[WIP] Gracefully shutdown old process with SIGUSR2
This commit is contained in:
parent
8f888b29bd
commit
8daf4575f5
132
src/shrpx.cc
132
src/shrpx.cc
|
@ -120,6 +120,12 @@ constexpr auto ENV_UNIX_PATH = StringRef::from_lit("NGHTTP2_UNIX_PATH");
|
|||
// descriptor. <PATH> is a path to UNIX domain socket.
|
||||
constexpr auto ENV_ACCEPT_PREFIX = StringRef::from_lit("NGHTTPX_ACCEPT_");
|
||||
|
||||
// This environment variable contains writable end of pipe(2) created
|
||||
// by parent process. The new process is expected to write single
|
||||
// byte to this fd to notify the parent process that this process is
|
||||
// ready to serve requests.
|
||||
constexpr auto ENV_IPC_FD = StringRef::from_lit("NGHTTPX_IPC");
|
||||
|
||||
#ifndef _KERNEL_FASTOPEN
|
||||
#define _KERNEL_FASTOPEN
|
||||
// conditional define for TCP_FASTOPEN mostly on ubuntu
|
||||
|
@ -174,6 +180,8 @@ namespace {
|
|||
void worker_process_child_cb(struct ev_loop *loop, ev_child *w, int revents);
|
||||
} // namespace
|
||||
|
||||
class SyncExecHandler;
|
||||
|
||||
struct WorkerProcess {
|
||||
WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd)
|
||||
: loop(loop), worker_pid(worker_pid), ipc_fd(ipc_fd) {
|
||||
|
@ -226,6 +234,7 @@ struct WorkerProcess {
|
|||
struct ev_loop *loop;
|
||||
pid_t worker_pid;
|
||||
int ipc_fd;
|
||||
std::unique_ptr<SyncExecHandler> sync_exec_handler;
|
||||
};
|
||||
|
||||
namespace {
|
||||
|
@ -364,7 +373,75 @@ int save_pid() {
|
|||
} // namespace
|
||||
|
||||
namespace {
|
||||
void exec_binary() {
|
||||
void sync_exec_notifycb(struct ev_loop *loop, ev_io *w, int revents);
|
||||
} // namespace
|
||||
|
||||
class SyncExecHandler {
|
||||
public:
|
||||
SyncExecHandler(WorkerProcess *wp, struct ev_loop *loop, int fd)
|
||||
: loop_(loop), fd_(fd) {
|
||||
ev_io_init(&rev_, sync_exec_notifycb, fd, EV_READ);
|
||||
rev_.data = wp;
|
||||
|
||||
ev_io_start(loop, &rev_);
|
||||
}
|
||||
|
||||
~SyncExecHandler() {
|
||||
ev_io_stop(loop_, &rev_);
|
||||
close(fd_);
|
||||
}
|
||||
|
||||
int on_read() {
|
||||
ssize_t nread;
|
||||
uint8_t b;
|
||||
while ((nread = read(fd_, &b, 1)) == -1 && errno == EINTR)
|
||||
;
|
||||
if (nread == -1) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
LOG(ERROR) << "New process is not ready; probably aborted";
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
LOG(NOTICE) << "New process is ready to serve request";
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
private:
|
||||
struct ev_loop *loop_;
|
||||
ev_io rev_;
|
||||
int fd_;
|
||||
};
|
||||
|
||||
namespace {
|
||||
void ipc_send(WorkerProcess *wp, uint8_t ipc_event);
|
||||
} // namespace
|
||||
|
||||
namespace {
|
||||
void sync_exec_notifycb(struct ev_loop *loop, ev_io *w, int revents) {
|
||||
int rv;
|
||||
auto wp = static_cast<WorkerProcess *>(w->data);
|
||||
|
||||
rv = wp->sync_exec_handler->on_read();
|
||||
if (rv == 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
wp->sync_exec_handler.reset();
|
||||
if (rv == -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
ipc_send(wp, SHRPX_IPC_GRACEFUL_SHUTDOWN);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
namespace {
|
||||
void exec_binary(WorkerProcess *wp) {
|
||||
int rv;
|
||||
sigset_t oldset;
|
||||
std::array<char, STRERROR_BUFSIZE> errbuf;
|
||||
|
@ -380,12 +457,29 @@ void exec_binary() {
|
|||
return;
|
||||
}
|
||||
|
||||
std::array<int, 2> ipc_fds;
|
||||
if (pipe(ipc_fds.data()) == -1) {
|
||||
auto error = errno;
|
||||
LOG(ERROR) << "Could not create IPC file descriptors: "
|
||||
<< xsi_strerror(error, errbuf.data(), errbuf.size());
|
||||
}
|
||||
|
||||
util::make_socket_closeonexec(ipc_fds[0]);
|
||||
for (auto fd : ipc_fds) {
|
||||
util::make_socket_nonblocking(fd);
|
||||
}
|
||||
|
||||
auto pid = fork();
|
||||
|
||||
if (pid != 0) {
|
||||
if (pid == -1) {
|
||||
auto error = errno;
|
||||
LOG(ERROR) << "fork() failed errno=" << error;
|
||||
} else {
|
||||
close(ipc_fds[1]);
|
||||
|
||||
wp->sync_exec_handler =
|
||||
make_unique<SyncExecHandler>(wp, wp->loop, ipc_fds[0]);
|
||||
}
|
||||
|
||||
rv = shrpx_signal_set(&oldset);
|
||||
|
@ -436,7 +530,8 @@ void exec_binary() {
|
|||
|
||||
auto &listenerconf = get_config()->conn.listener;
|
||||
|
||||
auto envp = make_unique<char *[]>(envlen + listenerconf.addrs.size() + 1);
|
||||
// 2 for ENV_IPC_FD and terminal nullptr.
|
||||
auto envp = make_unique<char *[]>(envlen + listenerconf.addrs.size() + 2);
|
||||
size_t envidx = 0;
|
||||
|
||||
std::vector<ImmutableString> fd_envs;
|
||||
|
@ -459,6 +554,11 @@ void exec_binary() {
|
|||
envp[envidx++] = const_cast<char *>(fd_envs.back().c_str());
|
||||
}
|
||||
|
||||
auto ipc_fd_str = ENV_IPC_FD.str();
|
||||
ipc_fd_str += '=';
|
||||
ipc_fd_str += util::utos(ipc_fds[1]);
|
||||
envp[envidx++] = const_cast<char *>(ipc_fd_str.c_str());
|
||||
|
||||
for (size_t i = 0; i < envlen; ++i) {
|
||||
auto env = StringRef{environ[i]};
|
||||
if (util::starts_with(env, ENV_ACCEPT_PREFIX) ||
|
||||
|
@ -466,7 +566,8 @@ void exec_binary() {
|
|||
util::starts_with(env, ENV_LISTENER6_FD) ||
|
||||
util::starts_with(env, ENV_PORT) ||
|
||||
util::starts_with(env, ENV_UNIX_FD) ||
|
||||
util::starts_with(env, ENV_UNIX_PATH)) {
|
||||
util::starts_with(env, ENV_UNIX_PATH) ||
|
||||
util::starts_with(env, ENV_IPC_FD)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -541,7 +642,7 @@ void signal_cb(struct ev_loop *loop, ev_signal *w, int revents) {
|
|||
reopen_log(wp);
|
||||
return;
|
||||
case EXEC_BINARY_SIGNAL:
|
||||
exec_binary();
|
||||
exec_binary(wp);
|
||||
return;
|
||||
case GRACEFUL_SHUTDOWN_SIGNAL:
|
||||
ipc_send(wp, SHRPX_IPC_GRACEFUL_SHUTDOWN);
|
||||
|
@ -1050,6 +1151,17 @@ void close_unused_inherited_addr(const std::vector<InheritedAddr> &iaddrs) {
|
|||
}
|
||||
} // namespace
|
||||
|
||||
namespace {
|
||||
// Returns IPC file descriptor from environment variable ENV_IPC_FD.
|
||||
int get_ipc_fd_from_env() {
|
||||
auto s = getenv(ENV_IPC_FD.c_str());
|
||||
if (s == nullptr) {
|
||||
return -1;
|
||||
}
|
||||
return util::parse_uint(s);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
namespace {
|
||||
int create_acceptor_socket(Config *config, std::vector<InheritedAddr> &iaddrs) {
|
||||
std::array<char, STRERROR_BUFSIZE> errbuf;
|
||||
|
@ -1255,6 +1367,11 @@ int event_loop() {
|
|||
close_unused_inherited_addr(iaddrs);
|
||||
}
|
||||
|
||||
auto ipc_notify_fd = get_ipc_fd_from_env();
|
||||
if (ipc_notify_fd != -1) {
|
||||
util::make_socket_closeonexec(ipc_notify_fd);
|
||||
}
|
||||
|
||||
auto loop = ev_default_loop(config->ev_loop_flags);
|
||||
|
||||
int ipc_fd;
|
||||
|
@ -1275,6 +1392,13 @@ int event_loop() {
|
|||
save_pid();
|
||||
}
|
||||
|
||||
if (ipc_notify_fd != -1) {
|
||||
LOG(NOTICE) << "Notify parent process that we are ready to server requests";
|
||||
write(ipc_notify_fd, "\0", 1);
|
||||
shutdown(ipc_notify_fd, SHUT_WR);
|
||||
close(ipc_notify_fd);
|
||||
}
|
||||
|
||||
ev_run(loop, 0);
|
||||
|
||||
return 0;
|
||||
|
|
Loading…
Reference in New Issue