diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 6e89a5c4..d67c472e 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -386,11 +386,6 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl, get_config()->conn.upstream.ratelimit.read, writecb, readcb, timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold, get_config()->tls.dyn_rec.idle_timeout), - pinned_http2sessions_( - get_config()->conn.downstream.proto == PROTO_HTTP2 - ? make_unique>( - worker->get_downstream_addr_groups().size(), -1) - : nullptr), ipaddr_(ipaddr), port_(port), faddr_(faddr), @@ -714,15 +709,30 @@ ClientHandler::get_downstream_connection(Downstream *downstream) { auto dconn_pool = worker_->get_dconn_pool(); if (downstreamconf.proto == PROTO_HTTP2) { - Http2Session *http2session; - auto &pinned = (*pinned_http2sessions_)[group]; - if (pinned == -1) { - http2session = worker_->next_http2_session(group); - pinned = http2session->get_index(); - } else { - auto dgrp = worker_->get_dgrp(group); - http2session = dgrp->http2sessions[pinned].get(); + auto &addr_group = worker_->get_downstream_addr_groups()[group]; + if (addr_group.http2_freelist.empty()) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) + << "http2_freelist is empty; create new Http2Session"; + } + auto session = make_unique( + conn_.loop, worker_->get_cl_ssl_ctx(), worker_, group); + addr_group.http2_freelist.append(session.release()); } + + auto http2session = addr_group.http2_freelist.head; + + // TODO max_concurrent_streams option must be independent from + // frontend and backend. + if (http2session->max_concurrency_reached(1)) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Maximum streams are reached for Http2Session(" + << http2session + << "). Remove Http2Session from http2_freelist"; + } + addr_group.http2_freelist.remove(http2session); + } + dconn = make_unique(dconn_pool, http2session); } else { dconn = make_unique(dconn_pool, group, diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 9e932ae2..42b03c26 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -144,7 +144,6 @@ private: Connection conn_; ev_timer reneg_shutdown_timer_; std::unique_ptr upstream_; - std::unique_ptr> pinned_http2sessions_; // IP address of client. If UNIX domain socket is used, this is // "localhost". std::string ipaddr_; diff --git a/src/shrpx_config.h b/src/shrpx_config.h index aba62253..1081f123 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -61,6 +61,7 @@ namespace shrpx { struct LogFragment; class ConnectBlocker; +class Http2Session; namespace ssl { @@ -304,6 +305,16 @@ struct DownstreamAddrGroup { ImmutableString pattern; std::vector addrs; + // List of Http2Session which is not fully utilized (i.e., the + // server advertized maximum concurrency is not reached). We will + // coalesce as much stream as possible in one Http2Session to fully + // utilize TCP connection. + // + // TODO Verify that this approach performs better in performance + // wise. + DList http2_freelist; + // Next downstream address index in addrs. + size_t next; }; struct TicketKey { diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index f7ac9a82..7ed816a9 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -74,6 +74,10 @@ void connchk_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { SSLOG(INFO, http2session) << "ping timeout"; } http2session->disconnect(); + + if (http2session->get_num_dconns() == 0) { + delete http2session; + } return; default: if (LOG_ENABLED(INFO)) { @@ -92,6 +96,9 @@ void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { SSLOG(INFO, http2session) << "SETTINGS timeout"; if (http2session->terminate_session(NGHTTP2_SETTINGS_TIMEOUT) != 0) { http2session->disconnect(); + if (http2session->get_num_dconns() == 0) { + delete http2session; + } return; } http2session->signal_write(); @@ -109,6 +116,9 @@ void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { http2session->disconnect(http2session->get_state() == Http2Session::CONNECTING); + if (http2session->get_num_dconns() == 0) { + delete http2session; + } } } // namespace @@ -120,6 +130,9 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) { rv = http2session->do_read(); if (rv != 0) { http2session->disconnect(http2session->should_hard_fail()); + if (http2session->get_num_dconns() == 0) { + delete http2session; + } return; } http2session->connection_alive(); @@ -127,6 +140,9 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) { rv = http2session->do_write(); if (rv != 0) { http2session->disconnect(http2session->should_hard_fail()); + if (http2session->get_num_dconns() == 0) { + delete http2session; + } return; } } @@ -140,6 +156,9 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { rv = http2session->do_write(); if (rv != 0) { http2session->disconnect(http2session->should_hard_fail()); + if (http2session->get_num_dconns() == 0) { + delete http2session; + } return; } http2session->reset_connection_check_timer_if_not_checking(); @@ -147,8 +166,10 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { } // namespace Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, - Worker *worker, size_t group, size_t idx) - : conn_(loop, -1, nullptr, worker->get_mcpool(), + Worker *worker, size_t group) + : dlnext(nullptr), + dlprev(nullptr), + conn_(loop, -1, nullptr, worker->get_mcpool(), get_config()->conn.downstream.timeout.write, get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb, timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold, @@ -159,11 +180,9 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, addr_(nullptr), session_(nullptr), group_(group), - index_(idx), state_(DISCONNECTED), connection_check_state_(CONNECTION_CHECK_NONE), flow_control_(false) { - read_ = write_ = &Http2Session::noop; on_read_ = &Http2Session::read_noop; @@ -182,7 +201,17 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, settings_timer_.data = this; } -Http2Session::~Http2Session() { disconnect(); } +Http2Session::~Http2Session() { + disconnect(); + + if (in_freelist()) { + if (LOG_ENABLED(INFO)) { + SSLOG(INFO, this) << "Removed from http2_freelist"; + } + auto &addr_group = worker_->get_downstream_addr_groups()[group_]; + addr_group.http2_freelist.remove(this); + } +} int Http2Session::disconnect(bool hard) { if (LOG_ENABLED(INFO)) { @@ -252,8 +281,8 @@ int Http2Session::disconnect(bool hard) { int Http2Session::initiate_connection() { int rv = 0; - auto &groups = worker_->get_downstream_addr_groups(); - auto &addrs = groups[group_].addrs; + auto &addr_group = worker_->get_downstream_addr_groups()[group_]; + auto &addrs = addr_group.addrs; auto worker_blocker = worker_->get_connect_blocker(); if (state_ == DISCONNECTED) { @@ -265,7 +294,7 @@ int Http2Session::initiate_connection() { return -1; } - auto &next_downstream = worker_->get_dgrp(group_)->next; + auto &next_downstream = addr_group.next; auto end = next_downstream; for (;;) { @@ -598,6 +627,18 @@ void Http2Session::remove_downstream_connection( Http2DownstreamConnection *dconn) { dconns_.remove(dconn); dconn->detach_stream_data(); + + if (LOG_ENABLED(INFO)) { + SSLOG(INFO, this) << "Remove downstream"; + } + + if (!in_freelist() && !max_concurrency_reached()) { + if (LOG_ENABLED(INFO)) { + SSLOG(INFO, this) << "Append to Http2Session freelist"; + } + auto &addr_group = worker_->get_downstream_addr_groups()[group_]; + addr_group.http2_freelist.append(this); + } } void Http2Session::remove_stream_data(StreamData *sd) { @@ -1892,8 +1933,6 @@ const DownstreamAddr *Http2Session::get_addr() const { return addr_; } size_t Http2Session::get_group() const { return group_; } -size_t Http2Session::get_index() const { return index_; } - int Http2Session::handle_downstream_push_promise(Downstream *downstream, int32_t promised_stream_id) { auto upstream = downstream->get_upstream(); @@ -1986,4 +2025,24 @@ int Http2Session::handle_downstream_push_promise_complete( return 0; } +size_t Http2Session::get_num_dconns() const { return dconns_.size(); } + +bool Http2Session::in_freelist() const { + auto &addr_group = worker_->get_downstream_addr_groups()[group_]; + + return dlnext != nullptr || dlprev != nullptr || + addr_group.http2_freelist.head == this || + addr_group.http2_freelist.tail == this; +} + +bool Http2Session::max_concurrency_reached(size_t extra) const { + if (!session_) { + return dconns_.size() + extra >= 100; + } + + return dconns_.size() + extra >= + nghttp2_session_get_remote_settings( + session_, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS); +} + } // namespace shrpx diff --git a/src/shrpx_http2_session.h b/src/shrpx_http2_session.h index 62e1235c..bc81f420 100644 --- a/src/shrpx_http2_session.h +++ b/src/shrpx_http2_session.h @@ -57,7 +57,7 @@ struct StreamData { class Http2Session { public: Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, - size_t group, size_t idx); + size_t group); ~Http2Session(); // If hard is true, all pending requests are abandoned and @@ -149,13 +149,27 @@ public: size_t get_group() const; - size_t get_index() const; - int handle_downstream_push_promise(Downstream *downstream, int32_t promised_stream_id); int handle_downstream_push_promise_complete(Downstream *downstream, Downstream *promised_downstream); + // Returns number of downstream connections, including pushed + // streams. + size_t get_num_dconns() const; + + // Returns true if this object is included in freelist. See + // DownstreamAddrGroup object. + bool in_freelist() const; + + // Returns true if the maximum concurrency is reached. In other + // words, the number of currently participated streams in this + // session is equal or greater than the max concurrent streams limit + // advertised by server. If |extra| is nonzero, it is added to the + // number of current concurrent streams when comparing against + // server initiated concurrency limit. + bool max_concurrency_reached(size_t extra = 0) const; + enum { // Disconnected DISCONNECTED, @@ -184,6 +198,8 @@ public: using ReadBuf = Buffer<8_k>; + Http2Session *dlnext, *dlprev; + private: Connection conn_; DefaultMemchunks wb_; @@ -207,9 +223,6 @@ private: const DownstreamAddr *addr_; nghttp2_session *session_; size_t group_; - // index inside group, this is used to pin frontend to certain - // HTTP/2 backend for better throughput. - size_t index_; int state_; int connection_check_state_; bool flow_control_; diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index f0b6fd3c..e8f0e298 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -157,10 +157,10 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { conn_.set_ssl(ssl); } - auto &next_downstream = worker_->get_dgrp(group_)->next; - auto end = next_downstream; auto &groups = worker_->get_downstream_addr_groups(); auto &addrs = groups[group_].addrs; + auto &next_downstream = groups[group_].next; + auto end = next_downstream; for (;;) { auto &addr = addrs[next_downstream]; diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 2cb71acc..51ee9ad3 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -73,7 +73,6 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, : randgen_(rd()), dconn_pool_(get_config()->conn.downstream.addr_groups.size()), worker_stat_(get_config()->conn.downstream.addr_groups.size()), - dgrps_(get_config()->conn.downstream.addr_groups.size()), loop_(loop), sv_ssl_ctx_(sv_ssl_ctx), cl_ssl_ctx_(cl_ssl_ctx), @@ -98,24 +97,6 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, StringRef{session_cacheconf.memcached.host}, &mcpool_); } - auto &downstreamconf = get_config()->conn.downstream; - - if (downstreamconf.proto == PROTO_HTTP2) { - auto n = get_config()->http2.downstream.connections_per_worker; - size_t group = 0; - for (auto &dgrp : dgrps_) { - auto m = n; - if (m == 0) { - m = downstreamconf.addr_groups[group].addrs.size(); - } - for (size_t idx = 0; idx < m; ++idx) { - dgrp.http2sessions.push_back( - make_unique(loop_, cl_ssl_ctx, this, group, idx)); - } - ++group; - } - } - for (auto &group : downstream_addr_groups_) { for (auto &addr : group.addrs) { addr.connect_blocker = new ConnectBlocker(randgen_, loop_); @@ -255,22 +236,6 @@ WorkerStat *Worker::get_worker_stat() { return &worker_stat_; } DownstreamConnectionPool *Worker::get_dconn_pool() { return &dconn_pool_; } -Http2Session *Worker::next_http2_session(size_t group) { - auto &dgrp = dgrps_[group]; - auto &http2sessions = dgrp.http2sessions; - if (http2sessions.empty()) { - return nullptr; - } - - auto res = http2sessions[dgrp.next_http2session].get(); - ++dgrp.next_http2session; - if (dgrp.next_http2session >= http2sessions.size()) { - dgrp.next_http2session = 0; - } - - return res; -} - struct ev_loop *Worker::get_loop() const { return loop_; } @@ -285,11 +250,6 @@ bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; } MemchunkPool *Worker::get_mcpool() { return &mcpool_; } -DownstreamGroup *Worker::get_dgrp(size_t group) { - assert(group < dgrps_.size()); - return &dgrps_[group]; -} - MemcachedDispatcher *Worker::get_session_cache_memcached_dispatcher() { return session_cache_memcached_dispatcher_.get(); } diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index aa1531e6..35917ec9 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -67,17 +67,6 @@ namespace ssl { class CertLookupTree; } // namespace ssl -struct DownstreamGroup { - DownstreamGroup() : next_http2session(0), next(0) {} - - std::vector> http2sessions; - // Next index in http2sessions. - size_t next_http2session; - // Next downstream address index corresponding to - // Config::downstream_addr_groups[]. - size_t next; -}; - struct WorkerStat { WorkerStat(size_t num_groups) : num_connections(0) {} @@ -122,7 +111,6 @@ public: WorkerStat *get_worker_stat(); DownstreamConnectionPool *get_dconn_pool(); - Http2Session *next_http2_session(size_t group); struct ev_loop *get_loop() const; SSL_CTX *get_sv_ssl_ctx() const; SSL_CTX *get_cl_ssl_ctx() const; @@ -133,8 +121,6 @@ public: MemchunkPool *get_mcpool(); void schedule_clear_mcpool(); - DownstreamGroup *get_dgrp(size_t group); - MemcachedDispatcher *get_session_cache_memcached_dispatcher(); std::mt19937 &get_randgen(); @@ -161,7 +147,6 @@ private: MemchunkPool mcpool_; DownstreamConnectionPool dconn_pool_; WorkerStat worker_stat_; - std::vector dgrps_; std::unique_ptr session_cache_memcached_dispatcher_; #ifdef HAVE_MRUBY diff --git a/src/template.h b/src/template.h index 253ea215..3b1727fc 100644 --- a/src/template.h +++ b/src/template.h @@ -99,14 +99,17 @@ template bool test_flags(T t, F flags) { // T *dlnext, which point to previous element and next element in the // list respectively. template struct DList { - DList() : head(nullptr), tail(nullptr) {} + DList() : head(nullptr), tail(nullptr), n(0) {} - DList(const DList &) = delete; + // We should delete these copy ctor and assignment operator. We + // need to them where copy is required before we add item to it. If + // you doubt, make them delete and try to compile. + DList(const DList &) = default; + DList &operator=(const DList &) = default; - DList &operator=(const DList &) = delete; - - DList(DList &&other) : head(other.head), tail(other.tail) { + DList(DList &&other) : head(other.head), tail(other.tail), n(other.n) { other.head = other.tail = nullptr; + other.n = 0; } DList &operator=(DList &&other) { @@ -115,11 +118,16 @@ template struct DList { } head = other.head; tail = other.tail; + n = other.n; + other.head = other.tail = nullptr; + other.n = 0; + return *this; } void append(T *t) { + ++n; if (tail) { tail->dlnext = t; t->dlprev = tail; @@ -130,6 +138,7 @@ template struct DList { } void remove(T *t) { + --n; auto p = t->dlprev; auto n = t->dlnext; if (p) { @@ -149,7 +158,10 @@ template struct DList { bool empty() const { return head == nullptr; } + size_t size() const { return n; } + T *head, *tail; + size_t n; }; template void dlist_delete_all(DList &dl) {