From 6f025619debf5294f2db5ab844528fab37aebae7 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Thu, 16 Jun 2016 21:22:36 +0900 Subject: [PATCH] nghttpx: Use dedicated worker for API processing Some API processing is very slow (e.g., getaddrinfo). To avoid to slow down regular request handling, if multi threaded configuration is enabled, we allocate dedicated worker for API. --- src/shrpx_config.cc | 4 ++++ src/shrpx_config.h | 2 ++ src/shrpx_connection_handler.cc | 40 ++++++++++++++++++++++++++++----- src/shrpx_connection_handler.h | 2 ++ 4 files changed, 42 insertions(+), 6 deletions(-) diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index 1a21e34d..4d044609 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -1738,6 +1738,10 @@ int parse_config(Config *config, int optid, const StringRef &opt, addr.tls = params.tls; addr.api = params.api; + if (addr.api) { + listenerconf.api = true; + } + if (util::istarts_with(optarg, SHRPX_UNIX_PATH_PREFIX)) { auto path = std::begin(optarg) + SHRPX_UNIX_PATH_PREFIX.size(); addr.host = ImmutableString{path, addr_end}; diff --git a/src/shrpx_config.h b/src/shrpx_config.h index 1165de4a..aa9522ac 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -652,6 +652,8 @@ struct ConnectionConfig { // TCP fastopen. If this is positive, it is passed to // setsockopt() along with TCP_FASTOPEN. int fastopen; + // true if at least one of UpstreamAddr has api enabled + bool api; } listener; struct { diff --git a/src/shrpx_connection_handler.cc b/src/shrpx_connection_handler.cc index e5cbe9a2..bc76ad47 100644 --- a/src/shrpx_connection_handler.cc +++ b/src/shrpx_connection_handler.cc @@ -120,7 +120,7 @@ ConnectionHandler::ConnectionHandler(struct ev_loop *loop) loop_(loop), tls_ticket_key_memcached_get_retry_count_(0), tls_ticket_key_memcached_fail_count_(0), - worker_round_robin_cnt_(0), + worker_round_robin_cnt_(get_config()->conn.listener.api ? 1 : 0), graceful_shutdown_(false) { ev_timer_init(&disable_acceptor_timer_, acceptor_disable_cb, 0., 0.); disable_acceptor_timer_.data = this; @@ -268,6 +268,12 @@ int ConnectionHandler::create_worker_thread(size_t num) { auto &tlsconf = get_config()->tls; auto &memcachedconf = get_config()->tls.session_cache.memcached; + auto &listenerconf = get_config()->conn.listener; + + // We have dedicated worker for API request processing. + if (listenerconf.api) { + ++num; + } for (size_t i = 0; i < num; ++i) { auto loop = ev_loop_new(get_config()->ev_loop_flags); @@ -300,6 +306,7 @@ int ConnectionHandler::create_worker_thread(size_t num) { for (auto &worker : workers_) { worker->run_async(); } + #endif // NOTHREADS return 0; @@ -384,11 +391,32 @@ int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen, return 0; } - size_t idx = worker_round_robin_cnt_ % workers_.size(); - if (LOG_ENABLED(INFO)) { - LOG(INFO) << "Dispatch connection to worker #" << idx; + Worker *worker; + + if (faddr->api) { + worker = workers_[0].get(); + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Dispatch connection to API worker #0"; + } + } else { + worker = workers_[worker_round_robin_cnt_].get(); + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Dispatch connection to worker #" << worker_round_robin_cnt_; + } + + if (++worker_round_robin_cnt_ == workers_.size()) { + auto &listenerconf = get_config()->conn.listener; + + if (listenerconf.api) { + worker_round_robin_cnt_ = 1; + } else { + worker_round_robin_cnt_ = 0; + } + } } - ++worker_round_robin_cnt_; + WorkerEvent wev{}; wev.type = NEW_CONNECTION; wev.client_fd = fd; @@ -396,7 +424,7 @@ int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen, wev.client_addrlen = addrlen; wev.faddr = faddr; - workers_[idx]->send(wev); + worker->send(wev); return 0; } diff --git a/src/shrpx_connection_handler.h b/src/shrpx_connection_handler.h index cd805f57..019a2545 100644 --- a/src/shrpx_connection_handler.h +++ b/src/shrpx_connection_handler.h @@ -171,6 +171,8 @@ private: // ev_loop for each worker std::vector worker_loops_; // Worker instances when multi threaded mode (-nN, N >= 2) is used. + // If at least one frontend enables API request, we allocate 1 + // additional worker dedicated to API request . std::vector> workers_; // mutex for serial event resive buffer handling std::mutex serial_event_mu_;