nghttpx: Reduce TTFB with large number of incoming connections

To reduce TTFB with large number of incoming connections, we now
intentionally accept one connection at a time, so that it does not
delay the TTFB of the existing connection.  This is significant
especially for TLS connections.
This commit is contained in:
Tatsuhiro Tsujikawa 2016-06-25 11:50:33 +09:00
parent 3c1efeff55
commit 2a4733857f
3 changed files with 113 additions and 91 deletions

View File

@ -58,51 +58,49 @@ AcceptHandler::~AcceptHandler() {
} }
void AcceptHandler::accept_connection() { void AcceptHandler::accept_connection() {
for (;;) { sockaddr_union sockaddr;
sockaddr_union sockaddr; socklen_t addrlen = sizeof(sockaddr);
socklen_t addrlen = sizeof(sockaddr);
#ifdef HAVE_ACCEPT4 #ifdef HAVE_ACCEPT4
auto cfd = accept4(faddr_->fd, &sockaddr.sa, &addrlen, auto cfd =
SOCK_NONBLOCK | SOCK_CLOEXEC); accept4(faddr_->fd, &sockaddr.sa, &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
#else // !HAVE_ACCEPT4 #else // !HAVE_ACCEPT4
auto cfd = accept(faddr_->fd, &sockaddr.sa, &addrlen); auto cfd = accept(faddr_->fd, &sockaddr.sa, &addrlen);
#endif // !HAVE_ACCEPT4 #endif // !HAVE_ACCEPT4
if (cfd == -1) { if (cfd == -1) {
switch (errno) { switch (errno) {
case EINTR: case EINTR:
case ENETDOWN: case ENETDOWN:
case EPROTO: case EPROTO:
case ENOPROTOOPT: case ENOPROTOOPT:
case EHOSTDOWN: case EHOSTDOWN:
#ifdef ENONET #ifdef ENONET
case ENONET: case ENONET:
#endif // ENONET #endif // ENONET
case EHOSTUNREACH: case EHOSTUNREACH:
case EOPNOTSUPP: case EOPNOTSUPP:
case ENETUNREACH: case ENETUNREACH:
continue; return;
case EMFILE: case EMFILE:
case ENFILE: case ENFILE:
LOG(WARN) << "acceptor: running out file descriptor; disable acceptor " LOG(WARN) << "acceptor: running out file descriptor; disable acceptor "
"temporarily"; "temporarily";
conn_hnr_->sleep_acceptor(get_config()->conn.listener.timeout.sleep); conn_hnr_->sleep_acceptor(get_config()->conn.listener.timeout.sleep);
break; return;
} default:
return;
break;
} }
}
#ifndef HAVE_ACCEPT4 #ifndef HAVE_ACCEPT4
util::make_socket_nonblocking(cfd); util::make_socket_nonblocking(cfd);
util::make_socket_closeonexec(cfd); util::make_socket_closeonexec(cfd);
#endif // !HAVE_ACCEPT4 #endif // !HAVE_ACCEPT4
util::make_socket_nodelay(cfd); util::make_socket_nodelay(cfd);
conn_hnr_->handle_connection(cfd, &sockaddr.sa, addrlen, faddr_); conn_hnr_->handle_connection(cfd, &sockaddr.sa, addrlen, faddr_);
}
} }
void AcceptHandler::enable() { ev_io_start(conn_hnr_->get_loop(), &wev_); } void AcceptHandler::enable() { ev_io_start(conn_hnr_->get_loop(), &wev_); }

View File

@ -63,6 +63,13 @@ void mcpool_clear_cb(struct ev_loop *loop, ev_timer *w, int revents) {
} }
} // namespace } // namespace
namespace {
void proc_wev_cb(struct ev_loop *loop, ev_timer *w, int revents) {
auto worker = static_cast<Worker *>(w->data);
worker->process_events();
}
} // namespace
namespace { namespace {
bool match_shared_downstream_addr( bool match_shared_downstream_addr(
const std::shared_ptr<SharedDownstreamAddr> &lhs, const std::shared_ptr<SharedDownstreamAddr> &lhs,
@ -131,6 +138,9 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
ev_timer_init(&mcpool_clear_timer_, mcpool_clear_cb, 0., 0.); ev_timer_init(&mcpool_clear_timer_, mcpool_clear_cb, 0., 0.);
mcpool_clear_timer_.data = this; mcpool_clear_timer_.data = this;
ev_timer_init(&proc_wev_timer_, proc_wev_cb, 0., 0.);
proc_wev_timer_.data = this;
auto &session_cacheconf = get_config()->tls.session_cache; auto &session_cacheconf = get_config()->tls.session_cache;
if (!session_cacheconf.memcached.host.empty()) { if (!session_cacheconf.memcached.host.empty()) {
@ -279,6 +289,7 @@ void Worker::replace_downstream_config(
Worker::~Worker() { Worker::~Worker() {
ev_async_stop(loop_, &w_); ev_async_stop(loop_, &w_);
ev_timer_stop(loop_, &mcpool_clear_timer_); ev_timer_stop(loop_, &mcpool_clear_timer_);
ev_timer_stop(loop_, &proc_wev_timer_);
} }
void Worker::schedule_clear_mcpool() { void Worker::schedule_clear_mcpool() {
@ -315,79 +326,91 @@ void Worker::send(const WorkerEvent &event) {
} }
void Worker::process_events() { void Worker::process_events() {
std::vector<WorkerEvent> q; WorkerEvent wev;
{ {
std::lock_guard<std::mutex> g(m_); std::lock_guard<std::mutex> g(m_);
q.swap(q_);
// Process event one at a time. This is important for
// NEW_CONNECTION event since accepting large number of new
// connections at once may delay time to 1st byte for existing
// connections.
if (q_.empty()) {
ev_timer_stop(loop_, &proc_wev_timer_);
return;
}
wev = q_.front();
q_.pop_front();
} }
ev_timer_start(loop_, &proc_wev_timer_);
auto worker_connections = get_config()->conn.upstream.worker_connections; auto worker_connections = get_config()->conn.upstream.worker_connections;
for (auto &wev : q) { switch (wev.type) {
switch (wev.type) { case NEW_CONNECTION: {
case NEW_CONNECTION: { if (LOG_ENABLED(INFO)) {
if (LOG_ENABLED(INFO)) { WLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd
WLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd << ", addrlen=" << wev.client_addrlen;
<< ", addrlen=" << wev.client_addrlen; }
}
if (worker_stat_.num_connections >= worker_connections) { if (worker_stat_.num_connections >= worker_connections) {
if (LOG_ENABLED(INFO)) {
WLOG(INFO, this) << "Too many connections >= " << worker_connections;
}
close(wev.client_fd);
break;
}
auto client_handler =
ssl::accept_connection(this, wev.client_fd, &wev.client_addr.sa,
wev.client_addrlen, wev.faddr);
if (!client_handler) {
if (LOG_ENABLED(INFO)) {
WLOG(ERROR, this) << "ClientHandler creation failed";
}
close(wev.client_fd);
break;
}
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created "; WLOG(INFO, this) << "Too many connections >= " << worker_connections;
} }
close(wev.client_fd);
break; break;
} }
case REOPEN_LOG:
WLOG(NOTICE, this) << "Reopening log files: worker process (thread "
<< this << ")";
reopen_log_files(); auto client_handler =
ssl::accept_connection(this, wev.client_fd, &wev.client_addr.sa,
break; wev.client_addrlen, wev.faddr);
case GRACEFUL_SHUTDOWN: if (!client_handler) {
WLOG(NOTICE, this) << "Graceful shutdown commencing";
graceful_shutdown_ = true;
if (worker_stat_.num_connections == 0) {
ev_break(loop_);
return;
}
break;
case REPLACE_DOWNSTREAM:
WLOG(NOTICE, this) << "Replace downstream";
replace_downstream_config(wev.downstreamconf);
break;
default:
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
WLOG(INFO, this) << "unknown event type " << wev.type; WLOG(ERROR, this) << "ClientHandler creation failed";
} }
close(wev.client_fd);
break;
}
if (LOG_ENABLED(INFO)) {
WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created ";
}
break;
}
case REOPEN_LOG:
WLOG(NOTICE, this) << "Reopening log files: worker process (thread " << this
<< ")";
reopen_log_files();
break;
case GRACEFUL_SHUTDOWN:
WLOG(NOTICE, this) << "Graceful shutdown commencing";
graceful_shutdown_ = true;
if (worker_stat_.num_connections == 0) {
ev_break(loop_);
return;
}
break;
case REPLACE_DOWNSTREAM:
WLOG(NOTICE, this) << "Replace downstream";
replace_downstream_config(wev.downstreamconf);
break;
default:
if (LOG_ENABLED(INFO)) {
WLOG(INFO, this) << "unknown event type " << wev.type;
} }
} }
} }

View File

@ -243,10 +243,11 @@ private:
std::future<void> fut_; std::future<void> fut_;
#endif // NOTHREADS #endif // NOTHREADS
std::mutex m_; std::mutex m_;
std::vector<WorkerEvent> q_; std::deque<WorkerEvent> q_;
std::mt19937 randgen_; std::mt19937 randgen_;
ev_async w_; ev_async w_;
ev_timer mcpool_clear_timer_; ev_timer mcpool_clear_timer_;
ev_timer proc_wev_timer_;
MemchunkPool mcpool_; MemchunkPool mcpool_;
WorkerStat worker_stat_; WorkerStat worker_stat_;