nghttpx: Use dedicated worker for API processing
Some API processing is very slow (e.g., getaddrinfo). To avoid to slow down regular request handling, if multi threaded configuration is enabled, we allocate dedicated worker for API.
This commit is contained in:
parent
7e31340045
commit
6f025619de
|
@ -1738,6 +1738,10 @@ int parse_config(Config *config, int optid, const StringRef &opt,
|
||||||
addr.tls = params.tls;
|
addr.tls = params.tls;
|
||||||
addr.api = params.api;
|
addr.api = params.api;
|
||||||
|
|
||||||
|
if (addr.api) {
|
||||||
|
listenerconf.api = true;
|
||||||
|
}
|
||||||
|
|
||||||
if (util::istarts_with(optarg, SHRPX_UNIX_PATH_PREFIX)) {
|
if (util::istarts_with(optarg, SHRPX_UNIX_PATH_PREFIX)) {
|
||||||
auto path = std::begin(optarg) + SHRPX_UNIX_PATH_PREFIX.size();
|
auto path = std::begin(optarg) + SHRPX_UNIX_PATH_PREFIX.size();
|
||||||
addr.host = ImmutableString{path, addr_end};
|
addr.host = ImmutableString{path, addr_end};
|
||||||
|
|
|
@ -652,6 +652,8 @@ struct ConnectionConfig {
|
||||||
// TCP fastopen. If this is positive, it is passed to
|
// TCP fastopen. If this is positive, it is passed to
|
||||||
// setsockopt() along with TCP_FASTOPEN.
|
// setsockopt() along with TCP_FASTOPEN.
|
||||||
int fastopen;
|
int fastopen;
|
||||||
|
// true if at least one of UpstreamAddr has api enabled
|
||||||
|
bool api;
|
||||||
} listener;
|
} listener;
|
||||||
|
|
||||||
struct {
|
struct {
|
||||||
|
|
|
@ -120,7 +120,7 @@ ConnectionHandler::ConnectionHandler(struct ev_loop *loop)
|
||||||
loop_(loop),
|
loop_(loop),
|
||||||
tls_ticket_key_memcached_get_retry_count_(0),
|
tls_ticket_key_memcached_get_retry_count_(0),
|
||||||
tls_ticket_key_memcached_fail_count_(0),
|
tls_ticket_key_memcached_fail_count_(0),
|
||||||
worker_round_robin_cnt_(0),
|
worker_round_robin_cnt_(get_config()->conn.listener.api ? 1 : 0),
|
||||||
graceful_shutdown_(false) {
|
graceful_shutdown_(false) {
|
||||||
ev_timer_init(&disable_acceptor_timer_, acceptor_disable_cb, 0., 0.);
|
ev_timer_init(&disable_acceptor_timer_, acceptor_disable_cb, 0., 0.);
|
||||||
disable_acceptor_timer_.data = this;
|
disable_acceptor_timer_.data = this;
|
||||||
|
@ -268,6 +268,12 @@ int ConnectionHandler::create_worker_thread(size_t num) {
|
||||||
|
|
||||||
auto &tlsconf = get_config()->tls;
|
auto &tlsconf = get_config()->tls;
|
||||||
auto &memcachedconf = get_config()->tls.session_cache.memcached;
|
auto &memcachedconf = get_config()->tls.session_cache.memcached;
|
||||||
|
auto &listenerconf = get_config()->conn.listener;
|
||||||
|
|
||||||
|
// We have dedicated worker for API request processing.
|
||||||
|
if (listenerconf.api) {
|
||||||
|
++num;
|
||||||
|
}
|
||||||
|
|
||||||
for (size_t i = 0; i < num; ++i) {
|
for (size_t i = 0; i < num; ++i) {
|
||||||
auto loop = ev_loop_new(get_config()->ev_loop_flags);
|
auto loop = ev_loop_new(get_config()->ev_loop_flags);
|
||||||
|
@ -300,6 +306,7 @@ int ConnectionHandler::create_worker_thread(size_t num) {
|
||||||
for (auto &worker : workers_) {
|
for (auto &worker : workers_) {
|
||||||
worker->run_async();
|
worker->run_async();
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif // NOTHREADS
|
#endif // NOTHREADS
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -384,11 +391,32 @@ int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t idx = worker_round_robin_cnt_ % workers_.size();
|
Worker *worker;
|
||||||
if (LOG_ENABLED(INFO)) {
|
|
||||||
LOG(INFO) << "Dispatch connection to worker #" << idx;
|
if (faddr->api) {
|
||||||
|
worker = workers_[0].get();
|
||||||
|
|
||||||
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
LOG(INFO) << "Dispatch connection to API worker #0";
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
worker = workers_[worker_round_robin_cnt_].get();
|
||||||
|
|
||||||
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
LOG(INFO) << "Dispatch connection to worker #" << worker_round_robin_cnt_;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (++worker_round_robin_cnt_ == workers_.size()) {
|
||||||
|
auto &listenerconf = get_config()->conn.listener;
|
||||||
|
|
||||||
|
if (listenerconf.api) {
|
||||||
|
worker_round_robin_cnt_ = 1;
|
||||||
|
} else {
|
||||||
|
worker_round_robin_cnt_ = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
++worker_round_robin_cnt_;
|
|
||||||
WorkerEvent wev{};
|
WorkerEvent wev{};
|
||||||
wev.type = NEW_CONNECTION;
|
wev.type = NEW_CONNECTION;
|
||||||
wev.client_fd = fd;
|
wev.client_fd = fd;
|
||||||
|
@ -396,7 +424,7 @@ int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen,
|
||||||
wev.client_addrlen = addrlen;
|
wev.client_addrlen = addrlen;
|
||||||
wev.faddr = faddr;
|
wev.faddr = faddr;
|
||||||
|
|
||||||
workers_[idx]->send(wev);
|
worker->send(wev);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -171,6 +171,8 @@ private:
|
||||||
// ev_loop for each worker
|
// ev_loop for each worker
|
||||||
std::vector<struct ev_loop *> worker_loops_;
|
std::vector<struct ev_loop *> worker_loops_;
|
||||||
// Worker instances when multi threaded mode (-nN, N >= 2) is used.
|
// Worker instances when multi threaded mode (-nN, N >= 2) is used.
|
||||||
|
// If at least one frontend enables API request, we allocate 1
|
||||||
|
// additional worker dedicated to API request .
|
||||||
std::vector<std::unique_ptr<Worker>> workers_;
|
std::vector<std::unique_ptr<Worker>> workers_;
|
||||||
// mutex for serial event resive buffer handling
|
// mutex for serial event resive buffer handling
|
||||||
std::mutex serial_event_mu_;
|
std::mutex serial_event_mu_;
|
||||||
|
|
Loading…
Reference in New Issue