diff --git a/src/shrpx_accept_handler.cc b/src/shrpx_accept_handler.cc index 7cfa2d9f..02c59a14 100644 --- a/src/shrpx_accept_handler.cc +++ b/src/shrpx_accept_handler.cc @@ -58,51 +58,49 @@ AcceptHandler::~AcceptHandler() { } void AcceptHandler::accept_connection() { - for (;;) { - sockaddr_union sockaddr; - socklen_t addrlen = sizeof(sockaddr); + sockaddr_union sockaddr; + socklen_t addrlen = sizeof(sockaddr); #ifdef HAVE_ACCEPT4 - auto cfd = accept4(faddr_->fd, &sockaddr.sa, &addrlen, - SOCK_NONBLOCK | SOCK_CLOEXEC); -#else // !HAVE_ACCEPT4 - auto cfd = accept(faddr_->fd, &sockaddr.sa, &addrlen); + auto cfd = + accept4(faddr_->fd, &sockaddr.sa, &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC); +#else // !HAVE_ACCEPT4 + auto cfd = accept(faddr_->fd, &sockaddr.sa, &addrlen); #endif // !HAVE_ACCEPT4 - if (cfd == -1) { - switch (errno) { - case EINTR: - case ENETDOWN: - case EPROTO: - case ENOPROTOOPT: - case EHOSTDOWN: + if (cfd == -1) { + switch (errno) { + case EINTR: + case ENETDOWN: + case EPROTO: + case ENOPROTOOPT: + case EHOSTDOWN: #ifdef ENONET - case ENONET: + case ENONET: #endif // ENONET - case EHOSTUNREACH: - case EOPNOTSUPP: - case ENETUNREACH: - continue; - case EMFILE: - case ENFILE: - LOG(WARN) << "acceptor: running out file descriptor; disable acceptor " - "temporarily"; - conn_hnr_->sleep_acceptor(get_config()->conn.listener.timeout.sleep); - break; - } - - break; + case EHOSTUNREACH: + case EOPNOTSUPP: + case ENETUNREACH: + return; + case EMFILE: + case ENFILE: + LOG(WARN) << "acceptor: running out file descriptor; disable acceptor " + "temporarily"; + conn_hnr_->sleep_acceptor(get_config()->conn.listener.timeout.sleep); + return; + default: + return; } + } #ifndef HAVE_ACCEPT4 - util::make_socket_nonblocking(cfd); - util::make_socket_closeonexec(cfd); + util::make_socket_nonblocking(cfd); + util::make_socket_closeonexec(cfd); #endif // !HAVE_ACCEPT4 - util::make_socket_nodelay(cfd); + util::make_socket_nodelay(cfd); - conn_hnr_->handle_connection(cfd, &sockaddr.sa, addrlen, faddr_); - } + conn_hnr_->handle_connection(cfd, &sockaddr.sa, addrlen, faddr_); } void AcceptHandler::enable() { ev_io_start(conn_hnr_->get_loop(), &wev_); } diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 88a1084d..f815149b 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -63,6 +63,13 @@ void mcpool_clear_cb(struct ev_loop *loop, ev_timer *w, int revents) { } } // namespace +namespace { +void proc_wev_cb(struct ev_loop *loop, ev_timer *w, int revents) { + auto worker = static_cast(w->data); + worker->process_events(); +} +} // namespace + namespace { bool match_shared_downstream_addr( const std::shared_ptr &lhs, @@ -131,6 +138,9 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, ev_timer_init(&mcpool_clear_timer_, mcpool_clear_cb, 0., 0.); mcpool_clear_timer_.data = this; + ev_timer_init(&proc_wev_timer_, proc_wev_cb, 0., 0.); + proc_wev_timer_.data = this; + auto &session_cacheconf = get_config()->tls.session_cache; if (!session_cacheconf.memcached.host.empty()) { @@ -279,6 +289,7 @@ void Worker::replace_downstream_config( Worker::~Worker() { ev_async_stop(loop_, &w_); ev_timer_stop(loop_, &mcpool_clear_timer_); + ev_timer_stop(loop_, &proc_wev_timer_); } void Worker::schedule_clear_mcpool() { @@ -315,79 +326,91 @@ void Worker::send(const WorkerEvent &event) { } void Worker::process_events() { - std::vector q; + WorkerEvent wev; { std::lock_guard g(m_); - q.swap(q_); + + // Process event one at a time. This is important for + // NEW_CONNECTION event since accepting large number of new + // connections at once may delay time to 1st byte for existing + // connections. + + if (q_.empty()) { + ev_timer_stop(loop_, &proc_wev_timer_); + return; + } + + wev = q_.front(); + q_.pop_front(); } + ev_timer_start(loop_, &proc_wev_timer_); + auto worker_connections = get_config()->conn.upstream.worker_connections; - for (auto &wev : q) { - switch (wev.type) { - case NEW_CONNECTION: { - if (LOG_ENABLED(INFO)) { - WLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd - << ", addrlen=" << wev.client_addrlen; - } + switch (wev.type) { + case NEW_CONNECTION: { + if (LOG_ENABLED(INFO)) { + WLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd + << ", addrlen=" << wev.client_addrlen; + } - if (worker_stat_.num_connections >= worker_connections) { - - if (LOG_ENABLED(INFO)) { - WLOG(INFO, this) << "Too many connections >= " << worker_connections; - } - - close(wev.client_fd); - - break; - } - - auto client_handler = - ssl::accept_connection(this, wev.client_fd, &wev.client_addr.sa, - wev.client_addrlen, wev.faddr); - if (!client_handler) { - if (LOG_ENABLED(INFO)) { - WLOG(ERROR, this) << "ClientHandler creation failed"; - } - close(wev.client_fd); - break; - } + if (worker_stat_.num_connections >= worker_connections) { if (LOG_ENABLED(INFO)) { - WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created "; + WLOG(INFO, this) << "Too many connections >= " << worker_connections; } + close(wev.client_fd); + break; } - case REOPEN_LOG: - WLOG(NOTICE, this) << "Reopening log files: worker process (thread " - << this << ")"; - reopen_log_files(); - - break; - case GRACEFUL_SHUTDOWN: - WLOG(NOTICE, this) << "Graceful shutdown commencing"; - - graceful_shutdown_ = true; - - if (worker_stat_.num_connections == 0) { - ev_break(loop_); - - return; - } - - break; - case REPLACE_DOWNSTREAM: - WLOG(NOTICE, this) << "Replace downstream"; - - replace_downstream_config(wev.downstreamconf); - - break; - default: + auto client_handler = + ssl::accept_connection(this, wev.client_fd, &wev.client_addr.sa, + wev.client_addrlen, wev.faddr); + if (!client_handler) { if (LOG_ENABLED(INFO)) { - WLOG(INFO, this) << "unknown event type " << wev.type; + WLOG(ERROR, this) << "ClientHandler creation failed"; } + close(wev.client_fd); + break; + } + + if (LOG_ENABLED(INFO)) { + WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created "; + } + + break; + } + case REOPEN_LOG: + WLOG(NOTICE, this) << "Reopening log files: worker process (thread " << this + << ")"; + + reopen_log_files(); + + break; + case GRACEFUL_SHUTDOWN: + WLOG(NOTICE, this) << "Graceful shutdown commencing"; + + graceful_shutdown_ = true; + + if (worker_stat_.num_connections == 0) { + ev_break(loop_); + + return; + } + + break; + case REPLACE_DOWNSTREAM: + WLOG(NOTICE, this) << "Replace downstream"; + + replace_downstream_config(wev.downstreamconf); + + break; + default: + if (LOG_ENABLED(INFO)) { + WLOG(INFO, this) << "unknown event type " << wev.type; } } } diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index c729af1e..d187743c 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -243,10 +243,11 @@ private: std::future fut_; #endif // NOTHREADS std::mutex m_; - std::vector q_; + std::deque q_; std::mt19937 randgen_; ev_async w_; ev_timer mcpool_clear_timer_; + ev_timer proc_wev_timer_; MemchunkPool mcpool_; WorkerStat worker_stat_;