diff --git a/src/shrpx.cc b/src/shrpx.cc index c9103b99..4690a3f5 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -918,6 +918,7 @@ void fill_default_config() { mod_config()->no_server_push = false; mod_config()->host_unix = false; mod_config()->http2_downstream_connchk = false; + mod_config()->http2_downstream_connections_per_worker = 0; } } // namespace @@ -2064,6 +2065,11 @@ int main(int argc, char **argv) { } } + if (get_config()->http2_downstream_connections_per_worker == 0) { + mod_config()->http2_downstream_connections_per_worker = + get_config()->downstream_addrs.size(); + } + if (get_config()->rlimit_nofile) { struct rlimit lim = {static_cast(get_config()->rlimit_nofile), static_cast(get_config()->rlimit_nofile)}; diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 9ca38690..b7f1a50c 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -576,6 +576,9 @@ void ClientHandler::set_should_close_after_write(bool f) { void ClientHandler::pool_downstream_connection( std::unique_ptr dconn) { + if (!dconn->poolable()) { + return; + } if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Pooling downstream connection DCONN:" << dconn.get(); } @@ -605,7 +608,7 @@ ClientHandler::get_downstream_connection() { } auto dconn_pool = worker_->get_dconn_pool(); - auto http2session = worker_->get_http2_session(); + auto http2session = worker_->next_http2_session(); if (http2session) { dconn = make_unique(dconn_pool, http2session); @@ -628,12 +631,8 @@ ClientHandler::get_downstream_connection() { SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; } -Http2Session *ClientHandler::get_http2_session() const { - return worker_->get_http2_session(); -} - -ConnectBlocker *ClientHandler::get_http1_connect_blocker() const { - return worker_->get_http1_connect_blocker(); +ConnectBlocker *ClientHandler::get_connect_blocker() const { + return worker_->get_connect_blocker(); } void ClientHandler::direct_http2_upgrade() { diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 96021a59..ac18d4e7 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -93,8 +93,7 @@ public: void remove_downstream_connection(DownstreamConnection *dconn); std::unique_ptr get_downstream_connection(); SSL *get_ssl() const; - Http2Session *get_http2_session() const; - ConnectBlocker *get_http1_connect_blocker() const; + ConnectBlocker *get_connect_blocker() const; // Call this function when HTTP/2 connection header is received at // the start of the connection. void direct_http2_upgrade(); diff --git a/src/shrpx_config.h b/src/shrpx_config.h index d4cec158..b15c09f5 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -260,6 +260,7 @@ struct Config { size_t http2_downstream_window_bits; size_t http2_upstream_connection_window_bits; size_t http2_downstream_connection_window_bits; + size_t http2_downstream_connections_per_worker; size_t downstream_connections_per_host; size_t downstream_connections_per_frontend; // actual size of downstream_http_proxy_addr diff --git a/src/shrpx_downstream_connection.h b/src/shrpx_downstream_connection.h index 5594ccf7..49ca1beb 100644 --- a/src/shrpx_downstream_connection.h +++ b/src/shrpx_downstream_connection.h @@ -58,6 +58,9 @@ public: virtual void on_upstream_change(Upstream *uptream) = 0; virtual int on_priority_change(int32_t pri) = 0; + // true if this object is poolable. + virtual bool poolable() const = 0; + void set_client_handler(ClientHandler *client_handler); ClientHandler *get_client_handler(); Downstream *get_downstream(); diff --git a/src/shrpx_http2_downstream_connection.h b/src/shrpx_http2_downstream_connection.h index cee6265a..d0febf94 100644 --- a/src/shrpx_http2_downstream_connection.h +++ b/src/shrpx_http2_downstream_connection.h @@ -62,6 +62,10 @@ public: virtual void on_upstream_change(Upstream *upstream) {} virtual int on_priority_change(int32_t pri); + // This object is not poolable because we dont' have facility to + // migrate to another Http2Session object. + virtual bool poolable() const { return false; } + int send(); void attach_stream_data(StreamData *sd); diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index 19aac74e..d2b1dba4 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -40,6 +40,7 @@ #include "shrpx_ssl.h" #include "shrpx_http.h" #include "shrpx_worker.h" +#include "shrpx_connect_blocker.h" #include "http2.h" #include "util.h" #include "base64.h" @@ -140,14 +141,14 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { } // namespace Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, - Worker *worker) + ConnectBlocker *connect_blocker, Worker *worker) : conn_(loop, -1, nullptr, get_config()->downstream_write_timeout, get_config()->downstream_read_timeout, 0, 0, 0, 0, writecb, readcb, timeoutcb, this), - worker_(worker), ssl_ctx_(ssl_ctx), session_(nullptr), - data_pending_(nullptr), data_pendinglen_(0), addr_idx_(0), - state_(DISCONNECTED), connection_check_state_(CONNECTION_CHECK_NONE), - flow_control_(false) { + worker_(worker), connect_blocker_(connect_blocker), ssl_ctx_(ssl_ctx), + session_(nullptr), data_pending_(nullptr), data_pendinglen_(0), + addr_idx_(0), state_(DISCONNECTED), + connection_check_state_(CONNECTION_CHECK_NONE), flow_control_(false) { read_ = write_ = &Http2Session::noop; on_read_ = on_write_ = &Http2Session::noop; @@ -237,6 +238,14 @@ int Http2Session::initiate_connection() { int rv = 0; if (state_ == DISCONNECTED) { + if (connect_blocker_->blocked()) { + if (LOG_ENABLED(INFO)) { + DCLOG(INFO, this) + << "Downstream connection was blocked by connect_blocker"; + } + return -1; + } + auto worker_stat = worker_->get_worker_stat(); addr_idx_ = worker_stat->next_downstream; ++worker_stat->next_downstream; @@ -261,6 +270,7 @@ int Http2Session::initiate_connection() { get_config()->downstream_http_proxy_addr.storage.ss_family); if (conn_.fd == -1) { + connect_blocker_->on_failure(); return -1; } @@ -270,6 +280,7 @@ int Http2Session::initiate_connection() { SSLOG(ERROR, this) << "Failed to connect to the proxy " << get_config()->downstream_http_proxy_host.get() << ":" << get_config()->downstream_http_proxy_port; + connect_blocker_->on_failure(); return -1; } @@ -329,6 +340,7 @@ int Http2Session::initiate_connection() { conn_.fd = util::create_nonblock_socket( downstream_addr.addr.storage.ss_family); if (conn_.fd == -1) { + connect_blocker_->on_failure(); return -1; } @@ -337,6 +349,7 @@ int Http2Session::initiate_connection() { const_cast(&downstream_addr.addr.sa), downstream_addr.addrlen); if (rv != 0 && errno != EINPROGRESS) { + connect_blocker_->on_failure(); return -1; } @@ -358,12 +371,14 @@ int Http2Session::initiate_connection() { downstream_addr.addr.storage.ss_family); if (conn_.fd == -1) { + connect_blocker_->on_failure(); return -1; } rv = connect(conn_.fd, const_cast(&downstream_addr.addr.sa), downstream_addr.addrlen); if (rv != 0 && errno != EINPROGRESS) { + connect_blocker_->on_failure(); return -1; } @@ -1377,7 +1392,9 @@ void Http2Session::signal_write() { LOG(INFO) << "Start connecting to backend server"; } if (initiate_connection() != 0) { - SSLOG(FATAL, this) << "Could not initiate backend connection"; + if (LOG_ENABLED(INFO)) { + SSLOG(INFO, this) << "Could not initiate backend connection"; + } disconnect(true); } break; @@ -1513,6 +1530,8 @@ int Http2Session::connected() { return -1; } + connect_blocker_->on_success(); + if (LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "Connection established"; } diff --git a/src/shrpx_http2_session.h b/src/shrpx_http2_session.h index 2aa0ae58..21b1c776 100644 --- a/src/shrpx_http2_session.h +++ b/src/shrpx_http2_session.h @@ -47,6 +47,7 @@ namespace shrpx { class Http2DownstreamConnection; class Worker; +class ConnectBlocker; struct StreamData { Http2DownstreamConnection *dconn; @@ -54,7 +55,8 @@ struct StreamData { class Http2Session { public: - Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker); + Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, + ConnectBlocker *connect_blocker, Worker *worker); ~Http2Session(); int check_cert(); @@ -192,6 +194,7 @@ private: // Used to parse the response from HTTP proxy std::unique_ptr proxy_htp_; Worker *worker_; + ConnectBlocker *connect_blocker_; // NULL if no TLS is configured SSL_CTX *ssl_ctx_; nghttp2_session *session_; diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 537cc30c..25fe4c59 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -1200,8 +1200,6 @@ Downstream *Http2Upstream::find_downstream(int32_t stream_id) { return downstream_queue_.find(stream_id); } -nghttp2_session *Http2Upstream::get_http2_session() { return session_; } - // WARNING: Never call directly or indirectly nghttp2_session_send or // nghttp2_session_recv. These calls may delete downstream. int Http2Upstream::on_downstream_header_complete(Downstream *downstream) { diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index 077297df..5b4c087c 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -64,8 +64,6 @@ public: void remove_downstream(Downstream *downstream); Downstream *find_downstream(int32_t stream_id); - nghttp2_session *get_http2_session(); - int rst_stream(Downstream *downstream, uint32_t error_code); int terminate_session(uint32_t error_code); int error_reply(Downstream *downstream, unsigned int status_code); diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 4b5c06f8..11aa90e4 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -130,7 +130,7 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { } if (conn_.fd == -1) { - auto connect_blocker = client_handler_->get_http1_connect_blocker(); + auto connect_blocker = client_handler_->get_connect_blocker(); if (connect_blocker->blocked()) { if (LOG_ENABLED(INFO)) { @@ -768,7 +768,7 @@ int HttpDownstreamConnection::on_write() { } int HttpDownstreamConnection::on_connect() { - auto connect_blocker = client_handler_->get_http1_connect_blocker(); + auto connect_blocker = client_handler_->get_connect_blocker(); if (!util::check_socket_connected(conn_.fd)) { conn_.wlimit.stopw(); diff --git a/src/shrpx_http_downstream_connection.h b/src/shrpx_http_downstream_connection.h index 2eec5def..87354a9e 100644 --- a/src/shrpx_http_downstream_connection.h +++ b/src/shrpx_http_downstream_connection.h @@ -59,6 +59,8 @@ public: virtual void on_upstream_change(Upstream *upstream); virtual int on_priority_change(int32_t pri) { return 0; } + virtual bool poolable() const { return true; } + int on_connect(); void signal_write(); diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 095e3b24..5e35808f 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -821,8 +821,6 @@ Downstream *SpdyUpstream::find_downstream(int32_t stream_id) { return downstream_queue_.find(stream_id); } -spdylay_session *SpdyUpstream::get_http2_session() { return session_; } - // WARNING: Never call directly or indirectly spdylay_session_send or // spdylay_session_recv. These calls may delete downstream. int SpdyUpstream::on_downstream_header_complete(Downstream *downstream) { diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index 9d83fc67..d35fadc1 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -60,8 +60,6 @@ public: void remove_downstream(Downstream *downstream); Downstream *find_downstream(int32_t stream_id); - spdylay_session *get_http2_session(); - int rst_stream(Downstream *downstream, int status_code); int error_reply(Downstream *downstream, unsigned int status_code); diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 99b5c6e6..f7d4d489 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -51,17 +51,20 @@ void eventcb(struct ev_loop *loop, ev_async *w, int revents) { Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, ssl::CertLookupTree *cert_tree, const std::shared_ptr &ticket_keys) - : loop_(loop), sv_ssl_ctx_(sv_ssl_ctx), cl_ssl_ctx_(cl_ssl_ctx), - cert_tree_(cert_tree), ticket_keys_(ticket_keys), + : next_http2session_(0), loop_(loop), sv_ssl_ctx_(sv_ssl_ctx), + cl_ssl_ctx_(cl_ssl_ctx), cert_tree_(cert_tree), ticket_keys_(ticket_keys), + connect_blocker_(make_unique(loop_)), graceful_shutdown_(false) { ev_async_init(&w_, eventcb); w_.data = this; ev_async_start(loop_, &w_); if (get_config()->downstream_proto == PROTO_HTTP2) { - http2session_ = make_unique(loop_, cl_ssl_ctx, this); - } else { - http1_connect_blocker_ = make_unique(loop_); + auto n = get_config()->http2_downstream_connections_per_worker; + for (; n > 0; --n) { + http2sessions_.push_back(make_unique( + loop_, cl_ssl_ctx, connect_blocker_.get(), this)); + } } } @@ -185,10 +188,22 @@ WorkerStat *Worker::get_worker_stat() { return &worker_stat_; } DownstreamConnectionPool *Worker::get_dconn_pool() { return &dconn_pool_; } -Http2Session *Worker::get_http2_session() const { return http2session_.get(); } +Http2Session *Worker::next_http2_session() { + if (http2sessions_.empty()) { + return nullptr; + } -ConnectBlocker *Worker::get_http1_connect_blocker() const { - return http1_connect_blocker_.get(); + auto res = http2sessions_[next_http2session_].get(); + ++next_http2session_; + if (next_http2session_ >= http2sessions_.size()) { + next_http2session_ = 0; + } + + return res; +} + +ConnectBlocker *Worker::get_connect_blocker() const { + return connect_blocker_.get(); } struct ev_loop *Worker::get_loop() const { diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index ba6073bb..78ed18ac 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -29,6 +29,7 @@ #include #include +#include #include #ifndef NOTHREADS #include @@ -94,8 +95,8 @@ public: void set_ticket_keys(std::shared_ptr ticket_keys); WorkerStat *get_worker_stat(); DownstreamConnectionPool *get_dconn_pool(); - Http2Session *get_http2_session() const; - ConnectBlocker *get_http1_connect_blocker() const; + Http2Session *next_http2_session(); + ConnectBlocker *get_connect_blocker() const; struct ev_loop *get_loop() const; SSL_CTX *get_sv_ssl_ctx() const; SSL_CTX *get_cl_ssl_ctx() const; @@ -104,6 +105,8 @@ public: bool get_graceful_shutdown() const; private: + std::vector> http2sessions_; + size_t next_http2session_; #ifndef NOTHREADS std::future fut_; #endif // NOTHREADS @@ -121,8 +124,7 @@ private: ssl::CertLookupTree *cert_tree_; std::shared_ptr ticket_keys_; - std::unique_ptr http2session_; - std::unique_ptr http1_connect_blocker_; + std::unique_ptr connect_blocker_; bool graceful_shutdown_; };