From 0f9ed40bd9b42f552cfa2702eafd3ac57de7e845 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Tue, 22 Mar 2016 23:51:00 +0900 Subject: [PATCH] nghttpx: Share connection among different patterns if address set are same --- src/shrpx_client_handler.cc | 29 +++++++++------- src/shrpx_http2_session.cc | 19 +++++++---- src/shrpx_http_downstream_connection.cc | 11 +++--- src/shrpx_worker.cc | 45 +++++++++++++++++++++++-- src/shrpx_worker.h | 8 +++-- 5 files changed, 85 insertions(+), 27 deletions(-) diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index bdc4c32a..180dbb3a 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -666,7 +666,7 @@ void ClientHandler::pool_downstream_connection( << " in group " << group; } - auto &dconn_pool = group->dconn_pool; + auto &dconn_pool = group->shared_addr->dconn_pool; dconn_pool.add_downstream_connection(std::move(dconn)); } @@ -675,7 +675,8 @@ void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) { CLOG(INFO, this) << "Removing downstream connection DCONN:" << dconn << " from pool"; } - auto &dconn_pool = dconn->get_downstream_addr_group()->dconn_pool; + auto &dconn_pool = + dconn->get_downstream_addr_group()->shared_addr->dconn_pool; dconn_pool.remove_downstream_connection(dconn); } @@ -722,7 +723,9 @@ ClientHandler::get_downstream_connection(Downstream *downstream) { } auto &group = worker_->get_downstream_addr_groups()[group_idx]; - auto &dconn_pool = group.dconn_pool; + auto &shared_addr = group.shared_addr; + auto &dconn_pool = shared_addr->dconn_pool; + auto dconn = dconn_pool.pop_downstream_connection(); if (!dconn) { @@ -731,25 +734,27 @@ ClientHandler::get_downstream_connection(Downstream *downstream) { << " Create new one"; } - if (group.proto == PROTO_HTTP2) { - if (group.http2_freelist.empty() || - group.http2_freelist.size() < group.addrs.size()) { + if (shared_addr->proto == PROTO_HTTP2) { + auto &http2_freelist = shared_addr->http2_freelist; + + if (http2_freelist.empty() || + http2_freelist.size() < shared_addr->addrs.size()) { if (LOG_ENABLED(INFO)) { - if (group.http2_freelist.empty()) { + if (http2_freelist.empty()) { CLOG(INFO, this) << "http2_freelist is empty; create new Http2Session"; } else { CLOG(INFO, this) << "Create new Http2Session; current " - << group.http2_freelist.size() << ", min " - << group.addrs.size(); + << http2_freelist.size() << ", min " + << shared_addr->addrs.size(); } } auto session = make_unique( conn_.loop, worker_->get_cl_ssl_ctx(), worker_, &group); - group.http2_freelist.append(session.release()); + http2_freelist.append(session.release()); } - auto http2session = group.http2_freelist.head; + auto http2session = http2_freelist.head; if (http2session->max_concurrency_reached(1)) { if (LOG_ENABLED(INFO)) { @@ -757,7 +762,7 @@ ClientHandler::get_downstream_connection(Downstream *downstream) { << http2session << "). Remove Http2Session from http2_freelist"; } - group.http2_freelist.remove(http2session); + http2_freelist.remove(http2session); } dconn = make_unique(http2session); diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index 84258896..954ec021 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -208,7 +208,7 @@ Http2Session::~Http2Session() { if (LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "Removed from http2_freelist"; } - group_->http2_freelist.remove(this); + group_->shared_addr->http2_freelist.remove(this); } } @@ -280,7 +280,8 @@ int Http2Session::disconnect(bool hard) { int Http2Session::initiate_connection() { int rv = 0; - auto &addrs = group_->addrs; + auto &shared_addr = group_->shared_addr; + auto &addrs = shared_addr->addrs; auto worker_blocker = worker_->get_connect_blocker(); if (state_ == DISCONNECTED) { @@ -292,7 +293,7 @@ int Http2Session::initiate_connection() { return -1; } - auto &next_downstream = group_->next; + auto &next_downstream = shared_addr->next; auto end = next_downstream; for (;;) { @@ -643,7 +644,7 @@ void Http2Session::remove_downstream_connection( if (LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "Append to Http2Session freelist"; } - group_->http2_freelist.append(this); + group_->shared_addr->http2_freelist.append(this); } } @@ -1242,6 +1243,10 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, return 0; } + case NGHTTP2_GOAWAY: + SSLOG(WARN, http2session) + << "GOAWAY received; error_code=" << frame->goaway.error_code; + return 0; default: return 0; } @@ -2056,9 +2061,11 @@ int Http2Session::handle_downstream_push_promise_complete( size_t Http2Session::get_num_dconns() const { return dconns_.size(); } bool Http2Session::in_freelist() const { + auto &shared_addr = group_->shared_addr; + auto &http2_freelist = shared_addr->http2_freelist; + return dlnext != nullptr || dlprev != nullptr || - group_->http2_freelist.head == this || - group_->http2_freelist.tail == this; + http2_freelist.head == this || http2_freelist.tail == this; } bool Http2Session::max_concurrency_reached(size_t extra) const { diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 893c9efc..55620e3e 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -158,8 +158,9 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { conn_.set_ssl(ssl); } - auto &addrs = group_->addrs; - auto &next_downstream = group_->next; + auto &shared_addr = group_->shared_addr; + auto &addrs = shared_addr->addrs; + auto &next_downstream = shared_addr->next; auto end = next_downstream; for (;;) { auto &addr = addrs[next_downstream]; @@ -511,7 +512,8 @@ void idle_readcb(struct ev_loop *loop, ev_io *w, int revents) { if (LOG_ENABLED(INFO)) { DCLOG(INFO, dconn) << "Idle connection EOF"; } - auto &dconn_pool = dconn->get_downstream_addr_group()->dconn_pool; + auto &dconn_pool = + dconn->get_downstream_addr_group()->shared_addr->dconn_pool; dconn_pool.remove_downstream_connection(dconn); // dconn was deleted } @@ -524,7 +526,8 @@ void idle_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { if (LOG_ENABLED(INFO)) { DCLOG(INFO, dconn) << "Idle connection timeout"; } - auto &dconn_pool = dconn->get_downstream_addr_group()->dconn_pool; + auto &dconn_pool = + dconn->get_downstream_addr_group()->shared_addr->dconn_pool; dconn_pool.remove_downstream_connection(dconn); // dconn was deleted } diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 13f84eb8..3b15c358 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -62,6 +62,28 @@ void mcpool_clear_cb(struct ev_loop *loop, ev_timer *w, int revents) { } } // namespace +namespace { +bool match_shared_downstream_addr( + const std::shared_ptr &lhs, + const std::shared_ptr &rhs) { + if (lhs->addrs.size() != rhs->addrs.size() || lhs->proto != rhs->proto) { + return false; + } + + for (auto &a : lhs->addrs) { + if (std::find_if(std::begin(rhs->addrs), std::end(rhs->addrs), + [&a](const DownstreamAddr &b) { + return a.host == b.host && a.port == b.port && + a.host_unix == b.host_unix; + }) == std::end(rhs->addrs)) { + return false; + } + } + + return true; +} +} // namespace + namespace { std::random_device rd; } // namespace @@ -103,12 +125,14 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, auto &dst = downstream_addr_groups_[i]; dst.pattern = src.pattern; - dst.addrs.resize(src.addrs.size()); - dst.proto = src.proto; + + auto shared_addr = std::make_shared(); + shared_addr->addrs.resize(src.addrs.size()); + shared_addr->proto = src.proto; for (size_t j = 0; j < src.addrs.size(); ++j) { auto &src_addr = src.addrs[j]; - auto &dst_addr = dst.addrs[j]; + auto &dst_addr = shared_addr->addrs[j]; dst_addr.addr = src_addr.addr; dst_addr.host = src_addr.host; @@ -118,6 +142,21 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, dst_addr.connect_blocker = make_unique(randgen_, loop_); } + + // share the connection if patterns have the same set of backend + // addresses. + auto end = std::begin(downstream_addr_groups_) + i; + auto it = std::find_if(std::begin(downstream_addr_groups_), end, + [&shared_addr](const DownstreamAddrGroup &group) { + return match_shared_downstream_addr( + group.shared_addr, shared_addr); + }); + + if (it == end) { + dst.shared_addr = shared_addr; + } else { + dst.shared_addr = (*it).shared_addr; + } } } diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index 17a9fee9..bd7be800 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -83,8 +83,7 @@ struct DownstreamAddr { TLSSessionCache tls_session_cache; }; -struct DownstreamAddrGroup { - ImmutableString pattern; +struct SharedDownstreamAddr { std::vector addrs; // Application protocol used in this group shrpx_proto proto; @@ -101,6 +100,11 @@ struct DownstreamAddrGroup { size_t next; }; +struct DownstreamAddrGroup { + ImmutableString pattern; + std::shared_ptr shared_addr; +}; + struct WorkerStat { size_t num_connections; };