diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index 1a21e34d..4d044609 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -1738,6 +1738,10 @@ int parse_config(Config *config, int optid, const StringRef &opt, addr.tls = params.tls; addr.api = params.api; + if (addr.api) { + listenerconf.api = true; + } + if (util::istarts_with(optarg, SHRPX_UNIX_PATH_PREFIX)) { auto path = std::begin(optarg) + SHRPX_UNIX_PATH_PREFIX.size(); addr.host = ImmutableString{path, addr_end}; diff --git a/src/shrpx_config.h b/src/shrpx_config.h index 1165de4a..aa9522ac 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -652,6 +652,8 @@ struct ConnectionConfig { // TCP fastopen. If this is positive, it is passed to // setsockopt() along with TCP_FASTOPEN. int fastopen; + // true if at least one of UpstreamAddr has api enabled + bool api; } listener; struct { diff --git a/src/shrpx_connection_handler.cc b/src/shrpx_connection_handler.cc index e5cbe9a2..bc76ad47 100644 --- a/src/shrpx_connection_handler.cc +++ b/src/shrpx_connection_handler.cc @@ -120,7 +120,7 @@ ConnectionHandler::ConnectionHandler(struct ev_loop *loop) loop_(loop), tls_ticket_key_memcached_get_retry_count_(0), tls_ticket_key_memcached_fail_count_(0), - worker_round_robin_cnt_(0), + worker_round_robin_cnt_(get_config()->conn.listener.api ? 1 : 0), graceful_shutdown_(false) { ev_timer_init(&disable_acceptor_timer_, acceptor_disable_cb, 0., 0.); disable_acceptor_timer_.data = this; @@ -268,6 +268,12 @@ int ConnectionHandler::create_worker_thread(size_t num) { auto &tlsconf = get_config()->tls; auto &memcachedconf = get_config()->tls.session_cache.memcached; + auto &listenerconf = get_config()->conn.listener; + + // We have dedicated worker for API request processing. + if (listenerconf.api) { + ++num; + } for (size_t i = 0; i < num; ++i) { auto loop = ev_loop_new(get_config()->ev_loop_flags); @@ -300,6 +306,7 @@ int ConnectionHandler::create_worker_thread(size_t num) { for (auto &worker : workers_) { worker->run_async(); } + #endif // NOTHREADS return 0; @@ -384,11 +391,32 @@ int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen, return 0; } - size_t idx = worker_round_robin_cnt_ % workers_.size(); - if (LOG_ENABLED(INFO)) { - LOG(INFO) << "Dispatch connection to worker #" << idx; + Worker *worker; + + if (faddr->api) { + worker = workers_[0].get(); + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Dispatch connection to API worker #0"; + } + } else { + worker = workers_[worker_round_robin_cnt_].get(); + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Dispatch connection to worker #" << worker_round_robin_cnt_; + } + + if (++worker_round_robin_cnt_ == workers_.size()) { + auto &listenerconf = get_config()->conn.listener; + + if (listenerconf.api) { + worker_round_robin_cnt_ = 1; + } else { + worker_round_robin_cnt_ = 0; + } + } } - ++worker_round_robin_cnt_; + WorkerEvent wev{}; wev.type = NEW_CONNECTION; wev.client_fd = fd; @@ -396,7 +424,7 @@ int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen, wev.client_addrlen = addrlen; wev.faddr = faddr; - workers_[idx]->send(wev); + worker->send(wev); return 0; } diff --git a/src/shrpx_connection_handler.h b/src/shrpx_connection_handler.h index cd805f57..019a2545 100644 --- a/src/shrpx_connection_handler.h +++ b/src/shrpx_connection_handler.h @@ -171,6 +171,8 @@ private: // ev_loop for each worker std::vector worker_loops_; // Worker instances when multi threaded mode (-nN, N >= 2) is used. + // If at least one frontend enables API request, we allocate 1 + // additional worker dedicated to API request . std::vector> workers_; // mutex for serial event resive buffer handling std::mutex serial_event_mu_;