From bf13d912644352c06c426019e89ea857acb2b76b Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Tue, 12 Aug 2014 22:22:02 +0900 Subject: [PATCH] nghttpx: Add hot deploy feature nghttpx supports hot deploy feature using signals. The host deploy in nghttpx is multi step process. First send USR2 signal to nghttpx process. It will do fork and execute new executable, using same command-line arguments and environment variables. At this point, both current and new processes can accept requests. To gracefully shutdown current process, send QUIT signal to current nghttpx process. When all existing frontend connections are done, the current process will exit. At this point, only new nghttpx process exists and serves incoming requests. --- doc/sources/nghttpx-howto.rst | 13 ++ src/shrpx.cc | 259 ++++++++++++++++++++++-- src/shrpx_client_handler.cc | 7 + src/shrpx_config.cc | 3 + src/shrpx_config.h | 3 + src/shrpx_http2_session.cc | 56 +++-- src/shrpx_http_downstream_connection.cc | 8 +- src/shrpx_listen_handler.cc | 205 ++++++++++++++++--- src/shrpx_listen_handler.h | 18 +- src/shrpx_thread_event_receiver.cc | 16 ++ src/shrpx_thread_event_receiver.h | 3 +- src/shrpx_worker.cc | 10 +- src/shrpx_worker.h | 4 +- src/shrpx_worker_config.cc | 3 +- src/shrpx_worker_config.h | 1 + src/util.cc | 27 ++- src/util.h | 8 + 17 files changed, 575 insertions(+), 69 deletions(-) diff --git a/doc/sources/nghttpx-howto.rst b/doc/sources/nghttpx-howto.rst index 90738209..f8796c87 100644 --- a/doc/sources/nghttpx-howto.rst +++ b/doc/sources/nghttpx-howto.rst @@ -262,3 +262,16 @@ used in frontend, and host is replaced with which appears in precedence. If the above conditions are not met with the host value in :authority header field, rewrite is retried with the value in host header field. + +Hot deploy +---------- + +nghttpx supports hot deploy feature using signals. The host deploy in +nghttpx is multi step process. First send USR2 signal to nghttpx +process. It will do fork and execute new executable, using same +command-line arguments and environment variables. At this point, both +current and new processes can accept requests. To gracefully shutdown +current process, send QUIT signal to current nghttpx process. When +all existing frontend connections are done, the current process will +exit. At this point, only new nghttpx process exists and serves +incoming requests. diff --git a/src/shrpx.cc b/src/shrpx.cc index 5834499d..5d636d30 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -55,18 +56,32 @@ #include "shrpx_listen_handler.h" #include "shrpx_ssl.h" #include "shrpx_worker_config.h" +#include "shrpx_worker.h" #include "util.h" #include "app_helper.h" #include "ssl.h" +extern char **environ; + using namespace nghttp2; namespace shrpx { namespace { const int REOPEN_LOG_SIGNAL = SIGUSR1; +const int EXEC_BINARY_SIGNAL = SIGUSR2; +const int GRACEFUL_SHUTDOWN_SIGNAL = SIGQUIT; } // namespace +// Environment variables to tell new binary the listening socket's +// file descriptors. They are not close-on-exec. +#define ENV_LISTENER4_FD "NGHTTPX_LISTENER4_FD" +#define ENV_LISTENER6_FD "NGHTTPX_LISTENER6_FD" + +// Environment variable to tell new binary the port number the current +// binary is listening to. +#define ENV_PORT "NGHTTPX_PORT" + namespace { void ssl_acceptcb(evconnlistener *listener, int fd, sockaddr *addr, int addrlen, void *arg) @@ -139,9 +154,50 @@ void evlistener_errorcb(evconnlistener *listener, void *ptr) } } // namespace +namespace { +evconnlistener* new_evlistener(ListenHandler *handler, int fd) +{ + auto evlistener = evconnlistener_new + (handler->get_evbase(), + ssl_acceptcb, + handler, + LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, + get_config()->backlog, + fd); + evconnlistener_set_error_cb(evlistener, evlistener_errorcb); + return evlistener; +} +} // namespace + namespace { evconnlistener* create_evlistener(ListenHandler *handler, int family) { + { + auto envfd = getenv(family == AF_INET ? + ENV_LISTENER4_FD : ENV_LISTENER6_FD); + auto envport = getenv(ENV_PORT); + + if(envfd && envport) { + auto fd = strtoul(envfd, nullptr, 10); + auto port = strtoul(envport, nullptr, 10); + + // Only do this iff NGHTTPX_PORT == get_config()->port. + // Otherwise, close fd, and create server socket as usual. + + if(port == get_config()->port) { + if(LOG_ENABLED(INFO)) { + LOG(INFO) << "Listening on port " << get_config()->port; + } + + return new_evlistener(handler, fd); + } + + LOG(WARNING) << "Port was changed between old binary (" << port + << ") and new binary (" << get_config()->port << ")"; + close(fd); + } + } + addrinfo hints; int fd = -1; int rv; @@ -222,15 +278,7 @@ evconnlistener* create_evlistener(ListenHandler *handler, int family) LOG(INFO) << "Listening on " << host << ", port " << get_config()->port; } - auto evlistener = evconnlistener_new - (handler->get_evbase(), - ssl_acceptcb, - handler, - LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, - get_config()->backlog, - fd); - evconnlistener_set_error_cb(evlistener, evlistener_errorcb); - return evlistener; + return new_evlistener(handler, fd); } } // namespace @@ -285,6 +333,122 @@ void reopen_log_signal_cb(evutil_socket_t sig, short events, void *arg) } } // namespace +namespace { +void exec_binary_signal_cb(evutil_socket_t sig, short events, void *arg) +{ + auto listener_handler = static_cast(arg); + + if(LOG_ENABLED(INFO)) { + LOG(INFO) << "Executing new binary"; + } + + auto pid = fork(); + + if(pid == -1) { + auto error = errno; + LOG(ERROR) << "fork() failed errno=" << error; + return; + } + + if(pid != 0) { + return; + } + + auto exec_path = util::get_exec_path(get_config()->argc, + get_config()->argv, + get_config()->cwd); + + if(!exec_path) { + LOG(ERROR) << "Could not resolve the executable path"; + return; + } + + auto argv = + static_cast(malloc(sizeof(char*) * (get_config()->argc + 1))); + + argv[0] = exec_path; + for(int i = 1; i < get_config()->argc; ++i) { + argv[i] = strdup(get_config()->argv[i]); + } + argv[get_config()->argc] = nullptr; + + size_t envlen = 0; + for(char **p = environ; *p; ++p, ++envlen); + // 3 for missing fd4, fd6 and port. + auto envp = static_cast(malloc(sizeof(char*) * (envlen + 3 + 1))); + size_t envidx = 0; + + auto evlistener4 = listener_handler->get_evlistener4(); + if(evlistener4) { + std::string fd4 = ENV_LISTENER4_FD "="; + fd4 += util::utos(evconnlistener_get_fd(evlistener4)); + envp[envidx++] = strdup(fd4.c_str()); + } + + auto evlistener6 = listener_handler->get_evlistener6(); + if(evlistener6) { + std::string fd6 = ENV_LISTENER6_FD "="; + fd6 += util::utos(evconnlistener_get_fd(evlistener6)); + envp[envidx++] = strdup(fd6.c_str()); + } + + std::string port = ENV_PORT "="; + port += util::utos(get_config()->port); + envp[envidx++] = strdup(port.c_str()); + + for(size_t i = 0; i < envlen; ++i) { + if(strcmp(ENV_LISTENER4_FD, environ[i]) == 0 || + strcmp(ENV_LISTENER6_FD, environ[i]) == 0 || + strcmp(ENV_PORT, environ[i]) == 0) { + continue; + } + + envp[envidx++] = environ[i]; + } + + envp[envidx++] = nullptr; + + if(LOG_ENABLED(INFO)) { + LOG(INFO) << "cmdline"; + for(int i = 0; argv[i]; ++i) { + LOG(INFO) << i << ": " << argv[i]; + } + LOG(INFO) << "environ"; + for(int i = 0; envp[i]; ++i) { + LOG(INFO) << i << ": " << envp[i]; + } + } + + if(execve(argv[0], argv, envp) == -1) { + auto error = errno; + LOG(ERROR) << "execve failed: errno=" << error; + exit(1); + } +} +} // namespace + +namespace { +void graceful_shutdown_signal_cb(evutil_socket_t sig, short events, void *arg) +{ + auto listener_handler = static_cast(arg); + + if(LOG_ENABLED(INFO)) { + LOG(INFO) << "Graceful shutdown signal received"; + } + + listener_handler->disable_evlistener(); + + // After disabling accepting new connection, disptach incoming + // connection in backlog. + + listener_handler->accept_pending_connection(); + + worker_config.graceful_shutdown = true; + + listener_handler->graceful_shutdown_worker(); +} +} // namespace + namespace { std::unique_ptr generate_time() { @@ -310,7 +474,18 @@ std::unique_ptr generate_time() namespace { void refresh_cb(evutil_socket_t sig, short events, void *arg) { + auto listener_handler = static_cast(arg); + auto worker_stat = listener_handler->get_worker_stat(); + mod_config()->cached_time = generate_time(); + + // In multi threaded mode (get_config()->num_worker > 1), we have to + // wait for event notification to workers to finish. + if(get_config()->num_worker == 1 && + worker_config.graceful_shutdown && + (!worker_stat || worker_stat->num_connections == 0)) { + event_base_loopbreak(listener_handler->get_evbase()); + } } } // namespace @@ -358,6 +533,9 @@ int event_loop() exit(EXIT_FAILURE); } + listener_handler->set_evlistener4(evlistener4); + listener_handler->set_evlistener6(evlistener6); + // ListenHandler loads private key, and we listen on a priveleged port. // After that, we drop the root privileges if needed. drop_privileges(); @@ -366,10 +544,11 @@ int event_loop() sigset_t signals; sigemptyset(&signals); sigaddset(&signals, REOPEN_LOG_SIGNAL); - + sigaddset(&signals, EXEC_BINARY_SIGNAL); + sigaddset(&signals, GRACEFUL_SHUTDOWN_SIGNAL); rv = pthread_sigmask(SIG_BLOCK, &signals, nullptr); if(rv != 0) { - LOG(ERROR) << "Blocking REOPEN_LOG_SIGNAL failed: " << strerror(rv); + LOG(ERROR) << "Blocking signals failed: " << strerror(rv); } #endif // !NOTHREADS @@ -382,7 +561,7 @@ int event_loop() #ifndef NOTHREADS rv = pthread_sigmask(SIG_UNBLOCK, &signals, nullptr); if(rv != 0) { - LOG(ERROR) << "Unblocking REOPEN_LOG_SIGNAL failed: " << strerror(rv); + LOG(ERROR) << "Unblocking signals failed: " << strerror(rv); } #endif // !NOTHREADS @@ -399,8 +578,31 @@ int event_loop() } } + auto exec_binary_signal_event = evsignal_new(evbase, EXEC_BINARY_SIGNAL, + exec_binary_signal_cb, + listener_handler); + rv = event_add(exec_binary_signal_event, nullptr); + + if(rv == -1) { + LOG(FATAL) << "event_add for exec_binary_signal_event failed"; + + exit(EXIT_FAILURE); + } + + auto graceful_shutdown_signal_event = evsignal_new + (evbase, GRACEFUL_SHUTDOWN_SIGNAL, graceful_shutdown_signal_cb, + listener_handler); + + rv = event_add(graceful_shutdown_signal_event, nullptr); + + if(rv == -1) { + LOG(FATAL) << "event_add for graceful_shutdown_signal_event failed"; + + exit(EXIT_FAILURE); + } + auto refresh_event = event_new(evbase, -1, EV_PERSIST, refresh_cb, - nullptr); + listener_handler); if(!refresh_event) { LOG(ERROR) << "event_new failed"; @@ -422,9 +624,17 @@ int event_loop() } event_base_loop(evbase, 0); + listener_handler->join_worker(); + if(refresh_event) { event_free(refresh_event); } + if(graceful_shutdown_signal_event) { + event_free(graceful_shutdown_signal_event); + } + if(exec_binary_signal_event) { + event_free(exec_binary_signal_event); + } if(reopen_log_signal_event) { event_free(reopen_log_signal_event); } @@ -570,6 +780,8 @@ void fill_default_config() mod_config()->tls_proto_mask = 0; mod_config()->cached_time = generate_time(); mod_config()->no_location_rewrite = false; + mod_config()->argc = 0; + mod_config()->argv = nullptr; } } // namespace @@ -933,6 +1145,23 @@ int main(int argc, char **argv) create_config(); fill_default_config(); + // We have to copy argv, since getopt_long may change its content. + mod_config()->argc = argc; + mod_config()->argv = new char*[argc]; + + for(int i = 0; i < argc; ++i) { + mod_config()->argv[i] = strdup(argv[i]); + } + + char cwd[PATH_MAX]; + + mod_config()->cwd = getcwd(cwd, sizeof(cwd)); + if(mod_config()->cwd == nullptr) { + auto error = errno; + LOG(FATAL) << "failed to get current working directory: errno=" << error; + exit(EXIT_FAILURE); + } + std::vector > cmdcfgs; while(1) { static int flag = 0; @@ -1488,6 +1717,10 @@ int main(int argc, char **argv) event_loop(); + if(LOG_ENABLED(INFO)) { + LOG(INFO) << "Shutdown momentarily"; + } + return 0; } diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 8bafd537..6c041647 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -35,6 +35,7 @@ #include "shrpx_http2_downstream_connection.h" #include "shrpx_ssl.h" #include "shrpx_worker.h" +#include "shrpx_worker_config.h" #ifdef HAVE_SPDYLAY #include "shrpx_spdy_upstream.h" #endif // HAVE_SPDYLAY @@ -266,6 +267,12 @@ ClientHandler::~ClientHandler() --worker_stat_->num_connections; + // TODO If backend is http/2, and it is in CONNECTED state, signal + // it and make it loopbreak when output is zero. + if(worker_config.graceful_shutdown && worker_stat_->num_connections == 0) { + event_base_loopbreak(get_evbase()); + } + if(reneg_shutdown_timerev_) { event_free(reneg_shutdown_timerev_); } diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index a8166fb1..6500bf6e 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -200,6 +200,9 @@ FILE* open_file_for_write(const char *filename) LOG(ERROR) << "Failed to open " << filename << " for writing. Cause: " << strerror(errno); } + + evutil_make_socket_closeonexec(fileno(f)); + return f; } } // namespace diff --git a/src/shrpx_config.h b/src/shrpx_config.h index 59b907f1..52461399 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -206,6 +206,8 @@ struct Config { FILE *http2_upstream_dump_request_header; FILE *http2_upstream_dump_response_header; nghttp2_option *http2_option; + char **argv; + char *cwd; size_t downstream_addrlen; size_t num_worker; size_t http2_max_concurrent_streams; @@ -232,6 +234,7 @@ struct Config { shrpx_proto downstream_proto; int syslog_facility; int backlog; + int argc; uid_t uid; gid_t gid; uint16_t port; diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index 899dc6a9..54a10157 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -26,6 +26,7 @@ #include #include + #include #include @@ -197,8 +198,10 @@ int Http2Session::init_notification() SSLOG(FATAL, this) << "socketpair() failed: errno=" << errno; return -1; } - evutil_make_socket_nonblocking(sockpair[0]); - evutil_make_socket_nonblocking(sockpair[1]); + for(int i = 0; i < 2; ++i) { + evutil_make_socket_nonblocking(sockpair[i]); + evutil_make_socket_closeonexec(sockpair[i]); + } wrbev_ = bufferevent_socket_new(evbase_, sockpair[0], BEV_OPT_CLOSE_ON_FREE| BEV_OPT_DEFER_CALLBACKS); @@ -459,6 +462,13 @@ int Http2Session::initiate_connection() } // If state_ == PROXY_CONNECTED, we has connected to the proxy // using fd_ and tunnel has been established. + if(state_ == DISCONNECTED) { + assert(fd_ == -1); + + fd_ = socket(get_config()->downstream_addr.storage.ss_family, + SOCK_STREAM | SOCK_CLOEXEC, 0); + } + bev_ = bufferevent_openssl_socket_new(evbase_, fd_, ssl_, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_DEFER_CALLBACKS); @@ -471,33 +481,39 @@ int Http2Session::initiate_connection() // TODO maybe not thread-safe? const_cast(&get_config()->downstream_addr.sa), get_config()->downstream_addrlen); - } else if(state_ == DISCONNECTED) { - // Without TLS and proxy. - bev_ = bufferevent_socket_new(evbase_, -1, BEV_OPT_DEFER_CALLBACKS); - if(!bev_) { - SSLOG(ERROR, this) << "bufferevent_socket_new() failed"; - return SHRPX_ERR_NETWORK; - } - rv = bufferevent_socket_connect - (bev_, - const_cast(&get_config()->downstream_addr.sa), - get_config()->downstream_addrlen); } else { - assert(state_ == PROXY_CONNECTED); - // Without TLS but with proxy. + if(state_ == DISCONNECTED) { + // Without TLS and proxy. + assert(fd_ == -1); + + fd_ = socket(get_config()->downstream_addr.storage.ss_family, + SOCK_STREAM | SOCK_CLOEXEC, 0); + } + bev_ = bufferevent_socket_new(evbase_, fd_, BEV_OPT_DEFER_CALLBACKS); if(!bev_) { SSLOG(ERROR, this) << "bufferevent_socket_new() failed"; return SHRPX_ERR_NETWORK; } - // Connection already established. - eventcb(bev_, BEV_EVENT_CONNECTED, this); - // eventcb() has no return value. Check state_ to whether it was - // failed or not. + if(state_ == DISCONNECTED) { - return -1; + rv = bufferevent_socket_connect + (bev_, + const_cast(&get_config()->downstream_addr.sa), + get_config()->downstream_addrlen); + } else { + // Without TLS but with proxy. + + // Connection already established. + eventcb(bev_, BEV_EVENT_CONNECTED, this); + // eventcb() has no return value. Check state_ to whether it was + // failed or not. + if(state_ == DISCONNECTED) { + return -1; + } } } + if(rv != 0) { return SHRPX_ERR_NETWORK; } diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 592dabcf..3e73dc19 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -77,11 +77,17 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) auto upstream = downstream->get_upstream(); if(!bev_) { auto evbase = client_handler_->get_evbase(); + + auto fd = socket(get_config()->downstream_addr.storage.ss_family, + SOCK_STREAM | SOCK_CLOEXEC, 0); + bev_ = bufferevent_socket_new - (evbase, -1, + (evbase, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS); if(!bev_) { DCLOG(INFO, this) << "bufferevent_socket_new() failed"; + close(fd); + return SHRPX_ERR_NETWORK; } int rv = bufferevent_socket_connect diff --git a/src/shrpx_listen_handler.cc b/src/shrpx_listen_handler.cc index c6903122..4ce67eee 100644 --- a/src/shrpx_listen_handler.cc +++ b/src/shrpx_listen_handler.cc @@ -28,7 +28,6 @@ #include #include -#include #include @@ -36,6 +35,7 @@ #include "shrpx_thread_event_receiver.h" #include "shrpx_ssl.h" #include "shrpx_worker.h" +#include "shrpx_worker_config.h" #include "shrpx_config.h" #include "shrpx_http2_session.h" #include "util.h" @@ -51,8 +51,11 @@ ListenHandler::ListenHandler(event_base *evbase, SSL_CTX *sv_ssl_ctx, cl_ssl_ctx_(cl_ssl_ctx), rate_limit_group_(bufferevent_rate_limit_group_new (evbase, get_config()->worker_rate_limit_cfg)), + evlistener4_(nullptr), + evlistener6_(nullptr), worker_stat_(util::make_unique()), - worker_round_robin_cnt_(0) + worker_round_robin_cnt_(0), + num_worker_shutdown_(0) {} ListenHandler::~ListenHandler() @@ -68,48 +71,70 @@ void ListenHandler::worker_reopen_log_files() wev.type = REOPEN_LOG; for(auto& info : workers_) { - bufferevent_write(info.bev, &wev, sizeof(wev)); + bufferevent_write(info->bev, &wev, sizeof(wev)); } } +namespace { +void worker_writecb(bufferevent *bev, void *ptr) +{ + auto listener_handler = static_cast(ptr); + auto output = bufferevent_get_output(bev); + + if(!worker_config.graceful_shutdown || + evbuffer_get_length(output) != 0) { + return; + } + + // If graceful_shutdown is true and nothing left to send, we sent + // graceful shutdown event to worker successfully. The worker is + // now doing shutdown. + listener_handler->notify_worker_shutdown(); + + // Disable bev so that this won' be called accidentally in the + // future. + bufferevent_disable(bev, EV_READ | EV_WRITE); +} +} // namespace + void ListenHandler::create_worker_thread(size_t num) { workers_.resize(0); for(size_t i = 0; i < num; ++i) { int rv; - auto info = WorkerInfo(); - rv = socketpair(AF_UNIX, SOCK_STREAM, 0, info.sv); + auto info = util::make_unique(); + rv = socketpair(AF_UNIX, SOCK_STREAM, 0, info->sv); if(rv == -1) { LLOG(ERROR, this) << "socketpair() failed: errno=" << errno; continue; } - evutil_make_socket_nonblocking(info.sv[0]); - evutil_make_socket_nonblocking(info.sv[1]); - info.sv_ssl_ctx = sv_ssl_ctx_; - info.cl_ssl_ctx = cl_ssl_ctx_; - try { - auto thread = std::thread{start_threaded_worker, info}; - thread.detach(); - } catch(const std::system_error& error) { - LLOG(ERROR, this) << "Could not start thread: code=" << error.code() - << " msg=" << error.what(); - for(size_t j = 0; j < 2; ++j) { - close(info.sv[j]); - } - continue; + + for(int j = 0; j < 2; ++j) { + evutil_make_socket_nonblocking(info->sv[j]); + evutil_make_socket_closeonexec(info->sv[j]); } - auto bev = bufferevent_socket_new(evbase_, info.sv[0], + + info->sv_ssl_ctx = sv_ssl_ctx_; + info->cl_ssl_ctx = cl_ssl_ctx_; + + info->fut = std::async(std::launch::async, start_threaded_worker, + info.get()); + + auto bev = bufferevent_socket_new(evbase_, info->sv[0], BEV_OPT_DEFER_CALLBACKS); if(!bev) { LLOG(ERROR, this) << "bufferevent_socket_new() failed"; for(size_t j = 0; j < 2; ++j) { - close(info.sv[j]); + close(info->sv[j]); } continue; } - info.bev = bev; - workers_.push_back(info); + bufferevent_setcb(bev, nullptr, worker_writecb, nullptr, this); + + info->bev = bev; + + workers_.push_back(std::move(info)); if(LOG_ENABLED(INFO)) { LLOG(INFO, this) << "Created thread #" << workers_.size() - 1; @@ -117,12 +142,56 @@ void ListenHandler::create_worker_thread(size_t num) } } +void ListenHandler::join_worker() +{ + int n = 0; + + if(LOG_ENABLED(INFO)) { + LLOG(INFO, this) << "Waiting for worker thread to join: n=" + << workers_.size(); + } + + for(auto& worker : workers_) { + worker->fut.get(); + if(LOG_ENABLED(INFO)) { + LLOG(INFO, this) << "Thread #" << n << " joined"; + } + ++n; + } +} + +void ListenHandler::graceful_shutdown_worker() +{ + if(get_config()->num_worker == 1) { + return; + } + + for(auto& worker : workers_) { + WorkerEvent wev; + memset(&wev, 0, sizeof(wev)); + wev.type = GRACEFUL_SHUTDOWN; + + if(LOG_ENABLED(INFO)) { + LLOG(INFO, this) << "Sending graceful shutdown signal to worker"; + } + + auto output = bufferevent_get_output(worker->bev); + + if(evbuffer_add(output, &wev, sizeof(wev)) != 0) { + LLOG(FATAL, this) << "evbuffer_add() failed"; + } + } +} + int ListenHandler::accept_connection(evutil_socket_t fd, sockaddr *addr, int addrlen) { if(LOG_ENABLED(INFO)) { LLOG(INFO, this) << "Accepted connection. fd=" << fd; } + + evutil_make_socket_closeonexec(fd); + if(get_config()->num_worker == 1) { if(worker_stat_->num_connections >= @@ -158,7 +227,7 @@ int ListenHandler::accept_connection(evutil_socket_t fd, wev.client_fd = fd; memcpy(&wev.client_addr, addr, addrlen); wev.client_addrlen = addrlen; - auto output = bufferevent_get_output(workers_[idx].bev); + auto output = bufferevent_get_output(workers_[idx]->bev); if(evbuffer_add(output, &wev, sizeof(wev)) != 0) { LLOG(FATAL, this) << "evbuffer_add() failed"; close(fd); @@ -181,4 +250,92 @@ int ListenHandler::create_http2_session() return rv; } +const WorkerStat* ListenHandler::get_worker_stat() const +{ + return worker_stat_.get(); +} + +void ListenHandler::set_evlistener4(evconnlistener *evlistener4) +{ + evlistener4_ = evlistener4; +} + +evconnlistener* ListenHandler::get_evlistener4() const +{ + return evlistener4_; +} + +void ListenHandler::set_evlistener6(evconnlistener *evlistener6) +{ + evlistener6_ = evlistener6; +} + +evconnlistener* ListenHandler::get_evlistener6() const +{ + return evlistener6_; +} + +void ListenHandler::disable_evlistener() +{ + if(evlistener4_) { + evconnlistener_disable(evlistener4_); + } + + if(evlistener6_) { + evconnlistener_disable(evlistener6_); + } +} + +namespace { +void perform_accept_pending_connection(ListenHandler *listener_handler, + evconnlistener *listener) +{ + if(!listener) { + return; + } + + auto server_fd = evconnlistener_get_fd(listener); + + for(;;) { + sockaddr_union sockaddr; + socklen_t addrlen = sizeof(sockaddr); + + auto fd = accept(server_fd, &sockaddr.sa, &addrlen); + + if(fd == -1) { + if(errno == EINTR || + errno == ENETDOWN || + errno == EPROTO || + errno == ENOPROTOOPT || + errno == EHOSTDOWN || + errno == ENONET || + errno == EHOSTUNREACH || + errno == EOPNOTSUPP || + errno == ENETUNREACH) { + continue; + } + + return; + } + + evutil_make_socket_nonblocking(fd); + + listener_handler->accept_connection(fd, &sockaddr.sa, addrlen); + } +} +} // namespace + +void ListenHandler::accept_pending_connection() +{ + perform_accept_pending_connection(this, evlistener4_); + perform_accept_pending_connection(this, evlistener6_); +} + +void ListenHandler::notify_worker_shutdown() +{ + if(++num_worker_shutdown_ == workers_.size()) { + event_base_loopbreak(evbase_); + } +} + } // namespace shrpx diff --git a/src/shrpx_listen_handler.h b/src/shrpx_listen_handler.h index 97426e4d..8a1e7fb7 100644 --- a/src/shrpx_listen_handler.h +++ b/src/shrpx_listen_handler.h @@ -32,15 +32,18 @@ #include #include +#include #include #include #include +#include namespace shrpx { struct WorkerInfo { + std::future fut; SSL_CTX *sv_ssl_ctx; SSL_CTX *cl_ssl_ctx; bufferevent *bev; @@ -59,8 +62,18 @@ public: void worker_reopen_log_files(); event_base* get_evbase() const; int create_http2_session(); + const WorkerStat* get_worker_stat() const; + void set_evlistener4(evconnlistener *evlistener4); + evconnlistener* get_evlistener4() const; + void set_evlistener6(evconnlistener *evlistener6); + evconnlistener* get_evlistener6() const; + void disable_evlistener(); + void accept_pending_connection(); + void graceful_shutdown_worker(); + void join_worker(); + void notify_worker_shutdown(); private: - std::vector workers_; + std::vector> workers_; event_base *evbase_; // The frontend server SSL_CTX SSL_CTX *sv_ssl_ctx_; @@ -70,8 +83,11 @@ private: // multi-threaded case, see shrpx_worker.cc. std::unique_ptr http2session_; bufferevent_rate_limit_group *rate_limit_group_; + evconnlistener *evlistener4_; + evconnlistener *evlistener6_; std::unique_ptr worker_stat_; unsigned int worker_round_robin_cnt_; + int num_worker_shutdown_; }; } // namespace shrpx diff --git a/src/shrpx_thread_event_receiver.cc b/src/shrpx_thread_event_receiver.cc index 23c352c2..b1587c9e 100644 --- a/src/shrpx_thread_event_receiver.cc +++ b/src/shrpx_thread_event_receiver.cc @@ -81,6 +81,22 @@ void ThreadEventReceiver::on_read(bufferevent *bev) continue; } + if(wev.type == GRACEFUL_SHUTDOWN) { + if(LOG_ENABLED(INFO)) { + LOG(INFO) << "Graceful shutdown commencing"; + } + + worker_config.graceful_shutdown = true; + + if(worker_stat_->num_connections == 0) { + event_base_loopbreak(evbase_); + + break; + } + + continue; + } + if(LOG_ENABLED(INFO)) { TLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd << ", addrlen=" << wev.client_addrlen; diff --git a/src/shrpx_thread_event_receiver.h b/src/shrpx_thread_event_receiver.h index a5e02f5a..54809cef 100644 --- a/src/shrpx_thread_event_receiver.h +++ b/src/shrpx_thread_event_receiver.h @@ -42,7 +42,8 @@ struct WorkerStat; enum WorkerEventType { NEW_CONNECTION = 0x01, - REOPEN_LOG = 0x02 + REOPEN_LOG = 0x02, + GRACEFUL_SHUTDOWN = 0x03, }; struct WorkerEvent { diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 5219fa0a..982b947e 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -43,10 +43,10 @@ using namespace nghttp2; namespace shrpx { -Worker::Worker(const WorkerInfo& info) - : sv_ssl_ctx_(info.sv_ssl_ctx), - cl_ssl_ctx_(info.cl_ssl_ctx), - fd_(info.sv[1]) +Worker::Worker(const WorkerInfo *info) + : sv_ssl_ctx_(info->sv_ssl_ctx), + cl_ssl_ctx_(info->cl_ssl_ctx), + fd_(info->sv[1]) {} Worker::~Worker() @@ -108,7 +108,7 @@ void Worker::run() event_base_loop(evbase.get(), 0); } -void start_threaded_worker(WorkerInfo info) +void start_threaded_worker(WorkerInfo *info) { Worker worker(info); worker.run(); diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index a766e192..07084c63 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -42,7 +42,7 @@ struct WorkerStat { class Worker { public: - Worker(const WorkerInfo& info); + Worker(const WorkerInfo *info); ~Worker(); void run(); private: @@ -52,7 +52,7 @@ private: int fd_; }; -void start_threaded_worker(WorkerInfo info); +void start_threaded_worker(WorkerInfo *info); } // namespace shrpx diff --git a/src/shrpx_worker_config.cc b/src/shrpx_worker_config.cc index 6db91f04..866b5158 100644 --- a/src/shrpx_worker_config.cc +++ b/src/shrpx_worker_config.cc @@ -29,7 +29,8 @@ namespace shrpx { WorkerConfig::WorkerConfig() : accesslog_fd(-1), errorlog_fd(-1), - errorlog_tty(false) + errorlog_tty(false), + graceful_shutdown(false) {} #ifndef NOTHREADS diff --git a/src/shrpx_worker_config.h b/src/shrpx_worker_config.h index 02567d65..9a2643bd 100644 --- a/src/shrpx_worker_config.h +++ b/src/shrpx_worker_config.h @@ -34,6 +34,7 @@ struct WorkerConfig { int errorlog_fd; // true if errorlog_fd is referring to a terminal. bool errorlog_tty; + bool graceful_shutdown; WorkerConfig(); }; diff --git a/src/util.cc b/src/util.cc index 744f57b9..8f59a061 100644 --- a/src/util.cc +++ b/src/util.cc @@ -589,7 +589,7 @@ bool numeric_host(const char *hostname) int reopen_log_file(const char *path) { - auto fd = open(path, O_WRONLY | O_APPEND | O_CREAT, + auto fd = open(path, O_WRONLY | O_APPEND | O_CREAT | O_CLOEXEC, S_IRUSR | S_IWUSR | S_IRGRP); if(fd == -1) { @@ -616,6 +616,31 @@ std::string ascii_dump(const uint8_t *data, size_t len) return res; } +char* get_exec_path(int argc, char **const argv, const char *cwd) +{ + if(argc == 0 || cwd == nullptr) { + return nullptr; + } + + auto argv0 = argv[0]; + auto len = strlen(argv0); + + char *path; + + if(argv0[0] == '/') { + path = static_cast(malloc(len + 1)); + memcpy(path, argv0, len + 1); + } else { + auto cwdlen = strlen(cwd); + path = static_cast(malloc(len + 1 + cwdlen + 1)); + memcpy(path, cwd, cwdlen); + path[cwdlen] = '/'; + memcpy(path + cwdlen + 1, argv0, len + 1); + } + + return path; +} + } // namespace util } // namespace nghttp2 diff --git a/src/util.h b/src/util.h index c7cd501f..74b4d037 100644 --- a/src/util.h +++ b/src/util.h @@ -487,6 +487,14 @@ int reopen_log_file(const char *path); // characters are preserved. Other characters are replaced with ".". std::string ascii_dump(const uint8_t *data, size_t len); +// Returns absolute path of executable path. If argc == 0 or |cwd| is +// nullptr, this function returns nullptr. If argv[0] starts with +// '/', this function returns argv[0]. Oterwise return cwd + "/" + +// argv[0]. If non-null is returned, it is NULL-terminated string and +// dynamically allocated by malloc. The caller is responsible to free +// it. +char* get_exec_path(int argc, char **const argv, const char *cwd); + } // namespace util } // namespace nghttp2