diff --git a/src/memchunk.h b/src/memchunk.h index 24f0f27b..e7c22d72 100644 --- a/src/memchunk.h +++ b/src/memchunk.h @@ -222,7 +222,7 @@ template struct Memchunks { } return ndata - count; } - int riovec(struct iovec *iov, int iovcnt) { + int riovec(struct iovec *iov, int iovcnt) const { if (!head) { return 0; } diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index d2947bcd..86ab02b3 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -731,7 +731,7 @@ int ClientHandler::perform_http2_upgrade(HttpsUpstream *http) { "Connection: Upgrade\r\n" "Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n" "\r\n"; - upstream->response_write(res, sizeof(res) - 1); + upstream->get_response_buf()->write(res, sizeof(res) - 1); upstream_ = std::move(upstream); signal_write(); diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index eefa99f2..a1f73030 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -1241,4 +1241,8 @@ bool Downstream::can_detach_downstream_connection() const { !response_connection_close_; } +DefaultMemchunks Downstream::pop_response_buf() { + return std::move(response_buf_); +} + } // namespace shrpx diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index f90e80ce..6960a236 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -342,6 +342,8 @@ public: // Returns true if downstream_connection can be detached and reused. bool can_detach_downstream_connection() const; + DefaultMemchunks pop_response_buf(); + enum { EVENT_ERROR = 0x1, EVENT_TIMEOUT = 0x2, diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index c885f2e7..82858fab 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -647,6 +647,57 @@ int on_frame_not_send_callback(nghttp2_session *session, } } // namespace +void Http2Upstream::set_pending_data_downstream(Downstream *downstream, + size_t n) { + pending_data_downstream_ = downstream; + data_pendinglen_ = n; +} + +namespace { +int send_data_callback(nghttp2_session *session, nghttp2_frame *frame, + const uint8_t *framehd, size_t length, + nghttp2_data_source *source, void *user_data) { + auto downstream = static_cast(source->ptr); + auto upstream = static_cast(downstream->get_upstream()); + auto body = downstream->get_response_buf(); + + auto wb = upstream->get_response_buf(); + + if (wb->wleft() < 9) { + return NGHTTP2_ERR_WOULDBLOCK; + } + + wb->write(framehd, 9); + + auto nwrite = std::min(length, wb->wleft()); + body->remove(wb->last, nwrite); + wb->write(nwrite); + if (nwrite < length) { + // We must store unsent amount of data to somewhere. We just tell + // libnghttp2 that we wrote everything, so downstream could be + // deleted. We handle this situation in + // Http2Upstream::remove_downstream(). + upstream->set_pending_data_downstream(downstream, length - nwrite); + } + + if (wb->rleft() == 0) { + downstream->disable_upstream_wtimer(); + } else { + downstream->reset_upstream_wtimer(); + } + + if (length > 0 && downstream->resume_read(SHRPX_NO_BUFFER, length) != 0) { + return NGHTTP2_ERR_CALLBACK_FAILURE; + } + + if (length > 0) { + downstream->add_response_sent_bodylen(length); + } + + return nwrite < length ? NGHTTP2_ERR_PAUSE : 0; +} +} // namespace + namespace { uint32_t infer_upstream_rst_stream_error_code(uint32_t downstream_error_code) { // NGHTTP2_REFUSED_STREAM is important because it tells upstream @@ -748,6 +799,9 @@ nghttp2_session_callbacks *create_http2_upstream_callbacks() { nghttp2_session_callbacks_set_on_begin_headers_callback( callbacks, on_begin_headers_callback); + nghttp2_session_callbacks_set_send_data_callback(callbacks, + send_data_callback); + if (get_config()->padding) { nghttp2_session_callbacks_set_select_padding_callback( callbacks, http::select_padding_callback); @@ -764,8 +818,9 @@ Http2Upstream::Http2Upstream(ClientHandler *handler) ? get_config()->downstream_connections_per_frontend : 0, !get_config()->http2_proxy), - handler_(handler), session_(nullptr), data_pending_(nullptr), - data_pendinglen_(0), shutdown_handled_(false) { + pending_response_buf_(handler->get_worker()->get_mcpool()), + pending_data_downstream_(nullptr), handler_(handler), session_(nullptr), + data_pending_(nullptr), data_pendinglen_(0), shutdown_handled_(false) { int rv; @@ -870,17 +925,42 @@ int Http2Upstream::on_write() { wb_.reset(); } - if (data_pending_) { - auto n = std::min(wb_.wleft(), data_pendinglen_); - wb_.write(data_pending_, n); - if (n < data_pendinglen_) { + if (data_pendinglen_ > 0) { + if (data_pending_) { + auto n = std::min(wb_.wleft(), data_pendinglen_); + wb_.write(data_pending_, n); data_pending_ += n; data_pendinglen_ -= n; - return 0; - } - data_pending_ = nullptr; - data_pendinglen_ = 0; + if (data_pendinglen_ > 0) { + return 0; + } + + data_pending_ = nullptr; + } else { + auto n = std::min(wb_.wleft(), data_pendinglen_); + DefaultMemchunks *body; + if (pending_data_downstream_) { + body = pending_data_downstream_->get_response_buf(); + } else { + body = &pending_response_buf_; + } + body->remove(wb_.last, n); + wb_.write(n); + data_pendinglen_ -= n; + + if (data_pendinglen_ > 0) { + return 0; + } + + if (pending_data_downstream_) { + pending_data_downstream_ = nullptr; + } else { + // Downstream was already deleted, and we don't need its + // response data. + body->reset(); + } + } } for (;;) { @@ -1118,8 +1198,10 @@ ssize_t downstream_data_read_callback(nghttp2_session *session, } } - auto nread = body->remove(buf, length); - auto body_empty = body->rleft() == 0; + auto nread = std::min(body->rleft(), length); + auto body_empty = body->rleft() == nread; + + *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY; if (body_empty && downstream->get_response_state() == Downstream::MSG_COMPLETE) { @@ -1147,24 +1229,10 @@ ssize_t downstream_data_read_callback(nghttp2_session *session, } } - if (body_empty) { - downstream->disable_upstream_wtimer(); - } else { - downstream->reset_upstream_wtimer(); - } - - if (nread > 0 && downstream->resume_read(SHRPX_NO_BUFFER, nread) != 0) { - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - if (nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) { return NGHTTP2_ERR_DEFERRED; } - if (nread > 0) { - downstream->add_response_sent_bodylen(nread); - } - return nread; } } // namespace @@ -1275,6 +1343,11 @@ void Http2Upstream::remove_downstream(Downstream *downstream) { nghttp2_session_set_stream_user_data(session_, downstream->get_stream_id(), nullptr); + if (downstream == pending_data_downstream_) { + pending_data_downstream_ = nullptr; + pending_response_buf_ = downstream->pop_response_buf(); + } + auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream); if (next_downstream) { @@ -1771,8 +1844,6 @@ void Http2Upstream::response_drain(size_t n) { wb_.drain(n); } bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; } -void Http2Upstream::response_write(void *data, size_t len) { - wb_.write(data, len); -} +Http2Upstream::WriteBuffer *Http2Upstream::get_response_buf() { return &wb_; } } // namespace shrpx diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index 11be767c..6dac8c9a 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -87,8 +87,6 @@ public: virtual void response_drain(size_t n); virtual bool response_empty() const; - void response_write(void *data, size_t len); - bool get_flow_control() const; // Perform HTTP/2 upgrade from |upstream|. On success, this object // takes ownership of the |upstream|. This function returns 0 if it @@ -114,6 +112,10 @@ public: using WriteBuffer = Buffer<32_k>; + WriteBuffer *get_response_buf(); + + void set_pending_data_downstream(Downstream *downstream, size_t n); + private: WriteBuffer wb_; std::unique_ptr pre_upstream_; @@ -121,9 +123,26 @@ private: ev_timer settings_timer_; ev_timer shutdown_timer_; ev_prepare prep_; + // A response buffer used to belong to Downstream object. This is + // moved here when response is partially written to wb_ in + // send_data_callback, but before writing them all, Downstream + // object was destroyed. On destruction of Downstream, + // pending_data_downstream_ becomes nullptr. + DefaultMemchunks pending_response_buf_; + // Downstream object whose DATA frame payload is partillay written + // to wb_ in send_data_callback. This field exists to keep track of + // its lifetime. When it is destroyed, its response buffer is + // transferred to pending_response_buf_, and this field becomes + // nullptr. + Downstream *pending_data_downstream_; ClientHandler *handler_; nghttp2_session *session_; const uint8_t *data_pending_; + // The length of lending data to be written into wb_. If + // data_pending_ is not nullptr, data_pending_ points to the data to + // write. Otherwise, pending_data_downstream_->get_response_buf() + // if pending_data_downstream_ is not nullptr, or + // pending_response_buf_ holds data to write. size_t data_pendinglen_; bool flow_control_; bool shutdown_handled_; diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 853dc087..31c444f1 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -455,7 +455,7 @@ void HttpDownstreamConnection::pause_read(IOCtrlReason reason) { int HttpDownstreamConnection::resume_read(IOCtrlReason reason, size_t consumed) { - if (!downstream_->response_buf_full()) { + if (downstream_->get_response_buf()->rleft() == 0) { ioctrl_.resume_read(reason); }