diff --git a/doc/sources/nghttpx-howto.rst b/doc/sources/nghttpx-howto.rst index 90738209..f8796c87 100644 --- a/doc/sources/nghttpx-howto.rst +++ b/doc/sources/nghttpx-howto.rst @@ -262,3 +262,16 @@ used in frontend, and host is replaced with which appears in precedence. If the above conditions are not met with the host value in :authority header field, rewrite is retried with the value in host header field. + +Hot deploy +---------- + +nghttpx supports hot deploy feature using signals. The host deploy in +nghttpx is multi step process. First send USR2 signal to nghttpx +process. It will do fork and execute new executable, using same +command-line arguments and environment variables. At this point, both +current and new processes can accept requests. To gracefully shutdown +current process, send QUIT signal to current nghttpx process. When +all existing frontend connections are done, the current process will +exit. At this point, only new nghttpx process exists and serves +incoming requests. diff --git a/src/shrpx.cc b/src/shrpx.cc index 5834499d..5d636d30 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -55,18 +56,32 @@ #include "shrpx_listen_handler.h" #include "shrpx_ssl.h" #include "shrpx_worker_config.h" +#include "shrpx_worker.h" #include "util.h" #include "app_helper.h" #include "ssl.h" +extern char **environ; + using namespace nghttp2; namespace shrpx { namespace { const int REOPEN_LOG_SIGNAL = SIGUSR1; +const int EXEC_BINARY_SIGNAL = SIGUSR2; +const int GRACEFUL_SHUTDOWN_SIGNAL = SIGQUIT; } // namespace +// Environment variables to tell new binary the listening socket's +// file descriptors. They are not close-on-exec. +#define ENV_LISTENER4_FD "NGHTTPX_LISTENER4_FD" +#define ENV_LISTENER6_FD "NGHTTPX_LISTENER6_FD" + +// Environment variable to tell new binary the port number the current +// binary is listening to. +#define ENV_PORT "NGHTTPX_PORT" + namespace { void ssl_acceptcb(evconnlistener *listener, int fd, sockaddr *addr, int addrlen, void *arg) @@ -139,9 +154,50 @@ void evlistener_errorcb(evconnlistener *listener, void *ptr) } } // namespace +namespace { +evconnlistener* new_evlistener(ListenHandler *handler, int fd) +{ + auto evlistener = evconnlistener_new + (handler->get_evbase(), + ssl_acceptcb, + handler, + LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, + get_config()->backlog, + fd); + evconnlistener_set_error_cb(evlistener, evlistener_errorcb); + return evlistener; +} +} // namespace + namespace { evconnlistener* create_evlistener(ListenHandler *handler, int family) { + { + auto envfd = getenv(family == AF_INET ? + ENV_LISTENER4_FD : ENV_LISTENER6_FD); + auto envport = getenv(ENV_PORT); + + if(envfd && envport) { + auto fd = strtoul(envfd, nullptr, 10); + auto port = strtoul(envport, nullptr, 10); + + // Only do this iff NGHTTPX_PORT == get_config()->port. + // Otherwise, close fd, and create server socket as usual. + + if(port == get_config()->port) { + if(LOG_ENABLED(INFO)) { + LOG(INFO) << "Listening on port " << get_config()->port; + } + + return new_evlistener(handler, fd); + } + + LOG(WARNING) << "Port was changed between old binary (" << port + << ") and new binary (" << get_config()->port << ")"; + close(fd); + } + } + addrinfo hints; int fd = -1; int rv; @@ -222,15 +278,7 @@ evconnlistener* create_evlistener(ListenHandler *handler, int family) LOG(INFO) << "Listening on " << host << ", port " << get_config()->port; } - auto evlistener = evconnlistener_new - (handler->get_evbase(), - ssl_acceptcb, - handler, - LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, - get_config()->backlog, - fd); - evconnlistener_set_error_cb(evlistener, evlistener_errorcb); - return evlistener; + return new_evlistener(handler, fd); } } // namespace @@ -285,6 +333,122 @@ void reopen_log_signal_cb(evutil_socket_t sig, short events, void *arg) } } // namespace +namespace { +void exec_binary_signal_cb(evutil_socket_t sig, short events, void *arg) +{ + auto listener_handler = static_cast(arg); + + if(LOG_ENABLED(INFO)) { + LOG(INFO) << "Executing new binary"; + } + + auto pid = fork(); + + if(pid == -1) { + auto error = errno; + LOG(ERROR) << "fork() failed errno=" << error; + return; + } + + if(pid != 0) { + return; + } + + auto exec_path = util::get_exec_path(get_config()->argc, + get_config()->argv, + get_config()->cwd); + + if(!exec_path) { + LOG(ERROR) << "Could not resolve the executable path"; + return; + } + + auto argv = + static_cast(malloc(sizeof(char*) * (get_config()->argc + 1))); + + argv[0] = exec_path; + for(int i = 1; i < get_config()->argc; ++i) { + argv[i] = strdup(get_config()->argv[i]); + } + argv[get_config()->argc] = nullptr; + + size_t envlen = 0; + for(char **p = environ; *p; ++p, ++envlen); + // 3 for missing fd4, fd6 and port. + auto envp = static_cast(malloc(sizeof(char*) * (envlen + 3 + 1))); + size_t envidx = 0; + + auto evlistener4 = listener_handler->get_evlistener4(); + if(evlistener4) { + std::string fd4 = ENV_LISTENER4_FD "="; + fd4 += util::utos(evconnlistener_get_fd(evlistener4)); + envp[envidx++] = strdup(fd4.c_str()); + } + + auto evlistener6 = listener_handler->get_evlistener6(); + if(evlistener6) { + std::string fd6 = ENV_LISTENER6_FD "="; + fd6 += util::utos(evconnlistener_get_fd(evlistener6)); + envp[envidx++] = strdup(fd6.c_str()); + } + + std::string port = ENV_PORT "="; + port += util::utos(get_config()->port); + envp[envidx++] = strdup(port.c_str()); + + for(size_t i = 0; i < envlen; ++i) { + if(strcmp(ENV_LISTENER4_FD, environ[i]) == 0 || + strcmp(ENV_LISTENER6_FD, environ[i]) == 0 || + strcmp(ENV_PORT, environ[i]) == 0) { + continue; + } + + envp[envidx++] = environ[i]; + } + + envp[envidx++] = nullptr; + + if(LOG_ENABLED(INFO)) { + LOG(INFO) << "cmdline"; + for(int i = 0; argv[i]; ++i) { + LOG(INFO) << i << ": " << argv[i]; + } + LOG(INFO) << "environ"; + for(int i = 0; envp[i]; ++i) { + LOG(INFO) << i << ": " << envp[i]; + } + } + + if(execve(argv[0], argv, envp) == -1) { + auto error = errno; + LOG(ERROR) << "execve failed: errno=" << error; + exit(1); + } +} +} // namespace + +namespace { +void graceful_shutdown_signal_cb(evutil_socket_t sig, short events, void *arg) +{ + auto listener_handler = static_cast(arg); + + if(LOG_ENABLED(INFO)) { + LOG(INFO) << "Graceful shutdown signal received"; + } + + listener_handler->disable_evlistener(); + + // After disabling accepting new connection, disptach incoming + // connection in backlog. + + listener_handler->accept_pending_connection(); + + worker_config.graceful_shutdown = true; + + listener_handler->graceful_shutdown_worker(); +} +} // namespace + namespace { std::unique_ptr generate_time() { @@ -310,7 +474,18 @@ std::unique_ptr generate_time() namespace { void refresh_cb(evutil_socket_t sig, short events, void *arg) { + auto listener_handler = static_cast(arg); + auto worker_stat = listener_handler->get_worker_stat(); + mod_config()->cached_time = generate_time(); + + // In multi threaded mode (get_config()->num_worker > 1), we have to + // wait for event notification to workers to finish. + if(get_config()->num_worker == 1 && + worker_config.graceful_shutdown && + (!worker_stat || worker_stat->num_connections == 0)) { + event_base_loopbreak(listener_handler->get_evbase()); + } } } // namespace @@ -358,6 +533,9 @@ int event_loop() exit(EXIT_FAILURE); } + listener_handler->set_evlistener4(evlistener4); + listener_handler->set_evlistener6(evlistener6); + // ListenHandler loads private key, and we listen on a priveleged port. // After that, we drop the root privileges if needed. drop_privileges(); @@ -366,10 +544,11 @@ int event_loop() sigset_t signals; sigemptyset(&signals); sigaddset(&signals, REOPEN_LOG_SIGNAL); - + sigaddset(&signals, EXEC_BINARY_SIGNAL); + sigaddset(&signals, GRACEFUL_SHUTDOWN_SIGNAL); rv = pthread_sigmask(SIG_BLOCK, &signals, nullptr); if(rv != 0) { - LOG(ERROR) << "Blocking REOPEN_LOG_SIGNAL failed: " << strerror(rv); + LOG(ERROR) << "Blocking signals failed: " << strerror(rv); } #endif // !NOTHREADS @@ -382,7 +561,7 @@ int event_loop() #ifndef NOTHREADS rv = pthread_sigmask(SIG_UNBLOCK, &signals, nullptr); if(rv != 0) { - LOG(ERROR) << "Unblocking REOPEN_LOG_SIGNAL failed: " << strerror(rv); + LOG(ERROR) << "Unblocking signals failed: " << strerror(rv); } #endif // !NOTHREADS @@ -399,8 +578,31 @@ int event_loop() } } + auto exec_binary_signal_event = evsignal_new(evbase, EXEC_BINARY_SIGNAL, + exec_binary_signal_cb, + listener_handler); + rv = event_add(exec_binary_signal_event, nullptr); + + if(rv == -1) { + LOG(FATAL) << "event_add for exec_binary_signal_event failed"; + + exit(EXIT_FAILURE); + } + + auto graceful_shutdown_signal_event = evsignal_new + (evbase, GRACEFUL_SHUTDOWN_SIGNAL, graceful_shutdown_signal_cb, + listener_handler); + + rv = event_add(graceful_shutdown_signal_event, nullptr); + + if(rv == -1) { + LOG(FATAL) << "event_add for graceful_shutdown_signal_event failed"; + + exit(EXIT_FAILURE); + } + auto refresh_event = event_new(evbase, -1, EV_PERSIST, refresh_cb, - nullptr); + listener_handler); if(!refresh_event) { LOG(ERROR) << "event_new failed"; @@ -422,9 +624,17 @@ int event_loop() } event_base_loop(evbase, 0); + listener_handler->join_worker(); + if(refresh_event) { event_free(refresh_event); } + if(graceful_shutdown_signal_event) { + event_free(graceful_shutdown_signal_event); + } + if(exec_binary_signal_event) { + event_free(exec_binary_signal_event); + } if(reopen_log_signal_event) { event_free(reopen_log_signal_event); } @@ -570,6 +780,8 @@ void fill_default_config() mod_config()->tls_proto_mask = 0; mod_config()->cached_time = generate_time(); mod_config()->no_location_rewrite = false; + mod_config()->argc = 0; + mod_config()->argv = nullptr; } } // namespace @@ -933,6 +1145,23 @@ int main(int argc, char **argv) create_config(); fill_default_config(); + // We have to copy argv, since getopt_long may change its content. + mod_config()->argc = argc; + mod_config()->argv = new char*[argc]; + + for(int i = 0; i < argc; ++i) { + mod_config()->argv[i] = strdup(argv[i]); + } + + char cwd[PATH_MAX]; + + mod_config()->cwd = getcwd(cwd, sizeof(cwd)); + if(mod_config()->cwd == nullptr) { + auto error = errno; + LOG(FATAL) << "failed to get current working directory: errno=" << error; + exit(EXIT_FAILURE); + } + std::vector > cmdcfgs; while(1) { static int flag = 0; @@ -1488,6 +1717,10 @@ int main(int argc, char **argv) event_loop(); + if(LOG_ENABLED(INFO)) { + LOG(INFO) << "Shutdown momentarily"; + } + return 0; } diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 8bafd537..6c041647 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -35,6 +35,7 @@ #include "shrpx_http2_downstream_connection.h" #include "shrpx_ssl.h" #include "shrpx_worker.h" +#include "shrpx_worker_config.h" #ifdef HAVE_SPDYLAY #include "shrpx_spdy_upstream.h" #endif // HAVE_SPDYLAY @@ -266,6 +267,12 @@ ClientHandler::~ClientHandler() --worker_stat_->num_connections; + // TODO If backend is http/2, and it is in CONNECTED state, signal + // it and make it loopbreak when output is zero. + if(worker_config.graceful_shutdown && worker_stat_->num_connections == 0) { + event_base_loopbreak(get_evbase()); + } + if(reneg_shutdown_timerev_) { event_free(reneg_shutdown_timerev_); } diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index a8166fb1..6500bf6e 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -200,6 +200,9 @@ FILE* open_file_for_write(const char *filename) LOG(ERROR) << "Failed to open " << filename << " for writing. Cause: " << strerror(errno); } + + evutil_make_socket_closeonexec(fileno(f)); + return f; } } // namespace diff --git a/src/shrpx_config.h b/src/shrpx_config.h index 59b907f1..52461399 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -206,6 +206,8 @@ struct Config { FILE *http2_upstream_dump_request_header; FILE *http2_upstream_dump_response_header; nghttp2_option *http2_option; + char **argv; + char *cwd; size_t downstream_addrlen; size_t num_worker; size_t http2_max_concurrent_streams; @@ -232,6 +234,7 @@ struct Config { shrpx_proto downstream_proto; int syslog_facility; int backlog; + int argc; uid_t uid; gid_t gid; uint16_t port; diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index 899dc6a9..54a10157 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -26,6 +26,7 @@ #include #include + #include #include @@ -197,8 +198,10 @@ int Http2Session::init_notification() SSLOG(FATAL, this) << "socketpair() failed: errno=" << errno; return -1; } - evutil_make_socket_nonblocking(sockpair[0]); - evutil_make_socket_nonblocking(sockpair[1]); + for(int i = 0; i < 2; ++i) { + evutil_make_socket_nonblocking(sockpair[i]); + evutil_make_socket_closeonexec(sockpair[i]); + } wrbev_ = bufferevent_socket_new(evbase_, sockpair[0], BEV_OPT_CLOSE_ON_FREE| BEV_OPT_DEFER_CALLBACKS); @@ -459,6 +462,13 @@ int Http2Session::initiate_connection() } // If state_ == PROXY_CONNECTED, we has connected to the proxy // using fd_ and tunnel has been established. + if(state_ == DISCONNECTED) { + assert(fd_ == -1); + + fd_ = socket(get_config()->downstream_addr.storage.ss_family, + SOCK_STREAM | SOCK_CLOEXEC, 0); + } + bev_ = bufferevent_openssl_socket_new(evbase_, fd_, ssl_, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_DEFER_CALLBACKS); @@ -471,33 +481,39 @@ int Http2Session::initiate_connection() // TODO maybe not thread-safe? const_cast(&get_config()->downstream_addr.sa), get_config()->downstream_addrlen); - } else if(state_ == DISCONNECTED) { - // Without TLS and proxy. - bev_ = bufferevent_socket_new(evbase_, -1, BEV_OPT_DEFER_CALLBACKS); - if(!bev_) { - SSLOG(ERROR, this) << "bufferevent_socket_new() failed"; - return SHRPX_ERR_NETWORK; - } - rv = bufferevent_socket_connect - (bev_, - const_cast(&get_config()->downstream_addr.sa), - get_config()->downstream_addrlen); } else { - assert(state_ == PROXY_CONNECTED); - // Without TLS but with proxy. + if(state_ == DISCONNECTED) { + // Without TLS and proxy. + assert(fd_ == -1); + + fd_ = socket(get_config()->downstream_addr.storage.ss_family, + SOCK_STREAM | SOCK_CLOEXEC, 0); + } + bev_ = bufferevent_socket_new(evbase_, fd_, BEV_OPT_DEFER_CALLBACKS); if(!bev_) { SSLOG(ERROR, this) << "bufferevent_socket_new() failed"; return SHRPX_ERR_NETWORK; } - // Connection already established. - eventcb(bev_, BEV_EVENT_CONNECTED, this); - // eventcb() has no return value. Check state_ to whether it was - // failed or not. + if(state_ == DISCONNECTED) { - return -1; + rv = bufferevent_socket_connect + (bev_, + const_cast(&get_config()->downstream_addr.sa), + get_config()->downstream_addrlen); + } else { + // Without TLS but with proxy. + + // Connection already established. + eventcb(bev_, BEV_EVENT_CONNECTED, this); + // eventcb() has no return value. Check state_ to whether it was + // failed or not. + if(state_ == DISCONNECTED) { + return -1; + } } } + if(rv != 0) { return SHRPX_ERR_NETWORK; } diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 592dabcf..3e73dc19 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -77,11 +77,17 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) auto upstream = downstream->get_upstream(); if(!bev_) { auto evbase = client_handler_->get_evbase(); + + auto fd = socket(get_config()->downstream_addr.storage.ss_family, + SOCK_STREAM | SOCK_CLOEXEC, 0); + bev_ = bufferevent_socket_new - (evbase, -1, + (evbase, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS); if(!bev_) { DCLOG(INFO, this) << "bufferevent_socket_new() failed"; + close(fd); + return SHRPX_ERR_NETWORK; } int rv = bufferevent_socket_connect diff --git a/src/shrpx_listen_handler.cc b/src/shrpx_listen_handler.cc index c6903122..4ce67eee 100644 --- a/src/shrpx_listen_handler.cc +++ b/src/shrpx_listen_handler.cc @@ -28,7 +28,6 @@ #include #include -#include #include @@ -36,6 +35,7 @@ #include "shrpx_thread_event_receiver.h" #include "shrpx_ssl.h" #include "shrpx_worker.h" +#include "shrpx_worker_config.h" #include "shrpx_config.h" #include "shrpx_http2_session.h" #include "util.h" @@ -51,8 +51,11 @@ ListenHandler::ListenHandler(event_base *evbase, SSL_CTX *sv_ssl_ctx, cl_ssl_ctx_(cl_ssl_ctx), rate_limit_group_(bufferevent_rate_limit_group_new (evbase, get_config()->worker_rate_limit_cfg)), + evlistener4_(nullptr), + evlistener6_(nullptr), worker_stat_(util::make_unique()), - worker_round_robin_cnt_(0) + worker_round_robin_cnt_(0), + num_worker_shutdown_(0) {} ListenHandler::~ListenHandler() @@ -68,48 +71,70 @@ void ListenHandler::worker_reopen_log_files() wev.type = REOPEN_LOG; for(auto& info : workers_) { - bufferevent_write(info.bev, &wev, sizeof(wev)); + bufferevent_write(info->bev, &wev, sizeof(wev)); } } +namespace { +void worker_writecb(bufferevent *bev, void *ptr) +{ + auto listener_handler = static_cast(ptr); + auto output = bufferevent_get_output(bev); + + if(!worker_config.graceful_shutdown || + evbuffer_get_length(output) != 0) { + return; + } + + // If graceful_shutdown is true and nothing left to send, we sent + // graceful shutdown event to worker successfully. The worker is + // now doing shutdown. + listener_handler->notify_worker_shutdown(); + + // Disable bev so that this won' be called accidentally in the + // future. + bufferevent_disable(bev, EV_READ | EV_WRITE); +} +} // namespace + void ListenHandler::create_worker_thread(size_t num) { workers_.resize(0); for(size_t i = 0; i < num; ++i) { int rv; - auto info = WorkerInfo(); - rv = socketpair(AF_UNIX, SOCK_STREAM, 0, info.sv); + auto info = util::make_unique(); + rv = socketpair(AF_UNIX, SOCK_STREAM, 0, info->sv); if(rv == -1) { LLOG(ERROR, this) << "socketpair() failed: errno=" << errno; continue; } - evutil_make_socket_nonblocking(info.sv[0]); - evutil_make_socket_nonblocking(info.sv[1]); - info.sv_ssl_ctx = sv_ssl_ctx_; - info.cl_ssl_ctx = cl_ssl_ctx_; - try { - auto thread = std::thread{start_threaded_worker, info}; - thread.detach(); - } catch(const std::system_error& error) { - LLOG(ERROR, this) << "Could not start thread: code=" << error.code() - << " msg=" << error.what(); - for(size_t j = 0; j < 2; ++j) { - close(info.sv[j]); - } - continue; + + for(int j = 0; j < 2; ++j) { + evutil_make_socket_nonblocking(info->sv[j]); + evutil_make_socket_closeonexec(info->sv[j]); } - auto bev = bufferevent_socket_new(evbase_, info.sv[0], + + info->sv_ssl_ctx = sv_ssl_ctx_; + info->cl_ssl_ctx = cl_ssl_ctx_; + + info->fut = std::async(std::launch::async, start_threaded_worker, + info.get()); + + auto bev = bufferevent_socket_new(evbase_, info->sv[0], BEV_OPT_DEFER_CALLBACKS); if(!bev) { LLOG(ERROR, this) << "bufferevent_socket_new() failed"; for(size_t j = 0; j < 2; ++j) { - close(info.sv[j]); + close(info->sv[j]); } continue; } - info.bev = bev; - workers_.push_back(info); + bufferevent_setcb(bev, nullptr, worker_writecb, nullptr, this); + + info->bev = bev; + + workers_.push_back(std::move(info)); if(LOG_ENABLED(INFO)) { LLOG(INFO, this) << "Created thread #" << workers_.size() - 1; @@ -117,12 +142,56 @@ void ListenHandler::create_worker_thread(size_t num) } } +void ListenHandler::join_worker() +{ + int n = 0; + + if(LOG_ENABLED(INFO)) { + LLOG(INFO, this) << "Waiting for worker thread to join: n=" + << workers_.size(); + } + + for(auto& worker : workers_) { + worker->fut.get(); + if(LOG_ENABLED(INFO)) { + LLOG(INFO, this) << "Thread #" << n << " joined"; + } + ++n; + } +} + +void ListenHandler::graceful_shutdown_worker() +{ + if(get_config()->num_worker == 1) { + return; + } + + for(auto& worker : workers_) { + WorkerEvent wev; + memset(&wev, 0, sizeof(wev)); + wev.type = GRACEFUL_SHUTDOWN; + + if(LOG_ENABLED(INFO)) { + LLOG(INFO, this) << "Sending graceful shutdown signal to worker"; + } + + auto output = bufferevent_get_output(worker->bev); + + if(evbuffer_add(output, &wev, sizeof(wev)) != 0) { + LLOG(FATAL, this) << "evbuffer_add() failed"; + } + } +} + int ListenHandler::accept_connection(evutil_socket_t fd, sockaddr *addr, int addrlen) { if(LOG_ENABLED(INFO)) { LLOG(INFO, this) << "Accepted connection. fd=" << fd; } + + evutil_make_socket_closeonexec(fd); + if(get_config()->num_worker == 1) { if(worker_stat_->num_connections >= @@ -158,7 +227,7 @@ int ListenHandler::accept_connection(evutil_socket_t fd, wev.client_fd = fd; memcpy(&wev.client_addr, addr, addrlen); wev.client_addrlen = addrlen; - auto output = bufferevent_get_output(workers_[idx].bev); + auto output = bufferevent_get_output(workers_[idx]->bev); if(evbuffer_add(output, &wev, sizeof(wev)) != 0) { LLOG(FATAL, this) << "evbuffer_add() failed"; close(fd); @@ -181,4 +250,92 @@ int ListenHandler::create_http2_session() return rv; } +const WorkerStat* ListenHandler::get_worker_stat() const +{ + return worker_stat_.get(); +} + +void ListenHandler::set_evlistener4(evconnlistener *evlistener4) +{ + evlistener4_ = evlistener4; +} + +evconnlistener* ListenHandler::get_evlistener4() const +{ + return evlistener4_; +} + +void ListenHandler::set_evlistener6(evconnlistener *evlistener6) +{ + evlistener6_ = evlistener6; +} + +evconnlistener* ListenHandler::get_evlistener6() const +{ + return evlistener6_; +} + +void ListenHandler::disable_evlistener() +{ + if(evlistener4_) { + evconnlistener_disable(evlistener4_); + } + + if(evlistener6_) { + evconnlistener_disable(evlistener6_); + } +} + +namespace { +void perform_accept_pending_connection(ListenHandler *listener_handler, + evconnlistener *listener) +{ + if(!listener) { + return; + } + + auto server_fd = evconnlistener_get_fd(listener); + + for(;;) { + sockaddr_union sockaddr; + socklen_t addrlen = sizeof(sockaddr); + + auto fd = accept(server_fd, &sockaddr.sa, &addrlen); + + if(fd == -1) { + if(errno == EINTR || + errno == ENETDOWN || + errno == EPROTO || + errno == ENOPROTOOPT || + errno == EHOSTDOWN || + errno == ENONET || + errno == EHOSTUNREACH || + errno == EOPNOTSUPP || + errno == ENETUNREACH) { + continue; + } + + return; + } + + evutil_make_socket_nonblocking(fd); + + listener_handler->accept_connection(fd, &sockaddr.sa, addrlen); + } +} +} // namespace + +void ListenHandler::accept_pending_connection() +{ + perform_accept_pending_connection(this, evlistener4_); + perform_accept_pending_connection(this, evlistener6_); +} + +void ListenHandler::notify_worker_shutdown() +{ + if(++num_worker_shutdown_ == workers_.size()) { + event_base_loopbreak(evbase_); + } +} + } // namespace shrpx diff --git a/src/shrpx_listen_handler.h b/src/shrpx_listen_handler.h index 97426e4d..8a1e7fb7 100644 --- a/src/shrpx_listen_handler.h +++ b/src/shrpx_listen_handler.h @@ -32,15 +32,18 @@ #include #include +#include #include #include #include +#include namespace shrpx { struct WorkerInfo { + std::future fut; SSL_CTX *sv_ssl_ctx; SSL_CTX *cl_ssl_ctx; bufferevent *bev; @@ -59,8 +62,18 @@ public: void worker_reopen_log_files(); event_base* get_evbase() const; int create_http2_session(); + const WorkerStat* get_worker_stat() const; + void set_evlistener4(evconnlistener *evlistener4); + evconnlistener* get_evlistener4() const; + void set_evlistener6(evconnlistener *evlistener6); + evconnlistener* get_evlistener6() const; + void disable_evlistener(); + void accept_pending_connection(); + void graceful_shutdown_worker(); + void join_worker(); + void notify_worker_shutdown(); private: - std::vector workers_; + std::vector> workers_; event_base *evbase_; // The frontend server SSL_CTX SSL_CTX *sv_ssl_ctx_; @@ -70,8 +83,11 @@ private: // multi-threaded case, see shrpx_worker.cc. std::unique_ptr http2session_; bufferevent_rate_limit_group *rate_limit_group_; + evconnlistener *evlistener4_; + evconnlistener *evlistener6_; std::unique_ptr worker_stat_; unsigned int worker_round_robin_cnt_; + int num_worker_shutdown_; }; } // namespace shrpx diff --git a/src/shrpx_thread_event_receiver.cc b/src/shrpx_thread_event_receiver.cc index 23c352c2..b1587c9e 100644 --- a/src/shrpx_thread_event_receiver.cc +++ b/src/shrpx_thread_event_receiver.cc @@ -81,6 +81,22 @@ void ThreadEventReceiver::on_read(bufferevent *bev) continue; } + if(wev.type == GRACEFUL_SHUTDOWN) { + if(LOG_ENABLED(INFO)) { + LOG(INFO) << "Graceful shutdown commencing"; + } + + worker_config.graceful_shutdown = true; + + if(worker_stat_->num_connections == 0) { + event_base_loopbreak(evbase_); + + break; + } + + continue; + } + if(LOG_ENABLED(INFO)) { TLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd << ", addrlen=" << wev.client_addrlen; diff --git a/src/shrpx_thread_event_receiver.h b/src/shrpx_thread_event_receiver.h index a5e02f5a..54809cef 100644 --- a/src/shrpx_thread_event_receiver.h +++ b/src/shrpx_thread_event_receiver.h @@ -42,7 +42,8 @@ struct WorkerStat; enum WorkerEventType { NEW_CONNECTION = 0x01, - REOPEN_LOG = 0x02 + REOPEN_LOG = 0x02, + GRACEFUL_SHUTDOWN = 0x03, }; struct WorkerEvent { diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 5219fa0a..982b947e 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -43,10 +43,10 @@ using namespace nghttp2; namespace shrpx { -Worker::Worker(const WorkerInfo& info) - : sv_ssl_ctx_(info.sv_ssl_ctx), - cl_ssl_ctx_(info.cl_ssl_ctx), - fd_(info.sv[1]) +Worker::Worker(const WorkerInfo *info) + : sv_ssl_ctx_(info->sv_ssl_ctx), + cl_ssl_ctx_(info->cl_ssl_ctx), + fd_(info->sv[1]) {} Worker::~Worker() @@ -108,7 +108,7 @@ void Worker::run() event_base_loop(evbase.get(), 0); } -void start_threaded_worker(WorkerInfo info) +void start_threaded_worker(WorkerInfo *info) { Worker worker(info); worker.run(); diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index a766e192..07084c63 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -42,7 +42,7 @@ struct WorkerStat { class Worker { public: - Worker(const WorkerInfo& info); + Worker(const WorkerInfo *info); ~Worker(); void run(); private: @@ -52,7 +52,7 @@ private: int fd_; }; -void start_threaded_worker(WorkerInfo info); +void start_threaded_worker(WorkerInfo *info); } // namespace shrpx diff --git a/src/shrpx_worker_config.cc b/src/shrpx_worker_config.cc index 6db91f04..866b5158 100644 --- a/src/shrpx_worker_config.cc +++ b/src/shrpx_worker_config.cc @@ -29,7 +29,8 @@ namespace shrpx { WorkerConfig::WorkerConfig() : accesslog_fd(-1), errorlog_fd(-1), - errorlog_tty(false) + errorlog_tty(false), + graceful_shutdown(false) {} #ifndef NOTHREADS diff --git a/src/shrpx_worker_config.h b/src/shrpx_worker_config.h index 02567d65..9a2643bd 100644 --- a/src/shrpx_worker_config.h +++ b/src/shrpx_worker_config.h @@ -34,6 +34,7 @@ struct WorkerConfig { int errorlog_fd; // true if errorlog_fd is referring to a terminal. bool errorlog_tty; + bool graceful_shutdown; WorkerConfig(); }; diff --git a/src/util.cc b/src/util.cc index 744f57b9..8f59a061 100644 --- a/src/util.cc +++ b/src/util.cc @@ -589,7 +589,7 @@ bool numeric_host(const char *hostname) int reopen_log_file(const char *path) { - auto fd = open(path, O_WRONLY | O_APPEND | O_CREAT, + auto fd = open(path, O_WRONLY | O_APPEND | O_CREAT | O_CLOEXEC, S_IRUSR | S_IWUSR | S_IRGRP); if(fd == -1) { @@ -616,6 +616,31 @@ std::string ascii_dump(const uint8_t *data, size_t len) return res; } +char* get_exec_path(int argc, char **const argv, const char *cwd) +{ + if(argc == 0 || cwd == nullptr) { + return nullptr; + } + + auto argv0 = argv[0]; + auto len = strlen(argv0); + + char *path; + + if(argv0[0] == '/') { + path = static_cast(malloc(len + 1)); + memcpy(path, argv0, len + 1); + } else { + auto cwdlen = strlen(cwd); + path = static_cast(malloc(len + 1 + cwdlen + 1)); + memcpy(path, cwd, cwdlen); + path[cwdlen] = '/'; + memcpy(path + cwdlen + 1, argv0, len + 1); + } + + return path; +} + } // namespace util } // namespace nghttp2 diff --git a/src/util.h b/src/util.h index c7cd501f..74b4d037 100644 --- a/src/util.h +++ b/src/util.h @@ -487,6 +487,14 @@ int reopen_log_file(const char *path); // characters are preserved. Other characters are replaced with ".". std::string ascii_dump(const uint8_t *data, size_t len); +// Returns absolute path of executable path. If argc == 0 or |cwd| is +// nullptr, this function returns nullptr. If argv[0] starts with +// '/', this function returns argv[0]. Oterwise return cwd + "/" + +// argv[0]. If non-null is returned, it is NULL-terminated string and +// dynamically allocated by malloc. The caller is responsible to free +// it. +char* get_exec_path(int argc, char **const argv, const char *cwd); + } // namespace util } // namespace nghttp2