diff --git a/bpf/reuseport_kern.c b/bpf/reuseport_kern.c index 00ee05a6..7c1094a5 100644 --- a/bpf/reuseport_kern.c +++ b/bpf/reuseport_kern.c @@ -271,7 +271,19 @@ int select_reuseport(struct sk_reuseport_md *reuse_md) { psk_index = bpf_map_lookup_elem(&cid_prefix_map, sk_prefix); if (psk_index == NULL) { - return SK_DROP; + pnum_socks = bpf_map_lookup_elem(&sk_info, &zero); + if (pnum_socks == NULL) { + return SK_DROP; + } + + a = (sk_prefix[0] << 24) | (sk_prefix[1] << 16) | (sk_prefix[2] << 8) | + sk_prefix[3]; + b = (sk_prefix[4] << 24) | (sk_prefix[5] << 16) | (sk_prefix[6] << 8) | + sk_prefix[7]; + + sk_index = jhash_2words(a, b, reuse_md->hash) % *pnum_socks; + + break; } sk_index = *psk_index; diff --git a/src/shrpx.cc b/src/shrpx.cc index e14036aa..e69505e9 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -132,6 +132,15 @@ constexpr auto ENV_ACCEPT_PREFIX = StringRef::from_lit("NGHTTPX_ACCEPT_"); // the original main process to shut it down gracefully. constexpr auto ENV_ORIG_PID = StringRef::from_lit("NGHTTPX_ORIG_PID"); +// Prefix of environment variables to tell new binary the QUIC IPC +// file descriptor and CID prefix of the lingering worker process. +// The value must be comma separated parameters: +// ,,,... is the file +// descriptor. is the I-th CID prefix in hex encoded +// string. +constexpr auto ENV_QUIC_WORKER_PROCESS_PREFIX = + StringRef::from_lit("NGHTTPX_QUIC_WORKER_PROCESS_"); + #ifndef _KERNEL_FASTOPEN # define _KERNEL_FASTOPEN // conditional define for TCP_FASTOPEN mostly on ubuntu @@ -183,8 +192,23 @@ void worker_process_child_cb(struct ev_loop *loop, ev_child *w, int revents); } // namespace struct WorkerProcess { - WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd) - : loop(loop), worker_pid(worker_pid), ipc_fd(ipc_fd) { + WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd +#ifdef ENABLE_HTTP3 + , + int quic_ipc_fd, + const std::vector> + &cid_prefixes +#endif // ENABLE_HTTP3 + ) + : loop(loop), + worker_pid(worker_pid), + ipc_fd(ipc_fd) +#ifdef ENABLE_HTTP3 + , + quic_ipc_fd(quic_ipc_fd), + cid_prefixes(cid_prefixes) +#endif // ENABLE_HTTP3 + { ev_signal_init(&reopen_log_signalev, signal_cb, REOPEN_LOG_SIGNAL); reopen_log_signalev.data = this; ev_signal_start(loop, &reopen_log_signalev); @@ -213,6 +237,12 @@ struct WorkerProcess { ev_child_stop(loop, &worker_process_childev); +#ifdef ENABLE_HTTP3 + if (quic_ipc_fd != -1) { + close(quic_ipc_fd); + } +#endif // ENABLE_HTTP3 + if (ipc_fd != -1) { shutdown(ipc_fd, SHUT_WR); close(ipc_fd); @@ -234,6 +264,10 @@ struct WorkerProcess { struct ev_loop *loop; pid_t worker_pid; int ipc_fd; +#ifdef ENABLE_HTTP3 + int quic_ipc_fd; + std::vector> cid_prefixes; +#endif // ENABLE_HTTP3 }; namespace { @@ -456,8 +490,8 @@ void exec_binary() { auto &listenerconf = config->conn.listener; // 2 for ENV_ORIG_PID and terminal nullptr. - auto envp = - std::make_unique(envlen + listenerconf.addrs.size() + 2); + auto envp = std::make_unique(envlen + listenerconf.addrs.size() + + worker_processes.size() + 2); size_t envidx = 0; std::vector fd_envs; @@ -485,6 +519,24 @@ void exec_binary() { ipc_fd_str += util::utos(config->pid); envp[envidx++] = const_cast(ipc_fd_str.c_str()); +#ifdef ENABLE_HTTP3 + std::vector quic_lwps; + for (size_t i = 0; i < worker_processes.size(); ++i) { + auto &wp = worker_processes[i]; + auto s = ENV_QUIC_WORKER_PROCESS_PREFIX.str(); + s += util::utos(i + 1); + s += '='; + s += util::utos(wp->quic_ipc_fd); + for (auto &cid_prefix : wp->cid_prefixes) { + s += ','; + s += util::format_hex(cid_prefix); + } + + quic_lwps.emplace_back(s); + envp[envidx++] = const_cast(quic_lwps.back().c_str()); + } +#endif // ENABLE_HTTP3 + for (size_t i = 0; i < envlen; ++i) { auto env = StringRef{environ[i]}; if (util::starts_with(env, ENV_ACCEPT_PREFIX) || @@ -493,7 +545,8 @@ void exec_binary() { util::starts_with(env, ENV_PORT) || util::starts_with(env, ENV_UNIX_FD) || util::starts_with(env, ENV_UNIX_PATH) || - util::starts_with(env, ENV_ORIG_PID)) { + util::starts_with(env, ENV_ORIG_PID) || + util::starts_with(env, ENV_QUIC_WORKER_PROCESS_PREFIX)) { continue; } @@ -1105,6 +1158,85 @@ pid_t get_orig_pid_from_env() { } } // namespace +#ifdef ENABLE_HTTP3 +namespace { +std::vector + inherited_quic_lingering_worker_processes; +} // namespace + +namespace { +std::vector +get_inherited_quic_lingering_worker_process_from_env() { + std::vector iwps; + + for (size_t i = 1;; ++i) { + auto name = ENV_QUIC_WORKER_PROCESS_PREFIX.str(); + name += util::utos(i); + auto env = getenv(name.c_str()); + if (!env) { + break; + } + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Read env " << name << "=" << env; + } + + auto envend = env + strlen(env); + + auto end_fd = std::find(env, envend, ','); + if (end_fd == envend) { + continue; + } + + auto fd = + util::parse_uint(reinterpret_cast(env), end_fd - env); + if (fd == -1) { + LOG(WARN) << "Could not parse file descriptor from " + << StringRef{env, static_cast(end_fd - env)}; + continue; + } + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Inherit worker process QUIC IPC socket fd=" << fd; + } + + util::make_socket_closeonexec(fd); + + std::vector> cid_prefixes; + + auto p = end_fd + 1; + for (;;) { + auto end = std::find(p, envend, ','); + + auto hex_cid_prefix = StringRef{p, end}; + if (hex_cid_prefix.size() != SHRPX_QUIC_CID_PREFIXLEN * 2 || + !util::is_hex_string(hex_cid_prefix)) { + continue; + } + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Inherit worker process CID prefix=" << hex_cid_prefix; + } + + cid_prefixes.emplace_back(); + + util::decode_hex(std::begin(cid_prefixes.back()), hex_cid_prefix); + + if (end == envend) { + break; + } + + p = end + 1; + } + + iwps.emplace_back(std::move(cid_prefixes), fd); + } + + return iwps; +} +} // namespace +#endif // ENABLE_HTTP3 + namespace { int create_acceptor_socket(Config *config, std::vector &iaddrs) { std::array errbuf; @@ -1182,14 +1314,97 @@ int create_ipc_socket(std::array &ipc_fd) { } } // namespace +#ifdef ENABLE_HTTP3 +namespace { +int create_quic_ipc_socket(std::array &quic_ipc_fd) { + std::array errbuf; + int rv; + + rv = socketpair(AF_UNIX, SOCK_DGRAM, 0, quic_ipc_fd.data()); + if (rv == -1) { + auto error = errno; + LOG(WARN) << "Failed to create socket pair to communicate worker process: " + << xsi_strerror(error, errbuf.data(), errbuf.size()); + return -1; + } + + for (auto fd : quic_ipc_fd) { + util::make_socket_nonblocking(fd); + } + + return 0; +} +} // namespace + +namespace { +int generate_cid_prefix( + std::vector> &cid_prefixes, + const Config *config) { + auto &apiconf = config->api; + + size_t num_cid_prefix; + if (config->single_thread) { + num_cid_prefix = 1; + } else { + num_cid_prefix = config->num_worker; + + // API endpoint occupies the one dedicated worker thread. + // Although such worker never gets QUIC traffic, we create CID + // prefix for it to make code a bit simpler. + if (apiconf.enabled) { + ++num_cid_prefix; + } + } + + cid_prefixes.resize(num_cid_prefix); + + for (auto &cid_prefix : cid_prefixes) { + if (create_cid_prefix(cid_prefix.data()) != 0) { + return -1; + } + } + + return 0; +} +} // namespace + +namespace { +std::vector +collect_quic_lingering_worker_processes() { + std::vector quic_lwps{ + std::begin(inherited_quic_lingering_worker_processes), + std::end(inherited_quic_lingering_worker_processes)}; + + for (auto &wp : worker_processes) { + quic_lwps.emplace_back(wp->cid_prefixes, wp->quic_ipc_fd); + } + + return quic_lwps; +} +} // namespace +#endif // ENABLE_HTTP3 + namespace { // Creates worker process, and returns PID of worker process. On // success, file descriptor for IPC (send only) is assigned to // |main_ipc_fd|. In child process, we will close file descriptors // which are inherited from previous configuration/process, but not // used in the current configuration. -pid_t fork_worker_process(int &main_ipc_fd, - const std::vector &iaddrs) { +pid_t fork_worker_process( + int &main_ipc_fd +#ifdef ENABLE_HTTP3 + , + int &wp_quic_ipc_fd +#endif // ENABLE_HTTP3 + , + const std::vector &iaddrs +#ifdef ENABLE_HTTP3 + , + const std::vector> + &cid_prefixes, + const std::vector &quic_lwps +#endif // ENABLE_HTTP3 +) { std::array errbuf; int rv; sigset_t oldset; @@ -1201,6 +1416,15 @@ pid_t fork_worker_process(int &main_ipc_fd, return -1; } +#ifdef ENABLE_HTTP3 + std::array quic_ipc_fd; + + rv = create_quic_ipc_socket(quic_ipc_fd); + if (rv != 0) { + return -1; + } +#endif // ENABLE_HTTP3 + rv = shrpx_signal_block_all(&oldset); if (rv != 0) { auto error = errno; @@ -1228,6 +1452,20 @@ pid_t fork_worker_process(int &main_ipc_fd, util::make_socket_closeonexec(addr.fd); } +#ifdef ENABLE_HTTP3 + util::make_socket_closeonexec(quic_ipc_fd[0]); + + for (auto &lwp : quic_lwps) { + util::make_socket_closeonexec(lwp.quic_ipc_fd); + } + + for (auto &wp : worker_processes) { + util::make_socket_closeonexec(wp->quic_ipc_fd); + // Do not close quic_ipc_fd. + wp->quic_ipc_fd = -1; + } +#endif // ENABLE_HTTP3 + // Remove all WorkerProcesses to stop any registered watcher on // default loop. worker_process_remove_all(); @@ -1251,9 +1489,19 @@ pid_t fork_worker_process(int &main_ipc_fd, if (!config->single_process) { close(ipc_fd[1]); +#ifdef ENABLE_HTTP3 + close(quic_ipc_fd[1]); +#endif // ENABLE_HTTP3 } - WorkerProcessConfig wpconf{ipc_fd[0]}; + WorkerProcessConfig wpconf{ + .ipc_fd = ipc_fd[0], +#ifdef ENABLE_HTTP3 + .cid_prefixes = cid_prefixes, + .quic_ipc_fd = quic_ipc_fd[0], + .quic_lingering_worker_processes = quic_lwps, +#endif // ENABLE_HTTP3 + }; rv = worker_process_event_loop(&wpconf); if (rv != 0) { LOG(FATAL) << "Worker process returned error"; @@ -1294,13 +1542,23 @@ pid_t fork_worker_process(int &main_ipc_fd, if (pid == -1) { close(ipc_fd[0]); close(ipc_fd[1]); +#ifdef ENABLE_HTTP3 + close(quic_ipc_fd[0]); + close(quic_ipc_fd[1]); +#endif // ENABLE_HTTP3 return -1; } close(ipc_fd[0]); +#ifdef ENABLE_HTTP3 + close(quic_ipc_fd[0]); +#endif // ENABLE_HTTP3 main_ipc_fd = ipc_fd[1]; +#ifdef ENABLE_HTTP3 + wp_quic_ipc_fd = quic_ipc_fd[1]; +#endif // ENABLE_HTTP3 LOG(NOTICE) << "Worker process [" << pid << "] spawned"; @@ -1347,17 +1605,50 @@ int event_loop() { auto orig_pid = get_orig_pid_from_env(); +#ifdef ENABLE_HTTP3 + inherited_quic_lingering_worker_processes = + get_inherited_quic_lingering_worker_process_from_env(); +#endif // ENABLE_HTTP3 + auto loop = ev_default_loop(config->ev_loop_flags); int ipc_fd = 0; +#ifdef ENABLE_HTTP3 + int quic_ipc_fd = 0; - auto pid = fork_worker_process(ipc_fd, {}); + auto quic_lwps = collect_quic_lingering_worker_processes(); + + std::vector> cid_prefixes; + + if (generate_cid_prefix(cid_prefixes, config) != 0) { + return -1; + } +#endif // ENABLE_HTTP3 + + auto pid = fork_worker_process( + ipc_fd +#ifdef ENABLE_HTTP3 + , + quic_ipc_fd +#endif // ENABLE_HTTP3 + , + {} +#ifdef ENABLE_HTTP3 + , + cid_prefixes, quic_lwps +#endif // ENABLE_HTTP3 + ); if (pid == -1) { return -1; } - worker_process_add(std::make_unique(loop, pid, ipc_fd)); + worker_process_add(std::make_unique(loop, pid, ipc_fd +#ifdef ENABLE_HTTP3 + , + quic_ipc_fd, cid_prefixes +#endif // ENABLE_HTTP3 + )); // Write PID file when we are ready to accept connection from peer. // This makes easier to write restart script for nghttpx. Because @@ -3325,13 +3616,37 @@ void reload_config(WorkerProcess *wp) { auto loop = ev_default_loop(new_config->ev_loop_flags); int ipc_fd = 0; +#ifdef ENABLE_HTTP3 + int quic_ipc_fd = 0; + + auto quic_lwps = collect_quic_lingering_worker_processes(); + + std::vector> cid_prefixes; + + if (generate_cid_prefix(cid_prefixes, new_config.get()) != 0) { + close_not_inherited_fd(new_config.get(), iaddrs); + return; + } +#endif // ENABLE_HTTP3 // fork_worker_process and forked child process assumes new // configuration can be obtained from get_config(). auto old_config = replace_config(std::move(new_config)); - auto pid = fork_worker_process(ipc_fd, iaddrs); + auto pid = fork_worker_process(ipc_fd +#ifdef ENABLE_HTTP3 + , + quic_ipc_fd +#endif // ENABLE_HTTP3 + + , + iaddrs +#ifdef ENABLE_HTTP3 + , + cid_prefixes, quic_lwps +#endif // ENABLE_HTTP3 + ); if (pid == -1) { LOG(ERROR) << "Failed to process new configuration"; @@ -3350,7 +3665,12 @@ void reload_config(WorkerProcess *wp) { // We no longer use signals for this worker. last_wp->shutdown_signal_watchers(); - worker_process_add(std::make_unique(loop, pid, ipc_fd)); + worker_process_add(std::make_unique(loop, pid, ipc_fd +#ifdef ENABLE_HTTP3 + , + quic_ipc_fd, cid_prefixes +#endif // ENABLE_HTTP3 + )); if (!get_config()->pid_file.empty()) { save_pid(); diff --git a/src/shrpx_connection_handler.cc b/src/shrpx_connection_handler.cc index 14e20253..c32eb86f 100644 --- a/src/shrpx_connection_handler.cc +++ b/src/shrpx_connection_handler.cc @@ -45,6 +45,7 @@ #include "shrpx_memcached_dispatcher.h" #include "shrpx_signal.h" #include "shrpx_log.h" +#include "xsi_strerror.h" #include "util.h" #include "template.h" @@ -112,7 +113,11 @@ void serial_event_async_cb(struct ev_loop *loop, ev_async *w, int revent) { } // namespace ConnectionHandler::ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen) - : gen_(gen), + : +#ifdef ENABLE_HTTP3 + quic_ipc_fd_(-1), +#endif // ENABLE_HTTP3 + gen_(gen), single_worker_(nullptr), loop_(loop), #ifdef HAVE_NEVERBLEED @@ -267,11 +272,13 @@ int ConnectionHandler::create_single_worker() { } } +#if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF) + quic_bpf_refs_.resize(config->conn.quic_listener.addrs.size()); +#endif // ENABLE_HTTP3 && HAVE_LIBBPF + #ifdef ENABLE_HTTP3 - std::array cid_prefix; - if (create_cid_prefix(cid_prefix.data()) != 0) { - return -1; - } + assert(cid_prefixes_.size() == 1); + const auto &cid_prefix = cid_prefixes_[0]; #endif // ENABLE_HTTP3 single_worker_ = std::make_unique( @@ -343,7 +350,7 @@ int ConnectionHandler::create_worker_thread(size_t num) { auto &apiconf = config->api; # if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF) - quic_bpf_refs_.resize(num); + quic_bpf_refs_.resize(config->conn.quic_listener.addrs.size()); # endif // ENABLE_HTTP3 && HAVE_LIBBPF // We have dedicated worker for API request processing. @@ -369,14 +376,15 @@ int ConnectionHandler::create_worker_thread(size_t num) { } } +# ifdef ENABLE_HTTP3 + assert(cid_prefixes_.size() == num); +# endif // ENABLE_HTTP3 + for (size_t i = 0; i < num; ++i) { auto loop = ev_loop_new(config->ev_loop_flags); # ifdef ENABLE_HTTP3 - std::array cid_prefix; - if (create_cid_prefix(cid_prefix.data()) != 0) { - return -1; - } + const auto &cid_prefix = cid_prefixes_[i]; # endif // ENABLE_HTTP3 auto worker = std::make_unique( @@ -1060,12 +1068,246 @@ int ConnectionHandler::create_quic_secret() { return 0; } +void ConnectionHandler::set_cid_prefixes( + const std::vector> + &cid_prefixes) { + cid_prefixes_ = cid_prefixes; +} + +QUICLingeringWorkerProcess * +ConnectionHandler::match_quic_lingering_worker_process_cid_prefix( + const uint8_t *dcid, size_t dcidlen) { + assert(dcidlen >= SHRPX_QUIC_CID_PREFIXLEN); + + for (auto &lwps : quic_lingering_worker_processes_) { + for (auto &cid_prefix : lwps.cid_prefixes) { + if (std::equal(std::begin(cid_prefix), std::end(cid_prefix), dcid)) { + return &lwps; + } + } + } + + return nullptr; +} + # ifdef HAVE_LIBBPF std::vector &ConnectionHandler::get_quic_bpf_refs() { return quic_bpf_refs_; } # endif // HAVE_LIBBPF +void ConnectionHandler::set_quic_ipc_fd(int fd) { quic_ipc_fd_ = fd; } + +void ConnectionHandler::set_quic_lingering_worker_processes( + const std::vector &quic_lwps) { + quic_lingering_worker_processes_ = quic_lwps; +} + +int ConnectionHandler::forward_quic_packet_to_lingering_worker_process( + QUICLingeringWorkerProcess *quic_lwp, const Address &remote_addr, + const Address &local_addr, const uint8_t *data, size_t datalen) { + std::array header; + + assert(header.size() >= 1 + 1 + 1 + sizeof(sockaddr_storage) * 2); + assert(remote_addr.len > 0); + assert(local_addr.len > 0); + + auto p = header.data(); + + *p++ = static_cast(QUICIPCType::DGRAM_FORWARD); + *p++ = static_cast(remote_addr.len - 1); + p = std::copy_n(reinterpret_cast(&remote_addr.su), + remote_addr.len, p); + *p++ = static_cast(local_addr.len - 1); + p = std::copy_n(reinterpret_cast(&local_addr.su), + local_addr.len, p); + + iovec msg_iov[] = { + { + .iov_base = header.data(), + .iov_len = static_cast(p - header.data()), + }, + { + .iov_base = const_cast(data), + .iov_len = datalen, + }, + }; + + msghdr msg{}; + msg.msg_iov = msg_iov; + msg.msg_iovlen = array_size(msg_iov); + + ssize_t nwrite; + + while ((nwrite = sendmsg(quic_lwp->quic_ipc_fd, &msg, 0)) == -1 && + errno == EINTR) + ; + + if (nwrite == -1) { + std::array errbuf; + + auto error = errno; + LOG(ERROR) << "Failed to send QUIC IPC message: " + << xsi_strerror(error, errbuf.data(), errbuf.size()); + + return -1; + } + + return 0; +} + +int ConnectionHandler::quic_ipc_read() { + std::array buf; + + ssize_t nread; + + while ((nread = recv(quic_ipc_fd_, buf.data(), buf.size(), 0)) == -1 && + errno == EINTR) + ; + + if (nread == -1) { + std::array errbuf; + + auto error = errno; + LOG(ERROR) << "Failed to read data from QUIC IPC channel: " + << xsi_strerror(error, errbuf.data(), errbuf.size()); + + return -1; + } + + if (nread == 0) { + return 0; + } + + size_t len = 1 + 1 + 1; + + // Wire format: + // TYPE(1) REMOTE_ADDRLEN(1) REMOTE_ADDR(N) LOCAL_ADDRLEN(1) REMOTE_ADDR(N) + // DGRAM_PAYLAOD(N) + // + // When encoding, REMOTE_ADDRLEN and LOCAL_ADDRLEN is decremented by + // 1. + if (static_cast(nread) < len) { + return 0; + } + + auto p = buf.data(); + if (*p != static_cast(QUICIPCType::DGRAM_FORWARD)) { + LOG(ERROR) << "Unknown QUICIPCType: " << static_cast(*p); + + return -1; + } + + ++p; + + auto pkt = std::make_unique(); + + auto remote_addrlen = static_cast(*p++) + 1; + if (remote_addrlen > sizeof(sockaddr_storage)) { + LOG(ERROR) << "The length of remote address is too large: " + << remote_addrlen; + + return -1; + } + + len += remote_addrlen; + + if (static_cast(nread) < len) { + LOG(ERROR) << "Insufficient QUIC IPC message length"; + + return -1; + } + + pkt->remote_addr.len = remote_addrlen; + memcpy(&pkt->remote_addr.su, p, remote_addrlen); + + p += remote_addrlen; + + auto local_addrlen = static_cast(*p++) + 1; + if (local_addrlen > sizeof(sockaddr_storage)) { + LOG(ERROR) << "The length of local address is too large: " << local_addrlen; + + return -1; + } + + len += local_addrlen; + + if (static_cast(nread) < len) { + LOG(ERROR) << "Insufficient QUIC IPC message length"; + + return -1; + } + + pkt->local_addr.len = local_addrlen; + memcpy(&pkt->local_addr.su, p, local_addrlen); + + p += local_addrlen; + + auto datalen = nread - (p - buf.data()); + + pkt->data.assign(p, p + datalen); + + // At the moment, UpstreamAddr index is unknown. + pkt->upstream_addr_index = static_cast(-1); + + uint32_t version; + const uint8_t *dcid; + size_t dcidlen; + const uint8_t *scid; + size_t scidlen; + + auto rv = + ngtcp2_pkt_decode_version_cid(&version, &dcid, &dcidlen, &scid, &scidlen, + p, datalen, SHRPX_QUIC_SCIDLEN); + if (rv < 0) { + LOG(ERROR) << "ngtcp2_pkt_decode_version_cid: " << ngtcp2_strerror(rv); + + return -1; + } + + if (dcidlen < SHRPX_QUIC_CID_PREFIXLEN) { + LOG(ERROR) << "DCID is too short"; + return -1; + } + + if (single_worker_) { + auto faddr = single_worker_->find_quic_upstream_addr(pkt->local_addr); + if (faddr == nullptr) { + LOG(ERROR) << "No suitable upstream address found"; + + return 0; + } + + auto quic_conn_handler = single_worker_->get_quic_connection_handler(); + + // Ignore return value + quic_conn_handler->handle_packet(faddr, pkt->remote_addr, pkt->local_addr, + pkt->data.data(), pkt->data.size()); + + return 0; + } + + for (auto &worker : workers_) { + if (!std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN, + worker->get_cid_prefix())) { + continue; + } + + WorkerEvent wev{ + .type = WorkerEventType::QUIC_PKT_FORWARD, + .quic_pkt = std::move(pkt), + }; + worker->send(std::move(wev)); + + return 0; + } + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "No worker to match CID prefix"; + } + + return 0; +} #endif // ENABLE_HTTP3 } // namespace shrpx diff --git a/src/shrpx_connection_handler.h b/src/shrpx_connection_handler.h index 7ffc2112..2c8d9c9a 100644 --- a/src/shrpx_connection_handler.h +++ b/src/shrpx_connection_handler.h @@ -103,12 +103,33 @@ struct SerialEvent { std::shared_ptr downstreamconf; }; -#if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF) +#ifdef ENABLE_HTTP3 +# ifdef HAVE_LIBBPF struct BPFRef { int reuseport_array; int cid_prefix_map; }; -#endif // ENABLE_HTTP3 && HAVE_LIBBPF +# endif // HAVE_LIBBPF + +// QUIC IPC message type. +enum class QUICIPCType { + NONE, + // Send forwarded QUIC UDP datagram and its metadata. + DGRAM_FORWARD, +}; + +// WorkerProcesses which are in graceful shutdown period. +struct QUICLingeringWorkerProcess { + QUICLingeringWorkerProcess( + std::vector> cid_prefixes, + int quic_ipc_fd) + : cid_prefixes{std::move(cid_prefixes)}, quic_ipc_fd{quic_ipc_fd} {} + + std::vector> cid_prefixes; + // Socket to send QUIC IPC message to this worker process. + int quic_ipc_fd; +}; +#endif // ENABLE_HTTP3 class ConnectionHandler { public: @@ -179,6 +200,28 @@ public: int create_quic_secret(); + void set_cid_prefixes( + const std::vector> + &cid_prefixes); + + void set_quic_lingering_worker_processes( + const std::vector &quic_lwps); + + // Return matching QUICLingeringWorkerProcess which has a CID prefix + // such that |dcid| starts with it. If no such + // QUICLingeringWorkerProcess, it returns nullptr. + QUICLingeringWorkerProcess * + match_quic_lingering_worker_process_cid_prefix(const uint8_t *dcid, + size_t dcidlen); + + int forward_quic_packet_to_lingering_worker_process( + QUICLingeringWorkerProcess *quic_lwp, const Address &remote_addr, + const Address &local_addr, const uint8_t *data, size_t datalen); + + void set_quic_ipc_fd(int fd); + + int quic_ipc_read(); + # ifdef HAVE_LIBBPF std::vector &get_quic_bpf_refs(); # endif // HAVE_LIBBPF @@ -212,6 +255,11 @@ private: // and signature algorithm presented by client. std::vector> indexed_ssl_ctx_; #ifdef ENABLE_HTTP3 + std::vector> cid_prefixes_; + std::vector> + lingering_cid_prefixes_; + int quic_ipc_fd_; + std::vector quic_lingering_worker_processes_; # ifdef HAVE_LIBBPF std::vector quic_bpf_refs_; # endif // HAVE_LIBBPF diff --git a/src/shrpx_quic_connection_handler.cc b/src/shrpx_quic_connection_handler.cc index 0160799e..b1f7a19f 100644 --- a/src/shrpx_quic_connection_handler.cc +++ b/src/shrpx_quic_connection_handler.cc @@ -77,10 +77,27 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr, auto dcid_key = make_cid_key(dcid, dcidlen); + auto conn_handler = worker_->get_connection_handler(); + ClientHandler *handler; auto it = connections_.find(dcid_key); if (it == std::end(connections_)) { + if (!std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN, + worker_->get_cid_prefix())) { + auto quic_lwp = + conn_handler->match_quic_lingering_worker_process_cid_prefix(dcid, + dcidlen); + if (quic_lwp) { + if (conn_handler->forward_quic_packet_to_lingering_worker_process( + quic_lwp, remote_addr, local_addr, data, datalen) == 0) { + return 0; + } + + return 0; + } + } + // new connection ngtcp2_pkt_hd hd; @@ -90,6 +107,14 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr, switch (ngtcp2_accept(&hd, data, datalen)) { case 0: { + // If we get Initial and it has the CID prefix of this worker, it + // is likely that client is intentionally use the our prefix. + // Just drop it. + if (std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN, + worker_->get_cid_prefix())) { + return 0; + } + if (hd.token.len == 0) { break; } @@ -102,7 +127,9 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr, if (verify_retry_token(&odcid, hd.token.base, hd.token.len, &hd.dcid, &remote_addr.su.sa, remote_addr.len, secret.data()) != 0) { - break; + // 2nd Retry packet is not allowed, so just drop it or send + // CONNECTIONC_CLOE with INVALID_TOKEN. + return 0; } podcid = &odcid; @@ -139,8 +166,6 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr, dcidlen > SHRPX_QUIC_CID_PREFIXLEN && !std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN, worker_->get_cid_prefix())) { - auto conn_handler = worker_->get_connection_handler(); - if (conn_handler->forward_quic_packet(faddr, remote_addr, local_addr, dcid, data, datalen) == 0) { return 0; @@ -152,14 +177,6 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr, return 0; } - // If we get Initial and it has the CID prefix of this worker, it - // is likely that client is intentionally use the our prefix. - // Just drop it. - if (std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN, - worker_->get_cid_prefix())) { - return 0; - } - handler = handle_new_connection(faddr, remote_addr, local_addr, hd, podcid, token, tokenlen); if (handler == nullptr) { diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 2e91b63d..ab5c4bc5 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -531,9 +531,26 @@ void Worker::process_events() { break; #ifdef ENABLE_HTTP3 case WorkerEventType::QUIC_PKT_FORWARD: { + const UpstreamAddr *faddr; + + if (wev.quic_pkt->upstream_addr_index == static_cast(-1)) { + faddr = find_quic_upstream_addr(wev.quic_pkt->local_addr); + if (faddr == nullptr) { + LOG(ERROR) << "No suitable upstream address found"; + + break; + } + } else if (quic_upstream_addrs_.size() <= + wev.quic_pkt->upstream_addr_index) { + LOG(ERROR) << "upstream_addr_index is too large"; + + break; + } else { + faddr = &quic_upstream_addrs_[wev.quic_pkt->upstream_addr_index]; + } + quic_conn_handler_.handle_packet( - &quic_upstream_addrs_[wev.quic_pkt->upstream_addr_index], - wev.quic_pkt->remote_addr, wev.quic_pkt->local_addr, + faddr, wev.quic_pkt->remote_addr, wev.quic_pkt->local_addr, wev.quic_pkt->data.data(), wev.quic_pkt->data.size()); break; @@ -647,12 +664,11 @@ bool Worker::should_attach_bpf() const { auto &quicconf = config->quic; auto &apiconf = config->api; - if (quicconf.bpf.disabled || config->single_thread || - config->num_worker == 1) { + if (quicconf.bpf.disabled) { return false; } - if (apiconf.enabled) { + if (!config->single_thread && apiconf.enabled) { return index_ == 1; } @@ -663,18 +679,14 @@ bool Worker::should_update_bpf_map() const { auto config = get_config(); auto &quicconf = config->quic; - return !quicconf.bpf.disabled && !config->single_thread && - config->num_worker > 1; + return !quicconf.bpf.disabled; } uint32_t Worker::compute_sk_index() const { auto config = get_config(); auto &apiconf = config->api; - assert(!config->single_thread); - assert(config->num_worker > 1); - - if (apiconf.enabled) { + if (!config->single_thread && apiconf.enabled) { return index_ - 1; } @@ -979,6 +991,65 @@ void Worker::set_quic_secret(const std::shared_ptr &secret) { const std::shared_ptr &Worker::get_quic_secret() const { return quic_secret_; } + +const UpstreamAddr *Worker::find_quic_upstream_addr(const Address &local_addr) { + std::array host; + + auto rv = getnameinfo(&local_addr.su.sa, local_addr.len, host.data(), + host.size(), nullptr, 0, NI_NUMERICHOST); + if (rv != 0) { + LOG(ERROR) << "getnameinfo: " << gai_strerror(rv); + + return nullptr; + } + + uint16_t port; + + switch (local_addr.su.sa.sa_family) { + case AF_INET: + port = htons(local_addr.su.in.sin_port); + + break; + case AF_INET6: + port = htons(local_addr.su.in6.sin6_port); + + break; + default: + assert(0); + } + + auto hostport = util::make_hostport(StringRef{host.data()}, port); + const UpstreamAddr *fallback_faddr = nullptr; + + for (auto &faddr : quic_upstream_addrs_) { + if (faddr.hostport == hostport) { + return &faddr; + } + + if (faddr.port != port || faddr.family != local_addr.su.sa.sa_family) { + continue; + } + + switch (faddr.family) { + case AF_INET: + if (util::starts_with(faddr.hostport, StringRef::from_lit("0.0.0.0:"))) { + fallback_faddr = &faddr; + } + + break; + case AF_INET6: + if (util::starts_with(faddr.hostport, StringRef::from_lit("[::]:"))) { + fallback_faddr = &faddr; + } + + break; + default: + assert(0); + } + } + + return fallback_faddr; +} #endif // ENABLE_HTTP3 namespace { diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index 52a75c94..3d58fbe3 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -261,6 +261,7 @@ struct QUICPacket { remote_addr{remote_addr}, local_addr{local_addr}, data{data, data + datalen} {} + QUICPacket() {} size_t upstream_addr_index; Address remote_addr; Address local_addr; @@ -380,6 +381,9 @@ public: # endif // HAVE_LIBBPF int create_quic_server_socket(UpstreamAddr &addr); + + // Returns a pointer to UpstreamAddr which matches |local_addr|. + const UpstreamAddr *find_quic_upstream_addr(const Address &local_addr); #endif // ENABLE_HTTP3 DNSTracker *get_dns_tracker(); diff --git a/src/shrpx_worker_process.cc b/src/shrpx_worker_process.cc index 4d18ffc8..9fc0681e 100644 --- a/src/shrpx_worker_process.cc +++ b/src/shrpx_worker_process.cc @@ -178,6 +178,20 @@ void ipc_readcb(struct ev_loop *loop, ev_io *w, int revents) { } } // namespace +#ifdef ENABLE_HTTP3 +namespace { +void quic_ipc_readcb(struct ev_loop *loop, ev_io *w, int revents) { + auto conn_handler = static_cast(w->data); + + if (conn_handler->quic_ipc_read() != 0) { + LOG(ERROR) << "Failed to read data from QUIC IPC channel"; + + return; + } +} +} // namespace +#endif // ENABLE_HTTP3 + namespace { int generate_ticket_key(TicketKey &ticket_key) { ticket_key.cipher = get_config()->tls.ticket.cipher; @@ -434,6 +448,12 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) { conn_handler->set_neverbleed(nb.get()); #endif // HAVE_NEVERBLEED +#ifdef ENABLE_HTTP3 + conn_handler->set_quic_ipc_fd(wpconf->quic_ipc_fd); + conn_handler->set_quic_lingering_worker_processes( + wpconf->quic_lingering_worker_processes); +#endif // ENABLE_HTTP3 + for (auto &addr : config->conn.listener.addrs) { conn_handler->add_acceptor( std::make_unique(&addr, conn_handler.get())); @@ -500,6 +520,10 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) { if (conn_handler->create_quic_secret() != 0) { return -1; } + + conn_handler->set_cid_prefixes(wpconf->cid_prefixes); + conn_handler->set_quic_lingering_worker_processes( + wpconf->quic_lingering_worker_processes); #endif // ENABLE_HTTP3 if (config->single_thread) { @@ -547,6 +571,13 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) { ipcev.data = conn_handler.get(); ev_io_start(loop, &ipcev); +#ifdef ENABLE_HTTP3 + ev_io quic_ipcev; + ev_io_init(&quic_ipcev, quic_ipc_readcb, wpconf->quic_ipc_fd, EV_READ); + quic_ipcev.data = conn_handler.get(); + ev_io_start(loop, &quic_ipcev); +#endif // ENABLE_HTTP3 + if (tls::upstream_tls_enabled(config->conn) && !config->tls.ocsp.disabled) { if (config->tls.ocsp.startup) { conn_handler->set_enable_acceptor_on_ocsp_completion(true); diff --git a/src/shrpx_worker_process.h b/src/shrpx_worker_process.h index a2a7dc7a..51597a38 100644 --- a/src/shrpx_worker_process.h +++ b/src/shrpx_worker_process.h @@ -27,6 +27,14 @@ #include "shrpx.h" +#include +#include + +#include "shrpx_connection_handler.h" +#ifdef ENABLE_HTTP3 +# include "shrpx_quic.h" +#endif // ENABLE_HTTP3 + namespace shrpx { class ConnectionHandler; @@ -38,6 +46,16 @@ struct WorkerProcessConfig { int server_fd; // IPv6 socket, or -1 if not used int server_fd6; +#ifdef ENABLE_HTTP3 + // CID prefixes for the new worker process. + std::vector> cid_prefixes; + // IPC socket to read forwarded QUIC UDP datagram from the current + // worker process. + int quic_ipc_fd; + // Lingering worker processes which were created before this worker + // process to forward QUIC UDP datagram during reload. + std::vector quic_lingering_worker_processes; +#endif // ENABLE_HTTP3 }; int worker_process_event_loop(WorkerProcessConfig *wpconf); diff --git a/src/util.cc b/src/util.cc index bab90d56..7bc94513 100644 --- a/src/util.cc +++ b/src/util.cc @@ -1609,10 +1609,7 @@ bool is_hex_string(const StringRef &s) { StringRef decode_hex(BlockAllocator &balloc, const StringRef &s) { auto iov = make_byte_ref(balloc, s.size() + 1); - auto p = iov.base; - for (auto it = std::begin(s); it != std::end(s); it += 2) { - *p++ = (hex_to_uint(*it) << 4) | hex_to_uint(*(it + 1)); - } + auto p = decode_hex(iov.base, s); *p = '\0'; return StringRef{iov.base, p}; } diff --git a/src/util.h b/src/util.h index 2470008e..16c26ff1 100644 --- a/src/util.h +++ b/src/util.h @@ -214,6 +214,15 @@ OutputIt format_hex(OutputIt it, const StringRef &s) { // == true. StringRef decode_hex(BlockAllocator &balloc, const StringRef &s); +template +OutputIt decode_hex(OutputIt d_first, const StringRef &s) { + for (auto it = std::begin(s); it != std::end(s); it += 2) { + *d_first++ = (hex_to_uint(*it) << 4) | hex_to_uint(*(it + 1)); + } + + return d_first; +} + // Returns given time |t| from epoch in HTTP Date format (e.g., Mon, // 10 Oct 2016 10:25:58 GMT). std::string http_date(time_t t);