diff --git a/src/memchunk.h b/src/memchunk.h index d381e3cb..d373d06a 100644 --- a/src/memchunk.h +++ b/src/memchunk.h @@ -251,6 +251,29 @@ template struct Memchunks { return count - left; } + size_t remove(Memchunks &dest) { + assert(pool == dest.pool); + + if (head == nullptr) { + return 0; + } + + auto n = len; + + if (dest.tail == nullptr) { + dest.head = head; + } else { + dest.tail->next = head; + } + + dest.tail = tail; + dest.len += len; + + head = tail = nullptr; + len = 0; + + return n; + } size_t drain(size_t count) { auto ndata = count; auto m = head; diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index cdd33bb8..59c9f5f7 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -121,6 +121,7 @@ Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool, req_(balloc_), resp_(balloc_), request_start_time_(std::chrono::high_resolution_clock::now()), + blocked_request_buf_(mcpool), request_buf_(mcpool), response_buf_(mcpool), upstream_(upstream), @@ -142,7 +143,8 @@ Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool, request_pending_(false), request_header_sent_(false), accesslog_written_(false), - new_affinity_cookie_(false) { + new_affinity_cookie_(false), + blocked_request_data_eof_(false) { auto &timeoutconf = get_config()->http2.timeout; @@ -605,6 +607,12 @@ int Downstream::push_request_headers() { int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen) { req_.recv_body_length += datalen; + if (!request_header_sent_) { + blocked_request_buf_.append(data, datalen); + req_.unconsumed_body_length += datalen; + return 0; + } + // Assumes that request headers have already been pushed to output // buffer using push_request_headers(). if (!dconn_) { @@ -621,6 +629,10 @@ int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen) { } int Downstream::end_upload_data() { + if (!request_header_sent_) { + blocked_request_data_eof_ = true; + return 0; + } if (!dconn_) { DLOG(INFO, this) << "dconn_ is NULL"; return -1; @@ -1052,4 +1064,12 @@ uint32_t Downstream::get_affinity_cookie_to_send() const { return 0; } +DefaultMemchunks *Downstream::get_blocked_request_buf() { + return &blocked_request_buf_; +} + +bool Downstream::get_blocked_request_data_eof() const { + return blocked_request_data_eof_; +} + } // namespace shrpx diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index c81fcf68..3e121eb4 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -356,6 +356,9 @@ public: // get_request_pending() returns false. bool request_submission_ready() const; + DefaultMemchunks *get_blocked_request_buf(); + bool get_blocked_request_data_eof() const; + // downstream response API const Response &response() const { return resp_; } Response &response() { return resp_; } @@ -491,6 +494,9 @@ private: // or not. StringRef request_downstream_host_; + // Data arrived in frontend before sending header fields to backend + // are stored in this buffer. + DefaultMemchunks blocked_request_buf_; DefaultMemchunks request_buf_; DefaultMemchunks response_buf_; @@ -547,6 +553,9 @@ private: bool accesslog_written_; // true if affinity cookie is generated for this request. bool new_affinity_cookie_; + // true if eof is received from client before sending header fields + // to backend. + bool blocked_request_data_eof_; }; } // namespace shrpx diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index 5fbe10ac..20186d08 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -1468,6 +1468,15 @@ int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame, if (frame->hd.type == NGHTTP2_HEADERS && frame->headers.cat == NGHTTP2_HCAT_REQUEST) { downstream->set_request_header_sent(true); + auto src = downstream->get_blocked_request_buf(); + if (src->rleft()) { + auto dest = downstream->get_request_buf(); + src->remove(*dest); + if (http2session->resume_data(sd->dconn) != 0) { + return NGHTTP2_ERR_CALLBACK_FAILURE; + } + downstream->ensure_downstream_wtimer(); + } } if ((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == 0) { diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index df78dfdf..b30a2899 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -590,7 +590,7 @@ int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, auto downstream = static_cast( nghttp2_session_get_stream_user_data(session, stream_id)); - if (!downstream || !downstream->get_downstream_connection()) { + if (!downstream) { if (upstream->consume(stream_id, len) != 0) { return NGHTTP2_ERR_CALLBACK_FAILURE; } diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index f50c0f4f..0bcc41ad 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -693,11 +693,37 @@ int HttpDownstreamConnection::push_request_headers() { // Don't call signal_write() if we anticipate request body. We call // signal_write() when we received request body chunk, and it // enables us to send headers and data in one writev system call. - if (connect_method || + if (connect_method || downstream_->get_blocked_request_buf()->rleft() || (!req.http2_expect_body && req.fs.content_length == 0)) { signal_write(); } + return process_blocked_request_buf(); +} + +int HttpDownstreamConnection::process_blocked_request_buf() { + auto src = downstream_->get_blocked_request_buf(); + + if (src->rleft()) { + auto dest = downstream_->get_request_buf(); + auto chunked = downstream_->get_chunked_request(); + if (chunked) { + auto chunk_size_hex = util::utox(src->rleft()); + dest->append(chunk_size_hex); + dest->append("\r\n"); + } + + src->remove(*dest); + + if (chunked) { + dest->append("\r\n"); + } + } + + if (downstream_->get_blocked_request_data_eof()) { + return end_upload_data(); + } + return 0; } diff --git a/src/shrpx_http_downstream_connection.h b/src/shrpx_http_downstream_connection.h index baea026c..554e9b94 100644 --- a/src/shrpx_http_downstream_connection.h +++ b/src/shrpx_http_downstream_connection.h @@ -89,6 +89,8 @@ public: int noop(); + int process_blocked_request_buf(); + private: Connection conn_; std::function on_read_, on_write_,