From 0e8419ac378e71b4a0eba0a20eaaee9af5d0d154 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sat, 16 Aug 2014 21:29:20 +0900 Subject: [PATCH] nghttpx: Add backend-connections-per-frontend option This option limits the number of backend connections per frontend. This is meaningful for the combination of HTTP/2 and SPDY frontend and HTTP/1 backend. --- src/shrpx.cc | 14 +++++++ src/shrpx_config.cc | 18 ++++++++ src/shrpx_config.h | 2 + src/shrpx_downstream_queue.cc | 74 +++++++++++++++++++++++++++++---- src/shrpx_downstream_queue.h | 23 ++++++++++- src/shrpx_http2_upstream.cc | 77 ++++++++++++++++++++++++----------- src/shrpx_http2_upstream.h | 2 + src/shrpx_spdy_upstream.cc | 57 +++++++++++++++++++------- src/shrpx_spdy_upstream.h | 3 ++ 9 files changed, 222 insertions(+), 48 deletions(-) diff --git a/src/shrpx.cc b/src/shrpx.cc index e5fa1cd7..45f795c6 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -796,6 +796,7 @@ void fill_default_config() mod_config()->no_location_rewrite = false; mod_config()->argc = 0; mod_config()->argv = nullptr; + mod_config()->max_downstream_connections = 100; } } // namespace @@ -891,6 +892,13 @@ Performance: Set maximum number of simultaneous connections frontend accepts. Setting 0 means unlimited. Default: 0 + --backend-connections-per-frontend= + Set maximum number of backend simultaneous + connections per frontend. This option is + meaningful when the combination of HTTP/2 or SPDY + frontend and HTTP/1 backend is used. + Default: )" + << get_config()->max_downstream_connections << R"( Timeout: --frontend-http2-read-timeout= @@ -1246,6 +1254,7 @@ int main(int argc, char **argv) {"stream-read-timeout", required_argument, &flag, 60}, {"stream-write-timeout", required_argument, &flag, 61}, {"no-location-rewrite", no_argument, &flag, 62}, + {"backend-connections-per-frontend", required_argument, &flag, 63}, {nullptr, 0, nullptr, 0 } }; @@ -1518,6 +1527,11 @@ int main(int argc, char **argv) // --no-location-rewrite cmdcfgs.emplace_back(SHRPX_OPT_NO_LOCATION_REWRITE, "yes"); break; + case 63: + // --backend-connections-per-frontend + cmdcfgs.emplace_back(SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND, + optarg); + break; default: break; } diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index cc4fb9aa..622dd1ae 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -125,6 +125,8 @@ const char SHRPX_OPT_ADD_RESPONSE_HEADER[] = "add-response-header"; const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[] = "worker-frontend-connections"; const char SHRPX_OPT_NO_LOCATION_REWRITE[] = "no-location-rewrite"; +const char SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND[] = + "backend-connections-per-frontend"; namespace { Config *config = nullptr; @@ -840,6 +842,22 @@ int parse_config(const char *opt, const char *optarg) return 0; } + if(util::strieq(opt, SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND)) { + errno = 0; + + auto n = strtoul(optarg, nullptr, 10); + + if(errno != 0 || n < 1) { + LOG(ERROR) << "backend-connections-per-frontend: " + << "specify the integer more than or equal to 1"; + return -1; + } + + mod_config()->max_downstream_connections = n; + + return 0; + } + if(util::strieq(opt, "conf")) { LOG(WARNING) << "conf is ignored"; diff --git a/src/shrpx_config.h b/src/shrpx_config.h index a0a83ba7..e6711ec9 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -115,6 +115,7 @@ extern const char SHRPX_OPT_ALTSVC[]; extern const char SHRPX_OPT_ADD_RESPONSE_HEADER[]; extern const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[]; extern const char SHRPX_OPT_NO_LOCATION_REWRITE[]; +extern const char SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND[]; union sockaddr_union { sockaddr sa; @@ -217,6 +218,7 @@ struct Config { size_t http2_downstream_window_bits; size_t http2_upstream_connection_window_bits; size_t http2_downstream_connection_window_bits; + size_t max_downstream_connections; // actual size of downstream_http_proxy_addr size_t downstream_http_proxy_addrlen; size_t read_rate; diff --git a/src/shrpx_downstream_queue.cc b/src/shrpx_downstream_queue.cc index 1e7ddc6f..dec8b314 100644 --- a/src/shrpx_downstream_queue.cc +++ b/src/shrpx_downstream_queue.cc @@ -35,29 +35,87 @@ DownstreamQueue::DownstreamQueue() DownstreamQueue::~DownstreamQueue() { - for(auto& kv : downstreams_) { + for(auto& kv : pending_downstreams_) { + delete kv.second; + } + + for(auto& kv : active_downstreams_) { + delete kv.second; + } + + for(auto& kv : failure_downstreams_) { delete kv.second; } } -void DownstreamQueue::add(Downstream *downstream) +void DownstreamQueue::add_pending(Downstream *downstream) { - downstreams_[downstream->get_stream_id()] = downstream; + pending_downstreams_[downstream->get_stream_id()] = downstream; +} + +void DownstreamQueue::add_failure(Downstream *downstream) +{ + failure_downstreams_[downstream->get_stream_id()] = downstream; +} + +void DownstreamQueue::add_active(Downstream *downstream) +{ + active_downstreams_[downstream->get_stream_id()] = downstream; } void DownstreamQueue::remove(Downstream *downstream) { - downstreams_.erase(downstream->get_stream_id()); + pending_downstreams_.erase(downstream->get_stream_id()); + active_downstreams_.erase(downstream->get_stream_id()); + failure_downstreams_.erase(downstream->get_stream_id()); } Downstream* DownstreamQueue::find(int32_t stream_id) { - auto kv = downstreams_.find(stream_id); - if(kv == std::end(downstreams_)) { - return nullptr; - } else { + auto kv = pending_downstreams_.find(stream_id); + + if(kv != std::end(pending_downstreams_)) { return (*kv).second; } + + kv = active_downstreams_.find(stream_id); + + if(kv != std::end(active_downstreams_)) { + return (*kv).second; + } + + kv = failure_downstreams_.find(stream_id); + + if(kv != std::end(failure_downstreams_)) { + return (*kv).second; + } + + return nullptr; +} + +Downstream* DownstreamQueue::pop_pending() +{ + auto i = std::begin(pending_downstreams_); + + if(i == std::end(pending_downstreams_)) { + return nullptr; + } + + auto downstream = (*i).second; + + pending_downstreams_.erase(i); + + return downstream; +} + +size_t DownstreamQueue::num_active() const +{ + return active_downstreams_.size(); +} + +bool DownstreamQueue::pending_empty() const +{ + return pending_downstreams_.empty(); } } // namespace shrpx diff --git a/src/shrpx_downstream_queue.h b/src/shrpx_downstream_queue.h index 68b8ecc4..04245b54 100644 --- a/src/shrpx_downstream_queue.h +++ b/src/shrpx_downstream_queue.h @@ -39,11 +39,30 @@ class DownstreamQueue { public: DownstreamQueue(); ~DownstreamQueue(); - void add(Downstream *downstream); + void add_pending(Downstream *downstream); + void add_failure(Downstream *downstream); + void add_active(Downstream *downstream); + // Removes |downstream| from either pending_downstreams_, + // active_downstreams_ or failure_downstreams_. void remove(Downstream *downstream); + // Finds Downstream object denoted by |stream_id| either in + // pending_downstreams_, active_downstreams_ or + // failure_downstreams_. Downstream* find(int32_t stream_id); + // Returns the number of active Downstream objects. + size_t num_active() const; + // Returns true if pending_downstreams_ is empty. + bool pending_empty() const; + // Pops first Downstream object in pending_downstreams_ and returns + // it. + Downstream* pop_pending(); private: - std::map downstreams_; + // Downstream objects, not processed yet + std::map pending_downstreams_; + // Downstream objects in use, consuming downstream concurrency limit + std::map active_downstreams_; + // Downstream objects, failed to connect to downstream server + std::map failure_downstreams_; }; } // namespace shrpx diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 80badb4d..72fa85dd 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -295,8 +295,6 @@ int on_request_headers(Http2Upstream *upstream, nghttp2_session *session, const nghttp2_frame *frame) { - int rv; - if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { return 0; } @@ -370,26 +368,6 @@ int on_request_headers(Http2Upstream *upstream, downstream->inspect_http2_request(); - auto dconn = upstream->get_client_handler()->get_downstream_connection(); - rv = dconn->attach_downstream(downstream); - if(rv != 0) { - // downstream connection fails, send error page - if(upstream->error_reply(downstream, 503) != 0) { - upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); - } - - downstream->set_request_state(Downstream::CONNECT_FAIL); - - return 0; - } - rv = downstream->push_request_headers(); - if(rv != 0) { - if(upstream->error_reply(downstream, 503) != 0) { - upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); - } - - return 0; - } downstream->set_request_state(Downstream::HEADER_COMPLETE); if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { downstream->disable_upstream_rtimer(); @@ -397,11 +375,60 @@ int on_request_headers(Http2Upstream *upstream, downstream->set_request_state(Downstream::MSG_COMPLETE); } + upstream->maintain_downstream_concurrency(); + return 0; } - } // namespace +void Http2Upstream::maintain_downstream_concurrency() +{ + while(get_config()->max_downstream_connections > + downstream_queue_.num_active()) { + auto downstream = downstream_queue_.pop_pending(); + + if(!downstream) { + break; + } + + initiate_downstream(downstream); + } +} + +void Http2Upstream::initiate_downstream(Downstream *downstream) +{ + int rv; + + auto dconn = handler_->get_downstream_connection(); + rv = dconn->attach_downstream(downstream); + if(rv != 0) { + downstream_queue_.add_failure(downstream); + + // downstream connection fails, send error page + if(error_reply(downstream, 503) != 0) { + rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); + } + + downstream->set_request_state(Downstream::CONNECT_FAIL); + + return; + } + rv = downstream->push_request_headers(); + if(rv != 0) { + downstream_queue_.add_failure(downstream); + + if(error_reply(downstream, 503) != 0) { + rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); + } + + return; + } + + downstream_queue_.add_active(downstream); + + return; +} + namespace { int on_frame_recv_callback (nghttp2_session *session, const nghttp2_frame *frame, void *user_data) @@ -1143,12 +1170,14 @@ bufferevent_event_cb Http2Upstream::get_downstream_eventcb() void Http2Upstream::add_downstream(Downstream *downstream) { - downstream_queue_.add(downstream); + downstream_queue_.add_pending(downstream); } void Http2Upstream::remove_downstream(Downstream *downstream) { downstream_queue_.remove(downstream); + + maintain_downstream_concurrency(); } Downstream* Http2Upstream::find_downstream(int32_t stream_id) diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index 44b95bf5..23214494 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -82,6 +82,8 @@ public: int consume(int32_t stream_id, size_t len); void log_response_headers(Downstream *downstream, const std::vector& nva) const; + void maintain_downstream_concurrency(); + void initiate_downstream(Downstream *downstream); private: DownstreamQueue downstream_queue_; std::unique_ptr pre_upstream_; diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index a88aab02..b2bc1033 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -230,24 +230,14 @@ void on_ctrl_recv_callback << "\n" << ss.str(); } - auto dconn = upstream->get_client_handler()->get_downstream_connection(); - int rv = dconn->attach_downstream(downstream); - if(rv != 0) { - // If downstream connection fails, issue RST_STREAM. - upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); - downstream->set_request_state(Downstream::CONNECT_FAIL); - return; - } - rv = downstream->push_request_headers(); - if(rv != 0) { - upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); - return; - } downstream->set_request_state(Downstream::HEADER_COMPLETE); if(frame->syn_stream.hd.flags & SPDYLAY_CTRL_FLAG_FIN) { downstream->disable_upstream_rtimer(); downstream->set_request_state(Downstream::MSG_COMPLETE); } + + upstream->maintain_downstream_concurrency(); + break; } default: @@ -256,6 +246,43 @@ void on_ctrl_recv_callback } } // namespace +void SpdyUpstream::maintain_downstream_concurrency() +{ + while(get_config()->max_downstream_connections > + downstream_queue_.num_active()) { + auto downstream = downstream_queue_.pop_pending(); + + if(!downstream) { + break; + } + + initiate_downstream(downstream); + } +} + +void SpdyUpstream::initiate_downstream(Downstream *downstream) +{ + auto dconn = handler_->get_downstream_connection(); + int rv = dconn->attach_downstream(downstream); + if(rv != 0) { + downstream_queue_.add_failure(downstream); + + // If downstream connection fails, issue RST_STREAM. + rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); + downstream->set_request_state(Downstream::CONNECT_FAIL); + return; + } + rv = downstream->push_request_headers(); + if(rv != 0) { + downstream_queue_.add_failure(downstream); + + rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); + return; + } + + downstream_queue_.add_active(downstream); +} + namespace { void on_data_chunk_recv_callback(spdylay_session *session, uint8_t flags, int32_t stream_id, @@ -876,12 +903,14 @@ bufferevent_event_cb SpdyUpstream::get_downstream_eventcb() void SpdyUpstream::add_downstream(Downstream *downstream) { - downstream_queue_.add(downstream); + downstream_queue_.add_pending(downstream); } void SpdyUpstream::remove_downstream(Downstream *downstream) { downstream_queue_.remove(downstream); + + maintain_downstream_concurrency(); } Downstream* SpdyUpstream::find_downstream(int32_t stream_id) diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index c2155f8f..f42a9839 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -74,6 +74,9 @@ public: int handle_ign_data_chunk(size_t len); + void maintain_downstream_concurrency(); + void initiate_downstream(Downstream *downstream); + nghttp2::util::EvbufferBuffer sendbuf; private: DownstreamQueue downstream_queue_;