From f267e400fa43cec27dc1e400ad661cbc27fc3d10 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Thu, 15 Sep 2016 00:25:41 +0900 Subject: [PATCH] nghttpx: Migrate backend stream to another h2 session on graceful shutdown --- src/shrpx_client_handler.cc | 71 ++++++++++++++++----- src/shrpx_downstream.cc | 10 ++- src/shrpx_downstream.h | 6 ++ src/shrpx_http2_downstream_connection.cc | 8 +-- src/shrpx_http2_session.cc | 78 ++++++++++++++++-------- src/shrpx_http_downstream_connection.cc | 4 ++ 6 files changed, 130 insertions(+), 47 deletions(-) diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 4feb973c..bf72a907 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -770,34 +770,73 @@ Http2Session *ClientHandler::select_http2_session( auto &http2_avail_freelist = shared_addr->http2_avail_freelist; if (http2_avail_freelist.size() >= min) { - auto session = http2_avail_freelist.head; - session->remove_from_freelist(); + for (auto session = http2_avail_freelist.head; session;) { + auto next = session->dlnext; - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "Use Http2Session " << session - << " from http2_avail_freelist"; - } + session->remove_from_freelist(); - if (session->max_concurrency_reached(1)) { - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "Maximum streams are reached for Http2Session(" - << session << ")."; + // session may be in graceful shutdown period now. + if (session->max_concurrency_reached(0)) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) + << "Maximum streams have been reached for Http2Session(" + << session << "). Skip it"; + } + + session = next; + + continue; } - } else { - session->add_to_avail_freelist(); + + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Use Http2Session " << session + << " from http2_avail_freelist"; + } + + if (session->max_concurrency_reached(1)) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Maximum streams are reached for Http2Session(" + << session << ")."; + } + } else { + session->add_to_avail_freelist(); + } + return session; } - return session; } DownstreamAddr *selected_addr = nullptr; for (auto &addr : shared_addr->addrs) { - if (addr.proto != PROTO_HTTP2 || (addr.http2_extra_freelist.size() == 0 && - addr.connect_blocker->blocked())) { + if (addr.in_avail || addr.proto != PROTO_HTTP2 || + (addr.http2_extra_freelist.size() == 0 && + addr.connect_blocker->blocked())) { continue; } - if (addr.in_avail) { + for (auto session = addr.http2_extra_freelist.head; session;) { + auto next = session->dlnext; + + // session may be in graceful shutdown period now. + if (session->max_concurrency_reached(0)) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) + << "Maximum streams have been reached for Http2Session(" + << session << "). Skip it"; + } + + session->remove_from_freelist(); + + session = next; + + continue; + } + + break; + } + + if (addr.http2_extra_freelist.size() == 0 && + addr.connect_blocker->blocked()) { continue; } diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index b30cab74..27cb8a18 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -137,7 +137,8 @@ Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool, chunked_request_(false), chunked_response_(false), expect_final_response_(false), - request_pending_(false) { + request_pending_(false), + request_header_sent_(false) { auto &timeoutconf = get_config()->http2.timeout; @@ -885,7 +886,7 @@ bool Downstream::accesslog_ready() const { return resp_.http_status > 0; } void Downstream::add_retry() { ++num_retry_; } -bool Downstream::no_more_retry() const { return num_retry_ > 5; } +bool Downstream::no_more_retry() const { return num_retry_ > 50; } void Downstream::set_request_downstream_host(const StringRef &host) { request_downstream_host_ = host; @@ -895,10 +896,13 @@ void Downstream::set_request_pending(bool f) { request_pending_ = f; } bool Downstream::get_request_pending() const { return request_pending_; } +void Downstream::set_request_header_sent(bool f) { request_header_sent_ = f; } + bool Downstream::request_submission_ready() const { return (request_state_ == Downstream::HEADER_COMPLETE || request_state_ == Downstream::MSG_COMPLETE) && - request_pending_ && response_state_ == Downstream::INITIAL; + (request_pending_ || !request_header_sent_) && + response_state_ == Downstream::INITIAL; } int Downstream::get_dispatch_state() const { return dispatch_state_; } diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index 8ae62847..d5514ea8 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -302,7 +302,11 @@ public: DefaultMemchunks *get_request_buf(); void set_request_pending(bool f); bool get_request_pending() const; + void set_request_header_sent(bool f); // Returns true if request is ready to be submitted to downstream. + // When sending pending request, get_request_pending() should be + // checked too because this function may return true when + // get_request_pending() returns false. bool request_submission_ready() const; // downstream response API @@ -471,6 +475,8 @@ private: // has not been established or should be checked before use; // currently used only with HTTP/2 connection. bool request_pending_; + // true if downstream request header is considered to be sent. + bool request_header_sent_; }; } // namespace shrpx diff --git a/src/shrpx_http2_downstream_connection.cc b/src/shrpx_http2_downstream_connection.cc index fb601f43..2a85f980 100644 --- a/src/shrpx_http2_downstream_connection.cc +++ b/src/shrpx_http2_downstream_connection.cc @@ -117,11 +117,11 @@ void Http2DownstreamConnection::detach_downstream(Downstream *downstream) { auto &resp = downstream_->response(); - if (submit_rst_stream(downstream) == 0) { - http2session_->signal_write(); - } - if (downstream_->get_downstream_stream_id() != -1) { + if (submit_rst_stream(downstream) == 0) { + http2session_->signal_write(); + } + http2session_->consume(downstream_->get_downstream_stream_id(), resp.unconsumed_body_length); diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index 9c258a32..9a8eee91 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -665,7 +665,6 @@ int Http2Session::submit_request(Http2DownstreamConnection *dconn, } dconn->attach_stream_data(sd.get()); - dconn->get_downstream()->set_downstream_stream_id(stream_id); streams_.append(sd.release()); return 0; @@ -1396,8 +1395,17 @@ int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame, auto downstream = sd->dconn->get_downstream(); - if (!downstream || - downstream->get_downstream_stream_id() != frame->hd.stream_id) { + if (!downstream) { + return 0; + } + + if (frame->hd.type == NGHTTP2_HEADERS && + frame->headers.cat == NGHTTP2_HCAT_REQUEST) { + assert(downstream->get_downstream_stream_id() == -1); + + downstream->set_downstream_stream_id(frame->hd.stream_id); + downstream->set_request_header_sent(true); + } else if (downstream->get_downstream_stream_id() != frame->hd.stream_id) { return 0; } @@ -1422,29 +1430,49 @@ int on_frame_not_send_callback(nghttp2_session *session, if (LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "Failed to send control frame type=" << static_cast(frame->hd.type) - << "lib_error_code=" << lib_error_code << ":" + << ", lib_error_code=" << lib_error_code << ": " << nghttp2_strerror(lib_error_code); } - if (frame->hd.type == NGHTTP2_HEADERS && - lib_error_code != NGHTTP2_ERR_STREAM_CLOSED && - lib_error_code != NGHTTP2_ERR_STREAM_CLOSING) { - // To avoid stream hanging around, flag Downstream::MSG_RESET. - auto sd = static_cast( - nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); - if (!sd) { - return 0; - } - if (!sd->dconn) { - return 0; - } - auto downstream = sd->dconn->get_downstream(); - if (!downstream || - downstream->get_downstream_stream_id() != frame->hd.stream_id) { - return 0; - } - downstream->set_response_state(Downstream::MSG_RESET); - call_downstream_readcb(http2session, downstream); + if (frame->hd.type != NGHTTP2_HEADERS || + lib_error_code == NGHTTP2_ERR_STREAM_CLOSED || + lib_error_code == NGHTTP2_ERR_STREAM_CLOSING) { + return 0; } + + auto sd = static_cast( + nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); + if (!sd) { + return 0; + } + if (!sd->dconn) { + return 0; + } + auto downstream = sd->dconn->get_downstream(); + if (!downstream) { + return 0; + } + + if (lib_error_code == NGHTTP2_ERR_START_STREAM_NOT_ALLOWED) { + // Migrate to another downstream connection. + auto upstream = downstream->get_upstream(); + + if (upstream->on_downstream_reset(downstream, false)) { + // This should be done for h1 upstream only. Deleting + // ClientHandler for h2 or SPDY upstream may lead to crash. + delete upstream->get_client_handler(); + } + + return 0; + } + + if (downstream->get_downstream_stream_id() != frame->hd.stream_id) { + return 0; + } + + // To avoid stream hanging around, flag Downstream::MSG_RESET. + downstream->set_response_state(Downstream::MSG_RESET); + call_downstream_readcb(http2session, downstream); + return 0; } } // namespace @@ -1815,7 +1843,8 @@ void Http2Session::submit_pending_requests() { for (auto dconn = dconns_.head; dconn; dconn = dconn->dlnext) { auto downstream = dconn->get_downstream(); - if (!downstream || !downstream->request_submission_ready()) { + if (!downstream || !downstream->get_request_pending() || + !downstream->request_submission_ready()) { continue; } @@ -2153,6 +2182,7 @@ int Http2Session::handle_downstream_push_promise_complete( auto upstream = promised_downstream->get_upstream(); promised_downstream->set_request_state(Downstream::MSG_COMPLETE); + promised_downstream->set_request_header_sent(true); if (upstream->on_downstream_push_promise_complete(downstream, promised_downstream) != 0) { diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 65e2cb37..9a454a21 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -344,6 +344,10 @@ int HttpDownstreamConnection::push_request_headers() { auto &httpconf = get_config()->http; + // Set request_sent to true because we write request into buffer + // here. + downstream_->set_request_header_sent(true); + // For HTTP/1.0 request, there is no authority in request. In that // case, we use backend server's host nonetheless. auto authority = StringRef(downstream_hostport);