nghttpx: Migrate backend stream to another h2 session on graceful shutdown

This commit is contained in:
Tatsuhiro Tsujikawa 2016-09-15 00:25:41 +09:00
parent 8bac5899cc
commit f267e400fa
6 changed files with 130 additions and 47 deletions

View File

@ -770,34 +770,73 @@ Http2Session *ClientHandler::select_http2_session(
auto &http2_avail_freelist = shared_addr->http2_avail_freelist;
if (http2_avail_freelist.size() >= min) {
auto session = http2_avail_freelist.head;
session->remove_from_freelist();
for (auto session = http2_avail_freelist.head; session;) {
auto next = session->dlnext;
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Use Http2Session " << session
<< " from http2_avail_freelist";
}
session->remove_from_freelist();
if (session->max_concurrency_reached(1)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
<< session << ").";
// session may be in graceful shutdown period now.
if (session->max_concurrency_reached(0)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this)
<< "Maximum streams have been reached for Http2Session("
<< session << "). Skip it";
}
session = next;
continue;
}
} else {
session->add_to_avail_freelist();
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Use Http2Session " << session
<< " from http2_avail_freelist";
}
if (session->max_concurrency_reached(1)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
<< session << ").";
}
} else {
session->add_to_avail_freelist();
}
return session;
}
return session;
}
DownstreamAddr *selected_addr = nullptr;
for (auto &addr : shared_addr->addrs) {
if (addr.proto != PROTO_HTTP2 || (addr.http2_extra_freelist.size() == 0 &&
addr.connect_blocker->blocked())) {
if (addr.in_avail || addr.proto != PROTO_HTTP2 ||
(addr.http2_extra_freelist.size() == 0 &&
addr.connect_blocker->blocked())) {
continue;
}
if (addr.in_avail) {
for (auto session = addr.http2_extra_freelist.head; session;) {
auto next = session->dlnext;
// session may be in graceful shutdown period now.
if (session->max_concurrency_reached(0)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this)
<< "Maximum streams have been reached for Http2Session("
<< session << "). Skip it";
}
session->remove_from_freelist();
session = next;
continue;
}
break;
}
if (addr.http2_extra_freelist.size() == 0 &&
addr.connect_blocker->blocked()) {
continue;
}

View File

@ -137,7 +137,8 @@ Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool,
chunked_request_(false),
chunked_response_(false),
expect_final_response_(false),
request_pending_(false) {
request_pending_(false),
request_header_sent_(false) {
auto &timeoutconf = get_config()->http2.timeout;
@ -885,7 +886,7 @@ bool Downstream::accesslog_ready() const { return resp_.http_status > 0; }
void Downstream::add_retry() { ++num_retry_; }
bool Downstream::no_more_retry() const { return num_retry_ > 5; }
bool Downstream::no_more_retry() const { return num_retry_ > 50; }
void Downstream::set_request_downstream_host(const StringRef &host) {
request_downstream_host_ = host;
@ -895,10 +896,13 @@ void Downstream::set_request_pending(bool f) { request_pending_ = f; }
bool Downstream::get_request_pending() const { return request_pending_; }
void Downstream::set_request_header_sent(bool f) { request_header_sent_ = f; }
bool Downstream::request_submission_ready() const {
return (request_state_ == Downstream::HEADER_COMPLETE ||
request_state_ == Downstream::MSG_COMPLETE) &&
request_pending_ && response_state_ == Downstream::INITIAL;
(request_pending_ || !request_header_sent_) &&
response_state_ == Downstream::INITIAL;
}
int Downstream::get_dispatch_state() const { return dispatch_state_; }

View File

@ -302,7 +302,11 @@ public:
DefaultMemchunks *get_request_buf();
void set_request_pending(bool f);
bool get_request_pending() const;
void set_request_header_sent(bool f);
// Returns true if request is ready to be submitted to downstream.
// When sending pending request, get_request_pending() should be
// checked too because this function may return true when
// get_request_pending() returns false.
bool request_submission_ready() const;
// downstream response API
@ -471,6 +475,8 @@ private:
// has not been established or should be checked before use;
// currently used only with HTTP/2 connection.
bool request_pending_;
// true if downstream request header is considered to be sent.
bool request_header_sent_;
};
} // namespace shrpx

View File

@ -117,11 +117,11 @@ void Http2DownstreamConnection::detach_downstream(Downstream *downstream) {
auto &resp = downstream_->response();
if (submit_rst_stream(downstream) == 0) {
http2session_->signal_write();
}
if (downstream_->get_downstream_stream_id() != -1) {
if (submit_rst_stream(downstream) == 0) {
http2session_->signal_write();
}
http2session_->consume(downstream_->get_downstream_stream_id(),
resp.unconsumed_body_length);

View File

@ -665,7 +665,6 @@ int Http2Session::submit_request(Http2DownstreamConnection *dconn,
}
dconn->attach_stream_data(sd.get());
dconn->get_downstream()->set_downstream_stream_id(stream_id);
streams_.append(sd.release());
return 0;
@ -1396,8 +1395,17 @@ int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame,
auto downstream = sd->dconn->get_downstream();
if (!downstream ||
downstream->get_downstream_stream_id() != frame->hd.stream_id) {
if (!downstream) {
return 0;
}
if (frame->hd.type == NGHTTP2_HEADERS &&
frame->headers.cat == NGHTTP2_HCAT_REQUEST) {
assert(downstream->get_downstream_stream_id() == -1);
downstream->set_downstream_stream_id(frame->hd.stream_id);
downstream->set_request_header_sent(true);
} else if (downstream->get_downstream_stream_id() != frame->hd.stream_id) {
return 0;
}
@ -1422,29 +1430,49 @@ int on_frame_not_send_callback(nghttp2_session *session,
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, http2session) << "Failed to send control frame type="
<< static_cast<uint32_t>(frame->hd.type)
<< "lib_error_code=" << lib_error_code << ":"
<< ", lib_error_code=" << lib_error_code << ": "
<< nghttp2_strerror(lib_error_code);
}
if (frame->hd.type == NGHTTP2_HEADERS &&
lib_error_code != NGHTTP2_ERR_STREAM_CLOSED &&
lib_error_code != NGHTTP2_ERR_STREAM_CLOSING) {
// To avoid stream hanging around, flag Downstream::MSG_RESET.
auto sd = static_cast<StreamData *>(
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if (!sd) {
return 0;
}
if (!sd->dconn) {
return 0;
}
auto downstream = sd->dconn->get_downstream();
if (!downstream ||
downstream->get_downstream_stream_id() != frame->hd.stream_id) {
return 0;
}
downstream->set_response_state(Downstream::MSG_RESET);
call_downstream_readcb(http2session, downstream);
if (frame->hd.type != NGHTTP2_HEADERS ||
lib_error_code == NGHTTP2_ERR_STREAM_CLOSED ||
lib_error_code == NGHTTP2_ERR_STREAM_CLOSING) {
return 0;
}
auto sd = static_cast<StreamData *>(
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if (!sd) {
return 0;
}
if (!sd->dconn) {
return 0;
}
auto downstream = sd->dconn->get_downstream();
if (!downstream) {
return 0;
}
if (lib_error_code == NGHTTP2_ERR_START_STREAM_NOT_ALLOWED) {
// Migrate to another downstream connection.
auto upstream = downstream->get_upstream();
if (upstream->on_downstream_reset(downstream, false)) {
// This should be done for h1 upstream only. Deleting
// ClientHandler for h2 or SPDY upstream may lead to crash.
delete upstream->get_client_handler();
}
return 0;
}
if (downstream->get_downstream_stream_id() != frame->hd.stream_id) {
return 0;
}
// To avoid stream hanging around, flag Downstream::MSG_RESET.
downstream->set_response_state(Downstream::MSG_RESET);
call_downstream_readcb(http2session, downstream);
return 0;
}
} // namespace
@ -1815,7 +1843,8 @@ void Http2Session::submit_pending_requests() {
for (auto dconn = dconns_.head; dconn; dconn = dconn->dlnext) {
auto downstream = dconn->get_downstream();
if (!downstream || !downstream->request_submission_ready()) {
if (!downstream || !downstream->get_request_pending() ||
!downstream->request_submission_ready()) {
continue;
}
@ -2153,6 +2182,7 @@ int Http2Session::handle_downstream_push_promise_complete(
auto upstream = promised_downstream->get_upstream();
promised_downstream->set_request_state(Downstream::MSG_COMPLETE);
promised_downstream->set_request_header_sent(true);
if (upstream->on_downstream_push_promise_complete(downstream,
promised_downstream) != 0) {

View File

@ -344,6 +344,10 @@ int HttpDownstreamConnection::push_request_headers() {
auto &httpconf = get_config()->http;
// Set request_sent to true because we write request into buffer
// here.
downstream_->set_request_header_sent(true);
// For HTTP/1.0 request, there is no authority in request. In that
// case, we use backend server's host nonetheless.
auto authority = StringRef(downstream_hostport);