From 2b4dc4496f61bed1d47d789c4d62d2f61bdb43fb Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sun, 29 Aug 2021 10:18:59 +0900 Subject: [PATCH] nghttpx: Forward QUIC UDP datagram to lingering worker in graceful shutdown Forward QUIC UDP datagram to lingering worker process which is in graceful shutdown. Both SIGHUP and SIGUSR2 work. To make this work correctly, eBPF is required. --- bpf/reuseport_kern.c | 14 +- src/shrpx.cc | 344 ++++++++++++++++++++++++++- src/shrpx_connection_handler.cc | 262 +++++++++++++++++++- src/shrpx_connection_handler.h | 52 +++- src/shrpx_quic_connection_handler.cc | 39 ++- src/shrpx_worker.cc | 93 +++++++- src/shrpx_worker.h | 4 + src/shrpx_worker_process.cc | 31 +++ src/shrpx_worker_process.h | 18 ++ src/util.cc | 5 +- src/util.h | 9 + 11 files changed, 820 insertions(+), 51 deletions(-) diff --git a/bpf/reuseport_kern.c b/bpf/reuseport_kern.c index 00ee05a6..7c1094a5 100644 --- a/bpf/reuseport_kern.c +++ b/bpf/reuseport_kern.c @@ -271,7 +271,19 @@ int select_reuseport(struct sk_reuseport_md *reuse_md) { psk_index = bpf_map_lookup_elem(&cid_prefix_map, sk_prefix); if (psk_index == NULL) { - return SK_DROP; + pnum_socks = bpf_map_lookup_elem(&sk_info, &zero); + if (pnum_socks == NULL) { + return SK_DROP; + } + + a = (sk_prefix[0] << 24) | (sk_prefix[1] << 16) | (sk_prefix[2] << 8) | + sk_prefix[3]; + b = (sk_prefix[4] << 24) | (sk_prefix[5] << 16) | (sk_prefix[6] << 8) | + sk_prefix[7]; + + sk_index = jhash_2words(a, b, reuse_md->hash) % *pnum_socks; + + break; } sk_index = *psk_index; diff --git a/src/shrpx.cc b/src/shrpx.cc index e14036aa..e69505e9 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -132,6 +132,15 @@ constexpr auto ENV_ACCEPT_PREFIX = StringRef::from_lit("NGHTTPX_ACCEPT_"); // the original main process to shut it down gracefully. constexpr auto ENV_ORIG_PID = StringRef::from_lit("NGHTTPX_ORIG_PID"); +// Prefix of environment variables to tell new binary the QUIC IPC +// file descriptor and CID prefix of the lingering worker process. +// The value must be comma separated parameters: +// ,,,... is the file +// descriptor. is the I-th CID prefix in hex encoded +// string. +constexpr auto ENV_QUIC_WORKER_PROCESS_PREFIX = + StringRef::from_lit("NGHTTPX_QUIC_WORKER_PROCESS_"); + #ifndef _KERNEL_FASTOPEN # define _KERNEL_FASTOPEN // conditional define for TCP_FASTOPEN mostly on ubuntu @@ -183,8 +192,23 @@ void worker_process_child_cb(struct ev_loop *loop, ev_child *w, int revents); } // namespace struct WorkerProcess { - WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd) - : loop(loop), worker_pid(worker_pid), ipc_fd(ipc_fd) { + WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd +#ifdef ENABLE_HTTP3 + , + int quic_ipc_fd, + const std::vector> + &cid_prefixes +#endif // ENABLE_HTTP3 + ) + : loop(loop), + worker_pid(worker_pid), + ipc_fd(ipc_fd) +#ifdef ENABLE_HTTP3 + , + quic_ipc_fd(quic_ipc_fd), + cid_prefixes(cid_prefixes) +#endif // ENABLE_HTTP3 + { ev_signal_init(&reopen_log_signalev, signal_cb, REOPEN_LOG_SIGNAL); reopen_log_signalev.data = this; ev_signal_start(loop, &reopen_log_signalev); @@ -213,6 +237,12 @@ struct WorkerProcess { ev_child_stop(loop, &worker_process_childev); +#ifdef ENABLE_HTTP3 + if (quic_ipc_fd != -1) { + close(quic_ipc_fd); + } +#endif // ENABLE_HTTP3 + if (ipc_fd != -1) { shutdown(ipc_fd, SHUT_WR); close(ipc_fd); @@ -234,6 +264,10 @@ struct WorkerProcess { struct ev_loop *loop; pid_t worker_pid; int ipc_fd; +#ifdef ENABLE_HTTP3 + int quic_ipc_fd; + std::vector> cid_prefixes; +#endif // ENABLE_HTTP3 }; namespace { @@ -456,8 +490,8 @@ void exec_binary() { auto &listenerconf = config->conn.listener; // 2 for ENV_ORIG_PID and terminal nullptr. - auto envp = - std::make_unique(envlen + listenerconf.addrs.size() + 2); + auto envp = std::make_unique(envlen + listenerconf.addrs.size() + + worker_processes.size() + 2); size_t envidx = 0; std::vector fd_envs; @@ -485,6 +519,24 @@ void exec_binary() { ipc_fd_str += util::utos(config->pid); envp[envidx++] = const_cast(ipc_fd_str.c_str()); +#ifdef ENABLE_HTTP3 + std::vector quic_lwps; + for (size_t i = 0; i < worker_processes.size(); ++i) { + auto &wp = worker_processes[i]; + auto s = ENV_QUIC_WORKER_PROCESS_PREFIX.str(); + s += util::utos(i + 1); + s += '='; + s += util::utos(wp->quic_ipc_fd); + for (auto &cid_prefix : wp->cid_prefixes) { + s += ','; + s += util::format_hex(cid_prefix); + } + + quic_lwps.emplace_back(s); + envp[envidx++] = const_cast(quic_lwps.back().c_str()); + } +#endif // ENABLE_HTTP3 + for (size_t i = 0; i < envlen; ++i) { auto env = StringRef{environ[i]}; if (util::starts_with(env, ENV_ACCEPT_PREFIX) || @@ -493,7 +545,8 @@ void exec_binary() { util::starts_with(env, ENV_PORT) || util::starts_with(env, ENV_UNIX_FD) || util::starts_with(env, ENV_UNIX_PATH) || - util::starts_with(env, ENV_ORIG_PID)) { + util::starts_with(env, ENV_ORIG_PID) || + util::starts_with(env, ENV_QUIC_WORKER_PROCESS_PREFIX)) { continue; } @@ -1105,6 +1158,85 @@ pid_t get_orig_pid_from_env() { } } // namespace +#ifdef ENABLE_HTTP3 +namespace { +std::vector + inherited_quic_lingering_worker_processes; +} // namespace + +namespace { +std::vector +get_inherited_quic_lingering_worker_process_from_env() { + std::vector iwps; + + for (size_t i = 1;; ++i) { + auto name = ENV_QUIC_WORKER_PROCESS_PREFIX.str(); + name += util::utos(i); + auto env = getenv(name.c_str()); + if (!env) { + break; + } + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Read env " << name << "=" << env; + } + + auto envend = env + strlen(env); + + auto end_fd = std::find(env, envend, ','); + if (end_fd == envend) { + continue; + } + + auto fd = + util::parse_uint(reinterpret_cast(env), end_fd - env); + if (fd == -1) { + LOG(WARN) << "Could not parse file descriptor from " + << StringRef{env, static_cast(end_fd - env)}; + continue; + } + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Inherit worker process QUIC IPC socket fd=" << fd; + } + + util::make_socket_closeonexec(fd); + + std::vector> cid_prefixes; + + auto p = end_fd + 1; + for (;;) { + auto end = std::find(p, envend, ','); + + auto hex_cid_prefix = StringRef{p, end}; + if (hex_cid_prefix.size() != SHRPX_QUIC_CID_PREFIXLEN * 2 || + !util::is_hex_string(hex_cid_prefix)) { + continue; + } + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Inherit worker process CID prefix=" << hex_cid_prefix; + } + + cid_prefixes.emplace_back(); + + util::decode_hex(std::begin(cid_prefixes.back()), hex_cid_prefix); + + if (end == envend) { + break; + } + + p = end + 1; + } + + iwps.emplace_back(std::move(cid_prefixes), fd); + } + + return iwps; +} +} // namespace +#endif // ENABLE_HTTP3 + namespace { int create_acceptor_socket(Config *config, std::vector &iaddrs) { std::array errbuf; @@ -1182,14 +1314,97 @@ int create_ipc_socket(std::array &ipc_fd) { } } // namespace +#ifdef ENABLE_HTTP3 +namespace { +int create_quic_ipc_socket(std::array &quic_ipc_fd) { + std::array errbuf; + int rv; + + rv = socketpair(AF_UNIX, SOCK_DGRAM, 0, quic_ipc_fd.data()); + if (rv == -1) { + auto error = errno; + LOG(WARN) << "Failed to create socket pair to communicate worker process: " + << xsi_strerror(error, errbuf.data(), errbuf.size()); + return -1; + } + + for (auto fd : quic_ipc_fd) { + util::make_socket_nonblocking(fd); + } + + return 0; +} +} // namespace + +namespace { +int generate_cid_prefix( + std::vector> &cid_prefixes, + const Config *config) { + auto &apiconf = config->api; + + size_t num_cid_prefix; + if (config->single_thread) { + num_cid_prefix = 1; + } else { + num_cid_prefix = config->num_worker; + + // API endpoint occupies the one dedicated worker thread. + // Although such worker never gets QUIC traffic, we create CID + // prefix for it to make code a bit simpler. + if (apiconf.enabled) { + ++num_cid_prefix; + } + } + + cid_prefixes.resize(num_cid_prefix); + + for (auto &cid_prefix : cid_prefixes) { + if (create_cid_prefix(cid_prefix.data()) != 0) { + return -1; + } + } + + return 0; +} +} // namespace + +namespace { +std::vector +collect_quic_lingering_worker_processes() { + std::vector quic_lwps{ + std::begin(inherited_quic_lingering_worker_processes), + std::end(inherited_quic_lingering_worker_processes)}; + + for (auto &wp : worker_processes) { + quic_lwps.emplace_back(wp->cid_prefixes, wp->quic_ipc_fd); + } + + return quic_lwps; +} +} // namespace +#endif // ENABLE_HTTP3 + namespace { // Creates worker process, and returns PID of worker process. On // success, file descriptor for IPC (send only) is assigned to // |main_ipc_fd|. In child process, we will close file descriptors // which are inherited from previous configuration/process, but not // used in the current configuration. -pid_t fork_worker_process(int &main_ipc_fd, - const std::vector &iaddrs) { +pid_t fork_worker_process( + int &main_ipc_fd +#ifdef ENABLE_HTTP3 + , + int &wp_quic_ipc_fd +#endif // ENABLE_HTTP3 + , + const std::vector &iaddrs +#ifdef ENABLE_HTTP3 + , + const std::vector> + &cid_prefixes, + const std::vector &quic_lwps +#endif // ENABLE_HTTP3 +) { std::array errbuf; int rv; sigset_t oldset; @@ -1201,6 +1416,15 @@ pid_t fork_worker_process(int &main_ipc_fd, return -1; } +#ifdef ENABLE_HTTP3 + std::array quic_ipc_fd; + + rv = create_quic_ipc_socket(quic_ipc_fd); + if (rv != 0) { + return -1; + } +#endif // ENABLE_HTTP3 + rv = shrpx_signal_block_all(&oldset); if (rv != 0) { auto error = errno; @@ -1228,6 +1452,20 @@ pid_t fork_worker_process(int &main_ipc_fd, util::make_socket_closeonexec(addr.fd); } +#ifdef ENABLE_HTTP3 + util::make_socket_closeonexec(quic_ipc_fd[0]); + + for (auto &lwp : quic_lwps) { + util::make_socket_closeonexec(lwp.quic_ipc_fd); + } + + for (auto &wp : worker_processes) { + util::make_socket_closeonexec(wp->quic_ipc_fd); + // Do not close quic_ipc_fd. + wp->quic_ipc_fd = -1; + } +#endif // ENABLE_HTTP3 + // Remove all WorkerProcesses to stop any registered watcher on // default loop. worker_process_remove_all(); @@ -1251,9 +1489,19 @@ pid_t fork_worker_process(int &main_ipc_fd, if (!config->single_process) { close(ipc_fd[1]); +#ifdef ENABLE_HTTP3 + close(quic_ipc_fd[1]); +#endif // ENABLE_HTTP3 } - WorkerProcessConfig wpconf{ipc_fd[0]}; + WorkerProcessConfig wpconf{ + .ipc_fd = ipc_fd[0], +#ifdef ENABLE_HTTP3 + .cid_prefixes = cid_prefixes, + .quic_ipc_fd = quic_ipc_fd[0], + .quic_lingering_worker_processes = quic_lwps, +#endif // ENABLE_HTTP3 + }; rv = worker_process_event_loop(&wpconf); if (rv != 0) { LOG(FATAL) << "Worker process returned error"; @@ -1294,13 +1542,23 @@ pid_t fork_worker_process(int &main_ipc_fd, if (pid == -1) { close(ipc_fd[0]); close(ipc_fd[1]); +#ifdef ENABLE_HTTP3 + close(quic_ipc_fd[0]); + close(quic_ipc_fd[1]); +#endif // ENABLE_HTTP3 return -1; } close(ipc_fd[0]); +#ifdef ENABLE_HTTP3 + close(quic_ipc_fd[0]); +#endif // ENABLE_HTTP3 main_ipc_fd = ipc_fd[1]; +#ifdef ENABLE_HTTP3 + wp_quic_ipc_fd = quic_ipc_fd[1]; +#endif // ENABLE_HTTP3 LOG(NOTICE) << "Worker process [" << pid << "] spawned"; @@ -1347,17 +1605,50 @@ int event_loop() { auto orig_pid = get_orig_pid_from_env(); +#ifdef ENABLE_HTTP3 + inherited_quic_lingering_worker_processes = + get_inherited_quic_lingering_worker_process_from_env(); +#endif // ENABLE_HTTP3 + auto loop = ev_default_loop(config->ev_loop_flags); int ipc_fd = 0; +#ifdef ENABLE_HTTP3 + int quic_ipc_fd = 0; - auto pid = fork_worker_process(ipc_fd, {}); + auto quic_lwps = collect_quic_lingering_worker_processes(); + + std::vector> cid_prefixes; + + if (generate_cid_prefix(cid_prefixes, config) != 0) { + return -1; + } +#endif // ENABLE_HTTP3 + + auto pid = fork_worker_process( + ipc_fd +#ifdef ENABLE_HTTP3 + , + quic_ipc_fd +#endif // ENABLE_HTTP3 + , + {} +#ifdef ENABLE_HTTP3 + , + cid_prefixes, quic_lwps +#endif // ENABLE_HTTP3 + ); if (pid == -1) { return -1; } - worker_process_add(std::make_unique(loop, pid, ipc_fd)); + worker_process_add(std::make_unique(loop, pid, ipc_fd +#ifdef ENABLE_HTTP3 + , + quic_ipc_fd, cid_prefixes +#endif // ENABLE_HTTP3 + )); // Write PID file when we are ready to accept connection from peer. // This makes easier to write restart script for nghttpx. Because @@ -3325,13 +3616,37 @@ void reload_config(WorkerProcess *wp) { auto loop = ev_default_loop(new_config->ev_loop_flags); int ipc_fd = 0; +#ifdef ENABLE_HTTP3 + int quic_ipc_fd = 0; + + auto quic_lwps = collect_quic_lingering_worker_processes(); + + std::vector> cid_prefixes; + + if (generate_cid_prefix(cid_prefixes, new_config.get()) != 0) { + close_not_inherited_fd(new_config.get(), iaddrs); + return; + } +#endif // ENABLE_HTTP3 // fork_worker_process and forked child process assumes new // configuration can be obtained from get_config(). auto old_config = replace_config(std::move(new_config)); - auto pid = fork_worker_process(ipc_fd, iaddrs); + auto pid = fork_worker_process(ipc_fd +#ifdef ENABLE_HTTP3 + , + quic_ipc_fd +#endif // ENABLE_HTTP3 + + , + iaddrs +#ifdef ENABLE_HTTP3 + , + cid_prefixes, quic_lwps +#endif // ENABLE_HTTP3 + ); if (pid == -1) { LOG(ERROR) << "Failed to process new configuration"; @@ -3350,7 +3665,12 @@ void reload_config(WorkerProcess *wp) { // We no longer use signals for this worker. last_wp->shutdown_signal_watchers(); - worker_process_add(std::make_unique(loop, pid, ipc_fd)); + worker_process_add(std::make_unique(loop, pid, ipc_fd +#ifdef ENABLE_HTTP3 + , + quic_ipc_fd, cid_prefixes +#endif // ENABLE_HTTP3 + )); if (!get_config()->pid_file.empty()) { save_pid(); diff --git a/src/shrpx_connection_handler.cc b/src/shrpx_connection_handler.cc index 14e20253..c32eb86f 100644 --- a/src/shrpx_connection_handler.cc +++ b/src/shrpx_connection_handler.cc @@ -45,6 +45,7 @@ #include "shrpx_memcached_dispatcher.h" #include "shrpx_signal.h" #include "shrpx_log.h" +#include "xsi_strerror.h" #include "util.h" #include "template.h" @@ -112,7 +113,11 @@ void serial_event_async_cb(struct ev_loop *loop, ev_async *w, int revent) { } // namespace ConnectionHandler::ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen) - : gen_(gen), + : +#ifdef ENABLE_HTTP3 + quic_ipc_fd_(-1), +#endif // ENABLE_HTTP3 + gen_(gen), single_worker_(nullptr), loop_(loop), #ifdef HAVE_NEVERBLEED @@ -267,11 +272,13 @@ int ConnectionHandler::create_single_worker() { } } +#if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF) + quic_bpf_refs_.resize(config->conn.quic_listener.addrs.size()); +#endif // ENABLE_HTTP3 && HAVE_LIBBPF + #ifdef ENABLE_HTTP3 - std::array cid_prefix; - if (create_cid_prefix(cid_prefix.data()) != 0) { - return -1; - } + assert(cid_prefixes_.size() == 1); + const auto &cid_prefix = cid_prefixes_[0]; #endif // ENABLE_HTTP3 single_worker_ = std::make_unique( @@ -343,7 +350,7 @@ int ConnectionHandler::create_worker_thread(size_t num) { auto &apiconf = config->api; # if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF) - quic_bpf_refs_.resize(num); + quic_bpf_refs_.resize(config->conn.quic_listener.addrs.size()); # endif // ENABLE_HTTP3 && HAVE_LIBBPF // We have dedicated worker for API request processing. @@ -369,14 +376,15 @@ int ConnectionHandler::create_worker_thread(size_t num) { } } +# ifdef ENABLE_HTTP3 + assert(cid_prefixes_.size() == num); +# endif // ENABLE_HTTP3 + for (size_t i = 0; i < num; ++i) { auto loop = ev_loop_new(config->ev_loop_flags); # ifdef ENABLE_HTTP3 - std::array cid_prefix; - if (create_cid_prefix(cid_prefix.data()) != 0) { - return -1; - } + const auto &cid_prefix = cid_prefixes_[i]; # endif // ENABLE_HTTP3 auto worker = std::make_unique( @@ -1060,12 +1068,246 @@ int ConnectionHandler::create_quic_secret() { return 0; } +void ConnectionHandler::set_cid_prefixes( + const std::vector> + &cid_prefixes) { + cid_prefixes_ = cid_prefixes; +} + +QUICLingeringWorkerProcess * +ConnectionHandler::match_quic_lingering_worker_process_cid_prefix( + const uint8_t *dcid, size_t dcidlen) { + assert(dcidlen >= SHRPX_QUIC_CID_PREFIXLEN); + + for (auto &lwps : quic_lingering_worker_processes_) { + for (auto &cid_prefix : lwps.cid_prefixes) { + if (std::equal(std::begin(cid_prefix), std::end(cid_prefix), dcid)) { + return &lwps; + } + } + } + + return nullptr; +} + # ifdef HAVE_LIBBPF std::vector &ConnectionHandler::get_quic_bpf_refs() { return quic_bpf_refs_; } # endif // HAVE_LIBBPF +void ConnectionHandler::set_quic_ipc_fd(int fd) { quic_ipc_fd_ = fd; } + +void ConnectionHandler::set_quic_lingering_worker_processes( + const std::vector &quic_lwps) { + quic_lingering_worker_processes_ = quic_lwps; +} + +int ConnectionHandler::forward_quic_packet_to_lingering_worker_process( + QUICLingeringWorkerProcess *quic_lwp, const Address &remote_addr, + const Address &local_addr, const uint8_t *data, size_t datalen) { + std::array header; + + assert(header.size() >= 1 + 1 + 1 + sizeof(sockaddr_storage) * 2); + assert(remote_addr.len > 0); + assert(local_addr.len > 0); + + auto p = header.data(); + + *p++ = static_cast(QUICIPCType::DGRAM_FORWARD); + *p++ = static_cast(remote_addr.len - 1); + p = std::copy_n(reinterpret_cast(&remote_addr.su), + remote_addr.len, p); + *p++ = static_cast(local_addr.len - 1); + p = std::copy_n(reinterpret_cast(&local_addr.su), + local_addr.len, p); + + iovec msg_iov[] = { + { + .iov_base = header.data(), + .iov_len = static_cast(p - header.data()), + }, + { + .iov_base = const_cast(data), + .iov_len = datalen, + }, + }; + + msghdr msg{}; + msg.msg_iov = msg_iov; + msg.msg_iovlen = array_size(msg_iov); + + ssize_t nwrite; + + while ((nwrite = sendmsg(quic_lwp->quic_ipc_fd, &msg, 0)) == -1 && + errno == EINTR) + ; + + if (nwrite == -1) { + std::array errbuf; + + auto error = errno; + LOG(ERROR) << "Failed to send QUIC IPC message: " + << xsi_strerror(error, errbuf.data(), errbuf.size()); + + return -1; + } + + return 0; +} + +int ConnectionHandler::quic_ipc_read() { + std::array buf; + + ssize_t nread; + + while ((nread = recv(quic_ipc_fd_, buf.data(), buf.size(), 0)) == -1 && + errno == EINTR) + ; + + if (nread == -1) { + std::array errbuf; + + auto error = errno; + LOG(ERROR) << "Failed to read data from QUIC IPC channel: " + << xsi_strerror(error, errbuf.data(), errbuf.size()); + + return -1; + } + + if (nread == 0) { + return 0; + } + + size_t len = 1 + 1 + 1; + + // Wire format: + // TYPE(1) REMOTE_ADDRLEN(1) REMOTE_ADDR(N) LOCAL_ADDRLEN(1) REMOTE_ADDR(N) + // DGRAM_PAYLAOD(N) + // + // When encoding, REMOTE_ADDRLEN and LOCAL_ADDRLEN is decremented by + // 1. + if (static_cast(nread) < len) { + return 0; + } + + auto p = buf.data(); + if (*p != static_cast(QUICIPCType::DGRAM_FORWARD)) { + LOG(ERROR) << "Unknown QUICIPCType: " << static_cast(*p); + + return -1; + } + + ++p; + + auto pkt = std::make_unique(); + + auto remote_addrlen = static_cast(*p++) + 1; + if (remote_addrlen > sizeof(sockaddr_storage)) { + LOG(ERROR) << "The length of remote address is too large: " + << remote_addrlen; + + return -1; + } + + len += remote_addrlen; + + if (static_cast(nread) < len) { + LOG(ERROR) << "Insufficient QUIC IPC message length"; + + return -1; + } + + pkt->remote_addr.len = remote_addrlen; + memcpy(&pkt->remote_addr.su, p, remote_addrlen); + + p += remote_addrlen; + + auto local_addrlen = static_cast(*p++) + 1; + if (local_addrlen > sizeof(sockaddr_storage)) { + LOG(ERROR) << "The length of local address is too large: " << local_addrlen; + + return -1; + } + + len += local_addrlen; + + if (static_cast(nread) < len) { + LOG(ERROR) << "Insufficient QUIC IPC message length"; + + return -1; + } + + pkt->local_addr.len = local_addrlen; + memcpy(&pkt->local_addr.su, p, local_addrlen); + + p += local_addrlen; + + auto datalen = nread - (p - buf.data()); + + pkt->data.assign(p, p + datalen); + + // At the moment, UpstreamAddr index is unknown. + pkt->upstream_addr_index = static_cast(-1); + + uint32_t version; + const uint8_t *dcid; + size_t dcidlen; + const uint8_t *scid; + size_t scidlen; + + auto rv = + ngtcp2_pkt_decode_version_cid(&version, &dcid, &dcidlen, &scid, &scidlen, + p, datalen, SHRPX_QUIC_SCIDLEN); + if (rv < 0) { + LOG(ERROR) << "ngtcp2_pkt_decode_version_cid: " << ngtcp2_strerror(rv); + + return -1; + } + + if (dcidlen < SHRPX_QUIC_CID_PREFIXLEN) { + LOG(ERROR) << "DCID is too short"; + return -1; + } + + if (single_worker_) { + auto faddr = single_worker_->find_quic_upstream_addr(pkt->local_addr); + if (faddr == nullptr) { + LOG(ERROR) << "No suitable upstream address found"; + + return 0; + } + + auto quic_conn_handler = single_worker_->get_quic_connection_handler(); + + // Ignore return value + quic_conn_handler->handle_packet(faddr, pkt->remote_addr, pkt->local_addr, + pkt->data.data(), pkt->data.size()); + + return 0; + } + + for (auto &worker : workers_) { + if (!std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN, + worker->get_cid_prefix())) { + continue; + } + + WorkerEvent wev{ + .type = WorkerEventType::QUIC_PKT_FORWARD, + .quic_pkt = std::move(pkt), + }; + worker->send(std::move(wev)); + + return 0; + } + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "No worker to match CID prefix"; + } + + return 0; +} #endif // ENABLE_HTTP3 } // namespace shrpx diff --git a/src/shrpx_connection_handler.h b/src/shrpx_connection_handler.h index 7ffc2112..2c8d9c9a 100644 --- a/src/shrpx_connection_handler.h +++ b/src/shrpx_connection_handler.h @@ -103,12 +103,33 @@ struct SerialEvent { std::shared_ptr downstreamconf; }; -#if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF) +#ifdef ENABLE_HTTP3 +# ifdef HAVE_LIBBPF struct BPFRef { int reuseport_array; int cid_prefix_map; }; -#endif // ENABLE_HTTP3 && HAVE_LIBBPF +# endif // HAVE_LIBBPF + +// QUIC IPC message type. +enum class QUICIPCType { + NONE, + // Send forwarded QUIC UDP datagram and its metadata. + DGRAM_FORWARD, +}; + +// WorkerProcesses which are in graceful shutdown period. +struct QUICLingeringWorkerProcess { + QUICLingeringWorkerProcess( + std::vector> cid_prefixes, + int quic_ipc_fd) + : cid_prefixes{std::move(cid_prefixes)}, quic_ipc_fd{quic_ipc_fd} {} + + std::vector> cid_prefixes; + // Socket to send QUIC IPC message to this worker process. + int quic_ipc_fd; +}; +#endif // ENABLE_HTTP3 class ConnectionHandler { public: @@ -179,6 +200,28 @@ public: int create_quic_secret(); + void set_cid_prefixes( + const std::vector> + &cid_prefixes); + + void set_quic_lingering_worker_processes( + const std::vector &quic_lwps); + + // Return matching QUICLingeringWorkerProcess which has a CID prefix + // such that |dcid| starts with it. If no such + // QUICLingeringWorkerProcess, it returns nullptr. + QUICLingeringWorkerProcess * + match_quic_lingering_worker_process_cid_prefix(const uint8_t *dcid, + size_t dcidlen); + + int forward_quic_packet_to_lingering_worker_process( + QUICLingeringWorkerProcess *quic_lwp, const Address &remote_addr, + const Address &local_addr, const uint8_t *data, size_t datalen); + + void set_quic_ipc_fd(int fd); + + int quic_ipc_read(); + # ifdef HAVE_LIBBPF std::vector &get_quic_bpf_refs(); # endif // HAVE_LIBBPF @@ -212,6 +255,11 @@ private: // and signature algorithm presented by client. std::vector> indexed_ssl_ctx_; #ifdef ENABLE_HTTP3 + std::vector> cid_prefixes_; + std::vector> + lingering_cid_prefixes_; + int quic_ipc_fd_; + std::vector quic_lingering_worker_processes_; # ifdef HAVE_LIBBPF std::vector quic_bpf_refs_; # endif // HAVE_LIBBPF diff --git a/src/shrpx_quic_connection_handler.cc b/src/shrpx_quic_connection_handler.cc index 0160799e..b1f7a19f 100644 --- a/src/shrpx_quic_connection_handler.cc +++ b/src/shrpx_quic_connection_handler.cc @@ -77,10 +77,27 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr, auto dcid_key = make_cid_key(dcid, dcidlen); + auto conn_handler = worker_->get_connection_handler(); + ClientHandler *handler; auto it = connections_.find(dcid_key); if (it == std::end(connections_)) { + if (!std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN, + worker_->get_cid_prefix())) { + auto quic_lwp = + conn_handler->match_quic_lingering_worker_process_cid_prefix(dcid, + dcidlen); + if (quic_lwp) { + if (conn_handler->forward_quic_packet_to_lingering_worker_process( + quic_lwp, remote_addr, local_addr, data, datalen) == 0) { + return 0; + } + + return 0; + } + } + // new connection ngtcp2_pkt_hd hd; @@ -90,6 +107,14 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr, switch (ngtcp2_accept(&hd, data, datalen)) { case 0: { + // If we get Initial and it has the CID prefix of this worker, it + // is likely that client is intentionally use the our prefix. + // Just drop it. + if (std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN, + worker_->get_cid_prefix())) { + return 0; + } + if (hd.token.len == 0) { break; } @@ -102,7 +127,9 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr, if (verify_retry_token(&odcid, hd.token.base, hd.token.len, &hd.dcid, &remote_addr.su.sa, remote_addr.len, secret.data()) != 0) { - break; + // 2nd Retry packet is not allowed, so just drop it or send + // CONNECTIONC_CLOE with INVALID_TOKEN. + return 0; } podcid = &odcid; @@ -139,8 +166,6 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr, dcidlen > SHRPX_QUIC_CID_PREFIXLEN && !std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN, worker_->get_cid_prefix())) { - auto conn_handler = worker_->get_connection_handler(); - if (conn_handler->forward_quic_packet(faddr, remote_addr, local_addr, dcid, data, datalen) == 0) { return 0; @@ -152,14 +177,6 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr, return 0; } - // If we get Initial and it has the CID prefix of this worker, it - // is likely that client is intentionally use the our prefix. - // Just drop it. - if (std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN, - worker_->get_cid_prefix())) { - return 0; - } - handler = handle_new_connection(faddr, remote_addr, local_addr, hd, podcid, token, tokenlen); if (handler == nullptr) { diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 2e91b63d..ab5c4bc5 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -531,9 +531,26 @@ void Worker::process_events() { break; #ifdef ENABLE_HTTP3 case WorkerEventType::QUIC_PKT_FORWARD: { + const UpstreamAddr *faddr; + + if (wev.quic_pkt->upstream_addr_index == static_cast(-1)) { + faddr = find_quic_upstream_addr(wev.quic_pkt->local_addr); + if (faddr == nullptr) { + LOG(ERROR) << "No suitable upstream address found"; + + break; + } + } else if (quic_upstream_addrs_.size() <= + wev.quic_pkt->upstream_addr_index) { + LOG(ERROR) << "upstream_addr_index is too large"; + + break; + } else { + faddr = &quic_upstream_addrs_[wev.quic_pkt->upstream_addr_index]; + } + quic_conn_handler_.handle_packet( - &quic_upstream_addrs_[wev.quic_pkt->upstream_addr_index], - wev.quic_pkt->remote_addr, wev.quic_pkt->local_addr, + faddr, wev.quic_pkt->remote_addr, wev.quic_pkt->local_addr, wev.quic_pkt->data.data(), wev.quic_pkt->data.size()); break; @@ -647,12 +664,11 @@ bool Worker::should_attach_bpf() const { auto &quicconf = config->quic; auto &apiconf = config->api; - if (quicconf.bpf.disabled || config->single_thread || - config->num_worker == 1) { + if (quicconf.bpf.disabled) { return false; } - if (apiconf.enabled) { + if (!config->single_thread && apiconf.enabled) { return index_ == 1; } @@ -663,18 +679,14 @@ bool Worker::should_update_bpf_map() const { auto config = get_config(); auto &quicconf = config->quic; - return !quicconf.bpf.disabled && !config->single_thread && - config->num_worker > 1; + return !quicconf.bpf.disabled; } uint32_t Worker::compute_sk_index() const { auto config = get_config(); auto &apiconf = config->api; - assert(!config->single_thread); - assert(config->num_worker > 1); - - if (apiconf.enabled) { + if (!config->single_thread && apiconf.enabled) { return index_ - 1; } @@ -979,6 +991,65 @@ void Worker::set_quic_secret(const std::shared_ptr &secret) { const std::shared_ptr &Worker::get_quic_secret() const { return quic_secret_; } + +const UpstreamAddr *Worker::find_quic_upstream_addr(const Address &local_addr) { + std::array host; + + auto rv = getnameinfo(&local_addr.su.sa, local_addr.len, host.data(), + host.size(), nullptr, 0, NI_NUMERICHOST); + if (rv != 0) { + LOG(ERROR) << "getnameinfo: " << gai_strerror(rv); + + return nullptr; + } + + uint16_t port; + + switch (local_addr.su.sa.sa_family) { + case AF_INET: + port = htons(local_addr.su.in.sin_port); + + break; + case AF_INET6: + port = htons(local_addr.su.in6.sin6_port); + + break; + default: + assert(0); + } + + auto hostport = util::make_hostport(StringRef{host.data()}, port); + const UpstreamAddr *fallback_faddr = nullptr; + + for (auto &faddr : quic_upstream_addrs_) { + if (faddr.hostport == hostport) { + return &faddr; + } + + if (faddr.port != port || faddr.family != local_addr.su.sa.sa_family) { + continue; + } + + switch (faddr.family) { + case AF_INET: + if (util::starts_with(faddr.hostport, StringRef::from_lit("0.0.0.0:"))) { + fallback_faddr = &faddr; + } + + break; + case AF_INET6: + if (util::starts_with(faddr.hostport, StringRef::from_lit("[::]:"))) { + fallback_faddr = &faddr; + } + + break; + default: + assert(0); + } + } + + return fallback_faddr; +} #endif // ENABLE_HTTP3 namespace { diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index 52a75c94..3d58fbe3 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -261,6 +261,7 @@ struct QUICPacket { remote_addr{remote_addr}, local_addr{local_addr}, data{data, data + datalen} {} + QUICPacket() {} size_t upstream_addr_index; Address remote_addr; Address local_addr; @@ -380,6 +381,9 @@ public: # endif // HAVE_LIBBPF int create_quic_server_socket(UpstreamAddr &addr); + + // Returns a pointer to UpstreamAddr which matches |local_addr|. + const UpstreamAddr *find_quic_upstream_addr(const Address &local_addr); #endif // ENABLE_HTTP3 DNSTracker *get_dns_tracker(); diff --git a/src/shrpx_worker_process.cc b/src/shrpx_worker_process.cc index 4d18ffc8..9fc0681e 100644 --- a/src/shrpx_worker_process.cc +++ b/src/shrpx_worker_process.cc @@ -178,6 +178,20 @@ void ipc_readcb(struct ev_loop *loop, ev_io *w, int revents) { } } // namespace +#ifdef ENABLE_HTTP3 +namespace { +void quic_ipc_readcb(struct ev_loop *loop, ev_io *w, int revents) { + auto conn_handler = static_cast(w->data); + + if (conn_handler->quic_ipc_read() != 0) { + LOG(ERROR) << "Failed to read data from QUIC IPC channel"; + + return; + } +} +} // namespace +#endif // ENABLE_HTTP3 + namespace { int generate_ticket_key(TicketKey &ticket_key) { ticket_key.cipher = get_config()->tls.ticket.cipher; @@ -434,6 +448,12 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) { conn_handler->set_neverbleed(nb.get()); #endif // HAVE_NEVERBLEED +#ifdef ENABLE_HTTP3 + conn_handler->set_quic_ipc_fd(wpconf->quic_ipc_fd); + conn_handler->set_quic_lingering_worker_processes( + wpconf->quic_lingering_worker_processes); +#endif // ENABLE_HTTP3 + for (auto &addr : config->conn.listener.addrs) { conn_handler->add_acceptor( std::make_unique(&addr, conn_handler.get())); @@ -500,6 +520,10 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) { if (conn_handler->create_quic_secret() != 0) { return -1; } + + conn_handler->set_cid_prefixes(wpconf->cid_prefixes); + conn_handler->set_quic_lingering_worker_processes( + wpconf->quic_lingering_worker_processes); #endif // ENABLE_HTTP3 if (config->single_thread) { @@ -547,6 +571,13 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) { ipcev.data = conn_handler.get(); ev_io_start(loop, &ipcev); +#ifdef ENABLE_HTTP3 + ev_io quic_ipcev; + ev_io_init(&quic_ipcev, quic_ipc_readcb, wpconf->quic_ipc_fd, EV_READ); + quic_ipcev.data = conn_handler.get(); + ev_io_start(loop, &quic_ipcev); +#endif // ENABLE_HTTP3 + if (tls::upstream_tls_enabled(config->conn) && !config->tls.ocsp.disabled) { if (config->tls.ocsp.startup) { conn_handler->set_enable_acceptor_on_ocsp_completion(true); diff --git a/src/shrpx_worker_process.h b/src/shrpx_worker_process.h index a2a7dc7a..51597a38 100644 --- a/src/shrpx_worker_process.h +++ b/src/shrpx_worker_process.h @@ -27,6 +27,14 @@ #include "shrpx.h" +#include +#include + +#include "shrpx_connection_handler.h" +#ifdef ENABLE_HTTP3 +# include "shrpx_quic.h" +#endif // ENABLE_HTTP3 + namespace shrpx { class ConnectionHandler; @@ -38,6 +46,16 @@ struct WorkerProcessConfig { int server_fd; // IPv6 socket, or -1 if not used int server_fd6; +#ifdef ENABLE_HTTP3 + // CID prefixes for the new worker process. + std::vector> cid_prefixes; + // IPC socket to read forwarded QUIC UDP datagram from the current + // worker process. + int quic_ipc_fd; + // Lingering worker processes which were created before this worker + // process to forward QUIC UDP datagram during reload. + std::vector quic_lingering_worker_processes; +#endif // ENABLE_HTTP3 }; int worker_process_event_loop(WorkerProcessConfig *wpconf); diff --git a/src/util.cc b/src/util.cc index bab90d56..7bc94513 100644 --- a/src/util.cc +++ b/src/util.cc @@ -1609,10 +1609,7 @@ bool is_hex_string(const StringRef &s) { StringRef decode_hex(BlockAllocator &balloc, const StringRef &s) { auto iov = make_byte_ref(balloc, s.size() + 1); - auto p = iov.base; - for (auto it = std::begin(s); it != std::end(s); it += 2) { - *p++ = (hex_to_uint(*it) << 4) | hex_to_uint(*(it + 1)); - } + auto p = decode_hex(iov.base, s); *p = '\0'; return StringRef{iov.base, p}; } diff --git a/src/util.h b/src/util.h index 2470008e..16c26ff1 100644 --- a/src/util.h +++ b/src/util.h @@ -214,6 +214,15 @@ OutputIt format_hex(OutputIt it, const StringRef &s) { // == true. StringRef decode_hex(BlockAllocator &balloc, const StringRef &s); +template +OutputIt decode_hex(OutputIt d_first, const StringRef &s) { + for (auto it = std::begin(s); it != std::end(s); it += 2) { + *d_first++ = (hex_to_uint(*it) << 4) | hex_to_uint(*(it + 1)); + } + + return d_first; +} + // Returns given time |t| from epoch in HTTP Date format (e.g., Mon, // 10 Oct 2016 10:25:58 GMT). std::string http_date(time_t t);