diff --git a/src/shrpx.cc b/src/shrpx.cc index 088eb23d..cefb720d 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -120,6 +120,12 @@ constexpr auto ENV_UNIX_PATH = StringRef::from_lit("NGHTTP2_UNIX_PATH"); // descriptor. is a path to UNIX domain socket. constexpr auto ENV_ACCEPT_PREFIX = StringRef::from_lit("NGHTTPX_ACCEPT_"); +// This environment variable contains writable end of pipe(2) created +// by parent process. The new process is expected to write single +// byte to this fd to notify the parent process that this process is +// ready to serve requests. +constexpr auto ENV_IPC_FD = StringRef::from_lit("NGHTTPX_IPC"); + #ifndef _KERNEL_FASTOPEN #define _KERNEL_FASTOPEN // conditional define for TCP_FASTOPEN mostly on ubuntu @@ -174,6 +180,8 @@ namespace { void worker_process_child_cb(struct ev_loop *loop, ev_child *w, int revents); } // namespace +class SyncExecHandler; + struct WorkerProcess { WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd) : loop(loop), worker_pid(worker_pid), ipc_fd(ipc_fd) { @@ -226,6 +234,7 @@ struct WorkerProcess { struct ev_loop *loop; pid_t worker_pid; int ipc_fd; + std::unique_ptr sync_exec_handler; }; namespace { @@ -364,7 +373,75 @@ int save_pid() { } // namespace namespace { -void exec_binary() { +void sync_exec_notifycb(struct ev_loop *loop, ev_io *w, int revents); +} // namespace + +class SyncExecHandler { +public: + SyncExecHandler(WorkerProcess *wp, struct ev_loop *loop, int fd) + : loop_(loop), fd_(fd) { + ev_io_init(&rev_, sync_exec_notifycb, fd, EV_READ); + rev_.data = wp; + + ev_io_start(loop, &rev_); + } + + ~SyncExecHandler() { + ev_io_stop(loop_, &rev_); + close(fd_); + } + + int on_read() { + ssize_t nread; + uint8_t b; + while ((nread = read(fd_, &b, 1)) == -1 && errno == EINTR) + ; + if (nread == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return 1; + } + + LOG(ERROR) << "New process is not ready; probably aborted"; + + return -1; + } + + LOG(NOTICE) << "New process is ready to serve request"; + + return 0; + } + +private: + struct ev_loop *loop_; + ev_io rev_; + int fd_; +}; + +namespace { +void ipc_send(WorkerProcess *wp, uint8_t ipc_event); +} // namespace + +namespace { +void sync_exec_notifycb(struct ev_loop *loop, ev_io *w, int revents) { + int rv; + auto wp = static_cast(w->data); + + rv = wp->sync_exec_handler->on_read(); + if (rv == 1) { + return; + } + + wp->sync_exec_handler.reset(); + if (rv == -1) { + return; + } + + ipc_send(wp, SHRPX_IPC_GRACEFUL_SHUTDOWN); +} +} // namespace + +namespace { +void exec_binary(WorkerProcess *wp) { int rv; sigset_t oldset; std::array errbuf; @@ -380,12 +457,29 @@ void exec_binary() { return; } + std::array ipc_fds; + if (pipe(ipc_fds.data()) == -1) { + auto error = errno; + LOG(ERROR) << "Could not create IPC file descriptors: " + << xsi_strerror(error, errbuf.data(), errbuf.size()); + } + + util::make_socket_closeonexec(ipc_fds[0]); + for (auto fd : ipc_fds) { + util::make_socket_nonblocking(fd); + } + auto pid = fork(); if (pid != 0) { if (pid == -1) { auto error = errno; LOG(ERROR) << "fork() failed errno=" << error; + } else { + close(ipc_fds[1]); + + wp->sync_exec_handler = + make_unique(wp, wp->loop, ipc_fds[0]); } rv = shrpx_signal_set(&oldset); @@ -436,7 +530,8 @@ void exec_binary() { auto &listenerconf = get_config()->conn.listener; - auto envp = make_unique(envlen + listenerconf.addrs.size() + 1); + // 2 for ENV_IPC_FD and terminal nullptr. + auto envp = make_unique(envlen + listenerconf.addrs.size() + 2); size_t envidx = 0; std::vector fd_envs; @@ -459,6 +554,11 @@ void exec_binary() { envp[envidx++] = const_cast(fd_envs.back().c_str()); } + auto ipc_fd_str = ENV_IPC_FD.str(); + ipc_fd_str += '='; + ipc_fd_str += util::utos(ipc_fds[1]); + envp[envidx++] = const_cast(ipc_fd_str.c_str()); + for (size_t i = 0; i < envlen; ++i) { auto env = StringRef{environ[i]}; if (util::starts_with(env, ENV_ACCEPT_PREFIX) || @@ -466,7 +566,8 @@ void exec_binary() { util::starts_with(env, ENV_LISTENER6_FD) || util::starts_with(env, ENV_PORT) || util::starts_with(env, ENV_UNIX_FD) || - util::starts_with(env, ENV_UNIX_PATH)) { + util::starts_with(env, ENV_UNIX_PATH) || + util::starts_with(env, ENV_IPC_FD)) { continue; } @@ -541,7 +642,7 @@ void signal_cb(struct ev_loop *loop, ev_signal *w, int revents) { reopen_log(wp); return; case EXEC_BINARY_SIGNAL: - exec_binary(); + exec_binary(wp); return; case GRACEFUL_SHUTDOWN_SIGNAL: ipc_send(wp, SHRPX_IPC_GRACEFUL_SHUTDOWN); @@ -1050,6 +1151,17 @@ void close_unused_inherited_addr(const std::vector &iaddrs) { } } // namespace +namespace { +// Returns IPC file descriptor from environment variable ENV_IPC_FD. +int get_ipc_fd_from_env() { + auto s = getenv(ENV_IPC_FD.c_str()); + if (s == nullptr) { + return -1; + } + return util::parse_uint(s); +} +} // namespace + namespace { int create_acceptor_socket(Config *config, std::vector &iaddrs) { std::array errbuf; @@ -1255,6 +1367,11 @@ int event_loop() { close_unused_inherited_addr(iaddrs); } + auto ipc_notify_fd = get_ipc_fd_from_env(); + if (ipc_notify_fd != -1) { + util::make_socket_closeonexec(ipc_notify_fd); + } + auto loop = ev_default_loop(config->ev_loop_flags); int ipc_fd; @@ -1275,6 +1392,13 @@ int event_loop() { save_pid(); } + if (ipc_notify_fd != -1) { + LOG(NOTICE) << "Notify parent process that we are ready to server requests"; + write(ipc_notify_fd, "\0", 1); + shutdown(ipc_notify_fd, SHUT_WR); + close(ipc_notify_fd); + } + ev_run(loop, 0); return 0;