diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index edb249c1..c4f8dec6 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -116,9 +116,8 @@ Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool, : dlnext(nullptr), dlprev(nullptr), request_start_time_(std::chrono::high_resolution_clock::now()), request_buf_(mcpool), response_buf_(mcpool), response_sent_bodylen_(0), - upstream_(upstream), blocked_link_(nullptr), response_datalen_(0), - num_retry_(0), stream_id_(stream_id), priority_(priority), - downstream_stream_id_(-1), + upstream_(upstream), blocked_link_(nullptr), num_retry_(0), + stream_id_(stream_id), priority_(priority), downstream_stream_id_(-1), response_rst_stream_error_code_(NGHTTP2_NO_ERROR), request_state_(INITIAL), response_state_(INITIAL), dispatch_state_(DISPATCH_NONE), upgraded_(false), chunked_request_(false), @@ -785,17 +784,6 @@ bool Downstream::get_expect_final_response() const { return expect_final_response_; } -void Downstream::add_response_datalen(size_t len) { response_datalen_ += len; } - -void Downstream::dec_response_datalen(size_t len) { - assert(response_datalen_ >= len); - response_datalen_ -= len; -} - -size_t Downstream::get_response_datalen() const { return response_datalen_; } - -void Downstream::reset_response_datalen() { response_datalen_ = 0; } - bool Downstream::expect_response_body() const { return http2::expect_response_body(req_.method, resp_.http_status); } diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index 0cbd91b4..c267096a 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -169,12 +169,20 @@ struct Request { struct Response { Response() - : fs(32), recv_body_length(0), http_status(0), http_major(1), - http_minor(1), connection_close(false) {} + : fs(32), recv_body_length(0), unconsumed_body_length(0), http_status(0), + http_major(1), http_minor(1), connection_close(false) {} + + void consume(size_t len) { + assert(unconsumed_body_length >= len); + unconsumed_body_length -= len; + } FieldStore fs; // the length of response body received so far int64_t recv_body_length; + // The number of bytes not consumed by the application yet. This is + // mainly for HTTP/2 backend. + size_t unconsumed_body_length; // HTTP status code unsigned int http_status; int http_major, http_minor; @@ -305,10 +313,6 @@ public: bool get_non_final_response() const; void set_expect_final_response(bool f); bool get_expect_final_response() const; - void add_response_datalen(size_t len); - void dec_response_datalen(size_t len); - size_t get_response_datalen() const; - void reset_response_datalen(); // Call this method when there is incoming data in downstream // connection. @@ -406,9 +410,6 @@ private: // only used by HTTP/2 or SPDY upstream BlockedLink *blocked_link_; - // The number of bytes not consumed by the application yet. - size_t response_datalen_; - size_t num_retry_; int32_t stream_id_; diff --git a/src/shrpx_http2_downstream_connection.cc b/src/shrpx_http2_downstream_connection.cc index e41d585d..2e1432e6 100644 --- a/src/shrpx_http2_downstream_connection.cc +++ b/src/shrpx_http2_downstream_connection.cc @@ -71,10 +71,12 @@ Http2DownstreamConnection::~Http2DownstreamConnection() { downstream_->get_downstream_stream_id() != -1) { submit_rst_stream(downstream_, error_code); - http2session_->consume(downstream_->get_downstream_stream_id(), - downstream_->get_response_datalen()); + auto &resp = downstream_->response(); - downstream_->reset_response_datalen(); + http2session_->consume(downstream_->get_downstream_stream_id(), + resp.unconsumed_body_length); + + resp.unconsumed_body_length = 0; http2session_->signal_write(); } @@ -105,15 +107,18 @@ void Http2DownstreamConnection::detach_downstream(Downstream *downstream) { if (LOG_ENABLED(INFO)) { DCLOG(INFO, this) << "Detaching from DOWNSTREAM:" << downstream; } + + auto &resp = downstream_->response(); + if (submit_rst_stream(downstream) == 0) { http2session_->signal_write(); } if (downstream_->get_downstream_stream_id() != -1) { http2session_->consume(downstream_->get_downstream_stream_id(), - downstream_->get_response_datalen()); + resp.unconsumed_body_length); - downstream_->reset_response_datalen(); + resp.unconsumed_body_length = 0; http2session_->signal_write(); } @@ -449,8 +454,6 @@ int Http2DownstreamConnection::resume_read(IOCtrlReason reason, } if (consumed > 0) { - assert(downstream_->get_response_datalen() >= consumed); - rv = http2session_->consume(downstream_->get_downstream_stream_id(), consumed); @@ -458,7 +461,9 @@ int Http2DownstreamConnection::resume_read(IOCtrlReason reason, return -1; } - downstream_->dec_response_datalen(consumed); + auto &resp = downstream_->response(); + + resp.unconsumed_body_length -= consumed; http2session_->signal_write(); } diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index 61e53282..881c4f26 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -1180,6 +1180,7 @@ int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, auto &resp = downstream->response(); resp.recv_body_length += len; + resp.unconsumed_body_length += len; auto upstream = downstream->get_upstream(); rv = upstream->on_downstream_body(downstream, data, len, false); @@ -1193,8 +1194,6 @@ int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, downstream->set_response_state(Downstream::MSG_RESET); } - downstream->add_response_datalen(len); - call_downstream_readcb(http2session, downstream); return 0; } diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index ce394549..b2e11e67 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -563,8 +563,8 @@ int HttpsUpstream::on_write() { if (output->rleft() == 0 && dconn && downstream->get_response_state() != Downstream::MSG_COMPLETE) { - if (downstream->resume_read(SHRPX_NO_BUFFER, - downstream->get_response_datalen()) != 0) { + if (downstream->resume_read(SHRPX_NO_BUFFER, resp.unconsumed_body_length) != + 0) { return -1; } @@ -601,8 +601,7 @@ int HttpsUpstream::on_write() { } } - return downstream->resume_read(SHRPX_NO_BUFFER, - downstream->get_response_datalen()); + return downstream->resume_read(SHRPX_NO_BUFFER, resp.unconsumed_body_length); } int HttpsUpstream::on_event() { return 0; }