diff --git a/src/shrpx.cc b/src/shrpx.cc index bf16178b..0876b336 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -776,7 +776,7 @@ void fill_default_config() { mod_config()->no_location_rewrite = false; mod_config()->argc = 0; mod_config()->argv = nullptr; - mod_config()->max_downstream_connections = 100; + mod_config()->downstream_connections_per_host = 8; mod_config()->listener_disable_timeout = {0, 0}; } } // namespace @@ -899,13 +899,12 @@ Performance: Set maximum number of simultaneous connections frontend accepts. Setting 0 means unlimited. Default: 0 - --backend-connections-per-frontend= - Set maximum number of backend simultaneous - connections per frontend. This option is - meaningful when the combination of HTTP/2 or SPDY - frontend and HTTP/1 backend is used. - Default: )" << get_config()->max_downstream_connections - << R"( + --backend-http1-connections-per-host= + Set maximum number of backend concurrent HTTP/1 + connections per host. This option is meaningful + when -s option is used. + Default: )" + << get_config()->downstream_connections_per_host << R"( Timeout: --frontend-http2-read-timeout= @@ -1276,7 +1275,7 @@ int main(int argc, char **argv) { {"stream-read-timeout", required_argument, &flag, 60}, {"stream-write-timeout", required_argument, &flag, 61}, {"no-location-rewrite", no_argument, &flag, 62}, - {"backend-connections-per-frontend", required_argument, &flag, 63}, + {"backend-http1-connections-per-host", required_argument, &flag, 63}, {"listener-disable-timeout", required_argument, &flag, 64}, {"strip-incoming-x-forwarded-for", no_argument, &flag, 65}, {"accesslog-format", required_argument, &flag, 66}, @@ -1568,8 +1567,8 @@ int main(int argc, char **argv) { cmdcfgs.emplace_back(SHRPX_OPT_NO_LOCATION_REWRITE, "yes"); break; case 63: - // --backend-connections-per-frontend - cmdcfgs.emplace_back(SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND, + // --backend-http1-connections-per-host + cmdcfgs.emplace_back(SHRPX_OPT_BACKEND_HTTP1_CONNECTIONS_PER_HOST, optarg); break; case 64: diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index 2b20ffb1..9b7a9322 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -132,8 +132,8 @@ const char SHRPX_OPT_ADD_RESPONSE_HEADER[] = "add-response-header"; const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[] = "worker-frontend-connections"; const char SHRPX_OPT_NO_LOCATION_REWRITE[] = "no-location-rewrite"; -const char SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND[] = - "backend-connections-per-frontend"; +const char SHRPX_OPT_BACKEND_HTTP1_CONNECTIONS_PER_HOST[] = + "backend-http1-connections-per-host"; const char SHRPX_OPT_LISTENER_DISABLE_TIMEOUT[] = "listener-disable-timeout"; namespace { @@ -1006,7 +1006,7 @@ int parse_config(const char *opt, const char *optarg) { return 0; } - if (util::strieq(opt, SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND)) { + if (util::strieq(opt, SHRPX_OPT_BACKEND_HTTP1_CONNECTIONS_PER_HOST)) { int n; if (parse_uint(&n, opt, optarg) != 0) { @@ -1019,7 +1019,7 @@ int parse_config(const char *opt, const char *optarg) { return -1; } - mod_config()->max_downstream_connections = n; + mod_config()->downstream_connections_per_host = n; return 0; } diff --git a/src/shrpx_config.h b/src/shrpx_config.h index 74690a07..0db64d54 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -123,7 +123,7 @@ extern const char SHRPX_OPT_ALTSVC[]; extern const char SHRPX_OPT_ADD_RESPONSE_HEADER[]; extern const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[]; extern const char SHRPX_OPT_NO_LOCATION_REWRITE[]; -extern const char SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND[]; +extern const char SHRPX_OPT_BACKEND_HTTP1_CONNECTIONS_PER_HOST[]; extern const char SHRPX_OPT_LISTENER_DISABLE_TIMEOUT[]; union sockaddr_union { @@ -222,7 +222,7 @@ struct Config { size_t http2_downstream_window_bits; size_t http2_upstream_connection_window_bits; size_t http2_downstream_connection_window_bits; - size_t max_downstream_connections; + size_t downstream_connections_per_host; // actual size of downstream_http_proxy_addr size_t downstream_http_proxy_addrlen; size_t read_rate; diff --git a/src/shrpx_downstream_queue.cc b/src/shrpx_downstream_queue.cc index 91b47779..9c46a18d 100644 --- a/src/shrpx_downstream_queue.cc +++ b/src/shrpx_downstream_queue.cc @@ -25,12 +25,18 @@ #include "shrpx_downstream_queue.h" #include +#include #include "shrpx_downstream.h" namespace shrpx { -DownstreamQueue::DownstreamQueue() {} +DownstreamQueue::HostEntry::HostEntry() : num_active(0) {} + +DownstreamQueue::DownstreamQueue(size_t conn_max_per_host) + : conn_max_per_host_(conn_max_per_host == 0 + ? std::numeric_limits::max() + : conn_max_per_host) {} DownstreamQueue::~DownstreamQueue() {} @@ -44,49 +50,146 @@ void DownstreamQueue::add_failure(std::unique_ptr downstream) { failure_downstreams_[stream_id] = std::move(downstream); } +DownstreamQueue::HostEntry & +DownstreamQueue::find_host_entry(const std::string &host) { + auto itr = host_entries_.find(host); + if (itr == std::end(host_entries_)) { + std::tie(itr, std::ignore) = host_entries_.emplace(host, HostEntry()); + } + return (*itr).second; +} + void DownstreamQueue::add_active(std::unique_ptr downstream) { + auto &ent = find_host_entry(downstream->get_request_http2_authority()); + ++ent.num_active; + auto stream_id = downstream->get_stream_id(); active_downstreams_[stream_id] = std::move(downstream); } -std::unique_ptr DownstreamQueue::remove(int32_t stream_id) { - auto kv = pending_downstreams_.find(stream_id); +void DownstreamQueue::add_blocked(std::unique_ptr downstream) { + auto &ent = find_host_entry(downstream->get_request_http2_authority()); + auto stream_id = downstream->get_stream_id(); + ent.blocked.insert(stream_id); + blocked_downstreams_[stream_id] = std::move(downstream); +} - if (kv != std::end(pending_downstreams_)) { - auto downstream = std::move((*kv).second); - pending_downstreams_.erase(kv); - return downstream; +bool DownstreamQueue::can_activate(const std::string &host) const { + auto itr = host_entries_.find(host); + if (itr == std::end(host_entries_)) { + return true; } + auto &ent = (*itr).second; + return ent.num_active < conn_max_per_host_; +} - kv = active_downstreams_.find(stream_id); +namespace { +std::unique_ptr +pop_downstream(DownstreamQueue::DownstreamMap::iterator i, + DownstreamQueue::DownstreamMap &downstreams) { + auto downstream = std::move((*i).second); + downstreams.erase(i); + return downstream; +} +} // namespace + +namespace { +bool remove_host_entry_if_empty(const DownstreamQueue::HostEntry &ent, + DownstreamQueue::HostEntryMap &host_entries, + const std::string &host) { + if (ent.blocked.empty() && ent.num_active == 0) { + host_entries.erase(host); + return true; + } + return false; +} +} // namespace + +std::unique_ptr DownstreamQueue::pop_pending(int32_t stream_id) { + auto itr = pending_downstreams_.find(stream_id); + if (itr == std::end(pending_downstreams_)) { + return nullptr; + } + return pop_downstream(itr, pending_downstreams_); +} + +std::unique_ptr +DownstreamQueue::remove_and_pop_blocked(int32_t stream_id) { + auto kv = active_downstreams_.find(stream_id); if (kv != std::end(active_downstreams_)) { - auto downstream = std::move((*kv).second); - active_downstreams_.erase(kv); - return downstream; + auto downstream = pop_downstream(kv, active_downstreams_); + auto &host = downstream->get_request_http2_authority(); + auto &ent = find_host_entry(host); + --ent.num_active; + + if (remove_host_entry_if_empty(ent, host_entries_, host)) { + return nullptr; + } + + if (ent.blocked.empty() || ent.num_active >= conn_max_per_host_) { + return nullptr; + } + + auto next_stream_id = *std::begin(ent.blocked); + ent.blocked.erase(std::begin(ent.blocked)); + + auto itr = blocked_downstreams_.find(next_stream_id); + assert(itr != std::end(blocked_downstreams_)); + + auto next_downstream = pop_downstream(itr, blocked_downstreams_); + + remove_host_entry_if_empty(ent, host_entries_, host); + + return next_downstream; + } + + kv = blocked_downstreams_.find(stream_id); + + if (kv != std::end(blocked_downstreams_)) { + auto downstream = pop_downstream(kv, blocked_downstreams_); + auto &host = downstream->get_request_http2_authority(); + auto &ent = find_host_entry(host); + ent.blocked.erase(stream_id); + + remove_host_entry_if_empty(ent, host_entries_, host); + + return nullptr; + } + + kv = pending_downstreams_.find(stream_id); + + if (kv != std::end(pending_downstreams_)) { + pop_downstream(kv, pending_downstreams_); + return nullptr; } kv = failure_downstreams_.find(stream_id); if (kv != std::end(failure_downstreams_)) { - auto downstream = std::move((*kv).second); - failure_downstreams_.erase(kv); - return downstream; + pop_downstream(kv, failure_downstreams_); + return nullptr; } return nullptr; } Downstream *DownstreamQueue::find(int32_t stream_id) { - auto kv = pending_downstreams_.find(stream_id); + auto kv = active_downstreams_.find(stream_id); - if (kv != std::end(pending_downstreams_)) { + if (kv != std::end(active_downstreams_)) { return (*kv).second.get(); } - kv = active_downstreams_.find(stream_id); + kv = blocked_downstreams_.find(stream_id); - if (kv != std::end(active_downstreams_)) { + if (kv != std::end(blocked_downstreams_)) { + return (*kv).second.get(); + } + + kv = pending_downstreams_.find(stream_id); + + if (kv != std::end(pending_downstreams_)) { return (*kv).second.get(); } @@ -99,41 +202,14 @@ Downstream *DownstreamQueue::find(int32_t stream_id) { return nullptr; } -std::unique_ptr DownstreamQueue::pop_pending() { - auto i = std::begin(pending_downstreams_); - - if (i == std::end(pending_downstreams_)) { - return nullptr; - } - - auto downstream = std::move((*i).second); - - pending_downstreams_.erase(i); - - return downstream; -} - -Downstream *DownstreamQueue::pending_top() const { - auto i = std::begin(pending_downstreams_); - - if (i == std::end(pending_downstreams_)) { - return nullptr; - } - - return (*i).second.get(); -} - -size_t DownstreamQueue::num_active() const { - return active_downstreams_.size(); -} - -bool DownstreamQueue::pending_empty() const { - return pending_downstreams_.empty(); -} - -const std::map> & +const DownstreamQueue::DownstreamMap & DownstreamQueue::get_active_downstreams() const { return active_downstreams_; } +const DownstreamQueue::DownstreamMap & +DownstreamQueue::get_blocked_downstreams() const { + return blocked_downstreams_; +} + } // namespace shrpx diff --git a/src/shrpx_downstream_queue.h b/src/shrpx_downstream_queue.h index 0149407d..dfc3d61c 100644 --- a/src/shrpx_downstream_queue.h +++ b/src/shrpx_downstream_queue.h @@ -30,6 +30,7 @@ #include #include +#include #include namespace shrpx { @@ -38,39 +39,67 @@ class Downstream; class DownstreamQueue { public: - DownstreamQueue(); + typedef std::map> DownstreamMap; + + struct HostEntry { + // Set of stream ID that blocked by conn_max_per_host_. + std::set blocked; + // The number of connections currently made to this host. + size_t num_active; + HostEntry(); + }; + + typedef std::map HostEntryMap; + + // conn_max_per_host == 0 means no limit for downstream connection. + DownstreamQueue(size_t conn_max_per_host = 0); ~DownstreamQueue(); void add_pending(std::unique_ptr downstream); void add_failure(std::unique_ptr downstream); + // Adds |downstream| to active_downstreams_, which means that + // downstream connection has been started. void add_active(std::unique_ptr downstream); - // Removes |downstream| from either pending_downstreams_, - // active_downstreams_ or failure_downstreams_ and returns it - // wrapped in std::unique_ptr. - std::unique_ptr remove(int32_t stream_id); + // Adds |downstream| to blocked_downstreams_, which means that + // download connection was blocked because conn_max_per_host_ limit. + void add_blocked(std::unique_ptr downstream); + // Returns true if we can make downstream connection to given + // |host|. + bool can_activate(const std::string &host) const; + // Removes pending Downstream object whose stream ID is |stream_id| + // from pending_downstreams_ and returns it. + std::unique_ptr pop_pending(int32_t stream_id); + // Removes Downstream object whose stream ID is |stream_id| from + // either pending_downstreams_, active_downstreams_, + // blocked_downstreams_ or failure_downstreams_. If a Downstream + // object is removed from active_downstreams_, this function may + // return Downstream object with the same target host in + // blocked_downstreams_ if its connection is now not blocked by + // conn_max_per_host_ limit. + std::unique_ptr remove_and_pop_blocked(int32_t stream_id); // Finds Downstream object denoted by |stream_id| either in - // pending_downstreams_, active_downstreams_ or - // failure_downstreams_. + // pending_downstreams_, active_downstreams_, blocked_downstreams_ + // or failure_downstreams_. Downstream *find(int32_t stream_id); - // Returns the number of active Downstream objects. - size_t num_active() const; - // Returns true if pending_downstreams_ is empty. - bool pending_empty() const; - // Pops first Downstream object in pending_downstreams_ and returns - // it. - std::unique_ptr pop_pending(); - // Returns first Downstream object in pending_downstreams_. This - // does not pop the first one. If queue is empty, returns nullptr. - Downstream *pending_top() const; - const std::map> & - get_active_downstreams() const; + const DownstreamMap &get_active_downstreams() const; + const DownstreamMap &get_blocked_downstreams() const; + + HostEntry &find_host_entry(const std::string &host); + + // Maximum number of concurrent connections to the same host. + size_t conn_max_per_host_; private: + // Per target host structure to keep track of the number of + // connections to the same host. + std::map host_entries_; // Downstream objects, not processed yet - std::map> pending_downstreams_; - // Downstream objects in use, consuming downstream concurrency limit - std::map> active_downstreams_; + DownstreamMap pending_downstreams_; // Downstream objects, failed to connect to downstream server - std::map> failure_downstreams_; + DownstreamMap failure_downstreams_; + // Downstream objects, downstream connection started + DownstreamMap active_downstreams_; + // Downstream objects, blocked by conn_max_per_host_ + DownstreamMap blocked_downstreams_; }; } // namespace shrpx diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 416fd4dc..e95dc5c6 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -352,35 +352,24 @@ int on_request_headers(Http2Upstream *upstream, Downstream *downstream, downstream->set_request_state(Downstream::MSG_COMPLETE); } - upstream->maintain_downstream_concurrency(); + upstream->start_downstream(downstream); return 0; } } // namespace -void Http2Upstream::maintain_downstream_concurrency() { - while (get_config()->max_downstream_connections > - downstream_queue_.num_active()) { - if (downstream_queue_.pending_empty()) { - break; - } +void Http2Upstream::start_downstream(Downstream *downstream) { + auto next_downstream = + downstream_queue_.pop_pending(downstream->get_stream_id()); + assert(next_downstream); - { - auto downstream = downstream_queue_.pending_top(); - if (downstream->get_request_state() != Downstream::HEADER_COMPLETE && - downstream->get_request_state() != Downstream::MSG_COMPLETE) { - break; - } - } - - auto downstream = downstream_queue_.pop_pending(); - - if (!downstream) { - break; - } - - initiate_downstream(std::move(downstream)); + if (downstream_queue_.can_activate( + downstream->get_request_http2_authority())) { + initiate_downstream(std::move(next_downstream)); + return; } + + downstream_queue_.add_blocked(std::move(next_downstream)); } void @@ -610,7 +599,10 @@ uint32_t infer_upstream_rst_stream_error_code(uint32_t downstream_error_code) { } // namespace Http2Upstream::Http2Upstream(ClientHandler *handler) - : handler_(handler), session_(nullptr), settings_timerev_(nullptr) { + : downstream_queue_(get_config()->http2_proxy + ? get_config()->downstream_connections_per_host + : 0), + handler_(handler), session_(nullptr), settings_timerev_(nullptr) { reset_timeouts(); int rv; @@ -1154,9 +1146,12 @@ void Http2Upstream::remove_downstream(Downstream *downstream) { handler_->write_accesslog(downstream); } - downstream_queue_.remove(downstream->get_stream_id()); + auto next_downstream = + downstream_queue_.remove_and_pop_blocked(downstream->get_stream_id()); - maintain_downstream_concurrency(); + if (next_downstream) { + initiate_downstream(std::move(next_downstream)); + } } Downstream *Http2Upstream::find_downstream(int32_t stream_id) { @@ -1400,6 +1395,11 @@ void Http2Upstream::on_handler_delete() { handler_->write_accesslog(ent.second.get()); } } + for (auto &ent : downstream_queue_.get_blocked_downstreams()) { + if (ent.second->accesslog_ready()) { + handler_->write_accesslog(ent.second.get()); + } + } } } // namespace shrpx diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index 6789047e..6cfa10b1 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -88,7 +88,7 @@ public: int consume(int32_t stream_id, size_t len); void log_response_headers(Downstream *downstream, const std::vector &nva) const; - void maintain_downstream_concurrency(); + void start_downstream(Downstream *downstream); void initiate_downstream(std::unique_ptr downstream); nghttp2::util::EvbufferBuffer sendbuf; diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index c76ea67d..2c95b913 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -232,7 +232,7 @@ void on_ctrl_recv_callback(spdylay_session *session, spdylay_frame_type type, downstream->set_request_state(Downstream::MSG_COMPLETE); } - upstream->maintain_downstream_concurrency(); + upstream->start_downstream(downstream); break; } @@ -242,29 +242,18 @@ void on_ctrl_recv_callback(spdylay_session *session, spdylay_frame_type type, } } // namespace -void SpdyUpstream::maintain_downstream_concurrency() { - while (get_config()->max_downstream_connections > - downstream_queue_.num_active()) { - if (downstream_queue_.pending_empty()) { - break; - } +void SpdyUpstream::start_downstream(Downstream *downstream) { + auto next_downstream = + downstream_queue_.pop_pending(downstream->get_stream_id()); + assert(next_downstream); - { - auto downstream = downstream_queue_.pending_top(); - if (downstream->get_request_state() != Downstream::HEADER_COMPLETE && - downstream->get_request_state() != Downstream::MSG_COMPLETE) { - break; - } - } - - auto downstream = downstream_queue_.pop_pending(); - - if (!downstream) { - break; - } - - initiate_downstream(std::move(downstream)); + if (downstream_queue_.can_activate( + downstream->get_request_http2_authority())) { + initiate_downstream(std::move(next_downstream)); + return; } + + downstream_queue_.add_blocked(std::move(next_downstream)); } void SpdyUpstream::initiate_downstream(std::unique_ptr downstream) { @@ -425,7 +414,10 @@ uint32_t infer_upstream_rst_stream_status_code(uint32_t downstream_error_code) { } // namespace SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler) - : handler_(handler), session_(nullptr) { + : downstream_queue_(get_config()->http2_proxy + ? get_config()->downstream_connections_per_host + : 0), + handler_(handler), session_(nullptr) { // handler->set_bev_cb(spdy_readcb, 0, spdy_eventcb); reset_timeouts(); @@ -875,9 +867,12 @@ void SpdyUpstream::remove_downstream(Downstream *downstream) { handler_->write_accesslog(downstream); } - downstream_queue_.remove(downstream->get_stream_id()); + auto next_downstream = + downstream_queue_.remove_and_pop_blocked(downstream->get_stream_id()); - maintain_downstream_concurrency(); + if (next_downstream) { + initiate_downstream(std::move(next_downstream)); + } } Downstream *SpdyUpstream::find_downstream(int32_t stream_id) { @@ -1094,6 +1089,11 @@ void SpdyUpstream::on_handler_delete() { handler_->write_accesslog(ent.second.get()); } } + for (auto &ent : downstream_queue_.get_blocked_downstreams()) { + if (ent.second->accesslog_ready()) { + handler_->write_accesslog(ent.second.get()); + } + } } } // namespace shrpx diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index 9c57e07d..86b6c2ac 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -81,7 +81,7 @@ public: int consume(int32_t stream_id, size_t len); - void maintain_downstream_concurrency(); + void start_downstream(Downstream *downstream); void initiate_downstream(std::unique_ptr downstream); nghttp2::util::EvbufferBuffer sendbuf;