From 21007da3924f1daff0891d7c94054e96e80cd28f Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sat, 27 Feb 2016 19:39:03 +0900 Subject: [PATCH] nghttpx: Rewrite backend HTTP/2 connection coalesce strategy Previously, we use one Http2Session object per DownstreamAddrGroup. This is not flexible, and we have to provision how many HTTP/2 connection is required in advance. The new strategy is we add Http2Session object on demand. We measure the number of attached downstream connection object and server advertised concurrency limit. As long as former is smaller than the latter, we attach new downstream connection to it. Once the limit is reached, we create new Http2Session object. If the number lowers the limit, we start to share Http2Session object again. --- src/shrpx_client_handler.cc | 36 +++++++---- src/shrpx_client_handler.h | 1 - src/shrpx_config.h | 11 ++++ src/shrpx_http2_session.cc | 79 +++++++++++++++++++++---- src/shrpx_http2_session.h | 25 ++++++-- src/shrpx_http_downstream_connection.cc | 4 +- src/shrpx_worker.cc | 40 ------------- src/shrpx_worker.h | 15 ----- src/template.h | 22 +++++-- 9 files changed, 141 insertions(+), 92 deletions(-) diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 6e89a5c4..d67c472e 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -386,11 +386,6 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl, get_config()->conn.upstream.ratelimit.read, writecb, readcb, timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold, get_config()->tls.dyn_rec.idle_timeout), - pinned_http2sessions_( - get_config()->conn.downstream.proto == PROTO_HTTP2 - ? make_unique>( - worker->get_downstream_addr_groups().size(), -1) - : nullptr), ipaddr_(ipaddr), port_(port), faddr_(faddr), @@ -714,15 +709,30 @@ ClientHandler::get_downstream_connection(Downstream *downstream) { auto dconn_pool = worker_->get_dconn_pool(); if (downstreamconf.proto == PROTO_HTTP2) { - Http2Session *http2session; - auto &pinned = (*pinned_http2sessions_)[group]; - if (pinned == -1) { - http2session = worker_->next_http2_session(group); - pinned = http2session->get_index(); - } else { - auto dgrp = worker_->get_dgrp(group); - http2session = dgrp->http2sessions[pinned].get(); + auto &addr_group = worker_->get_downstream_addr_groups()[group]; + if (addr_group.http2_freelist.empty()) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) + << "http2_freelist is empty; create new Http2Session"; + } + auto session = make_unique( + conn_.loop, worker_->get_cl_ssl_ctx(), worker_, group); + addr_group.http2_freelist.append(session.release()); } + + auto http2session = addr_group.http2_freelist.head; + + // TODO max_concurrent_streams option must be independent from + // frontend and backend. + if (http2session->max_concurrency_reached(1)) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Maximum streams are reached for Http2Session(" + << http2session + << "). Remove Http2Session from http2_freelist"; + } + addr_group.http2_freelist.remove(http2session); + } + dconn = make_unique(dconn_pool, http2session); } else { dconn = make_unique(dconn_pool, group, diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 9e932ae2..42b03c26 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -144,7 +144,6 @@ private: Connection conn_; ev_timer reneg_shutdown_timer_; std::unique_ptr upstream_; - std::unique_ptr> pinned_http2sessions_; // IP address of client. If UNIX domain socket is used, this is // "localhost". std::string ipaddr_; diff --git a/src/shrpx_config.h b/src/shrpx_config.h index aba62253..1081f123 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -61,6 +61,7 @@ namespace shrpx { struct LogFragment; class ConnectBlocker; +class Http2Session; namespace ssl { @@ -304,6 +305,16 @@ struct DownstreamAddrGroup { ImmutableString pattern; std::vector addrs; + // List of Http2Session which is not fully utilized (i.e., the + // server advertized maximum concurrency is not reached). We will + // coalesce as much stream as possible in one Http2Session to fully + // utilize TCP connection. + // + // TODO Verify that this approach performs better in performance + // wise. + DList http2_freelist; + // Next downstream address index in addrs. + size_t next; }; struct TicketKey { diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index f7ac9a82..7ed816a9 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -74,6 +74,10 @@ void connchk_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { SSLOG(INFO, http2session) << "ping timeout"; } http2session->disconnect(); + + if (http2session->get_num_dconns() == 0) { + delete http2session; + } return; default: if (LOG_ENABLED(INFO)) { @@ -92,6 +96,9 @@ void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { SSLOG(INFO, http2session) << "SETTINGS timeout"; if (http2session->terminate_session(NGHTTP2_SETTINGS_TIMEOUT) != 0) { http2session->disconnect(); + if (http2session->get_num_dconns() == 0) { + delete http2session; + } return; } http2session->signal_write(); @@ -109,6 +116,9 @@ void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { http2session->disconnect(http2session->get_state() == Http2Session::CONNECTING); + if (http2session->get_num_dconns() == 0) { + delete http2session; + } } } // namespace @@ -120,6 +130,9 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) { rv = http2session->do_read(); if (rv != 0) { http2session->disconnect(http2session->should_hard_fail()); + if (http2session->get_num_dconns() == 0) { + delete http2session; + } return; } http2session->connection_alive(); @@ -127,6 +140,9 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) { rv = http2session->do_write(); if (rv != 0) { http2session->disconnect(http2session->should_hard_fail()); + if (http2session->get_num_dconns() == 0) { + delete http2session; + } return; } } @@ -140,6 +156,9 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { rv = http2session->do_write(); if (rv != 0) { http2session->disconnect(http2session->should_hard_fail()); + if (http2session->get_num_dconns() == 0) { + delete http2session; + } return; } http2session->reset_connection_check_timer_if_not_checking(); @@ -147,8 +166,10 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { } // namespace Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, - Worker *worker, size_t group, size_t idx) - : conn_(loop, -1, nullptr, worker->get_mcpool(), + Worker *worker, size_t group) + : dlnext(nullptr), + dlprev(nullptr), + conn_(loop, -1, nullptr, worker->get_mcpool(), get_config()->conn.downstream.timeout.write, get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb, timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold, @@ -159,11 +180,9 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, addr_(nullptr), session_(nullptr), group_(group), - index_(idx), state_(DISCONNECTED), connection_check_state_(CONNECTION_CHECK_NONE), flow_control_(false) { - read_ = write_ = &Http2Session::noop; on_read_ = &Http2Session::read_noop; @@ -182,7 +201,17 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, settings_timer_.data = this; } -Http2Session::~Http2Session() { disconnect(); } +Http2Session::~Http2Session() { + disconnect(); + + if (in_freelist()) { + if (LOG_ENABLED(INFO)) { + SSLOG(INFO, this) << "Removed from http2_freelist"; + } + auto &addr_group = worker_->get_downstream_addr_groups()[group_]; + addr_group.http2_freelist.remove(this); + } +} int Http2Session::disconnect(bool hard) { if (LOG_ENABLED(INFO)) { @@ -252,8 +281,8 @@ int Http2Session::disconnect(bool hard) { int Http2Session::initiate_connection() { int rv = 0; - auto &groups = worker_->get_downstream_addr_groups(); - auto &addrs = groups[group_].addrs; + auto &addr_group = worker_->get_downstream_addr_groups()[group_]; + auto &addrs = addr_group.addrs; auto worker_blocker = worker_->get_connect_blocker(); if (state_ == DISCONNECTED) { @@ -265,7 +294,7 @@ int Http2Session::initiate_connection() { return -1; } - auto &next_downstream = worker_->get_dgrp(group_)->next; + auto &next_downstream = addr_group.next; auto end = next_downstream; for (;;) { @@ -598,6 +627,18 @@ void Http2Session::remove_downstream_connection( Http2DownstreamConnection *dconn) { dconns_.remove(dconn); dconn->detach_stream_data(); + + if (LOG_ENABLED(INFO)) { + SSLOG(INFO, this) << "Remove downstream"; + } + + if (!in_freelist() && !max_concurrency_reached()) { + if (LOG_ENABLED(INFO)) { + SSLOG(INFO, this) << "Append to Http2Session freelist"; + } + auto &addr_group = worker_->get_downstream_addr_groups()[group_]; + addr_group.http2_freelist.append(this); + } } void Http2Session::remove_stream_data(StreamData *sd) { @@ -1892,8 +1933,6 @@ const DownstreamAddr *Http2Session::get_addr() const { return addr_; } size_t Http2Session::get_group() const { return group_; } -size_t Http2Session::get_index() const { return index_; } - int Http2Session::handle_downstream_push_promise(Downstream *downstream, int32_t promised_stream_id) { auto upstream = downstream->get_upstream(); @@ -1986,4 +2025,24 @@ int Http2Session::handle_downstream_push_promise_complete( return 0; } +size_t Http2Session::get_num_dconns() const { return dconns_.size(); } + +bool Http2Session::in_freelist() const { + auto &addr_group = worker_->get_downstream_addr_groups()[group_]; + + return dlnext != nullptr || dlprev != nullptr || + addr_group.http2_freelist.head == this || + addr_group.http2_freelist.tail == this; +} + +bool Http2Session::max_concurrency_reached(size_t extra) const { + if (!session_) { + return dconns_.size() + extra >= 100; + } + + return dconns_.size() + extra >= + nghttp2_session_get_remote_settings( + session_, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS); +} + } // namespace shrpx diff --git a/src/shrpx_http2_session.h b/src/shrpx_http2_session.h index 62e1235c..bc81f420 100644 --- a/src/shrpx_http2_session.h +++ b/src/shrpx_http2_session.h @@ -57,7 +57,7 @@ struct StreamData { class Http2Session { public: Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, - size_t group, size_t idx); + size_t group); ~Http2Session(); // If hard is true, all pending requests are abandoned and @@ -149,13 +149,27 @@ public: size_t get_group() const; - size_t get_index() const; - int handle_downstream_push_promise(Downstream *downstream, int32_t promised_stream_id); int handle_downstream_push_promise_complete(Downstream *downstream, Downstream *promised_downstream); + // Returns number of downstream connections, including pushed + // streams. + size_t get_num_dconns() const; + + // Returns true if this object is included in freelist. See + // DownstreamAddrGroup object. + bool in_freelist() const; + + // Returns true if the maximum concurrency is reached. In other + // words, the number of currently participated streams in this + // session is equal or greater than the max concurrent streams limit + // advertised by server. If |extra| is nonzero, it is added to the + // number of current concurrent streams when comparing against + // server initiated concurrency limit. + bool max_concurrency_reached(size_t extra = 0) const; + enum { // Disconnected DISCONNECTED, @@ -184,6 +198,8 @@ public: using ReadBuf = Buffer<8_k>; + Http2Session *dlnext, *dlprev; + private: Connection conn_; DefaultMemchunks wb_; @@ -207,9 +223,6 @@ private: const DownstreamAddr *addr_; nghttp2_session *session_; size_t group_; - // index inside group, this is used to pin frontend to certain - // HTTP/2 backend for better throughput. - size_t index_; int state_; int connection_check_state_; bool flow_control_; diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index f0b6fd3c..e8f0e298 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -157,10 +157,10 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { conn_.set_ssl(ssl); } - auto &next_downstream = worker_->get_dgrp(group_)->next; - auto end = next_downstream; auto &groups = worker_->get_downstream_addr_groups(); auto &addrs = groups[group_].addrs; + auto &next_downstream = groups[group_].next; + auto end = next_downstream; for (;;) { auto &addr = addrs[next_downstream]; diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 2cb71acc..51ee9ad3 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -73,7 +73,6 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, : randgen_(rd()), dconn_pool_(get_config()->conn.downstream.addr_groups.size()), worker_stat_(get_config()->conn.downstream.addr_groups.size()), - dgrps_(get_config()->conn.downstream.addr_groups.size()), loop_(loop), sv_ssl_ctx_(sv_ssl_ctx), cl_ssl_ctx_(cl_ssl_ctx), @@ -98,24 +97,6 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, StringRef{session_cacheconf.memcached.host}, &mcpool_); } - auto &downstreamconf = get_config()->conn.downstream; - - if (downstreamconf.proto == PROTO_HTTP2) { - auto n = get_config()->http2.downstream.connections_per_worker; - size_t group = 0; - for (auto &dgrp : dgrps_) { - auto m = n; - if (m == 0) { - m = downstreamconf.addr_groups[group].addrs.size(); - } - for (size_t idx = 0; idx < m; ++idx) { - dgrp.http2sessions.push_back( - make_unique(loop_, cl_ssl_ctx, this, group, idx)); - } - ++group; - } - } - for (auto &group : downstream_addr_groups_) { for (auto &addr : group.addrs) { addr.connect_blocker = new ConnectBlocker(randgen_, loop_); @@ -255,22 +236,6 @@ WorkerStat *Worker::get_worker_stat() { return &worker_stat_; } DownstreamConnectionPool *Worker::get_dconn_pool() { return &dconn_pool_; } -Http2Session *Worker::next_http2_session(size_t group) { - auto &dgrp = dgrps_[group]; - auto &http2sessions = dgrp.http2sessions; - if (http2sessions.empty()) { - return nullptr; - } - - auto res = http2sessions[dgrp.next_http2session].get(); - ++dgrp.next_http2session; - if (dgrp.next_http2session >= http2sessions.size()) { - dgrp.next_http2session = 0; - } - - return res; -} - struct ev_loop *Worker::get_loop() const { return loop_; } @@ -285,11 +250,6 @@ bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; } MemchunkPool *Worker::get_mcpool() { return &mcpool_; } -DownstreamGroup *Worker::get_dgrp(size_t group) { - assert(group < dgrps_.size()); - return &dgrps_[group]; -} - MemcachedDispatcher *Worker::get_session_cache_memcached_dispatcher() { return session_cache_memcached_dispatcher_.get(); } diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index aa1531e6..35917ec9 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -67,17 +67,6 @@ namespace ssl { class CertLookupTree; } // namespace ssl -struct DownstreamGroup { - DownstreamGroup() : next_http2session(0), next(0) {} - - std::vector> http2sessions; - // Next index in http2sessions. - size_t next_http2session; - // Next downstream address index corresponding to - // Config::downstream_addr_groups[]. - size_t next; -}; - struct WorkerStat { WorkerStat(size_t num_groups) : num_connections(0) {} @@ -122,7 +111,6 @@ public: WorkerStat *get_worker_stat(); DownstreamConnectionPool *get_dconn_pool(); - Http2Session *next_http2_session(size_t group); struct ev_loop *get_loop() const; SSL_CTX *get_sv_ssl_ctx() const; SSL_CTX *get_cl_ssl_ctx() const; @@ -133,8 +121,6 @@ public: MemchunkPool *get_mcpool(); void schedule_clear_mcpool(); - DownstreamGroup *get_dgrp(size_t group); - MemcachedDispatcher *get_session_cache_memcached_dispatcher(); std::mt19937 &get_randgen(); @@ -161,7 +147,6 @@ private: MemchunkPool mcpool_; DownstreamConnectionPool dconn_pool_; WorkerStat worker_stat_; - std::vector dgrps_; std::unique_ptr session_cache_memcached_dispatcher_; #ifdef HAVE_MRUBY diff --git a/src/template.h b/src/template.h index 253ea215..3b1727fc 100644 --- a/src/template.h +++ b/src/template.h @@ -99,14 +99,17 @@ template bool test_flags(T t, F flags) { // T *dlnext, which point to previous element and next element in the // list respectively. template struct DList { - DList() : head(nullptr), tail(nullptr) {} + DList() : head(nullptr), tail(nullptr), n(0) {} - DList(const DList &) = delete; + // We should delete these copy ctor and assignment operator. We + // need to them where copy is required before we add item to it. If + // you doubt, make them delete and try to compile. + DList(const DList &) = default; + DList &operator=(const DList &) = default; - DList &operator=(const DList &) = delete; - - DList(DList &&other) : head(other.head), tail(other.tail) { + DList(DList &&other) : head(other.head), tail(other.tail), n(other.n) { other.head = other.tail = nullptr; + other.n = 0; } DList &operator=(DList &&other) { @@ -115,11 +118,16 @@ template struct DList { } head = other.head; tail = other.tail; + n = other.n; + other.head = other.tail = nullptr; + other.n = 0; + return *this; } void append(T *t) { + ++n; if (tail) { tail->dlnext = t; t->dlprev = tail; @@ -130,6 +138,7 @@ template struct DList { } void remove(T *t) { + --n; auto p = t->dlprev; auto n = t->dlnext; if (p) { @@ -149,7 +158,10 @@ template struct DList { bool empty() const { return head == nullptr; } + size_t size() const { return n; } + T *head, *tail; + size_t n; }; template void dlist_delete_all(DList &dl) {