From 011e3b325da8c1fa4e02e4d802bd57acdfe32f55 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Fri, 13 Feb 2015 22:41:50 +0900 Subject: [PATCH] nghttpx: Cancel backend request when frontend HTTP/1 connection is lost --- src/shrpx_client_handler.cc | 15 ++++++++++----- src/shrpx_http2_session.cc | 2 +- src/shrpx_http2_upstream.cc | 2 ++ src/shrpx_http_downstream_connection.cc | 2 +- src/shrpx_https_upstream.cc | 24 ++++++++++++++++-------- src/shrpx_spdy_upstream.cc | 2 ++ 6 files changed, 32 insertions(+), 15 deletions(-) diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index b611632e..6c059b4d 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -107,14 +107,15 @@ int ClientHandler::read_clear() { ev_timer_again(conn_.loop, &conn_.rt); for (;;) { - // we should process buffered data first before we read EOF. if (rb_.rleft() && on_read() != 0) { return -1; } - if (rb_.rleft()) { + if (rb_.rleft() == 0) { + rb_.reset(); + } else if (rb_.wleft() == 0) { + conn_.rlimit.stopw(); return 0; } - rb_.reset(); auto nread = conn_.read_clear(rb_.last, rb_.wleft()); @@ -199,10 +200,12 @@ int ClientHandler::read_tls() { if (rb_.rleft() && on_read() != 0) { return -1; } - if (rb_.rleft()) { + if (rb_.rleft() == 0) { + rb_.reset(); + } else if (rb_.wleft() == 0) { + conn_.rlimit.stopw(); return 0; } - rb_.reset(); auto nread = conn_.read_tls(rb_.last, rb_.wleft()); @@ -292,6 +295,7 @@ int ClientHandler::upstream_http2_connhd_read() { left_connhd_len_ -= nread; rb_.drain(nread); + conn_.rlimit.startw(); if (left_connhd_len_ == 0) { on_read_ = &ClientHandler::upstream_read; @@ -330,6 +334,7 @@ int ClientHandler::upstream_http1_connhd_read() { left_connhd_len_ -= nread; rb_.drain(nread); + conn_.rlimit.startw(); if (left_connhd_len_ == 0) { if (LOG_ENABLED(INFO)) { diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index c49ef848..eef42369 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -842,7 +842,7 @@ int on_response_headers(Http2Session *http2session, Downstream *downstream, if (downstream->get_upgraded()) { downstream->set_response_connection_close(true); // On upgrade sucess, both ends can send data - if (upstream->resume_read(SHRPX_MSG_BLOCK, downstream, 0) != 0) { + if (upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0) != 0) { // If resume_read fails, just drop connection. Not ideal. delete upstream->get_client_handler(); return -1; diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index a39dea63..3564c6d3 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -811,6 +811,7 @@ Http2Upstream::~Http2Upstream() { int Http2Upstream::on_read() { ssize_t rv = 0; auto rb = handler_->get_rb(); + auto rlimit = handler_->get_rlimit(); if (rb->rleft()) { rv = nghttp2_session_mem_recv(session_, rb->pos, rb->rleft()); @@ -826,6 +827,7 @@ int Http2Upstream::on_read() { // success. assert(static_cast(rv) == rb->rleft()); rb->reset(); + rlimit->startw(); } auto wb = handler_->get_wb(); diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index db353dfc..b664b8d3 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -526,7 +526,7 @@ int htp_hdrs_completecb(http_parser *htp) { if (downstream->get_upgraded()) { // Upgrade complete, read until EOF in both ends - if (upstream->resume_read(SHRPX_MSG_BLOCK, downstream, 0) != 0) { + if (upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0) != 0) { return -1; } downstream->set_request_state(Downstream::HEADER_COMPLETE); diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index 30439860..1c799c7e 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -253,6 +253,7 @@ http_parser_settings htp_hooks = { // one http request is fully received. int HttpsUpstream::on_read() { auto rb = handler_->get_rb(); + auto rlimit = handler_->get_rlimit(); auto downstream = get_downstream(); if (rb->rleft() == 0) { @@ -270,6 +271,7 @@ int HttpsUpstream::on_read() { } rb->reset(); + rlimit->startw(); if (downstream->request_buf_full()) { if (LOG_ENABLED(INFO)) { @@ -283,10 +285,22 @@ int HttpsUpstream::on_read() { return 0; } + if (downstream) { + // To avoid reading next pipelined request + switch (downstream->get_request_state()) { + case Downstream::INITIAL: + case Downstream::HEADER_COMPLETE: + break; + default: + return 0; + } + } + auto nread = http_parser_execute( &htp_, &htp_hooks, reinterpret_cast(rb->pos), rb->rleft()); rb->drain(nread); + rlimit->startw(); // Well, actually header length + some body bytes current_header_length_ += nread; @@ -303,8 +317,6 @@ int HttpsUpstream::on_read() { assert(downstream); if (downstream->get_request_state() == Downstream::CONNECT_FAIL) { - // Following paues_read is needed to avoid reading next data. - pause_read(SHRPX_MSG_BLOCK); error_reply(503); handler_->signal_write(); // Downstream gets deleted after response body is read. @@ -333,8 +345,6 @@ int HttpsUpstream::on_read() { return 0; } - pause_read(SHRPX_MSG_BLOCK); - return 0; } @@ -345,8 +355,6 @@ int HttpsUpstream::on_read() { << http_errno_description(htperr); } - pause_read(SHRPX_MSG_BLOCK); - unsigned int status_code; if (downstream && @@ -423,7 +431,7 @@ int HttpsUpstream::on_write() { // We need this if response ends before request. if (downstream->get_request_state() == Downstream::MSG_COMPLETE) { delete_downstream(); - return resume_read(SHRPX_MSG_BLOCK, nullptr, 0); + return resume_read(SHRPX_NO_BUFFER, nullptr, 0); } } @@ -441,7 +449,7 @@ void HttpsUpstream::pause_read(IOCtrlReason reason) { int HttpsUpstream::resume_read(IOCtrlReason reason, Downstream *downstream, size_t consumed) { - // downstream could be nullptr if reason is SHRPX_MSG_BLOCK. + // downstream could be nullptr if (downstream && downstream->request_buf_full()) { return 0; } diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 6ca21c11..4e76f683 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -68,6 +68,7 @@ ssize_t recv_callback(spdylay_session *session, uint8_t *buf, size_t len, auto upstream = static_cast(user_data); auto handler = upstream->get_client_handler(); auto rb = handler->get_rb(); + auto rlimit = handler->get_rlimit(); if (rb->rleft() == 0) { return SPDYLAY_ERR_WOULDBLOCK; @@ -77,6 +78,7 @@ ssize_t recv_callback(spdylay_session *session, uint8_t *buf, size_t len, memcpy(buf, rb->pos, nread); rb->drain(nread); + rlimit->startw(); return nread; }