diff --git a/configure.ac b/configure.ac index 99dbfcb3..be35adf0 100644 --- a/configure.ac +++ b/configure.ac @@ -312,7 +312,7 @@ fi # spdylay (for src/nghttpx and src/h2load) have_spdylay=no if test "x${request_spdylay}" != "xno"; then - PKG_CHECK_MODULES([LIBSPDYLAY], [libspdylay >= 1.2.3], + PKG_CHECK_MODULES([LIBSPDYLAY], [libspdylay >= 1.3.0], [have_spdylay=yes], [have_spdylay=no]) if test "x${have_spdylay}" = "xyes"; then AC_DEFINE([HAVE_SPDYLAY], [1], [Define to 1 if you have `spdylay` library.]) diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index 771abd65..4a5d0bc4 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -150,13 +150,13 @@ void Downstream::pause_read(IOCtrlReason reason) } } -int Downstream::resume_read(IOCtrlReason reason) +int Downstream::resume_read(IOCtrlReason reason, size_t consumed) { if(dconn_) { - return dconn_->resume_read(reason); - } else { - return 0; + return dconn_->resume_read(reason, consumed); } + + return 0; } void Downstream::force_resume_read() @@ -873,6 +873,12 @@ size_t Downstream::get_request_datalen() const return request_datalen_; } +void Downstream::dec_request_datalen(size_t len) +{ + assert(request_datalen_ >= len); + request_datalen_ -= len; +} + void Downstream::reset_request_datalen() { request_datalen_ = 0; @@ -883,6 +889,12 @@ void Downstream::add_response_datalen(size_t len) response_datalen_ += len; } +void Downstream::dec_response_datalen(size_t len) +{ + assert(response_datalen_ >= len); + response_datalen_ -= len; +} + size_t Downstream::get_response_datalen() const { return response_datalen_; diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index 98581d25..76244644 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -59,7 +59,7 @@ public: void set_priority(int32_t pri); int32_t get_priority() const; void pause_read(IOCtrlReason reason); - int resume_read(IOCtrlReason reason); + int resume_read(IOCtrlReason reason, size_t consumed); void force_resume_read(); // Set stream ID for downstream HTTP2 connection. void set_downstream_stream_id(int32_t stream_id); @@ -151,6 +151,7 @@ public: int push_upload_data_chunk(const uint8_t *data, size_t datalen); int end_upload_data(); size_t get_request_datalen() const; + void dec_request_datalen(size_t len); void reset_request_datalen(); bool request_pseudo_header_allowed() const; bool expect_response_body() const; @@ -224,6 +225,7 @@ public: void set_expect_final_response(bool f); bool get_expect_final_response() const; void add_response_datalen(size_t len); + void dec_response_datalen(size_t len); size_t get_response_datalen() const; void reset_response_datalen(); bool response_pseudo_header_allowed() const; diff --git a/src/shrpx_downstream_connection.h b/src/shrpx_downstream_connection.h index 8b56a1e7..c00af2cd 100644 --- a/src/shrpx_downstream_connection.h +++ b/src/shrpx_downstream_connection.h @@ -47,7 +47,7 @@ public: virtual int end_upload_data() = 0; virtual void pause_read(IOCtrlReason reason) = 0; - virtual int resume_read(IOCtrlReason reason) = 0; + virtual int resume_read(IOCtrlReason reason, size_t consumed) = 0; virtual void force_resume_read() = 0; virtual bool get_output_buffer_full() = 0; diff --git a/src/shrpx_http2_downstream_connection.cc b/src/shrpx_http2_downstream_connection.cc index bf3b8cd9..3afaaf87 100644 --- a/src/shrpx_http2_downstream_connection.cc +++ b/src/shrpx_http2_downstream_connection.cc @@ -203,11 +203,12 @@ ssize_t http2_data_read_callback(nghttp2_session *session, DCLOG(FATAL, dconn) << "evbuffer_remove() failed"; return NGHTTP2_ERR_CALLBACK_FAILURE; } - if(nread == 0) { + + if(nread > 0) { // This is important because it will handle flow control // stuff. - if(downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER, - downstream) != 0) { + if(downstream->get_upstream()->resume_read + (SHRPX_NO_BUFFER, downstream, nread) != 0) { // In this case, downstream may be deleted. return NGHTTP2_ERR_CALLBACK_FAILURE; } @@ -218,40 +219,32 @@ ssize_t http2_data_read_callback(nghttp2_session *session, return NGHTTP2_ERR_DEFERRED; } - if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { - if(!downstream->get_upgrade_request() || - (downstream->get_response_state() == Downstream::HEADER_COMPLETE && - !downstream->get_upgraded())) { - *data_flags |= NGHTTP2_DATA_FLAG_EOF; - } else { - downstream->disable_downstream_wtimer(); + break; + } - return NGHTTP2_ERR_DEFERRED; - } - break; + if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { + if(!downstream->get_upgrade_request() || + (downstream->get_response_state() == Downstream::HEADER_COMPLETE && + !downstream->get_upgraded())) { + *data_flags |= NGHTTP2_DATA_FLAG_EOF; } else { - if(evbuffer_get_length(body) == 0) { - // Check get_request_state() == MSG_COMPLETE just in case - if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { - *data_flags |= NGHTTP2_DATA_FLAG_EOF; - break; - } + downstream->disable_downstream_wtimer(); - downstream->disable_downstream_wtimer(); - - return NGHTTP2_ERR_DEFERRED; - } - } - } else { - // Send WINDOW_UPDATE before buffer is empty to avoid delay - // because of RTT. - if(!downstream->get_output_buffer_full() && - downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER, - downstream) == -1) { - // In this case, downstream may be deleted. return NGHTTP2_ERR_DEFERRED; } break; + } else { + if(evbuffer_get_length(body) == 0) { + // Check get_request_state() == MSG_COMPLETE just in case + if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { + *data_flags |= NGHTTP2_DATA_FLAG_EOF; + break; + } + + downstream->disable_downstream_wtimer(); + + return NGHTTP2_ERR_DEFERRED; + } } } @@ -517,7 +510,8 @@ int Http2DownstreamConnection::end_upload_data() return 0; } -int Http2DownstreamConnection::resume_read(IOCtrlReason reason) +int Http2DownstreamConnection::resume_read +(IOCtrlReason reason, size_t consumed) { int rv; @@ -530,17 +524,21 @@ int Http2DownstreamConnection::resume_read(IOCtrlReason reason) return 0; } - rv = http2session_->consume(downstream_->get_downstream_stream_id(), - downstream_->get_response_datalen()); + if(consumed > 0) { + assert(downstream_->get_response_datalen() >= consumed); - if(rv != 0) { - return -1; + rv = http2session_->consume(downstream_->get_downstream_stream_id(), + consumed); + + if(rv != 0) { + return -1; + } + + downstream_->dec_response_datalen(consumed); + + http2session_->notify(); } - downstream_->reset_response_datalen(); - - http2session_->notify(); - return 0; } diff --git a/src/shrpx_http2_downstream_connection.h b/src/shrpx_http2_downstream_connection.h index 18fd11e6..494e337b 100644 --- a/src/shrpx_http2_downstream_connection.h +++ b/src/shrpx_http2_downstream_connection.h @@ -50,7 +50,7 @@ public: virtual int end_upload_data(); virtual void pause_read(IOCtrlReason reason) {} - virtual int resume_read(IOCtrlReason reason); + virtual int resume_read(IOCtrlReason reason, size_t consumed); virtual void force_resume_read() {} virtual bool get_output_buffer_full(); diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index bd38524e..7286e8dc 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -1037,7 +1037,7 @@ int on_response_headers(Http2Session *http2session, if(downstream->get_upgraded()) { downstream->set_response_connection_close(true); // On upgrade sucess, both ends can send data - if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream) != 0) { + if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream, 0) != 0) { // If resume_read fails, just drop connection. Not ideal. delete upstream->get_client_handler(); return -1; diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 4e22cdd8..2a499e2d 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -886,9 +886,7 @@ void downstream_writecb(bufferevent *bev, void *ptr) return; } auto dconn = static_cast(ptr); - auto downstream = dconn->get_downstream(); - auto upstream = static_cast(downstream->get_upstream()); - upstream->resume_read(SHRPX_NO_BUFFER, downstream); + dconn->on_write(); } } // namespace @@ -1101,15 +1099,14 @@ ssize_t downstream_data_read_callback(nghttp2_session *session, downstream->disable_upstream_wtimer(); } - if(nread == 0) { - if(downstream->resume_read(SHRPX_NO_BUFFER) != 0) { - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - - if(((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) { - return NGHTTP2_ERR_DEFERRED; - } + if(nread > 0 && downstream->resume_read(SHRPX_NO_BUFFER, nread) != 0) { + return NGHTTP2_ERR_CALLBACK_FAILURE; } + + if(nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) { + return NGHTTP2_ERR_DEFERRED; + } + return nread; } } // namespace @@ -1361,15 +1358,17 @@ bool Http2Upstream::get_flow_control() const void Http2Upstream::pause_read(IOCtrlReason reason) {} -int Http2Upstream::resume_read(IOCtrlReason reason, Downstream *downstream) +int Http2Upstream::resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed) { if(get_flow_control()) { - if(consume(downstream->get_stream_id(), - downstream->get_request_datalen()) != 0) { + assert(downstream->get_request_datalen() >= consumed); + + if(consume(downstream->get_stream_id(), consumed) != 0) { return -1; } - downstream->reset_request_datalen(); + downstream->dec_request_datalen(consumed); } return send(); diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index d720e683..00b333f6 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -65,7 +65,8 @@ public: int error_reply(Downstream *downstream, unsigned int status_code); virtual void pause_read(IOCtrlReason reason); - virtual int resume_read(IOCtrlReason reason, Downstream *downstream); + virtual int resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed); virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 12fcd594..1c2bb055 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -408,7 +408,7 @@ void HttpDownstreamConnection::pause_read(IOCtrlReason reason) ioctrl_.pause_read(reason); } -int HttpDownstreamConnection::resume_read(IOCtrlReason reason) +int HttpDownstreamConnection::resume_read(IOCtrlReason reason, size_t consumed) { ioctrl_.resume_read(reason); return 0; @@ -475,7 +475,7 @@ int htp_hdrs_completecb(http_parser *htp) if(downstream->get_upgraded()) { // Upgrade complete, read until EOF in both ends - if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream) != 0) { + if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream, 0) != 0) { return -1; } downstream->set_request_state(Downstream::HEADER_COMPLETE); @@ -652,6 +652,9 @@ int HttpDownstreamConnection::on_read() int HttpDownstreamConnection::on_write() { + auto upstream = downstream_->get_upstream(); + upstream->resume_read(SHRPX_NO_BUFFER, downstream_, + downstream_->get_request_datalen()); return 0; } diff --git a/src/shrpx_http_downstream_connection.h b/src/shrpx_http_downstream_connection.h index df525fa0..5a69ec17 100644 --- a/src/shrpx_http_downstream_connection.h +++ b/src/shrpx_http_downstream_connection.h @@ -49,7 +49,7 @@ public: virtual int end_upload_data(); virtual void pause_read(IOCtrlReason reason); - virtual int resume_read(IOCtrlReason reason); + virtual int resume_read(IOCtrlReason reason, size_t consumed); virtual void force_resume_read(); virtual bool get_output_buffer_full(); diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index f41f682e..ee8a9177 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -422,7 +422,8 @@ int HttpsUpstream::on_write() } } - rv = downstream->resume_read(SHRPX_NO_BUFFER); + rv = downstream->resume_read(SHRPX_NO_BUFFER, + downstream->get_response_datalen()); } return rv; } @@ -442,7 +443,8 @@ void HttpsUpstream::pause_read(IOCtrlReason reason) ioctrl_.pause_read(reason); } -int HttpsUpstream::resume_read(IOCtrlReason reason, Downstream *downstream) +int HttpsUpstream::resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed) { if(ioctrl_.resume_read(reason)) { // Process remaining data in input buffer here because these bytes @@ -491,7 +493,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) upstream->delete_downstream(); // Process next HTTP request - if(upstream->resume_read(SHRPX_MSG_BLOCK, 0) == -1) { + if(upstream->resume_read(SHRPX_MSG_BLOCK, nullptr, 0) == -1) { return; } } @@ -537,7 +539,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) upstream->delete_downstream(); // Process next HTTP request - if(upstream->resume_read(SHRPX_MSG_BLOCK, 0) == -1) { + if(upstream->resume_read(SHRPX_MSG_BLOCK, nullptr, 0) == -1) { return; } @@ -579,7 +581,7 @@ void https_downstream_writecb(bufferevent *bev, void *ptr) auto downstream = dconn->get_downstream(); auto upstream = static_cast(downstream->get_upstream()); // May return -1 - upstream->resume_read(SHRPX_NO_BUFFER, downstream); + upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0); } } // namespace @@ -631,7 +633,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) } if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { upstream->delete_downstream(); - if(upstream->resume_read(SHRPX_MSG_BLOCK, 0) == -1) { + if(upstream->resume_read(SHRPX_MSG_BLOCK, nullptr, 0) == -1) { return; } } @@ -661,7 +663,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr) } if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { upstream->delete_downstream(); - if(upstream->resume_read(SHRPX_MSG_BLOCK, 0) == -1) { + if(upstream->resume_read(SHRPX_MSG_BLOCK, nullptr, 0) == -1) { return; } } @@ -903,6 +905,7 @@ int HttpsUpstream::on_downstream_body(Downstream *downstream, ULOG(FATAL, this) << "evbuffer_add() failed"; return -1; } + if(downstream->get_chunked_response()) { if(evbuffer_add(output, "\r\n", 2) != 0) { ULOG(FATAL, this) << "evbuffer_add() failed"; diff --git a/src/shrpx_https_upstream.h b/src/shrpx_https_upstream.h index b2bb3309..c7f46ae5 100644 --- a/src/shrpx_https_upstream.h +++ b/src/shrpx_https_upstream.h @@ -59,7 +59,8 @@ public: int error_reply(unsigned int status_code); virtual void pause_read(IOCtrlReason reason); - virtual int resume_read(IOCtrlReason reason, Downstream *downstream); + virtual int resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed); virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 6175c777..8064a16e 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -107,6 +107,10 @@ void on_stream_close_callback return; } + upstream->consume(stream_id, downstream->get_request_datalen()); + + downstream->reset_request_datalen(); + if(downstream->get_request_state() == Downstream::CONNECT_FAIL) { upstream->remove_downstream(downstream); // downstrea was deleted @@ -295,7 +299,8 @@ void on_data_chunk_recv_callback(spdylay_session *session, auto downstream = upstream->find_downstream(stream_id); if(!downstream) { - upstream->handle_ign_data_chunk(len); + upstream->consume(stream_id, len); + return; } @@ -303,7 +308,9 @@ void on_data_chunk_recv_callback(spdylay_session *session, if(downstream->push_upload_data_chunk(data, len) != 0) { upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); - upstream->handle_ign_data_chunk(len); + + upstream->consume(stream_id, len); + return; } @@ -456,7 +463,7 @@ SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler) flow_control_ = true; initial_window_size_ = 1 << get_config()->http2_upstream_window_bits; rv = spdylay_session_set_option(session_, - SPDYLAY_OPT_NO_AUTO_WINDOW_UPDATE, &val, + SPDYLAY_OPT_NO_AUTO_WINDOW_UPDATE2, &val, sizeof(val)); assert(rv == 0); } else { @@ -619,9 +626,7 @@ void spdy_downstream_writecb(bufferevent *bev, void *ptr) return; } auto dconn = static_cast(ptr); - auto downstream = dconn->get_downstream(); - auto upstream = static_cast(downstream->get_upstream()); - upstream->resume_read(SHRPX_NO_BUFFER, downstream); + dconn->on_write(); } } // namespace @@ -834,14 +839,12 @@ ssize_t spdy_data_read_callback(spdylay_session *session, downstream->disable_upstream_wtimer(); } - if(nread == 0) { - if(downstream->resume_read(SHRPX_NO_BUFFER) != 0) { - return SPDYLAY_ERR_CALLBACK_FAILURE; - } + if(nread > 0 && downstream->resume_read(SHRPX_NO_BUFFER, nread) != 0) { + return SPDYLAY_ERR_CALLBACK_FAILURE; + } - if(*eof != 1) { - return SPDYLAY_ERR_DEFERRED; - } + if(nread == 0 && *eof != 1) { + return SPDYLAY_ERR_DEFERRED; } return nread; @@ -1091,40 +1094,19 @@ bool SpdyUpstream::get_flow_control() const void SpdyUpstream::pause_read(IOCtrlReason reason) {} -namespace { -int32_t determine_window_update_transmission(spdylay_session *session, - int32_t stream_id) -{ - int32_t recv_length, window_size; - if(stream_id == 0) { - recv_length = spdylay_session_get_recv_data_length(session); - window_size = 1 << get_config()->http2_upstream_connection_window_bits; - } else { - recv_length = spdylay_session_get_stream_recv_data_length - (session, stream_id); - window_size = 1 << get_config()->http2_upstream_window_bits; - } - if(recv_length != -1 && recv_length >= window_size / 2) { - return recv_length; - } - return -1; -} -} // namespace - -int SpdyUpstream::resume_read(IOCtrlReason reason, Downstream *downstream) +int SpdyUpstream::resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed) { if(get_flow_control()) { - int32_t delta; - delta = determine_window_update_transmission(session_, 0); - if(delta != -1) { - window_update(0, delta); - } - delta = determine_window_update_transmission - (session_, downstream->get_stream_id()); - if(delta != -1) { - window_update(downstream, delta); + assert(downstream->get_request_datalen() >= consumed); + + if(consume(downstream->get_stream_id(), consumed) != 0) { + return -1; } + + downstream->dec_request_datalen(consumed); } + return send(); } @@ -1142,24 +1124,22 @@ int SpdyUpstream::on_downstream_abort_request(Downstream *downstream, return send(); } -int SpdyUpstream::handle_ign_data_chunk(size_t len) +int SpdyUpstream::consume(int32_t stream_id, size_t len) { - int32_t window_size; + int rv; - if(spdylay_session_get_recv_data_length(session_) == -1) { - // No connection flow control - return 0; - } + rv = spdylay_session_consume(session_, stream_id, len); - window_size = 1 << get_config()->http2_upstream_connection_window_bits; - - if(recv_ign_window_size_ >= window_size / 2) { - window_update(0, recv_ign_window_size_); + if(rv != 0) { + ULOG(WARNING, this) << "spdylay_session_consume() returned error: " + << spdylay_strerror(rv); + return -1; } return 0; } + int SpdyUpstream::on_timeout(Downstream *downstream) { if(LOG_ENABLED(INFO)) { diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index 04d23297..e307196e 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -65,7 +65,8 @@ public: int error_reply(Downstream *downstream, unsigned int status_code); virtual void pause_read(IOCtrlReason reason); - virtual int resume_read(IOCtrlReason reason, Downstream *downstream); + virtual int resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed); virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, @@ -74,7 +75,7 @@ public: bool get_flow_control() const; - int handle_ign_data_chunk(size_t len); + int consume(int32_t stream_id, size_t len); void maintain_downstream_concurrency(); void initiate_downstream(std::unique_ptr downstream); diff --git a/src/shrpx_upstream.h b/src/shrpx_upstream.h index ae8793ab..758e5563 100644 --- a/src/shrpx_upstream.h +++ b/src/shrpx_upstream.h @@ -57,7 +57,8 @@ public: virtual int on_downstream_body_complete(Downstream *downstream) = 0; virtual void pause_read(IOCtrlReason reason) = 0; - virtual int resume_read(IOCtrlReason reason, Downstream *downstream) = 0; + virtual int resume_read(IOCtrlReason reason, Downstream *downstream, + size_t consumed) = 0; }; } // namespace shrpx