From d5dcbf6f3bdd4e12acfdfc8634507b6adcb4350f Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Thu, 21 Aug 2014 21:22:16 +0900 Subject: [PATCH] nghttpx: Fix possible flow control issue Previously we only update consumed flow control window when number of bytes read in nghttp2 and spdylay callback is 0. Now we notify nghttp2 library the consumed bytes even if number of bytes read > 0. This change also uses newly added spdylay_session_consume() API, so we require spdylay >= 1.3.0. --- configure.ac | 2 +- src/shrpx_downstream.cc | 20 ++++-- src/shrpx_downstream.h | 4 +- src/shrpx_downstream_connection.h | 2 +- src/shrpx_http2_downstream_connection.cc | 78 +++++++++++---------- src/shrpx_http2_downstream_connection.h | 2 +- src/shrpx_http2_session.cc | 2 +- src/shrpx_http2_upstream.cc | 29 ++++---- src/shrpx_http2_upstream.h | 3 +- src/shrpx_http_downstream_connection.cc | 7 +- src/shrpx_http_downstream_connection.h | 2 +- src/shrpx_https_upstream.cc | 17 +++-- src/shrpx_https_upstream.h | 3 +- src/shrpx_spdy_upstream.cc | 86 +++++++++--------------- src/shrpx_spdy_upstream.h | 5 +- src/shrpx_upstream.h | 3 +- 16 files changed, 133 insertions(+), 132 deletions(-) diff --git a/configure.ac b/configure.ac index 99dbfcb3..be35adf0 100644 --- a/configure.ac +++ b/configure.ac @@ -312,7 +312,7 @@ fi # spdylay (for src/nghttpx and src/h2load) have_spdylay=no if test "x${request_spdylay}" != "xno"; then - PKG_CHECK_MODULES([LIBSPDYLAY], [libspdylay >= 1.2.3], + PKG_CHECK_MODULES([LIBSPDYLAY], [libspdylay >= 1.3.0], [have_spdylay=yes], [have_spdylay=no]) if test "x${have_spdylay}" = "xyes"; then AC_DEFINE([HAVE_SPDYLAY], [1], [Define to 1 if you have `spdylay` library.]) diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index 771abd65..4a5d0bc4 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -150,13 +150,13 @@ void Downstream::pause_read(IOCtrlReason reason) } } -int Downstream::resume_read(IOCtrlReason reason) +int Downstream::resume_read(IOCtrlReason reason, size_t consumed) { if(dconn_) { - return dconn_->resume_read(reason); - } else { - return 0; + return dconn_->resume_read(reason, consumed); } + + return 0; } void Downstream::force_resume_read() @@ -873,6 +873,12 @@ size_t Downstream::get_request_datalen() const return request_datalen_; } +void Downstream::dec_request_datalen(size_t len) +{ + assert(request_datalen_ >= len); + request_datalen_ -= len; +} + void Downstream::reset_request_datalen() { request_datalen_ = 0; @@ -883,6 +889,12 @@ void Downstream::add_response_datalen(size_t len) response_datalen_ += len; } +void Downstream::dec_response_datalen(size_t len) +{ + assert(response_datalen_ >= len); + response_datalen_ -= len; +} + size_t Downstream::get_response_datalen() const { return response_datalen_; diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index 98581d25..76244644 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -59,7 +59,7 @@ public: void set_priority(int32_t pri); int32_t get_priority() const; void pause_read(IOCtrlReason reason); - int resume_read(IOCtrlReason reason); + int resume_read(IOCtrlReason reason, size_t consumed); void force_resume_read(); // Set stream ID for downstream HTTP2 connection. void set_downstream_stream_id(int32_t stream_id); @@ -151,6 +151,7 @@ public: int push_upload_data_chunk(const uint8_t *data, size_t datalen); int end_upload_data(); size_t get_request_datalen() const; + void dec_request_datalen(size_t len); void reset_request_datalen(); bool request_pseudo_header_allowed() const; bool expect_response_body() const; @@ -224,6 +225,7 @@ public: void set_expect_final_response(bool f); bool get_expect_final_response() const; void add_response_datalen(size_t len); + void dec_response_datalen(size_t len); size_t get_response_datalen() const; void reset_response_datalen(); bool response_pseudo_header_allowed() const; diff --git a/src/shrpx_downstream_connection.h b/src/shrpx_downstream_connection.h index 8b56a1e7..c00af2cd 100644 --- a/src/shrpx_downstream_connection.h +++ b/src/shrpx_downstream_connection.h @@ -47,7 +47,7 @@ public: virtual int end_upload_data() = 0; virtual void pause_read(IOCtrlReason reason) = 0; - virtual int resume_read(IOCtrlReason reason) = 0; + virtual int resume_read(IOCtrlReason reason, size_t consumed) = 0; virtual void force_resume_read() = 0; virtual bool get_output_buffer_full() = 0; diff --git a/src/shrpx_http2_downstream_connection.cc b/src/shrpx_http2_downstream_connection.cc index bf3b8cd9..3afaaf87 100644 --- a/src/shrpx_http2_downstream_connection.cc +++ b/src/shrpx_http2_downstream_connection.cc @@ -203,11 +203,12 @@ ssize_t http2_data_read_callback(nghttp2_session *session, DCLOG(FATAL, dconn) << "evbuffer_remove() failed"; return NGHTTP2_ERR_CALLBACK_FAILURE; } - if(nread == 0) { + + if(nread > 0) { // This is important because it will handle flow control // stuff. - if(downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER, - downstream) != 0) { + if(downstream->get_upstream()->resume_read + (SHRPX_NO_BUFFER, downstream, nread) != 0) { // In this case, downstream may be deleted. return NGHTTP2_ERR_CALLBACK_FAILURE; } @@ -218,40 +219,32 @@ ssize_t http2_data_read_callback(nghttp2_session *session, return NGHTTP2_ERR_DEFERRED; } - if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { - if(!downstream->get_upgrade_request() || - (downstream->get_response_state() == Downstream::HEADER_COMPLETE && - !downstream->get_upgraded())) { - *data_flags |= NGHTTP2_DATA_FLAG_EOF; - } else { - downstream->disable_downstream_wtimer(); + break; + } - return NGHTTP2_ERR_DEFERRED; - } - break; + if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { + if(!downstream->get_upgrade_request() || + (downstream->get_response_state() == Downstream::HEADER_COMPLETE && + !downstream->get_upgraded())) { + *data_flags |= NGHTTP2_DATA_FLAG_EOF; } else { - if(evbuffer_get_length(body) == 0) { - // Check get_request_state() == MSG_COMPLETE just in case - if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { - *data_flags |= NGHTTP2_DATA_FLAG_EOF; - break; - } + downstream->disable_downstream_wtimer(); - downstream->disable_downstream_wtimer(); - - return NGHTTP2_ERR_DEFERRED; - } - } - } else { - // Send WINDOW_UPDATE before buffer is empty to avoid delay - // because of RTT. - if(!downstream->get_output_buffer_full() && - downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER, - downstream) == -1) { - // In this case, downstream may be deleted. return NGHTTP2_ERR_DEFERRED; } break; + } else { + if(evbuffer_get_length(body) == 0) { + // Check get_request_state() == MSG_COMPLETE just in case + if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { + *data_flags |= NGHTTP2_DATA_FLAG_EOF; + break; + } + + downstream->disable_downstream_wtimer(); + + return NGHTTP2_ERR_DEFERRED; + } } } @@ -517,7 +510,8 @@ int Http2DownstreamConnection::end_upload_data() return 0; } -int Http2DownstreamConnection::resume_read(IOCtrlReason reason) +int Http2DownstreamConnection::resume_read +(IOCtrlReason reason, size_t consumed) { int rv; @@ -530,17 +524,21 @@ int Http2DownstreamConnection::resume_read(IOCtrlReason reason) return 0; } - rv = http2session_->consume(downstream_->get_downstream_stream_id(), - downstream_->get_response_datalen()); + if(consumed > 0) { + assert(downstream_->get_response_datalen() >= consumed); - if(rv != 0) { - return -1; + rv = http2session_->consume(downstream_->get_downstream_stream_id(), + consumed); + + if(rv != 0) { + return -1; + } + + downstream_->dec_response_datalen(consumed); + + http2session_->notify(); } - downstream_->reset_response_datalen(); - - http2session_->notify(); - return 0; } diff --git a/src/shrpx_http2_downstream_connection.h b/src/shrpx_http2_downstream_connection.h index 18fd11e6..494e337b 100644 --- a/src/shrpx_http2_downstream_connection.h +++ b/src/shrpx_http2_downstream_connection.h @@ -50,7 +50,7 @@ public: virtual int end_upload_data(); virtual void pause_read(IOCtrlReason reason) {} - virtual int resume_read(IOCtrlReason reason); + virtual int resume_read(IOCtrlReason reason, size_t consumed); virtual void force_resume_read() {} virtual bool get_output_buffer_full(); diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index bd38524e..7286e8dc 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -1037,7 +1037,7 @@ int on_response_headers(Http2Session *http2session, if(downstream->get_upgraded()) { downstream->set_response_connection_close(true); // On upgrade sucess, both ends can send data - if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream) != 0) { + if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream, 0) != 0) { // If resume_read fails, just drop connection. Not ideal. delete upstream->get_client_handler(); return -1; diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 4e22cdd8..2a499e2d 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -886,9 +886,7 @@ void downstream_writecb(bufferevent *bev, void *ptr) return; } auto dconn = static_cast(ptr); - auto downstream = dconn->get_downstream(); - auto upstream = static_cast(downstream->get_upstream()); - upstream->resume_read(SHRPX_NO_BUFFER, downstream); + dconn->on_write(); } } // namespace @@ -1101,15 +1099,14 @@ ssize_t downstream_data_read_callback(nghttp2_session *session, downstream->disable_upstream_wtimer(); } - if(nread == 0) { - if(downstream->resume_read(SHRPX_NO_BUFFER) != 0) { - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - - if(((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) { - return NGHTTP2_ERR_DEFERRED; - } + if(nread > 0 && downstream->resume_read(SHRPX_NO_BUFFER, nread) != 0) { + return NGHTTP2_ERR_CALLBACK_FAILURE; } + + if(nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) { + return NGHTTP2_ERR_DEFERRED; + } + return nread; } } // namespace @@ -1361,15 +1358,17 @@ bool Http2Upstream::get_flow_control() const void Http2Upstream::pause_read(IOCtrlReason reason) {} -int Http2Upstream::resume_read(IOCtrlReason reason, Downstream *downstream) +int Http2Upstream::resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed) { if(get_flow_control()) { - if(consume(downstream->get_stream_id(), - downstream->get_request_datalen()) != 0) { + assert(downstream->get_request_datalen() >= consumed); + + if(consume(downstream->get_stream_id(), consumed) != 0) { return -1; } - downstream->reset_request_datalen(); + downstream->dec_request_datalen(consumed); } return send(); diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index d720e683..00b333f6 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -65,7 +65,8 @@ public: int error_reply(Downstream *downstream, unsigned int status_code); virtual void pause_read(IOCtrlReason reason); - virtual int resume_read(IOCtrlReason reason, Downstream *downstream); + virtual int resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed); virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 12fcd594..1c2bb055 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -408,7 +408,7 @@ void HttpDownstreamConnection::pause_read(IOCtrlReason reason) ioctrl_.pause_read(reason); } -int HttpDownstreamConnection::resume_read(IOCtrlReason reason) +int HttpDownstreamConnection::resume_read(IOCtrlReason reason, size_t consumed) { ioctrl_.resume_read(reason); return 0; @@ -475,7 +475,7 @@ int htp_hdrs_completecb(http_parser *htp) if(downstream->get_upgraded()) { // Upgrade complete, read until EOF in both ends - if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream) != 0) { + if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream, 0) != 0) { return -1; } downstream->set_request_state(Downstream::HEADER_COMPLETE); @@ -652,6 +652,9 @@ int HttpDownstreamConnection::on_read() int HttpDownstreamConnection::on_write() { + auto upstream = downstream_->get_upstream(); + upstream->resume_read(SHRPX_NO_BUFFER, downstream_, + downstream_->get_request_datalen()); return 0; } diff --git a/src/shrpx_http_downstream_connection.h b/src/shrpx_http_downstream_connection.h index df525fa0..5a69ec17 100644 --- a/src/shrpx_http_downstream_connection.h +++ b/src/shrpx_http_downstream_connection.h @@ -49,7 +49,7 @@ public: virtual int end_upload_data(); virtual void pause_read(IOCtrlReason reason); - virtual int resume_read(IOCtrlReason reason); + virtual int resume_read(IOCtrlReason reason, size_t consumed); virtual void force_resume_read(); virtual bool get_output_buffer_full(); diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index f41f682e..ee8a9177 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -422,7 +422,8 @@ int HttpsUpstream::on_write() } } - rv = downstream->resume_read(SHRPX_NO_BUFFER); + rv = downstream->resume_read(SHRPX_NO_BUFFER, + downstream->get_response_datalen()); } return rv; } @@ -442,7 +443,8 @@ void HttpsUpstream::pause_read(IOCtrlReason reason) ioctrl_.pause_read(reason); } -int HttpsUpstream::resume_read(IOCtrlReason reason, Downstream *downstream) +int HttpsUpstream::resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed) { if(ioctrl_.resume_read(reason)) { // Process remaining data in input buffer here because these bytes @@ -491,7 +493,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) upstream->delete_downstream(); // Process next HTTP request - if(upstream->resume_read(SHRPX_MSG_BLOCK, 0) == -1) { + if(upstream->resume_read(SHRPX_MSG_BLOCK, nullptr, 0) == -1) { return; } } @@ -537,7 +539,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) upstream->delete_downstream(); // Process next HTTP request - if(upstream->resume_read(SHRPX_MSG_BLOCK, 0) == -1) { + if(upstream->resume_read(SHRPX_MSG_BLOCK, nullptr, 0) == -1) { return; } @@ -579,7 +581,7 @@ void https_downstream_writecb(bufferevent *bev, void *ptr) auto downstream = dconn->get_downstream(); auto upstream = static_cast(downstream->get_upstream()); // May return -1 - upstream->resume_read(SHRPX_NO_BUFFER, downstream); + upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0); } } // namespace @@ -631,7 +633,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) } if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { upstream->delete_downstream(); - if(upstream->resume_read(SHRPX_MSG_BLOCK, 0) == -1) { + if(upstream->resume_read(SHRPX_MSG_BLOCK, nullptr, 0) == -1) { return; } } @@ -661,7 +663,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) } if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { upstream->delete_downstream(); - if(upstream->resume_read(SHRPX_MSG_BLOCK, 0) == -1) { + if(upstream->resume_read(SHRPX_MSG_BLOCK, nullptr, 0) == -1) { return; } } @@ -903,6 +905,7 @@ int HttpsUpstream::on_downstream_body(Downstream *downstream, ULOG(FATAL, this) << "evbuffer_add() failed"; return -1; } + if(downstream->get_chunked_response()) { if(evbuffer_add(output, "\r\n", 2) != 0) { ULOG(FATAL, this) << "evbuffer_add() failed"; diff --git a/src/shrpx_https_upstream.h b/src/shrpx_https_upstream.h index b2bb3309..c7f46ae5 100644 --- a/src/shrpx_https_upstream.h +++ b/src/shrpx_https_upstream.h @@ -59,7 +59,8 @@ public: int error_reply(unsigned int status_code); virtual void pause_read(IOCtrlReason reason); - virtual int resume_read(IOCtrlReason reason, Downstream *downstream); + virtual int resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed); virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 6175c777..8064a16e 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -107,6 +107,10 @@ void on_stream_close_callback return; } + upstream->consume(stream_id, downstream->get_request_datalen()); + + downstream->reset_request_datalen(); + if(downstream->get_request_state() == Downstream::CONNECT_FAIL) { upstream->remove_downstream(downstream); // downstrea was deleted @@ -295,7 +299,8 @@ void on_data_chunk_recv_callback(spdylay_session *session, auto downstream = upstream->find_downstream(stream_id); if(!downstream) { - upstream->handle_ign_data_chunk(len); + upstream->consume(stream_id, len); + return; } @@ -303,7 +308,9 @@ void on_data_chunk_recv_callback(spdylay_session *session, if(downstream->push_upload_data_chunk(data, len) != 0) { upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); - upstream->handle_ign_data_chunk(len); + + upstream->consume(stream_id, len); + return; } @@ -456,7 +463,7 @@ SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler) flow_control_ = true; initial_window_size_ = 1 << get_config()->http2_upstream_window_bits; rv = spdylay_session_set_option(session_, - SPDYLAY_OPT_NO_AUTO_WINDOW_UPDATE, &val, + SPDYLAY_OPT_NO_AUTO_WINDOW_UPDATE2, &val, sizeof(val)); assert(rv == 0); } else { @@ -619,9 +626,7 @@ void spdy_downstream_writecb(bufferevent *bev, void *ptr) return; } auto dconn = static_cast(ptr); - auto downstream = dconn->get_downstream(); - auto upstream = static_cast(downstream->get_upstream()); - upstream->resume_read(SHRPX_NO_BUFFER, downstream); + dconn->on_write(); } } // namespace @@ -834,14 +839,12 @@ ssize_t spdy_data_read_callback(spdylay_session *session, downstream->disable_upstream_wtimer(); } - if(nread == 0) { - if(downstream->resume_read(SHRPX_NO_BUFFER) != 0) { - return SPDYLAY_ERR_CALLBACK_FAILURE; - } + if(nread > 0 && downstream->resume_read(SHRPX_NO_BUFFER, nread) != 0) { + return SPDYLAY_ERR_CALLBACK_FAILURE; + } - if(*eof != 1) { - return SPDYLAY_ERR_DEFERRED; - } + if(nread == 0 && *eof != 1) { + return SPDYLAY_ERR_DEFERRED; } return nread; @@ -1091,40 +1094,19 @@ bool SpdyUpstream::get_flow_control() const void SpdyUpstream::pause_read(IOCtrlReason reason) {} -namespace { -int32_t determine_window_update_transmission(spdylay_session *session, - int32_t stream_id) -{ - int32_t recv_length, window_size; - if(stream_id == 0) { - recv_length = spdylay_session_get_recv_data_length(session); - window_size = 1 << get_config()->http2_upstream_connection_window_bits; - } else { - recv_length = spdylay_session_get_stream_recv_data_length - (session, stream_id); - window_size = 1 << get_config()->http2_upstream_window_bits; - } - if(recv_length != -1 && recv_length >= window_size / 2) { - return recv_length; - } - return -1; -} -} // namespace - -int SpdyUpstream::resume_read(IOCtrlReason reason, Downstream *downstream) +int SpdyUpstream::resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed) { if(get_flow_control()) { - int32_t delta; - delta = determine_window_update_transmission(session_, 0); - if(delta != -1) { - window_update(0, delta); - } - delta = determine_window_update_transmission - (session_, downstream->get_stream_id()); - if(delta != -1) { - window_update(downstream, delta); + assert(downstream->get_request_datalen() >= consumed); + + if(consume(downstream->get_stream_id(), consumed) != 0) { + return -1; } + + downstream->dec_request_datalen(consumed); } + return send(); } @@ -1142,24 +1124,22 @@ int SpdyUpstream::on_downstream_abort_request(Downstream *downstream, return send(); } -int SpdyUpstream::handle_ign_data_chunk(size_t len) +int SpdyUpstream::consume(int32_t stream_id, size_t len) { - int32_t window_size; + int rv; - if(spdylay_session_get_recv_data_length(session_) == -1) { - // No connection flow control - return 0; - } + rv = spdylay_session_consume(session_, stream_id, len); - window_size = 1 << get_config()->http2_upstream_connection_window_bits; - - if(recv_ign_window_size_ >= window_size / 2) { - window_update(0, recv_ign_window_size_); + if(rv != 0) { + ULOG(WARNING, this) << "spdylay_session_consume() returned error: " + << spdylay_strerror(rv); + return -1; } return 0; } + int SpdyUpstream::on_timeout(Downstream *downstream) { if(LOG_ENABLED(INFO)) { diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index 04d23297..e307196e 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -65,7 +65,8 @@ public: int error_reply(Downstream *downstream, unsigned int status_code); virtual void pause_read(IOCtrlReason reason); - virtual int resume_read(IOCtrlReason reason, Downstream *downstream); + virtual int resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed); virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, @@ -74,7 +75,7 @@ public: bool get_flow_control() const; - int handle_ign_data_chunk(size_t len); + int consume(int32_t stream_id, size_t len); void maintain_downstream_concurrency(); void initiate_downstream(std::unique_ptr downstream); diff --git a/src/shrpx_upstream.h b/src/shrpx_upstream.h index ae8793ab..758e5563 100644 --- a/src/shrpx_upstream.h +++ b/src/shrpx_upstream.h @@ -57,7 +57,8 @@ public: virtual int on_downstream_body_complete(Downstream *downstream) = 0; virtual void pause_read(IOCtrlReason reason) = 0; - virtual int resume_read(IOCtrlReason reason, Downstream *downstream) = 0; + virtual int resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed) = 0; }; } // namespace shrpx