diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index 3134611d..9abc2a15 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -960,8 +960,32 @@ namespace { int on_frame_recv_callback (nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { + int rv; auto http2session = static_cast(user_data); + switch(frame->hd.type) { + case NGHTTP2_DATA: { + auto sd = static_cast + (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); + if(!sd || !sd->dconn) { + break; + } + auto downstream = sd->dconn->get_downstream(); + if(!downstream || + downstream->get_downstream_stream_id() != frame->hd.stream_id) { + break; + } + + auto upstream = downstream->get_upstream(); + rv = upstream->on_downstream_body(downstream, nullptr, 0, true); + if(rv != 0) { + http2session->submit_rst_stream(frame->hd.stream_id, + NGHTTP2_INTERNAL_ERROR); + downstream->set_response_state(Downstream::MSG_RESET); + } + call_downstream_readcb(http2session, downstream); + break; + } case NGHTTP2_HEADERS: return on_response_headers(http2session, session, frame); case NGHTTP2_RST_STREAM: { @@ -1039,7 +1063,7 @@ int on_data_chunk_recv_callback(nghttp2_session *session, } auto upstream = downstream->get_upstream(); - rv = upstream->on_downstream_body(downstream, data, len); + rv = upstream->on_downstream_body(downstream, data, len, false); if(rv != 0) { http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR); downstream->set_response_state(Downstream::MSG_RESET); diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 32672d49..005ee154 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -905,11 +905,13 @@ ssize_t downstream_data_read_callback(nghttp2_session *session, auto handler = upstream->get_client_handler(); auto body = downstream->get_response_body_buf(); assert(body); + int nread = evbuffer_remove(body, buf, length); if(nread == -1) { ULOG(FATAL, upstream) << "evbuffer_remove() failed"; return NGHTTP2_ERR_CALLBACK_FAILURE; } + if(nread == 0 && downstream->get_response_state() == Downstream::MSG_COMPLETE) { if(!downstream->get_upgraded()) { @@ -1094,7 +1096,8 @@ int Http2Upstream::on_downstream_header_complete(Downstream *downstream) // WARNING: Never call directly or indirectly nghttp2_session_send or // nghttp2_session_recv. These calls may delete downstream. int Http2Upstream::on_downstream_body(Downstream *downstream, - const uint8_t *data, size_t len) + const uint8_t *data, size_t len, + bool flush) { auto upstream = downstream->get_upstream(); auto handler = upstream->get_client_handler(); @@ -1104,11 +1107,18 @@ int Http2Upstream::on_downstream_body(Downstream *downstream, ULOG(FATAL, this) << "evbuffer_add() failed"; return -1; } - nghttp2_session_resume_data(session_, downstream->get_stream_id()); + + if(flush) { + nghttp2_session_resume_data(session_, downstream->get_stream_id()); + } auto outbuflen = handler->get_outbuf_length() + evbuffer_get_length(body); if(outbuflen > OUTBUF_MAX_THRES) { + if(!flush) { + nghttp2_session_resume_data(session_, downstream->get_stream_id()); + } + downstream->pause_read(SHRPX_NO_BUFFER); } diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index bd761297..02eebd18 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -69,7 +69,7 @@ public: virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, - const uint8_t *data, size_t len); + const uint8_t *data, size_t len, bool flush); virtual int on_downstream_body_complete(Downstream *downstream); bool get_flow_control() const; diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 7240340b..a4d35b8c 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -468,7 +468,7 @@ int htp_bodycb(http_parser *htp, const char *data, size_t len) { auto downstream = static_cast(htp->data); return downstream->get_upstream()->on_downstream_body - (downstream, reinterpret_cast(data), len); + (downstream, reinterpret_cast(data), len, true); } } // namespace @@ -507,7 +507,7 @@ int HttpDownstreamConnection::on_read() // For upgraded connection, just pass data to the upstream. int rv; rv = downstream_->get_upstream()->on_downstream_body - (downstream_, reinterpret_cast(mem), inputlen); + (downstream_, reinterpret_cast(mem), inputlen, true); if(rv != 0) { return rv; } diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index b41c2055..97fd6c1b 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -734,7 +734,8 @@ int HttpsUpstream::on_downstream_header_complete(Downstream *downstream) } int HttpsUpstream::on_downstream_body(Downstream *downstream, - const uint8_t *data, size_t len) + const uint8_t *data, size_t len, + bool flush) { int rv; if(len == 0) { diff --git a/src/shrpx_https_upstream.h b/src/shrpx_https_upstream.h index 423ffe8d..c99eff10 100644 --- a/src/shrpx_https_upstream.h +++ b/src/shrpx_https_upstream.h @@ -60,7 +60,7 @@ public: virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, - const uint8_t *data, size_t len); + const uint8_t *data, size_t len, bool flush); virtual int on_downstream_body_complete(Downstream *downstream); void reset_current_header_length(); diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 5815d904..6bfcd95f 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -919,7 +919,8 @@ int SpdyUpstream::on_downstream_header_complete(Downstream *downstream) // WARNING: Never call directly or indirectly spdylay_session_send or // spdylay_session_recv. These calls may delete downstream. int SpdyUpstream::on_downstream_body(Downstream *downstream, - const uint8_t *data, size_t len) + const uint8_t *data, size_t len, + bool flush) { auto upstream = downstream->get_upstream(); auto body = downstream->get_response_body_buf(); @@ -928,11 +929,18 @@ int SpdyUpstream::on_downstream_body(Downstream *downstream, ULOG(FATAL, this) << "evbuffer_add() failed"; return -1; } - spdylay_session_resume_data(session_, downstream->get_stream_id()); + + if(flush) { + spdylay_session_resume_data(session_, downstream->get_stream_id()); + } auto outbuflen = upstream->get_client_handler()->get_outbuf_length() + evbuffer_get_length(body); if(outbuflen > OUTBUF_MAX_THRES) { + if(!flush) { + spdylay_session_resume_data(session_, downstream->get_stream_id()); + } + downstream->pause_read(SHRPX_NO_BUFFER); } diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index bae8360d..06b0d6f8 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -64,7 +64,7 @@ public: virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_body(Downstream *downstream, - const uint8_t *data, size_t len); + const uint8_t *data, size_t len, bool flush); virtual int on_downstream_body_complete(Downstream *downstream); bool get_flow_control() const; diff --git a/src/shrpx_upstream.h b/src/shrpx_upstream.h index c524a95e..792daa36 100644 --- a/src/shrpx_upstream.h +++ b/src/shrpx_upstream.h @@ -49,7 +49,8 @@ public: virtual int on_downstream_header_complete(Downstream *downstream) = 0; virtual int on_downstream_body(Downstream *downstream, - const uint8_t *data, size_t len) = 0; + const uint8_t *data, size_t len, + bool flush) = 0; virtual int on_downstream_body_complete(Downstream *downstream) = 0; virtual void pause_read(IOCtrlReason reason) = 0;