nghttpx: Add hot deploy feature

nghttpx supports hot deploy feature using signals.  The host deploy in
nghttpx is multi step process.  First send USR2 signal to nghttpx
process.  It will do fork and execute new executable, using same
command-line arguments and environment variables.  At this point, both
current and new processes can accept requests.  To gracefully shutdown
current process, send QUIT signal to current nghttpx process.  When
all existing frontend connections are done, the current process will
exit.  At this point, only new nghttpx process exists and serves
incoming requests.
This commit is contained in:
Tatsuhiro Tsujikawa 2014-08-12 22:22:02 +09:00
parent 8aa6580d89
commit bf13d91264
17 changed files with 575 additions and 69 deletions

View File

@ -262,3 +262,16 @@ used in frontend, and host is replaced with which appears in
precedence. If the above conditions are not met with the host value
in :authority header field, rewrite is retried with the value in host
header field.
Hot deploy
----------
nghttpx supports hot deploy feature using signals. The host deploy in
nghttpx is multi step process. First send USR2 signal to nghttpx
process. It will do fork and execute new executable, using same
command-line arguments and environment variables. At this point, both
current and new processes can accept requests. To gracefully shutdown
current process, send QUIT signal to current nghttpx process. When
all existing frontend connections are done, the current process will
exit. At this point, only new nghttpx process exists and serves
incoming requests.

View File

@ -36,6 +36,7 @@
#include <getopt.h>
#include <syslog.h>
#include <signal.h>
#include <limits.h>
#include <limits>
#include <cstdlib>
@ -55,18 +56,32 @@
#include "shrpx_listen_handler.h"
#include "shrpx_ssl.h"
#include "shrpx_worker_config.h"
#include "shrpx_worker.h"
#include "util.h"
#include "app_helper.h"
#include "ssl.h"
extern char **environ;
using namespace nghttp2;
namespace shrpx {
namespace {
const int REOPEN_LOG_SIGNAL = SIGUSR1;
const int EXEC_BINARY_SIGNAL = SIGUSR2;
const int GRACEFUL_SHUTDOWN_SIGNAL = SIGQUIT;
} // namespace
// Environment variables to tell new binary the listening socket's
// file descriptors. They are not close-on-exec.
#define ENV_LISTENER4_FD "NGHTTPX_LISTENER4_FD"
#define ENV_LISTENER6_FD "NGHTTPX_LISTENER6_FD"
// Environment variable to tell new binary the port number the current
// binary is listening to.
#define ENV_PORT "NGHTTPX_PORT"
namespace {
void ssl_acceptcb(evconnlistener *listener, int fd,
sockaddr *addr, int addrlen, void *arg)
@ -139,9 +154,50 @@ void evlistener_errorcb(evconnlistener *listener, void *ptr)
}
} // namespace
namespace {
evconnlistener* new_evlistener(ListenHandler *handler, int fd)
{
auto evlistener = evconnlistener_new
(handler->get_evbase(),
ssl_acceptcb,
handler,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
get_config()->backlog,
fd);
evconnlistener_set_error_cb(evlistener, evlistener_errorcb);
return evlistener;
}
} // namespace
namespace {
evconnlistener* create_evlistener(ListenHandler *handler, int family)
{
{
auto envfd = getenv(family == AF_INET ?
ENV_LISTENER4_FD : ENV_LISTENER6_FD);
auto envport = getenv(ENV_PORT);
if(envfd && envport) {
auto fd = strtoul(envfd, nullptr, 10);
auto port = strtoul(envport, nullptr, 10);
// Only do this iff NGHTTPX_PORT == get_config()->port.
// Otherwise, close fd, and create server socket as usual.
if(port == get_config()->port) {
if(LOG_ENABLED(INFO)) {
LOG(INFO) << "Listening on port " << get_config()->port;
}
return new_evlistener(handler, fd);
}
LOG(WARNING) << "Port was changed between old binary (" << port
<< ") and new binary (" << get_config()->port << ")";
close(fd);
}
}
addrinfo hints;
int fd = -1;
int rv;
@ -222,15 +278,7 @@ evconnlistener* create_evlistener(ListenHandler *handler, int family)
LOG(INFO) << "Listening on " << host << ", port " << get_config()->port;
}
auto evlistener = evconnlistener_new
(handler->get_evbase(),
ssl_acceptcb,
handler,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
get_config()->backlog,
fd);
evconnlistener_set_error_cb(evlistener, evlistener_errorcb);
return evlistener;
return new_evlistener(handler, fd);
}
} // namespace
@ -285,6 +333,122 @@ void reopen_log_signal_cb(evutil_socket_t sig, short events, void *arg)
}
} // namespace
namespace {
void exec_binary_signal_cb(evutil_socket_t sig, short events, void *arg)
{
auto listener_handler = static_cast<ListenHandler*>(arg);
if(LOG_ENABLED(INFO)) {
LOG(INFO) << "Executing new binary";
}
auto pid = fork();
if(pid == -1) {
auto error = errno;
LOG(ERROR) << "fork() failed errno=" << error;
return;
}
if(pid != 0) {
return;
}
auto exec_path = util::get_exec_path(get_config()->argc,
get_config()->argv,
get_config()->cwd);
if(!exec_path) {
LOG(ERROR) << "Could not resolve the executable path";
return;
}
auto argv =
static_cast<char**>(malloc(sizeof(char*) * (get_config()->argc + 1)));
argv[0] = exec_path;
for(int i = 1; i < get_config()->argc; ++i) {
argv[i] = strdup(get_config()->argv[i]);
}
argv[get_config()->argc] = nullptr;
size_t envlen = 0;
for(char **p = environ; *p; ++p, ++envlen);
// 3 for missing fd4, fd6 and port.
auto envp = static_cast<char**>(malloc(sizeof(char*) * (envlen + 3 + 1)));
size_t envidx = 0;
auto evlistener4 = listener_handler->get_evlistener4();
if(evlistener4) {
std::string fd4 = ENV_LISTENER4_FD "=";
fd4 += util::utos(evconnlistener_get_fd(evlistener4));
envp[envidx++] = strdup(fd4.c_str());
}
auto evlistener6 = listener_handler->get_evlistener6();
if(evlistener6) {
std::string fd6 = ENV_LISTENER6_FD "=";
fd6 += util::utos(evconnlistener_get_fd(evlistener6));
envp[envidx++] = strdup(fd6.c_str());
}
std::string port = ENV_PORT "=";
port += util::utos(get_config()->port);
envp[envidx++] = strdup(port.c_str());
for(size_t i = 0; i < envlen; ++i) {
if(strcmp(ENV_LISTENER4_FD, environ[i]) == 0 ||
strcmp(ENV_LISTENER6_FD, environ[i]) == 0 ||
strcmp(ENV_PORT, environ[i]) == 0) {
continue;
}
envp[envidx++] = environ[i];
}
envp[envidx++] = nullptr;
if(LOG_ENABLED(INFO)) {
LOG(INFO) << "cmdline";
for(int i = 0; argv[i]; ++i) {
LOG(INFO) << i << ": " << argv[i];
}
LOG(INFO) << "environ";
for(int i = 0; envp[i]; ++i) {
LOG(INFO) << i << ": " << envp[i];
}
}
if(execve(argv[0], argv, envp) == -1) {
auto error = errno;
LOG(ERROR) << "execve failed: errno=" << error;
exit(1);
}
}
} // namespace
namespace {
void graceful_shutdown_signal_cb(evutil_socket_t sig, short events, void *arg)
{
auto listener_handler = static_cast<ListenHandler*>(arg);
if(LOG_ENABLED(INFO)) {
LOG(INFO) << "Graceful shutdown signal received";
}
listener_handler->disable_evlistener();
// After disabling accepting new connection, disptach incoming
// connection in backlog.
listener_handler->accept_pending_connection();
worker_config.graceful_shutdown = true;
listener_handler->graceful_shutdown_worker();
}
} // namespace
namespace {
std::unique_ptr<std::string> generate_time()
{
@ -310,7 +474,18 @@ std::unique_ptr<std::string> generate_time()
namespace {
void refresh_cb(evutil_socket_t sig, short events, void *arg)
{
auto listener_handler = static_cast<ListenHandler*>(arg);
auto worker_stat = listener_handler->get_worker_stat();
mod_config()->cached_time = generate_time();
// In multi threaded mode (get_config()->num_worker > 1), we have to
// wait for event notification to workers to finish.
if(get_config()->num_worker == 1 &&
worker_config.graceful_shutdown &&
(!worker_stat || worker_stat->num_connections == 0)) {
event_base_loopbreak(listener_handler->get_evbase());
}
}
} // namespace
@ -358,6 +533,9 @@ int event_loop()
exit(EXIT_FAILURE);
}
listener_handler->set_evlistener4(evlistener4);
listener_handler->set_evlistener6(evlistener6);
// ListenHandler loads private key, and we listen on a priveleged port.
// After that, we drop the root privileges if needed.
drop_privileges();
@ -366,10 +544,11 @@ int event_loop()
sigset_t signals;
sigemptyset(&signals);
sigaddset(&signals, REOPEN_LOG_SIGNAL);
sigaddset(&signals, EXEC_BINARY_SIGNAL);
sigaddset(&signals, GRACEFUL_SHUTDOWN_SIGNAL);
rv = pthread_sigmask(SIG_BLOCK, &signals, nullptr);
if(rv != 0) {
LOG(ERROR) << "Blocking REOPEN_LOG_SIGNAL failed: " << strerror(rv);
LOG(ERROR) << "Blocking signals failed: " << strerror(rv);
}
#endif // !NOTHREADS
@ -382,7 +561,7 @@ int event_loop()
#ifndef NOTHREADS
rv = pthread_sigmask(SIG_UNBLOCK, &signals, nullptr);
if(rv != 0) {
LOG(ERROR) << "Unblocking REOPEN_LOG_SIGNAL failed: " << strerror(rv);
LOG(ERROR) << "Unblocking signals failed: " << strerror(rv);
}
#endif // !NOTHREADS
@ -399,8 +578,31 @@ int event_loop()
}
}
auto exec_binary_signal_event = evsignal_new(evbase, EXEC_BINARY_SIGNAL,
exec_binary_signal_cb,
listener_handler);
rv = event_add(exec_binary_signal_event, nullptr);
if(rv == -1) {
LOG(FATAL) << "event_add for exec_binary_signal_event failed";
exit(EXIT_FAILURE);
}
auto graceful_shutdown_signal_event = evsignal_new
(evbase, GRACEFUL_SHUTDOWN_SIGNAL, graceful_shutdown_signal_cb,
listener_handler);
rv = event_add(graceful_shutdown_signal_event, nullptr);
if(rv == -1) {
LOG(FATAL) << "event_add for graceful_shutdown_signal_event failed";
exit(EXIT_FAILURE);
}
auto refresh_event = event_new(evbase, -1, EV_PERSIST, refresh_cb,
nullptr);
listener_handler);
if(!refresh_event) {
LOG(ERROR) << "event_new failed";
@ -422,9 +624,17 @@ int event_loop()
}
event_base_loop(evbase, 0);
listener_handler->join_worker();
if(refresh_event) {
event_free(refresh_event);
}
if(graceful_shutdown_signal_event) {
event_free(graceful_shutdown_signal_event);
}
if(exec_binary_signal_event) {
event_free(exec_binary_signal_event);
}
if(reopen_log_signal_event) {
event_free(reopen_log_signal_event);
}
@ -570,6 +780,8 @@ void fill_default_config()
mod_config()->tls_proto_mask = 0;
mod_config()->cached_time = generate_time();
mod_config()->no_location_rewrite = false;
mod_config()->argc = 0;
mod_config()->argv = nullptr;
}
} // namespace
@ -933,6 +1145,23 @@ int main(int argc, char **argv)
create_config();
fill_default_config();
// We have to copy argv, since getopt_long may change its content.
mod_config()->argc = argc;
mod_config()->argv = new char*[argc];
for(int i = 0; i < argc; ++i) {
mod_config()->argv[i] = strdup(argv[i]);
}
char cwd[PATH_MAX];
mod_config()->cwd = getcwd(cwd, sizeof(cwd));
if(mod_config()->cwd == nullptr) {
auto error = errno;
LOG(FATAL) << "failed to get current working directory: errno=" << error;
exit(EXIT_FAILURE);
}
std::vector<std::pair<const char*, const char*> > cmdcfgs;
while(1) {
static int flag = 0;
@ -1488,6 +1717,10 @@ int main(int argc, char **argv)
event_loop();
if(LOG_ENABLED(INFO)) {
LOG(INFO) << "Shutdown momentarily";
}
return 0;
}

View File

@ -35,6 +35,7 @@
#include "shrpx_http2_downstream_connection.h"
#include "shrpx_ssl.h"
#include "shrpx_worker.h"
#include "shrpx_worker_config.h"
#ifdef HAVE_SPDYLAY
#include "shrpx_spdy_upstream.h"
#endif // HAVE_SPDYLAY
@ -266,6 +267,12 @@ ClientHandler::~ClientHandler()
--worker_stat_->num_connections;
// TODO If backend is http/2, and it is in CONNECTED state, signal
// it and make it loopbreak when output is zero.
if(worker_config.graceful_shutdown && worker_stat_->num_connections == 0) {
event_base_loopbreak(get_evbase());
}
if(reneg_shutdown_timerev_) {
event_free(reneg_shutdown_timerev_);
}

View File

@ -200,6 +200,9 @@ FILE* open_file_for_write(const char *filename)
LOG(ERROR) << "Failed to open " << filename << " for writing. Cause: "
<< strerror(errno);
}
evutil_make_socket_closeonexec(fileno(f));
return f;
}
} // namespace

View File

@ -206,6 +206,8 @@ struct Config {
FILE *http2_upstream_dump_request_header;
FILE *http2_upstream_dump_response_header;
nghttp2_option *http2_option;
char **argv;
char *cwd;
size_t downstream_addrlen;
size_t num_worker;
size_t http2_max_concurrent_streams;
@ -232,6 +234,7 @@ struct Config {
shrpx_proto downstream_proto;
int syslog_facility;
int backlog;
int argc;
uid_t uid;
gid_t gid;
uint16_t port;

View File

@ -26,6 +26,7 @@
#include <netinet/tcp.h>
#include <unistd.h>
#include <vector>
#include <openssl/err.h>
@ -197,8 +198,10 @@ int Http2Session::init_notification()
SSLOG(FATAL, this) << "socketpair() failed: errno=" << errno;
return -1;
}
evutil_make_socket_nonblocking(sockpair[0]);
evutil_make_socket_nonblocking(sockpair[1]);
for(int i = 0; i < 2; ++i) {
evutil_make_socket_nonblocking(sockpair[i]);
evutil_make_socket_closeonexec(sockpair[i]);
}
wrbev_ = bufferevent_socket_new(evbase_, sockpair[0],
BEV_OPT_CLOSE_ON_FREE|
BEV_OPT_DEFER_CALLBACKS);
@ -459,6 +462,13 @@ int Http2Session::initiate_connection()
}
// If state_ == PROXY_CONNECTED, we has connected to the proxy
// using fd_ and tunnel has been established.
if(state_ == DISCONNECTED) {
assert(fd_ == -1);
fd_ = socket(get_config()->downstream_addr.storage.ss_family,
SOCK_STREAM | SOCK_CLOEXEC, 0);
}
bev_ = bufferevent_openssl_socket_new(evbase_, fd_, ssl_,
BUFFEREVENT_SSL_CONNECTING,
BEV_OPT_DEFER_CALLBACKS);
@ -471,25 +481,29 @@ int Http2Session::initiate_connection()
// TODO maybe not thread-safe?
const_cast<sockaddr*>(&get_config()->downstream_addr.sa),
get_config()->downstream_addrlen);
} else if(state_ == DISCONNECTED) {
// Without TLS and proxy.
bev_ = bufferevent_socket_new(evbase_, -1, BEV_OPT_DEFER_CALLBACKS);
if(!bev_) {
SSLOG(ERROR, this) << "bufferevent_socket_new() failed";
return SHRPX_ERR_NETWORK;
}
rv = bufferevent_socket_connect
(bev_,
const_cast<sockaddr*>(&get_config()->downstream_addr.sa),
get_config()->downstream_addrlen);
} else {
assert(state_ == PROXY_CONNECTED);
// Without TLS but with proxy.
if(state_ == DISCONNECTED) {
// Without TLS and proxy.
assert(fd_ == -1);
fd_ = socket(get_config()->downstream_addr.storage.ss_family,
SOCK_STREAM | SOCK_CLOEXEC, 0);
}
bev_ = bufferevent_socket_new(evbase_, fd_, BEV_OPT_DEFER_CALLBACKS);
if(!bev_) {
SSLOG(ERROR, this) << "bufferevent_socket_new() failed";
return SHRPX_ERR_NETWORK;
}
if(state_ == DISCONNECTED) {
rv = bufferevent_socket_connect
(bev_,
const_cast<sockaddr*>(&get_config()->downstream_addr.sa),
get_config()->downstream_addrlen);
} else {
// Without TLS but with proxy.
// Connection already established.
eventcb(bev_, BEV_EVENT_CONNECTED, this);
// eventcb() has no return value. Check state_ to whether it was
@ -498,6 +512,8 @@ int Http2Session::initiate_connection()
return -1;
}
}
}
if(rv != 0) {
return SHRPX_ERR_NETWORK;
}

View File

@ -77,11 +77,17 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream)
auto upstream = downstream->get_upstream();
if(!bev_) {
auto evbase = client_handler_->get_evbase();
auto fd = socket(get_config()->downstream_addr.storage.ss_family,
SOCK_STREAM | SOCK_CLOEXEC, 0);
bev_ = bufferevent_socket_new
(evbase, -1,
(evbase, fd,
BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
if(!bev_) {
DCLOG(INFO, this) << "bufferevent_socket_new() failed";
close(fd);
return SHRPX_ERR_NETWORK;
}
int rv = bufferevent_socket_connect

View File

@ -28,7 +28,6 @@
#include <cerrno>
#include <thread>
#include <system_error>
#include <event2/bufferevent_ssl.h>
@ -36,6 +35,7 @@
#include "shrpx_thread_event_receiver.h"
#include "shrpx_ssl.h"
#include "shrpx_worker.h"
#include "shrpx_worker_config.h"
#include "shrpx_config.h"
#include "shrpx_http2_session.h"
#include "util.h"
@ -51,8 +51,11 @@ ListenHandler::ListenHandler(event_base *evbase, SSL_CTX *sv_ssl_ctx,
cl_ssl_ctx_(cl_ssl_ctx),
rate_limit_group_(bufferevent_rate_limit_group_new
(evbase, get_config()->worker_rate_limit_cfg)),
evlistener4_(nullptr),
evlistener6_(nullptr),
worker_stat_(util::make_unique<WorkerStat>()),
worker_round_robin_cnt_(0)
worker_round_robin_cnt_(0),
num_worker_shutdown_(0)
{}
ListenHandler::~ListenHandler()
@ -68,48 +71,70 @@ void ListenHandler::worker_reopen_log_files()
wev.type = REOPEN_LOG;
for(auto& info : workers_) {
bufferevent_write(info.bev, &wev, sizeof(wev));
bufferevent_write(info->bev, &wev, sizeof(wev));
}
}
namespace {
void worker_writecb(bufferevent *bev, void *ptr)
{
auto listener_handler = static_cast<ListenHandler*>(ptr);
auto output = bufferevent_get_output(bev);
if(!worker_config.graceful_shutdown ||
evbuffer_get_length(output) != 0) {
return;
}
// If graceful_shutdown is true and nothing left to send, we sent
// graceful shutdown event to worker successfully. The worker is
// now doing shutdown.
listener_handler->notify_worker_shutdown();
// Disable bev so that this won' be called accidentally in the
// future.
bufferevent_disable(bev, EV_READ | EV_WRITE);
}
} // namespace
void ListenHandler::create_worker_thread(size_t num)
{
workers_.resize(0);
for(size_t i = 0; i < num; ++i) {
int rv;
auto info = WorkerInfo();
rv = socketpair(AF_UNIX, SOCK_STREAM, 0, info.sv);
auto info = util::make_unique<WorkerInfo>();
rv = socketpair(AF_UNIX, SOCK_STREAM, 0, info->sv);
if(rv == -1) {
LLOG(ERROR, this) << "socketpair() failed: errno=" << errno;
continue;
}
evutil_make_socket_nonblocking(info.sv[0]);
evutil_make_socket_nonblocking(info.sv[1]);
info.sv_ssl_ctx = sv_ssl_ctx_;
info.cl_ssl_ctx = cl_ssl_ctx_;
try {
auto thread = std::thread{start_threaded_worker, info};
thread.detach();
} catch(const std::system_error& error) {
LLOG(ERROR, this) << "Could not start thread: code=" << error.code()
<< " msg=" << error.what();
for(size_t j = 0; j < 2; ++j) {
close(info.sv[j]);
for(int j = 0; j < 2; ++j) {
evutil_make_socket_nonblocking(info->sv[j]);
evutil_make_socket_closeonexec(info->sv[j]);
}
continue;
}
auto bev = bufferevent_socket_new(evbase_, info.sv[0],
info->sv_ssl_ctx = sv_ssl_ctx_;
info->cl_ssl_ctx = cl_ssl_ctx_;
info->fut = std::async(std::launch::async, start_threaded_worker,
info.get());
auto bev = bufferevent_socket_new(evbase_, info->sv[0],
BEV_OPT_DEFER_CALLBACKS);
if(!bev) {
LLOG(ERROR, this) << "bufferevent_socket_new() failed";
for(size_t j = 0; j < 2; ++j) {
close(info.sv[j]);
close(info->sv[j]);
}
continue;
}
info.bev = bev;
workers_.push_back(info);
bufferevent_setcb(bev, nullptr, worker_writecb, nullptr, this);
info->bev = bev;
workers_.push_back(std::move(info));
if(LOG_ENABLED(INFO)) {
LLOG(INFO, this) << "Created thread #" << workers_.size() - 1;
@ -117,12 +142,56 @@ void ListenHandler::create_worker_thread(size_t num)
}
}
void ListenHandler::join_worker()
{
int n = 0;
if(LOG_ENABLED(INFO)) {
LLOG(INFO, this) << "Waiting for worker thread to join: n="
<< workers_.size();
}
for(auto& worker : workers_) {
worker->fut.get();
if(LOG_ENABLED(INFO)) {
LLOG(INFO, this) << "Thread #" << n << " joined";
}
++n;
}
}
void ListenHandler::graceful_shutdown_worker()
{
if(get_config()->num_worker == 1) {
return;
}
for(auto& worker : workers_) {
WorkerEvent wev;
memset(&wev, 0, sizeof(wev));
wev.type = GRACEFUL_SHUTDOWN;
if(LOG_ENABLED(INFO)) {
LLOG(INFO, this) << "Sending graceful shutdown signal to worker";
}
auto output = bufferevent_get_output(worker->bev);
if(evbuffer_add(output, &wev, sizeof(wev)) != 0) {
LLOG(FATAL, this) << "evbuffer_add() failed";
}
}
}
int ListenHandler::accept_connection(evutil_socket_t fd,
sockaddr *addr, int addrlen)
{
if(LOG_ENABLED(INFO)) {
LLOG(INFO, this) << "Accepted connection. fd=" << fd;
}
evutil_make_socket_closeonexec(fd);
if(get_config()->num_worker == 1) {
if(worker_stat_->num_connections >=
@ -158,7 +227,7 @@ int ListenHandler::accept_connection(evutil_socket_t fd,
wev.client_fd = fd;
memcpy(&wev.client_addr, addr, addrlen);
wev.client_addrlen = addrlen;
auto output = bufferevent_get_output(workers_[idx].bev);
auto output = bufferevent_get_output(workers_[idx]->bev);
if(evbuffer_add(output, &wev, sizeof(wev)) != 0) {
LLOG(FATAL, this) << "evbuffer_add() failed";
close(fd);
@ -181,4 +250,92 @@ int ListenHandler::create_http2_session()
return rv;
}
const WorkerStat* ListenHandler::get_worker_stat() const
{
return worker_stat_.get();
}
void ListenHandler::set_evlistener4(evconnlistener *evlistener4)
{
evlistener4_ = evlistener4;
}
evconnlistener* ListenHandler::get_evlistener4() const
{
return evlistener4_;
}
void ListenHandler::set_evlistener6(evconnlistener *evlistener6)
{
evlistener6_ = evlistener6;
}
evconnlistener* ListenHandler::get_evlistener6() const
{
return evlistener6_;
}
void ListenHandler::disable_evlistener()
{
if(evlistener4_) {
evconnlistener_disable(evlistener4_);
}
if(evlistener6_) {
evconnlistener_disable(evlistener6_);
}
}
namespace {
void perform_accept_pending_connection(ListenHandler *listener_handler,
evconnlistener *listener)
{
if(!listener) {
return;
}
auto server_fd = evconnlistener_get_fd(listener);
for(;;) {
sockaddr_union sockaddr;
socklen_t addrlen = sizeof(sockaddr);
auto fd = accept(server_fd, &sockaddr.sa, &addrlen);
if(fd == -1) {
if(errno == EINTR ||
errno == ENETDOWN ||
errno == EPROTO ||
errno == ENOPROTOOPT ||
errno == EHOSTDOWN ||
errno == ENONET ||
errno == EHOSTUNREACH ||
errno == EOPNOTSUPP ||
errno == ENETUNREACH) {
continue;
}
return;
}
evutil_make_socket_nonblocking(fd);
listener_handler->accept_connection(fd, &sockaddr.sa, addrlen);
}
}
} // namespace
void ListenHandler::accept_pending_connection()
{
perform_accept_pending_connection(this, evlistener4_);
perform_accept_pending_connection(this, evlistener6_);
}
void ListenHandler::notify_worker_shutdown()
{
if(++num_worker_shutdown_ == workers_.size()) {
event_base_loopbreak(evbase_);
}
}
} // namespace shrpx

View File

@ -32,15 +32,18 @@
#include <memory>
#include <vector>
#include <future>
#include <openssl/ssl.h>
#include <event.h>
#include <event2/bufferevent.h>
#include <event2/listener.h>
namespace shrpx {
struct WorkerInfo {
std::future<void> fut;
SSL_CTX *sv_ssl_ctx;
SSL_CTX *cl_ssl_ctx;
bufferevent *bev;
@ -59,8 +62,18 @@ public:
void worker_reopen_log_files();
event_base* get_evbase() const;
int create_http2_session();
const WorkerStat* get_worker_stat() const;
void set_evlistener4(evconnlistener *evlistener4);
evconnlistener* get_evlistener4() const;
void set_evlistener6(evconnlistener *evlistener6);
evconnlistener* get_evlistener6() const;
void disable_evlistener();
void accept_pending_connection();
void graceful_shutdown_worker();
void join_worker();
void notify_worker_shutdown();
private:
std::vector<WorkerInfo> workers_;
std::vector<std::unique_ptr<WorkerInfo>> workers_;
event_base *evbase_;
// The frontend server SSL_CTX
SSL_CTX *sv_ssl_ctx_;
@ -70,8 +83,11 @@ private:
// multi-threaded case, see shrpx_worker.cc.
std::unique_ptr<Http2Session> http2session_;
bufferevent_rate_limit_group *rate_limit_group_;
evconnlistener *evlistener4_;
evconnlistener *evlistener6_;
std::unique_ptr<WorkerStat> worker_stat_;
unsigned int worker_round_robin_cnt_;
int num_worker_shutdown_;
};
} // namespace shrpx

View File

@ -81,6 +81,22 @@ void ThreadEventReceiver::on_read(bufferevent *bev)
continue;
}
if(wev.type == GRACEFUL_SHUTDOWN) {
if(LOG_ENABLED(INFO)) {
LOG(INFO) << "Graceful shutdown commencing";
}
worker_config.graceful_shutdown = true;
if(worker_stat_->num_connections == 0) {
event_base_loopbreak(evbase_);
break;
}
continue;
}
if(LOG_ENABLED(INFO)) {
TLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd
<< ", addrlen=" << wev.client_addrlen;

View File

@ -42,7 +42,8 @@ struct WorkerStat;
enum WorkerEventType {
NEW_CONNECTION = 0x01,
REOPEN_LOG = 0x02
REOPEN_LOG = 0x02,
GRACEFUL_SHUTDOWN = 0x03,
};
struct WorkerEvent {

View File

@ -43,10 +43,10 @@ using namespace nghttp2;
namespace shrpx {
Worker::Worker(const WorkerInfo& info)
: sv_ssl_ctx_(info.sv_ssl_ctx),
cl_ssl_ctx_(info.cl_ssl_ctx),
fd_(info.sv[1])
Worker::Worker(const WorkerInfo *info)
: sv_ssl_ctx_(info->sv_ssl_ctx),
cl_ssl_ctx_(info->cl_ssl_ctx),
fd_(info->sv[1])
{}
Worker::~Worker()
@ -108,7 +108,7 @@ void Worker::run()
event_base_loop(evbase.get(), 0);
}
void start_threaded_worker(WorkerInfo info)
void start_threaded_worker(WorkerInfo *info)
{
Worker worker(info);
worker.run();

View File

@ -42,7 +42,7 @@ struct WorkerStat {
class Worker {
public:
Worker(const WorkerInfo& info);
Worker(const WorkerInfo *info);
~Worker();
void run();
private:
@ -52,7 +52,7 @@ private:
int fd_;
};
void start_threaded_worker(WorkerInfo info);
void start_threaded_worker(WorkerInfo *info);
} // namespace shrpx

View File

@ -29,7 +29,8 @@ namespace shrpx {
WorkerConfig::WorkerConfig()
: accesslog_fd(-1),
errorlog_fd(-1),
errorlog_tty(false)
errorlog_tty(false),
graceful_shutdown(false)
{}
#ifndef NOTHREADS

View File

@ -34,6 +34,7 @@ struct WorkerConfig {
int errorlog_fd;
// true if errorlog_fd is referring to a terminal.
bool errorlog_tty;
bool graceful_shutdown;
WorkerConfig();
};

View File

@ -589,7 +589,7 @@ bool numeric_host(const char *hostname)
int reopen_log_file(const char *path)
{
auto fd = open(path, O_WRONLY | O_APPEND | O_CREAT,
auto fd = open(path, O_WRONLY | O_APPEND | O_CREAT | O_CLOEXEC,
S_IRUSR | S_IWUSR | S_IRGRP);
if(fd == -1) {
@ -616,6 +616,31 @@ std::string ascii_dump(const uint8_t *data, size_t len)
return res;
}
char* get_exec_path(int argc, char **const argv, const char *cwd)
{
if(argc == 0 || cwd == nullptr) {
return nullptr;
}
auto argv0 = argv[0];
auto len = strlen(argv0);
char *path;
if(argv0[0] == '/') {
path = static_cast<char*>(malloc(len + 1));
memcpy(path, argv0, len + 1);
} else {
auto cwdlen = strlen(cwd);
path = static_cast<char*>(malloc(len + 1 + cwdlen + 1));
memcpy(path, cwd, cwdlen);
path[cwdlen] = '/';
memcpy(path + cwdlen + 1, argv0, len + 1);
}
return path;
}
} // namespace util
} // namespace nghttp2

View File

@ -487,6 +487,14 @@ int reopen_log_file(const char *path);
// characters are preserved. Other characters are replaced with ".".
std::string ascii_dump(const uint8_t *data, size_t len);
// Returns absolute path of executable path. If argc == 0 or |cwd| is
// nullptr, this function returns nullptr. If argv[0] starts with
// '/', this function returns argv[0]. Oterwise return cwd + "/" +
// argv[0]. If non-null is returned, it is NULL-terminated string and
// dynamically allocated by malloc. The caller is responsible to free
// it.
char* get_exec_path(int argc, char **const argv, const char *cwd);
} // namespace util
} // namespace nghttp2