From 46514074a487e147ca1f011396da37939b7c312f Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sat, 2 Apr 2016 23:11:03 +0900 Subject: [PATCH] nghttpx: Better load balancing between backend HTTP/2 servers --- src/shrpx_client_handler.cc | 145 ++++++++++++++++++++++++++++-------- src/shrpx_client_handler.h | 3 + src/shrpx_http2_session.cc | 125 ++++++++++++++++--------------- src/shrpx_http2_session.h | 25 ++++++- src/shrpx_http2_upstream.cc | 16 ++-- src/shrpx_https_upstream.cc | 15 ++-- src/shrpx_spdy_upstream.cc | 19 +++-- src/shrpx_worker.h | 12 ++- 8 files changed, 250 insertions(+), 110 deletions(-) diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 29b55e48..6985e079 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -47,6 +47,7 @@ #include "shrpx_downstream_connection_pool.h" #include "shrpx_downstream.h" #include "shrpx_http2_session.h" +#include "shrpx_connect_blocker.h" #ifdef HAVE_SPDYLAY #include "shrpx_spdy_upstream.h" #endif // HAVE_SPDYLAY @@ -680,6 +681,116 @@ void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) { dconn_pool.remove_downstream_connection(dconn); } +namespace { +// Returns true if load of |lhs| is lighter than that of |rhs|. +// Currently, we assume that lesser streams means lesser load. +bool load_lighter(const DownstreamAddr *lhs, const DownstreamAddr *rhs) { + return lhs->num_dconn < rhs->num_dconn; +} +} // namespace + +Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) { + auto &shared_addr = group.shared_addr; + + // First count the working backend addresses. + size_t min = 0; + for (const auto &addr : shared_addr->addrs) { + if (addr.connect_blocker->blocked()) { + continue; + } + + ++min; + } + + if (min == 0) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "No working backend address found"; + } + + return nullptr; + } + + auto &http2_avail_freelist = shared_addr->http2_avail_freelist; + + if (http2_avail_freelist.size() >= min) { + auto session = http2_avail_freelist.head; + session->remove_from_freelist(); + + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Use Http2Session " << session + << " from http2_avail_freelist"; + } + + if (session->max_concurrency_reached(1)) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Maximum streams are reached for Http2Session(" + << session << ")."; + } + } else { + session->add_to_avail_freelist(); + } + return session; + } + + DownstreamAddr *selected_addr = nullptr; + + for (auto &addr : shared_addr->addrs) { + if (addr.http2_extra_freelist.size() == 0 && + addr.connect_blocker->blocked()) { + continue; + } + + if (addr.in_avail) { + continue; + } + + if (selected_addr == nullptr || load_lighter(&addr, selected_addr)) { + selected_addr = &addr; + } + } + + assert(selected_addr); + + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Selected DownstreamAddr=" << selected_addr + << ", index=" + << (selected_addr - shared_addr->addrs.data()) / + sizeof(*selected_addr); + } + + if (selected_addr->http2_extra_freelist.size()) { + auto session = selected_addr->http2_extra_freelist.head; + session->remove_from_freelist(); + + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Use Http2Session " << session + << " from http2_extra_freelist"; + } + + if (session->max_concurrency_reached(1)) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Maximum streams are reached for Http2Session(" + << session << ")."; + } + } else { + session->add_to_avail_freelist(); + } + return session; + } + + auto session = new Http2Session( + conn_.loop, shared_addr->tls ? worker_->get_cl_ssl_ctx() : nullptr, + worker_, &group, selected_addr); + + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Create new Http2Session " << session; + } + + session->add_to_avail_freelist(); + + return session; +} + std::unique_ptr ClientHandler::get_downstream_connection(Downstream *downstream) { size_t group_idx; @@ -735,38 +846,10 @@ ClientHandler::get_downstream_connection(Downstream *downstream) { } if (shared_addr->proto == PROTO_HTTP2) { - auto &http2_freelist = shared_addr->http2_freelist; + auto http2session = select_http2_session(group); - Http2Session *http2session; - - if (http2_freelist.empty() || - http2_freelist.size() < shared_addr->addrs.size()) { - if (LOG_ENABLED(INFO)) { - if (http2_freelist.empty()) { - CLOG(INFO, this) - << "http2_freelist is empty; create new Http2Session"; - } else { - CLOG(INFO, this) << "Create new Http2Session; current " - << http2_freelist.size() << ", min " - << shared_addr->addrs.size(); - } - } - http2session = new Http2Session( - conn_.loop, shared_addr->tls ? worker_->get_cl_ssl_ctx() : nullptr, - worker_, &group); - } else { - http2session = http2_freelist.head; - http2_freelist.remove(http2session); - } - - if (http2session->max_concurrency_reached(1)) { - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "Maximum streams are reached for Http2Session(" - << http2session - << "). Remove Http2Session from http2_freelist"; - } - } else { - http2_freelist.append(http2session); + if (http2session == nullptr) { + return nullptr; } dconn = make_unique(http2session); diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 8d1bc7c0..e883d6f0 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -49,6 +49,7 @@ class ConnectBlocker; class DownstreamConnectionPool; class Worker; struct WorkerStat; +struct DownstreamAddrGroup; class ClientHandler { public: @@ -141,6 +142,8 @@ public: // header field. StringRef get_forwarded_for() const; + Http2Session *select_http2_session(DownstreamAddrGroup &group); + private: Connection conn_; ev_timer reneg_shutdown_timer_; diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index f18ada48..8527aa53 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -182,7 +182,8 @@ void initiate_connection_cb(struct ev_loop *loop, ev_timer *w, int revents) { } // namespace Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, - Worker *worker, DownstreamAddrGroup *group) + Worker *worker, DownstreamAddrGroup *group, + DownstreamAddr *addr) : dlnext(nullptr), dlprev(nullptr), conn_(loop, -1, nullptr, worker->get_mcpool(), @@ -194,10 +195,11 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, worker_(worker), ssl_ctx_(ssl_ctx), group_(group), - addr_(nullptr), + addr_(addr), session_(nullptr), state_(DISCONNECTED), connection_check_state_(CONNECTION_CHECK_NONE), + freelist_zone_(FREELIST_ZONE_NONE), flow_control_(false) { read_ = write_ = &Http2Session::noop; @@ -223,12 +225,7 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Http2Session::~Http2Session() { disconnect(true); - if (in_freelist()) { - if (LOG_ENABLED(INFO)) { - SSLOG(INFO, this) << "Removed from http2_freelist"; - } - group_->shared_addr->http2_freelist.remove(this); - } + remove_from_freelist(); } int Http2Session::disconnect(bool hard) { @@ -254,8 +251,6 @@ int Http2Session::disconnect(bool hard) { conn_.disconnect(); - addr_ = nullptr; - if (proxy_htp_) { proxy_htp_.reset(); } @@ -298,8 +293,6 @@ int Http2Session::disconnect(bool hard) { int Http2Session::initiate_connection() { int rv = 0; - auto &shared_addr = group_->shared_addr; - auto &addrs = shared_addr->addrs; auto worker_blocker = worker_->get_connect_blocker(); if (state_ == DISCONNECTED) { @@ -310,42 +303,6 @@ int Http2Session::initiate_connection() { } return -1; } - - auto &next_downstream = shared_addr->next; - auto end = next_downstream; - - for (;;) { - auto &addr = addrs[next_downstream]; - - if (++next_downstream >= addrs.size()) { - next_downstream = 0; - } - - auto &connect_blocker = addr.connect_blocker; - - if (connect_blocker->blocked()) { - if (LOG_ENABLED(INFO)) { - SSLOG(INFO, this) << "Backend server " - << util::to_numeric_addr(&addr.addr) - << " was not available temporarily"; - } - - if (end == next_downstream) { - return -1; - } - - continue; - } - - if (LOG_ENABLED(INFO)) { - SSLOG(INFO, this) << "Using downstream address idx=" << next_downstream - << " out of " << addrs.size(); - } - - addr_ = &addr; - - break; - } } auto &connect_blocker = addr_->connect_blocker; @@ -647,10 +604,12 @@ int Http2Session::downstream_connect_proxy() { void Http2Session::add_downstream_connection(Http2DownstreamConnection *dconn) { dconns_.append(dconn); + ++addr_->num_dconn; } void Http2Session::remove_downstream_connection( Http2DownstreamConnection *dconn) { + --addr_->num_dconn; dconns_.remove(dconn); dconn->detach_stream_data(); @@ -658,11 +617,14 @@ void Http2Session::remove_downstream_connection( SSLOG(INFO, this) << "Remove downstream"; } - if (!in_freelist() && !max_concurrency_reached()) { + if (freelist_zone_ == FREELIST_ZONE_NONE && !max_concurrency_reached()) { if (LOG_ENABLED(INFO)) { - SSLOG(INFO, this) << "Append to Http2Session freelist"; + SSLOG(INFO, this) << "Append to http2_extra_freelist, addr=" << addr_ + << ", freelist.size=" + << addr_->http2_extra_freelist.size(); } - group_->shared_addr->http2_freelist.append(this); + + add_to_extra_freelist(); } } @@ -2087,14 +2049,6 @@ int Http2Session::handle_downstream_push_promise_complete( size_t Http2Session::get_num_dconns() const { return dconns_.size(); } -bool Http2Session::in_freelist() const { - auto &shared_addr = group_->shared_addr; - auto &http2_freelist = shared_addr->http2_freelist; - - return dlnext != nullptr || dlprev != nullptr || - http2_freelist.head == this || http2_freelist.tail == this; -} - bool Http2Session::max_concurrency_reached(size_t extra) const { if (!session_) { return dconns_.size() + extra >= 100; @@ -2112,4 +2066,57 @@ DownstreamAddrGroup *Http2Session::get_downstream_addr_group() const { return group_; } +void Http2Session::add_to_avail_freelist() { + assert(freelist_zone_ == FREELIST_ZONE_NONE); + + if (LOG_ENABLED(INFO)) { + SSLOG(INFO, this) << "Append to http2_avail_freelist, group=" << group_ + << ", freelist.size=" + << group_->shared_addr->http2_avail_freelist.size(); + } + + freelist_zone_ = FREELIST_ZONE_AVAIL; + group_->shared_addr->http2_avail_freelist.append(this); + addr_->in_avail = true; +} + +void Http2Session::add_to_extra_freelist() { + assert(freelist_zone_ == FREELIST_ZONE_NONE); + + if (LOG_ENABLED(INFO)) { + SSLOG(INFO, this) << "Append to http2_extra_freelist, addr=" << addr_ + << ", freelist.size=" + << addr_->http2_extra_freelist.size(); + } + + freelist_zone_ = FREELIST_ZONE_EXTRA; + addr_->http2_extra_freelist.append(this); +} + +void Http2Session::remove_from_freelist() { + switch (freelist_zone_) { + case FREELIST_ZONE_NONE: + return; + case FREELIST_ZONE_AVAIL: + if (LOG_ENABLED(INFO)) { + SSLOG(INFO, this) << "Remove from http2_avail_freelist, group=" << group_ + << ", freelist.size=" + << group_->shared_addr->http2_avail_freelist.size(); + } + group_->shared_addr->http2_avail_freelist.remove(this); + addr_->in_avail = false; + break; + case FREELIST_ZONE_EXTRA: + if (LOG_ENABLED(INFO)) { + SSLOG(INFO, this) << "Remove from http2_extra_freelist, addr=" << addr_ + << ", freelist.size=" + << addr_->http2_extra_freelist.size(); + } + addr_->http2_extra_freelist.remove(this); + break; + } + + freelist_zone_ = FREELIST_ZONE_NONE; +} + } // namespace shrpx diff --git a/src/shrpx_http2_session.h b/src/shrpx_http2_session.h index 16fbac1d..c6253a25 100644 --- a/src/shrpx_http2_session.h +++ b/src/shrpx_http2_session.h @@ -56,10 +56,21 @@ struct StreamData { Http2DownstreamConnection *dconn; }; +enum FreelistZone { + // Http2Session object is not linked in any freelist. + FREELIST_ZONE_NONE, + // Http2Session object is linked in group scope + // http2_avail_freelist. + FREELIST_ZONE_AVAIL, + // Http2Session object is linked in address scope + // http2_extra_freelist. + FREELIST_ZONE_EXTRA +}; + class Http2Session { public: Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, - DownstreamAddrGroup *group); + DownstreamAddrGroup *group, DownstreamAddr *addr); ~Http2Session(); // If hard is true, all pending requests are abandoned and @@ -160,9 +171,14 @@ public: // streams. size_t get_num_dconns() const; - // Returns true if this object is included in freelist. See - // DownstreamAddrGroup object. - bool in_freelist() const; + // Adds to group scope http2_avail_freelist. + void add_to_avail_freelist(); + // Adds to address scope http2_extra_freelist. + void add_to_extra_freelist(); + + // Removes this object from any freelist. If this object is not + // linked from any freelist, this function does nothing. + void remove_from_freelist(); // Returns true if the maximum concurrency is reached. In other // words, the number of currently participated streams in this @@ -229,6 +245,7 @@ private: nghttp2_session *session_; int state_; int connection_check_state_; + int freelist_zone_; bool flow_control_; }; diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 3eaa3368..25c8d9e6 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -385,9 +385,9 @@ void Http2Upstream::start_downstream(Downstream *downstream) { void Http2Upstream::initiate_downstream(Downstream *downstream) { int rv; - rv = downstream->attach_downstream_connection( - handler_->get_downstream_connection(downstream)); - if (rv != 0) { + auto dconn = handler_->get_downstream_connection(downstream); + if (!dconn || + (rv = downstream->attach_downstream_connection(std::move(dconn))) != 0) { // downstream connection fails, send error page if (error_reply(downstream, 503) != 0) { rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); @@ -1739,6 +1739,8 @@ int Http2Upstream::on_downstream_reset(bool no_retry) { downstream->add_retry(); + std::unique_ptr dconn; + if (no_retry || downstream->no_more_retry()) { goto fail; } @@ -1746,8 +1748,12 @@ int Http2Upstream::on_downstream_reset(bool no_retry) { // downstream connection is clean; we can retry with new // downstream connection. - rv = downstream->attach_downstream_connection( - handler_->get_downstream_connection(downstream)); + dconn = handler_->get_downstream_connection(downstream); + if (!dconn) { + goto fail; + } + + rv = downstream->attach_downstream_connection(std::move(dconn)); if (rv != 0) { goto fail; } diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index 2501c116..a9bdd3e8 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -394,10 +394,10 @@ int htp_hdrs_completecb(http_parser *htp) { return 0; } - rv = downstream->attach_downstream_connection( - handler->get_downstream_connection(downstream)); + auto dconn = handler->get_downstream_connection(downstream); - if (rv != 0) { + if (!dconn || + (rv = downstream->attach_downstream_connection(std::move(dconn))) != 0) { downstream->set_request_state(Downstream::CONNECT_FAIL); return -1; @@ -1172,6 +1172,7 @@ void HttpsUpstream::on_handler_delete() { int HttpsUpstream::on_downstream_reset(bool no_retry) { int rv; + std::unique_ptr dconn; if (!downstream_->request_submission_ready()) { // Return error so that caller can delete handler @@ -1186,8 +1187,12 @@ int HttpsUpstream::on_downstream_reset(bool no_retry) { goto fail; } - rv = downstream_->attach_downstream_connection( - handler_->get_downstream_connection(downstream_.get())); + dconn = handler_->get_downstream_connection(downstream_.get()); + if (!dconn) { + goto fail; + } + + rv = downstream_->attach_downstream_connection(std::move(dconn)); if (rv != 0) { goto fail; } diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 040b5f6e..758ff525 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -335,9 +335,12 @@ void SpdyUpstream::start_downstream(Downstream *downstream) { } void SpdyUpstream::initiate_downstream(Downstream *downstream) { - int rv = downstream->attach_downstream_connection( - handler_->get_downstream_connection(downstream)); - if (rv != 0) { + int rv; + + auto dconn = handler_->get_downstream_connection(downstream); + + if (!dconn || + (rv = downstream->attach_downstream_connection(std::move(dconn))) != 0) { // If downstream connection fails, issue RST_STREAM. rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); downstream->set_request_state(Downstream::CONNECT_FAIL); @@ -1247,6 +1250,8 @@ int SpdyUpstream::on_downstream_reset(bool no_retry) { downstream->add_retry(); + std::unique_ptr dconn; + if (no_retry || downstream->no_more_retry()) { goto fail; } @@ -1254,8 +1259,12 @@ int SpdyUpstream::on_downstream_reset(bool no_retry) { // downstream connection is clean; we can retry with new // downstream connection. - rv = downstream->attach_downstream_connection( - handler_->get_downstream_connection(downstream)); + dconn = handler_->get_downstream_connection(downstream); + if (!dconn) { + goto fail; + } + + rv = downstream->attach_downstream_connection(std::move(dconn)); if (rv != 0) { goto fail; } diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index 9e3f9915..8224c497 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -81,6 +81,16 @@ struct DownstreamAddr { std::unique_ptr connect_blocker; // Client side TLS session cache TLSSessionCache tls_session_cache; + // Http2Session object created for this address. This list chains + // all Http2Session objects that is not in group scope + // http2_avail_freelist, and is not reached in maximum concurrency. + DList http2_extra_freelist; + // true if Http2Session for this address is in group scope + // SharedDownstreamAddr.http2_avail_freelist + bool in_avail; + // total number of streams created in HTTP/2 connections for this + // address. + size_t num_dconn; }; struct SharedDownstreamAddr { @@ -94,7 +104,7 @@ struct SharedDownstreamAddr { // // TODO Verify that this approach performs better in performance // wise. - DList http2_freelist; + DList http2_avail_freelist; DownstreamConnectionPool dconn_pool; // Next downstream address index in addrs. size_t next;