diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index 11ef248e..edb249c1 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -116,9 +116,9 @@ Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool, : dlnext(nullptr), dlprev(nullptr), request_start_time_(std::chrono::high_resolution_clock::now()), request_buf_(mcpool), response_buf_(mcpool), response_sent_bodylen_(0), - upstream_(upstream), blocked_link_(nullptr), request_datalen_(0), - response_datalen_(0), num_retry_(0), stream_id_(stream_id), - priority_(priority), downstream_stream_id_(-1), + upstream_(upstream), blocked_link_(nullptr), response_datalen_(0), + num_retry_(0), stream_id_(stream_id), priority_(priority), + downstream_stream_id_(-1), response_rst_stream_error_code_(NGHTTP2_NO_ERROR), request_state_(INITIAL), response_state_(INITIAL), dispatch_state_(DISPATCH_NONE), upgraded_(false), chunked_request_(false), @@ -522,7 +522,7 @@ int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen) { return -1; } - request_datalen_ += datalen; + req_.unconsumed_body_length += datalen; return 0; } @@ -785,15 +785,6 @@ bool Downstream::get_expect_final_response() const { return expect_final_response_; } -size_t Downstream::get_request_datalen() const { return request_datalen_; } - -void Downstream::dec_request_datalen(size_t len) { - assert(request_datalen_ >= len); - request_datalen_ -= len; -} - -void Downstream::reset_request_datalen() { request_datalen_ = 0; } - void Downstream::add_response_datalen(size_t len) { response_datalen_ += len; } void Downstream::dec_response_datalen(size_t len) { diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index 2a25b1f2..0cbd91b4 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -124,9 +124,15 @@ private: struct Request { Request() - : fs(16), recv_body_length(0), method(-1), http_major(1), http_minor(1), - upgrade_request(false), http2_upgrade_seen(false), - connection_close(false), http2_expect_body(false) {} + : fs(16), recv_body_length(0), unconsumed_body_length(0), method(-1), + http_major(1), http_minor(1), upgrade_request(false), + http2_upgrade_seen(false), connection_close(false), + http2_expect_body(false) {} + + void consume(size_t len) { + assert(unconsumed_body_length >= len); + unconsumed_body_length -= len; + } FieldStore fs; // Request scheme. For HTTP/2, this is :scheme header field value. @@ -144,6 +150,8 @@ struct Request { std::string path; // the length of request body received so far int64_t recv_body_length; + // The number of bytes not consumed by the application yet. + size_t unconsumed_body_length; int method; // HTTP major and minor version int http_major, http_minor; @@ -238,9 +246,6 @@ public: void set_chunked_request(bool f); int push_upload_data_chunk(const uint8_t *data, size_t datalen); int end_upload_data(); - size_t get_request_datalen() const; - void dec_request_datalen(size_t len); - void reset_request_datalen(); // Validates that received request body length and content-length // matches. bool validate_request_recv_body_length() const; @@ -402,7 +407,6 @@ private: BlockedLink *blocked_link_; // The number of bytes not consumed by the application yet. - size_t request_datalen_; size_t response_datalen_; size_t num_retry_; diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index c7c81f4a..ac719484 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -66,9 +66,11 @@ int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, return 0; } - upstream->consume(stream_id, downstream->get_request_datalen()); + auto &req = downstream->request(); - downstream->reset_request_datalen(); + upstream->consume(stream_id, req.unconsumed_body_length); + + req.unconsumed_body_length = 0; if (downstream->get_request_state() == Downstream::CONNECT_FAIL) { upstream->remove_downstream(downstream); @@ -1643,13 +1645,13 @@ void Http2Upstream::pause_read(IOCtrlReason reason) {} int Http2Upstream::resume_read(IOCtrlReason reason, Downstream *downstream, size_t consumed) { if (get_flow_control()) { - assert(downstream->get_request_datalen() >= consumed); - if (consume(downstream->get_stream_id(), consumed) != 0) { return -1; } - downstream->dec_request_datalen(consumed); + auto &req = downstream->request(); + + req.consume(consumed); } handler_->signal_write(); diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index eb1e9766..4f39ab05 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -773,8 +773,10 @@ int HttpDownstreamConnection::on_write() { ev_timer_stop(conn_.loop, &conn_.wt); if (input->rleft() == 0) { + auto &req = downstream_->request(); + upstream->resume_read(SHRPX_NO_BUFFER, downstream_, - downstream_->get_request_datalen()); + req.unconsumed_body_length); } return 0; diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index def2703c..e29dca1c 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -102,9 +102,11 @@ void on_stream_close_callback(spdylay_session *session, int32_t stream_id, return; } - upstream->consume(stream_id, downstream->get_request_datalen()); + auto &req = downstream->request(); - downstream->reset_request_datalen(); + upstream->consume(stream_id, req.unconsumed_body_length); + + req.unconsumed_body_length = 0; if (downstream->get_request_state() == Downstream::CONNECT_FAIL) { upstream->remove_downstream(downstream); @@ -1112,13 +1114,13 @@ void SpdyUpstream::pause_read(IOCtrlReason reason) {} int SpdyUpstream::resume_read(IOCtrlReason reason, Downstream *downstream, size_t consumed) { if (get_flow_control()) { - assert(downstream->get_request_datalen() >= consumed); - if (consume(downstream->get_stream_id(), consumed) != 0) { return -1; } - downstream->dec_request_datalen(consumed); + auto &req = downstream->request(); + + req.consume(consumed); } handler_->signal_write();