Compare commits

...

1 Commits

Author SHA1 Message Date
Tatsuhiro Tsujikawa 8daf4575f5 [WIP] Gracefully shutdown old process with SIGUSR2 2017-02-09 22:51:17 +09:00
1 changed files with 128 additions and 4 deletions

View File

@ -120,6 +120,12 @@ constexpr auto ENV_UNIX_PATH = StringRef::from_lit("NGHTTP2_UNIX_PATH");
// descriptor. <PATH> is a path to UNIX domain socket. // descriptor. <PATH> is a path to UNIX domain socket.
constexpr auto ENV_ACCEPT_PREFIX = StringRef::from_lit("NGHTTPX_ACCEPT_"); constexpr auto ENV_ACCEPT_PREFIX = StringRef::from_lit("NGHTTPX_ACCEPT_");
// This environment variable contains writable end of pipe(2) created
// by parent process. The new process is expected to write single
// byte to this fd to notify the parent process that this process is
// ready to serve requests.
constexpr auto ENV_IPC_FD = StringRef::from_lit("NGHTTPX_IPC");
#ifndef _KERNEL_FASTOPEN #ifndef _KERNEL_FASTOPEN
#define _KERNEL_FASTOPEN #define _KERNEL_FASTOPEN
// conditional define for TCP_FASTOPEN mostly on ubuntu // conditional define for TCP_FASTOPEN mostly on ubuntu
@ -174,6 +180,8 @@ namespace {
void worker_process_child_cb(struct ev_loop *loop, ev_child *w, int revents); void worker_process_child_cb(struct ev_loop *loop, ev_child *w, int revents);
} // namespace } // namespace
class SyncExecHandler;
struct WorkerProcess { struct WorkerProcess {
WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd) WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd)
: loop(loop), worker_pid(worker_pid), ipc_fd(ipc_fd) { : loop(loop), worker_pid(worker_pid), ipc_fd(ipc_fd) {
@ -226,6 +234,7 @@ struct WorkerProcess {
struct ev_loop *loop; struct ev_loop *loop;
pid_t worker_pid; pid_t worker_pid;
int ipc_fd; int ipc_fd;
std::unique_ptr<SyncExecHandler> sync_exec_handler;
}; };
namespace { namespace {
@ -364,7 +373,75 @@ int save_pid() {
} // namespace } // namespace
namespace { namespace {
void exec_binary() { void sync_exec_notifycb(struct ev_loop *loop, ev_io *w, int revents);
} // namespace
class SyncExecHandler {
public:
SyncExecHandler(WorkerProcess *wp, struct ev_loop *loop, int fd)
: loop_(loop), fd_(fd) {
ev_io_init(&rev_, sync_exec_notifycb, fd, EV_READ);
rev_.data = wp;
ev_io_start(loop, &rev_);
}
~SyncExecHandler() {
ev_io_stop(loop_, &rev_);
close(fd_);
}
int on_read() {
ssize_t nread;
uint8_t b;
while ((nread = read(fd_, &b, 1)) == -1 && errno == EINTR)
;
if (nread == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 1;
}
LOG(ERROR) << "New process is not ready; probably aborted";
return -1;
}
LOG(NOTICE) << "New process is ready to serve request";
return 0;
}
private:
struct ev_loop *loop_;
ev_io rev_;
int fd_;
};
namespace {
void ipc_send(WorkerProcess *wp, uint8_t ipc_event);
} // namespace
namespace {
void sync_exec_notifycb(struct ev_loop *loop, ev_io *w, int revents) {
int rv;
auto wp = static_cast<WorkerProcess *>(w->data);
rv = wp->sync_exec_handler->on_read();
if (rv == 1) {
return;
}
wp->sync_exec_handler.reset();
if (rv == -1) {
return;
}
ipc_send(wp, SHRPX_IPC_GRACEFUL_SHUTDOWN);
}
} // namespace
namespace {
void exec_binary(WorkerProcess *wp) {
int rv; int rv;
sigset_t oldset; sigset_t oldset;
std::array<char, STRERROR_BUFSIZE> errbuf; std::array<char, STRERROR_BUFSIZE> errbuf;
@ -380,12 +457,29 @@ void exec_binary() {
return; return;
} }
std::array<int, 2> ipc_fds;
if (pipe(ipc_fds.data()) == -1) {
auto error = errno;
LOG(ERROR) << "Could not create IPC file descriptors: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
}
util::make_socket_closeonexec(ipc_fds[0]);
for (auto fd : ipc_fds) {
util::make_socket_nonblocking(fd);
}
auto pid = fork(); auto pid = fork();
if (pid != 0) { if (pid != 0) {
if (pid == -1) { if (pid == -1) {
auto error = errno; auto error = errno;
LOG(ERROR) << "fork() failed errno=" << error; LOG(ERROR) << "fork() failed errno=" << error;
} else {
close(ipc_fds[1]);
wp->sync_exec_handler =
make_unique<SyncExecHandler>(wp, wp->loop, ipc_fds[0]);
} }
rv = shrpx_signal_set(&oldset); rv = shrpx_signal_set(&oldset);
@ -436,7 +530,8 @@ void exec_binary() {
auto &listenerconf = get_config()->conn.listener; auto &listenerconf = get_config()->conn.listener;
auto envp = make_unique<char *[]>(envlen + listenerconf.addrs.size() + 1); // 2 for ENV_IPC_FD and terminal nullptr.
auto envp = make_unique<char *[]>(envlen + listenerconf.addrs.size() + 2);
size_t envidx = 0; size_t envidx = 0;
std::vector<ImmutableString> fd_envs; std::vector<ImmutableString> fd_envs;
@ -459,6 +554,11 @@ void exec_binary() {
envp[envidx++] = const_cast<char *>(fd_envs.back().c_str()); envp[envidx++] = const_cast<char *>(fd_envs.back().c_str());
} }
auto ipc_fd_str = ENV_IPC_FD.str();
ipc_fd_str += '=';
ipc_fd_str += util::utos(ipc_fds[1]);
envp[envidx++] = const_cast<char *>(ipc_fd_str.c_str());
for (size_t i = 0; i < envlen; ++i) { for (size_t i = 0; i < envlen; ++i) {
auto env = StringRef{environ[i]}; auto env = StringRef{environ[i]};
if (util::starts_with(env, ENV_ACCEPT_PREFIX) || if (util::starts_with(env, ENV_ACCEPT_PREFIX) ||
@ -466,7 +566,8 @@ void exec_binary() {
util::starts_with(env, ENV_LISTENER6_FD) || util::starts_with(env, ENV_LISTENER6_FD) ||
util::starts_with(env, ENV_PORT) || util::starts_with(env, ENV_PORT) ||
util::starts_with(env, ENV_UNIX_FD) || util::starts_with(env, ENV_UNIX_FD) ||
util::starts_with(env, ENV_UNIX_PATH)) { util::starts_with(env, ENV_UNIX_PATH) ||
util::starts_with(env, ENV_IPC_FD)) {
continue; continue;
} }
@ -541,7 +642,7 @@ void signal_cb(struct ev_loop *loop, ev_signal *w, int revents) {
reopen_log(wp); reopen_log(wp);
return; return;
case EXEC_BINARY_SIGNAL: case EXEC_BINARY_SIGNAL:
exec_binary(); exec_binary(wp);
return; return;
case GRACEFUL_SHUTDOWN_SIGNAL: case GRACEFUL_SHUTDOWN_SIGNAL:
ipc_send(wp, SHRPX_IPC_GRACEFUL_SHUTDOWN); ipc_send(wp, SHRPX_IPC_GRACEFUL_SHUTDOWN);
@ -1050,6 +1151,17 @@ void close_unused_inherited_addr(const std::vector<InheritedAddr> &iaddrs) {
} }
} // namespace } // namespace
namespace {
// Returns IPC file descriptor from environment variable ENV_IPC_FD.
int get_ipc_fd_from_env() {
auto s = getenv(ENV_IPC_FD.c_str());
if (s == nullptr) {
return -1;
}
return util::parse_uint(s);
}
} // namespace
namespace { namespace {
int create_acceptor_socket(Config *config, std::vector<InheritedAddr> &iaddrs) { int create_acceptor_socket(Config *config, std::vector<InheritedAddr> &iaddrs) {
std::array<char, STRERROR_BUFSIZE> errbuf; std::array<char, STRERROR_BUFSIZE> errbuf;
@ -1255,6 +1367,11 @@ int event_loop() {
close_unused_inherited_addr(iaddrs); close_unused_inherited_addr(iaddrs);
} }
auto ipc_notify_fd = get_ipc_fd_from_env();
if (ipc_notify_fd != -1) {
util::make_socket_closeonexec(ipc_notify_fd);
}
auto loop = ev_default_loop(config->ev_loop_flags); auto loop = ev_default_loop(config->ev_loop_flags);
int ipc_fd; int ipc_fd;
@ -1275,6 +1392,13 @@ int event_loop() {
save_pid(); save_pid();
} }
if (ipc_notify_fd != -1) {
LOG(NOTICE) << "Notify parent process that we are ready to server requests";
write(ipc_notify_fd, "\0", 1);
shutdown(ipc_notify_fd, SHUT_WR);
close(ipc_notify_fd);
}
ev_run(loop, 0); ev_run(loop, 0);
return 0; return 0;