24 clientConnection(aConn),
28 mayUseConnection_(false),
29 connRegistered_(false)
41 if (
auto node = getTail()) {
56 connRegistered_ =
true;
63 return http->out.size == 0;
70 debugs(33, 5, clientConnection <<
", sz " <<
size <<
71 ", off " << (http->out.size +
size) <<
", len " <<
74 http->out.size +=
size;
76 switch (socketState()) {
83 debugs(33, 5, clientConnection <<
" Stream complete, keepalive is " <<
84 http->request->flags.proxyKeepalive);
88 if (!http->request->flags.proxyKeepalive)
89 clientConnection->close();
96 initiateClose(
"STREAM_UNPLANNED_COMPLETE");
100 initiateClose(
"STREAM_FAILED");
104 fatal(
"Hit unreachable code in Http::Stream::writeComplete\n");
111 debugs(33, 5, reply <<
" written " << http->out.size <<
" into " << clientConnection);
117 readBuffer.
offset = getNextRangeOffset();
119 readBuffer.
data = reqbuf;
127 return http->multipartRangeRequest();
133 debugs (33, 5,
"range: " << http->request->range <<
134 "; http offset " << http->out.offset <<
135 "; reply " << reply);
141 if (http->request->range) {
144 assert(http->range_iter.valid);
146 assert(canPackMoreRanges());
148 assert(http->range_iter.currentSpec());
150 int64_t start = http->range_iter.currentSpec()->offset +
151 http->range_iter.currentSpec()->length -
152 http->range_iter.debt();
153 debugs(33, 3,
"clientPackMoreRanges: in: offset: " << http->out.offset);
154 debugs(33, 3,
"clientPackMoreRanges: out:"
155 " start: " << start <<
156 " spec[" << http->range_iter.pos - http->request->range->begin() <<
"]:" <<
157 " [" << http->range_iter.currentSpec()->offset <<
158 ", " << http->range_iter.currentSpec()->offset +
159 http->range_iter.currentSpec()->length <<
"),"
160 " len: " << http->range_iter.currentSpec()->length <<
161 " debt: " << http->range_iter.debt());
162 if (http->range_iter.currentSpec()->length != -1)
163 assert(http->out.offset <= start);
168 }
else if (
const auto cr = reply ? reply->contentRange() :
nullptr) {
174 return http->out.offset + cr->spec.offset;
177 return http->out.offset;
191 if (!http->range_iter.debt()) {
192 debugs(33, 5,
"At end of current range spec for " << clientConnection);
194 if (http->range_iter.pos != http->range_iter.end)
195 ++http->range_iter.pos;
197 http->range_iter.updateSpec();
200 assert(!http->range_iter.debt() == !http->range_iter.currentSpec());
204 debugs(33, 5,
"returning " << (http->range_iter.currentSpec() ?
true :
false));
205 return http->range_iter.currentSpec() ? true :
false;
216 if (http->request->range) {
218 assert(http->range_iter.valid);
221 if (!canPackMoreRanges()) {
222 debugs(33, 5,
"Range request at end of returnable " <<
223 "range sequence on " << clientConnection);
227 }
else if (reply && reply->contentRange()) {
229 const int64_t &bytesSent = http->out.offset;
230 const int64_t &bytesExpected = reply->contentRange()->spec.length;
232 debugs(33, 7,
"body bytes sent vs. expected: " <<
233 bytesSent <<
" ? " << bytesExpected <<
" (+" <<
234 reply->contentRange()->spec.offset <<
")");
242 if (bytesSent == bytesExpected)
245 if (bytesSent > bytesExpected)
261 fatal (
"unreachable code\n");
273 debugs(11, 2,
"HTTP Client " << clientConnection);
274 debugs(11, 2,
"HTTP Client REPLY:\n---------\n" << mb->
buf <<
"\n----------");
280 if (multipartRangeRequest())
281 packRange(bodyData, mb);
282 else if (http->request->flags.chunkedReply) {
283 packChunk(bodyData, *mb);
285 size_t length = lengthToSend(bodyData.
range());
286 noteSentBodyBytes(length);
296 const auto answer = chl->fastCheck();
297 if (answer.allowed()) {
298 writeQuotaHandler = pool->createBucket();
299 fd_table[clientConnection->fd].writeQuotaHandler = writeQuotaHandler;
302 debugs(83, 4,
"Response delay pool " << pool->poolName <<
303 " skipped because ACL " << answer);
309 getConn()->write(mb);
316 if (!multipartRangeRequest() && !http->request->flags.chunkedReply) {
317 size_t length = lengthToSend(bodyData.
range());
318 noteSentBodyBytes(length);
319 getConn()->write(bodyData.
data, length);
325 if (multipartRangeRequest())
326 packRange(bodyData, &mb);
328 packChunk(bodyData, mb);
331 getConn()->write(&mb);
340 size_t maximum = available.
size();
342 if (!http->request->range)
345 assert(canPackMoreRanges());
347 if (http->range_iter.debt() == -1)
350 assert(http->range_iter.debt() > 0);
353 if (available.
start < http->range_iter.currentSpec()->offset)
356 return min(http->range_iter.debt(),
static_cast<int64_t
>(maximum));
362 debugs(33, 7, bytes <<
" body bytes");
363 http->out.offset += bytes;
365 if (!http->request->range)
368 if (http->range_iter.debt() != -1) {
369 http->range_iter.debt(http->range_iter.debt() - bytes);
370 assert (http->range_iter.debt() >= 0);
374 assert(http->range_iter.debt() >= -1);
391 (rep_tag.
str ? rep_tag.
str :
"<none>"));
419 const char *range_err =
nullptr;
424 auto contentRange = rep ? rep->
contentRange() :
nullptr;
427 range_err =
"no [parse-able] reply";
429 range_err =
"wrong status code";
431 range_err =
"too complex response";
433 range_err =
"wrong status code";
436 range_err =
"meaningless response";
439 range_err =
"unknown length";
440 else if (rep->
content_length != http->storeEntry()->mem().baseReply().content_length)
441 range_err =
"INCONSISTENT length";
446 else if (http->loggingTags().isTcpHit() &&
449 range_err =
"If-Range match failed";
451 else if (!http->request->range->canonize(rep))
452 range_err =
"canonization failed";
453 else if (http->request->range->isComplex())
454 range_err =
"too complex range header";
455 else if (!http->loggingTags().isTcpHit() && http->request->range->offsetLimitExceeded(roffLimit))
456 range_err =
"range outside range_offset_limit";
465 http->request->ignoreRange(range_err);
471 const auto actual_clen = http->prepPartialResponseGeneration();
473 const int spec_count = http->request->range->specs.size();
475 debugs(33, 3,
"range spec count: " << spec_count <<
479 if (spec_count == 1) {
480 const auto singleSpec = *http->request->range->begin();
496 debugs(33, 3,
"actual content length: " << actual_clen);
503 if (http->client_stream.tail)
512 return static_cast<clientStreamNode *
>(http->client_stream.tail->prev->data);
518 assert(http && http->getConn());
519 return http->getConn();
527 http->updateError(
error);
528 http->al->cache.code.err.update(lte);
542 connRegistered_ =
false;
550 debugs(33, 4, clientConnection <<
" because " << reason);
551 getConn()->stopSending(reason);
557 debugs(33, 2,
"Deferring request " << http->uri);
558 assert(flags.deferred == 0);
560 deferredparams.node =
node;
561 deferredparams.rep = rep;
562 deferredparams.queuedBuffer = receivedData;
569 if (http->request->range)
570 buildRangeHeader(rep);
580 const uint64_t length =
581 static_cast<uint64_t
>(lengthToSend(bodyData.
range()));
582 noteSentBodyBytes(length);
598 char const *buf = source.
data;
601 const size_t copy_sz = lengthToSend(available);
613 &http->storeEntry()->mem().freshestReply(),
620 debugs(33, 3,
"appending " << copy_sz <<
" bytes");
621 noteSentBodyBytes(copy_sz);
625 available.
start += copy_sz;
629 if (!canPackMoreRanges()) {
630 debugs(33, 3,
"Returning because !canPackMoreRanges.");
637 int64_t nextOffset = getNextRangeOffset();
638 assert(nextOffset >= http->out.offset);
639 int64_t skip = nextOffset - http->out.offset;
641 http->out.offset = nextOffset;
643 if (available.
size() <= (uint64_t)skip)
646 available.
start += skip;
657 clientConnection->close();
bool etagIsStrongEqual(const ETag &tag1, const ETag &tag2)
whether etags are strong-equal
void httpHeaderAddContRange(HttpHeader *, HttpHdrRangeSpec, int64_t)
#define SQUIDSTRINGPRINT(s)
static bool clientIfRangeMatch(ClientHttpRequest *http, HttpReply *rep)
int conn
the current server connection FD
void error(char *format,...)
HttpRequest *const request
StoreEntry * storeEntry() const
static void Reset()
forgets the current context, setting it to nil/unknown
void kick()
try to make progress on a transaction or read more I/O
int weak
true if it is a weak validator
const char * str
quoted-string
const HttpHdrRangeSpec * currentSpec() const
static int64_t const UnknownPosition
const HttpHdrContRange * contentRange() const
int64_t getRangeOffsetLimit()
AnyP::ProtocolVersion version
breakdown of protocol version label: (HTTP/ICY) and (0.9/1.0/1.1)
void set(const AnyP::ProtocolVersion &newVersion, Http::StatusCode newStatus, const char *newReason=nullptr)
Http::StatusCode status() const
retrieve the status code for this status line
void sendStartOfMessage(HttpReply *, StoreIOBuffer bodyData)
send an HTTP reply message headers and maybe some initial payload
struct Http::Stream::@74 flags
void deferRecipientForLater(clientStreamNode *, HttpReply *, StoreIOBuffer receivedData)
DeferredParams deferredparams
size_t lengthToSend(Range< int64_t > const &available) const
bool canPackMoreRanges() const
void registerWithConn()
register this stream with the Server
void packChunk(const StoreIOBuffer &bodyData, MemBuf &)
ConnStateData * getConn() const
clientStream_status_t socketState()
Adapt stream status to account for Range cases.
void sendBody(StoreIOBuffer bodyData)
send some HTTP reply message payload
void buildRangeHeader(HttpReply *)
add Range headers (if any) to the given HTTP reply message
char reqbuf[HTTP_REQBUF_SZ]
clientStreamNode * getClientReplyContext() const
void packRange(StoreIOBuffer const &, MemBuf *)
void initiateClose(const char *reason)
terminate due to a send/write error (may continue reading)
int64_t getNextRangeOffset() const
bool multipartRangeRequest() const
bool startOfOutput() const
whether the reply has started being sent
void noteSentBodyBytes(size_t)
void finished()
cleanup when the transaction has finished. may destroy 'this'
void writeComplete(size_t size)
update stream state after a write, may initiate more I/O
void prepareReply(HttpReply *)
clientStreamNode * getTail() const
Stream(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq)
construct with HTTP/1.x details
void noteIoError(const Error &, const LogTagsErrors &)
update state to reflect I/O error
void pullData()
get more data to send
void append(const char *c, int sz) override
void init(mb_size_t szInit, mb_size_t szMax)
mb_size_t contentSize() const
available data size
static MessageDelayPools * Instance()
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
bool modifiedSince(const time_t ims, const int imslen=-1) const
int64_t objectLen() const
Range< int64_t > range() const
void httpRequestFree(void *data)
void clientPackTermBound(String boundary, MemBuf *mb)
put terminating boundary for multiparts to the buffer
ACLFilledChecklist * clientAclChecklistCreate(const acl_access *acl, ClientHttpRequest *http)
void clientPackRangeHdr(const HttpReplyPointer &rep, const HttpHdrRangeSpec *spec, String boundary, MemBuf *mb)
append a "part" HTTP header (as in a multi-part/range reply) to the buffer
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
@ STREAM_UNPLANNED_COMPLETE
void fatal(const char *message)
clientStream_status_t clientStreamStatus(clientStreamNode *thisObject, ClientHttpRequest *http)
void clientStreamRead(clientStreamNode *thisObject, ClientHttpRequest *http, StoreIOBuffer readBuffer)
void clientStreamDetach(clientStreamNode *thisObject, ClientHttpRequest *http)
void HTTPMSGLOCK(Http::Message *a)