nghttpx: Forward QUIC UDP datagram to lingering worker in graceful shutdown

Forward QUIC UDP datagram to lingering worker process which is in
graceful shutdown.  Both SIGHUP and SIGUSR2 work.  To make this work
correctly, eBPF is required.
This commit is contained in:
Tatsuhiro Tsujikawa 2021-08-29 10:18:59 +09:00
parent c5e9d0096a
commit 2b4dc4496f
11 changed files with 820 additions and 51 deletions

View File

@ -271,7 +271,19 @@ int select_reuseport(struct sk_reuseport_md *reuse_md) {
psk_index = bpf_map_lookup_elem(&cid_prefix_map, sk_prefix);
if (psk_index == NULL) {
return SK_DROP;
pnum_socks = bpf_map_lookup_elem(&sk_info, &zero);
if (pnum_socks == NULL) {
return SK_DROP;
}
a = (sk_prefix[0] << 24) | (sk_prefix[1] << 16) | (sk_prefix[2] << 8) |
sk_prefix[3];
b = (sk_prefix[4] << 24) | (sk_prefix[5] << 16) | (sk_prefix[6] << 8) |
sk_prefix[7];
sk_index = jhash_2words(a, b, reuse_md->hash) % *pnum_socks;
break;
}
sk_index = *psk_index;

View File

@ -132,6 +132,15 @@ constexpr auto ENV_ACCEPT_PREFIX = StringRef::from_lit("NGHTTPX_ACCEPT_");
// the original main process to shut it down gracefully.
constexpr auto ENV_ORIG_PID = StringRef::from_lit("NGHTTPX_ORIG_PID");
// Prefix of environment variables to tell new binary the QUIC IPC
// file descriptor and CID prefix of the lingering worker process.
// The value must be comma separated parameters:
// <FD>,<CID_PREFIX_0>,<CID_PREFIX_1>,... <FD> is the file
// descriptor. <CID_PREFIX_I> is the I-th CID prefix in hex encoded
// string.
constexpr auto ENV_QUIC_WORKER_PROCESS_PREFIX =
StringRef::from_lit("NGHTTPX_QUIC_WORKER_PROCESS_");
#ifndef _KERNEL_FASTOPEN
# define _KERNEL_FASTOPEN
// conditional define for TCP_FASTOPEN mostly on ubuntu
@ -183,8 +192,23 @@ void worker_process_child_cb(struct ev_loop *loop, ev_child *w, int revents);
} // namespace
struct WorkerProcess {
WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd)
: loop(loop), worker_pid(worker_pid), ipc_fd(ipc_fd) {
WorkerProcess(struct ev_loop *loop, pid_t worker_pid, int ipc_fd
#ifdef ENABLE_HTTP3
,
int quic_ipc_fd,
const std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>>
&cid_prefixes
#endif // ENABLE_HTTP3
)
: loop(loop),
worker_pid(worker_pid),
ipc_fd(ipc_fd)
#ifdef ENABLE_HTTP3
,
quic_ipc_fd(quic_ipc_fd),
cid_prefixes(cid_prefixes)
#endif // ENABLE_HTTP3
{
ev_signal_init(&reopen_log_signalev, signal_cb, REOPEN_LOG_SIGNAL);
reopen_log_signalev.data = this;
ev_signal_start(loop, &reopen_log_signalev);
@ -213,6 +237,12 @@ struct WorkerProcess {
ev_child_stop(loop, &worker_process_childev);
#ifdef ENABLE_HTTP3
if (quic_ipc_fd != -1) {
close(quic_ipc_fd);
}
#endif // ENABLE_HTTP3
if (ipc_fd != -1) {
shutdown(ipc_fd, SHUT_WR);
close(ipc_fd);
@ -234,6 +264,10 @@ struct WorkerProcess {
struct ev_loop *loop;
pid_t worker_pid;
int ipc_fd;
#ifdef ENABLE_HTTP3
int quic_ipc_fd;
std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>> cid_prefixes;
#endif // ENABLE_HTTP3
};
namespace {
@ -456,8 +490,8 @@ void exec_binary() {
auto &listenerconf = config->conn.listener;
// 2 for ENV_ORIG_PID and terminal nullptr.
auto envp =
std::make_unique<char *[]>(envlen + listenerconf.addrs.size() + 2);
auto envp = std::make_unique<char *[]>(envlen + listenerconf.addrs.size() +
worker_processes.size() + 2);
size_t envidx = 0;
std::vector<ImmutableString> fd_envs;
@ -485,6 +519,24 @@ void exec_binary() {
ipc_fd_str += util::utos(config->pid);
envp[envidx++] = const_cast<char *>(ipc_fd_str.c_str());
#ifdef ENABLE_HTTP3
std::vector<ImmutableString> quic_lwps;
for (size_t i = 0; i < worker_processes.size(); ++i) {
auto &wp = worker_processes[i];
auto s = ENV_QUIC_WORKER_PROCESS_PREFIX.str();
s += util::utos(i + 1);
s += '=';
s += util::utos(wp->quic_ipc_fd);
for (auto &cid_prefix : wp->cid_prefixes) {
s += ',';
s += util::format_hex(cid_prefix);
}
quic_lwps.emplace_back(s);
envp[envidx++] = const_cast<char *>(quic_lwps.back().c_str());
}
#endif // ENABLE_HTTP3
for (size_t i = 0; i < envlen; ++i) {
auto env = StringRef{environ[i]};
if (util::starts_with(env, ENV_ACCEPT_PREFIX) ||
@ -493,7 +545,8 @@ void exec_binary() {
util::starts_with(env, ENV_PORT) ||
util::starts_with(env, ENV_UNIX_FD) ||
util::starts_with(env, ENV_UNIX_PATH) ||
util::starts_with(env, ENV_ORIG_PID)) {
util::starts_with(env, ENV_ORIG_PID) ||
util::starts_with(env, ENV_QUIC_WORKER_PROCESS_PREFIX)) {
continue;
}
@ -1105,6 +1158,85 @@ pid_t get_orig_pid_from_env() {
}
} // namespace
#ifdef ENABLE_HTTP3
namespace {
std::vector<QUICLingeringWorkerProcess>
inherited_quic_lingering_worker_processes;
} // namespace
namespace {
std::vector<QUICLingeringWorkerProcess>
get_inherited_quic_lingering_worker_process_from_env() {
std::vector<QUICLingeringWorkerProcess> iwps;
for (size_t i = 1;; ++i) {
auto name = ENV_QUIC_WORKER_PROCESS_PREFIX.str();
name += util::utos(i);
auto env = getenv(name.c_str());
if (!env) {
break;
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Read env " << name << "=" << env;
}
auto envend = env + strlen(env);
auto end_fd = std::find(env, envend, ',');
if (end_fd == envend) {
continue;
}
auto fd =
util::parse_uint(reinterpret_cast<const uint8_t *>(env), end_fd - env);
if (fd == -1) {
LOG(WARN) << "Could not parse file descriptor from "
<< StringRef{env, static_cast<size_t>(end_fd - env)};
continue;
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Inherit worker process QUIC IPC socket fd=" << fd;
}
util::make_socket_closeonexec(fd);
std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>> cid_prefixes;
auto p = end_fd + 1;
for (;;) {
auto end = std::find(p, envend, ',');
auto hex_cid_prefix = StringRef{p, end};
if (hex_cid_prefix.size() != SHRPX_QUIC_CID_PREFIXLEN * 2 ||
!util::is_hex_string(hex_cid_prefix)) {
continue;
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Inherit worker process CID prefix=" << hex_cid_prefix;
}
cid_prefixes.emplace_back();
util::decode_hex(std::begin(cid_prefixes.back()), hex_cid_prefix);
if (end == envend) {
break;
}
p = end + 1;
}
iwps.emplace_back(std::move(cid_prefixes), fd);
}
return iwps;
}
} // namespace
#endif // ENABLE_HTTP3
namespace {
int create_acceptor_socket(Config *config, std::vector<InheritedAddr> &iaddrs) {
std::array<char, STRERROR_BUFSIZE> errbuf;
@ -1182,14 +1314,97 @@ int create_ipc_socket(std::array<int, 2> &ipc_fd) {
}
} // namespace
#ifdef ENABLE_HTTP3
namespace {
int create_quic_ipc_socket(std::array<int, 2> &quic_ipc_fd) {
std::array<char, STRERROR_BUFSIZE> errbuf;
int rv;
rv = socketpair(AF_UNIX, SOCK_DGRAM, 0, quic_ipc_fd.data());
if (rv == -1) {
auto error = errno;
LOG(WARN) << "Failed to create socket pair to communicate worker process: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
return -1;
}
for (auto fd : quic_ipc_fd) {
util::make_socket_nonblocking(fd);
}
return 0;
}
} // namespace
namespace {
int generate_cid_prefix(
std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>> &cid_prefixes,
const Config *config) {
auto &apiconf = config->api;
size_t num_cid_prefix;
if (config->single_thread) {
num_cid_prefix = 1;
} else {
num_cid_prefix = config->num_worker;
// API endpoint occupies the one dedicated worker thread.
// Although such worker never gets QUIC traffic, we create CID
// prefix for it to make code a bit simpler.
if (apiconf.enabled) {
++num_cid_prefix;
}
}
cid_prefixes.resize(num_cid_prefix);
for (auto &cid_prefix : cid_prefixes) {
if (create_cid_prefix(cid_prefix.data()) != 0) {
return -1;
}
}
return 0;
}
} // namespace
namespace {
std::vector<QUICLingeringWorkerProcess>
collect_quic_lingering_worker_processes() {
std::vector<QUICLingeringWorkerProcess> quic_lwps{
std::begin(inherited_quic_lingering_worker_processes),
std::end(inherited_quic_lingering_worker_processes)};
for (auto &wp : worker_processes) {
quic_lwps.emplace_back(wp->cid_prefixes, wp->quic_ipc_fd);
}
return quic_lwps;
}
} // namespace
#endif // ENABLE_HTTP3
namespace {
// Creates worker process, and returns PID of worker process. On
// success, file descriptor for IPC (send only) is assigned to
// |main_ipc_fd|. In child process, we will close file descriptors
// which are inherited from previous configuration/process, but not
// used in the current configuration.
pid_t fork_worker_process(int &main_ipc_fd,
const std::vector<InheritedAddr> &iaddrs) {
pid_t fork_worker_process(
int &main_ipc_fd
#ifdef ENABLE_HTTP3
,
int &wp_quic_ipc_fd
#endif // ENABLE_HTTP3
,
const std::vector<InheritedAddr> &iaddrs
#ifdef ENABLE_HTTP3
,
const std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>>
&cid_prefixes,
const std::vector<QUICLingeringWorkerProcess> &quic_lwps
#endif // ENABLE_HTTP3
) {
std::array<char, STRERROR_BUFSIZE> errbuf;
int rv;
sigset_t oldset;
@ -1201,6 +1416,15 @@ pid_t fork_worker_process(int &main_ipc_fd,
return -1;
}
#ifdef ENABLE_HTTP3
std::array<int, 2> quic_ipc_fd;
rv = create_quic_ipc_socket(quic_ipc_fd);
if (rv != 0) {
return -1;
}
#endif // ENABLE_HTTP3
rv = shrpx_signal_block_all(&oldset);
if (rv != 0) {
auto error = errno;
@ -1228,6 +1452,20 @@ pid_t fork_worker_process(int &main_ipc_fd,
util::make_socket_closeonexec(addr.fd);
}
#ifdef ENABLE_HTTP3
util::make_socket_closeonexec(quic_ipc_fd[0]);
for (auto &lwp : quic_lwps) {
util::make_socket_closeonexec(lwp.quic_ipc_fd);
}
for (auto &wp : worker_processes) {
util::make_socket_closeonexec(wp->quic_ipc_fd);
// Do not close quic_ipc_fd.
wp->quic_ipc_fd = -1;
}
#endif // ENABLE_HTTP3
// Remove all WorkerProcesses to stop any registered watcher on
// default loop.
worker_process_remove_all();
@ -1251,9 +1489,19 @@ pid_t fork_worker_process(int &main_ipc_fd,
if (!config->single_process) {
close(ipc_fd[1]);
#ifdef ENABLE_HTTP3
close(quic_ipc_fd[1]);
#endif // ENABLE_HTTP3
}
WorkerProcessConfig wpconf{ipc_fd[0]};
WorkerProcessConfig wpconf{
.ipc_fd = ipc_fd[0],
#ifdef ENABLE_HTTP3
.cid_prefixes = cid_prefixes,
.quic_ipc_fd = quic_ipc_fd[0],
.quic_lingering_worker_processes = quic_lwps,
#endif // ENABLE_HTTP3
};
rv = worker_process_event_loop(&wpconf);
if (rv != 0) {
LOG(FATAL) << "Worker process returned error";
@ -1294,13 +1542,23 @@ pid_t fork_worker_process(int &main_ipc_fd,
if (pid == -1) {
close(ipc_fd[0]);
close(ipc_fd[1]);
#ifdef ENABLE_HTTP3
close(quic_ipc_fd[0]);
close(quic_ipc_fd[1]);
#endif // ENABLE_HTTP3
return -1;
}
close(ipc_fd[0]);
#ifdef ENABLE_HTTP3
close(quic_ipc_fd[0]);
#endif // ENABLE_HTTP3
main_ipc_fd = ipc_fd[1];
#ifdef ENABLE_HTTP3
wp_quic_ipc_fd = quic_ipc_fd[1];
#endif // ENABLE_HTTP3
LOG(NOTICE) << "Worker process [" << pid << "] spawned";
@ -1347,17 +1605,50 @@ int event_loop() {
auto orig_pid = get_orig_pid_from_env();
#ifdef ENABLE_HTTP3
inherited_quic_lingering_worker_processes =
get_inherited_quic_lingering_worker_process_from_env();
#endif // ENABLE_HTTP3
auto loop = ev_default_loop(config->ev_loop_flags);
int ipc_fd = 0;
#ifdef ENABLE_HTTP3
int quic_ipc_fd = 0;
auto pid = fork_worker_process(ipc_fd, {});
auto quic_lwps = collect_quic_lingering_worker_processes();
std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>> cid_prefixes;
if (generate_cid_prefix(cid_prefixes, config) != 0) {
return -1;
}
#endif // ENABLE_HTTP3
auto pid = fork_worker_process(
ipc_fd
#ifdef ENABLE_HTTP3
,
quic_ipc_fd
#endif // ENABLE_HTTP3
,
{}
#ifdef ENABLE_HTTP3
,
cid_prefixes, quic_lwps
#endif // ENABLE_HTTP3
);
if (pid == -1) {
return -1;
}
worker_process_add(std::make_unique<WorkerProcess>(loop, pid, ipc_fd));
worker_process_add(std::make_unique<WorkerProcess>(loop, pid, ipc_fd
#ifdef ENABLE_HTTP3
,
quic_ipc_fd, cid_prefixes
#endif // ENABLE_HTTP3
));
// Write PID file when we are ready to accept connection from peer.
// This makes easier to write restart script for nghttpx. Because
@ -3325,13 +3616,37 @@ void reload_config(WorkerProcess *wp) {
auto loop = ev_default_loop(new_config->ev_loop_flags);
int ipc_fd = 0;
#ifdef ENABLE_HTTP3
int quic_ipc_fd = 0;
auto quic_lwps = collect_quic_lingering_worker_processes();
std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>> cid_prefixes;
if (generate_cid_prefix(cid_prefixes, new_config.get()) != 0) {
close_not_inherited_fd(new_config.get(), iaddrs);
return;
}
#endif // ENABLE_HTTP3
// fork_worker_process and forked child process assumes new
// configuration can be obtained from get_config().
auto old_config = replace_config(std::move(new_config));
auto pid = fork_worker_process(ipc_fd, iaddrs);
auto pid = fork_worker_process(ipc_fd
#ifdef ENABLE_HTTP3
,
quic_ipc_fd
#endif // ENABLE_HTTP3
,
iaddrs
#ifdef ENABLE_HTTP3
,
cid_prefixes, quic_lwps
#endif // ENABLE_HTTP3
);
if (pid == -1) {
LOG(ERROR) << "Failed to process new configuration";
@ -3350,7 +3665,12 @@ void reload_config(WorkerProcess *wp) {
// We no longer use signals for this worker.
last_wp->shutdown_signal_watchers();
worker_process_add(std::make_unique<WorkerProcess>(loop, pid, ipc_fd));
worker_process_add(std::make_unique<WorkerProcess>(loop, pid, ipc_fd
#ifdef ENABLE_HTTP3
,
quic_ipc_fd, cid_prefixes
#endif // ENABLE_HTTP3
));
if (!get_config()->pid_file.empty()) {
save_pid();

View File

@ -45,6 +45,7 @@
#include "shrpx_memcached_dispatcher.h"
#include "shrpx_signal.h"
#include "shrpx_log.h"
#include "xsi_strerror.h"
#include "util.h"
#include "template.h"
@ -112,7 +113,11 @@ void serial_event_async_cb(struct ev_loop *loop, ev_async *w, int revent) {
} // namespace
ConnectionHandler::ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen)
: gen_(gen),
:
#ifdef ENABLE_HTTP3
quic_ipc_fd_(-1),
#endif // ENABLE_HTTP3
gen_(gen),
single_worker_(nullptr),
loop_(loop),
#ifdef HAVE_NEVERBLEED
@ -267,11 +272,13 @@ int ConnectionHandler::create_single_worker() {
}
}
#if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
quic_bpf_refs_.resize(config->conn.quic_listener.addrs.size());
#endif // ENABLE_HTTP3 && HAVE_LIBBPF
#ifdef ENABLE_HTTP3
std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN> cid_prefix;
if (create_cid_prefix(cid_prefix.data()) != 0) {
return -1;
}
assert(cid_prefixes_.size() == 1);
const auto &cid_prefix = cid_prefixes_[0];
#endif // ENABLE_HTTP3
single_worker_ = std::make_unique<Worker>(
@ -343,7 +350,7 @@ int ConnectionHandler::create_worker_thread(size_t num) {
auto &apiconf = config->api;
# if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
quic_bpf_refs_.resize(num);
quic_bpf_refs_.resize(config->conn.quic_listener.addrs.size());
# endif // ENABLE_HTTP3 && HAVE_LIBBPF
// We have dedicated worker for API request processing.
@ -369,14 +376,15 @@ int ConnectionHandler::create_worker_thread(size_t num) {
}
}
# ifdef ENABLE_HTTP3
assert(cid_prefixes_.size() == num);
# endif // ENABLE_HTTP3
for (size_t i = 0; i < num; ++i) {
auto loop = ev_loop_new(config->ev_loop_flags);
# ifdef ENABLE_HTTP3
std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN> cid_prefix;
if (create_cid_prefix(cid_prefix.data()) != 0) {
return -1;
}
const auto &cid_prefix = cid_prefixes_[i];
# endif // ENABLE_HTTP3
auto worker = std::make_unique<Worker>(
@ -1060,12 +1068,246 @@ int ConnectionHandler::create_quic_secret() {
return 0;
}
void ConnectionHandler::set_cid_prefixes(
const std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>>
&cid_prefixes) {
cid_prefixes_ = cid_prefixes;
}
QUICLingeringWorkerProcess *
ConnectionHandler::match_quic_lingering_worker_process_cid_prefix(
const uint8_t *dcid, size_t dcidlen) {
assert(dcidlen >= SHRPX_QUIC_CID_PREFIXLEN);
for (auto &lwps : quic_lingering_worker_processes_) {
for (auto &cid_prefix : lwps.cid_prefixes) {
if (std::equal(std::begin(cid_prefix), std::end(cid_prefix), dcid)) {
return &lwps;
}
}
}
return nullptr;
}
# ifdef HAVE_LIBBPF
std::vector<BPFRef> &ConnectionHandler::get_quic_bpf_refs() {
return quic_bpf_refs_;
}
# endif // HAVE_LIBBPF
void ConnectionHandler::set_quic_ipc_fd(int fd) { quic_ipc_fd_ = fd; }
void ConnectionHandler::set_quic_lingering_worker_processes(
const std::vector<QUICLingeringWorkerProcess> &quic_lwps) {
quic_lingering_worker_processes_ = quic_lwps;
}
int ConnectionHandler::forward_quic_packet_to_lingering_worker_process(
QUICLingeringWorkerProcess *quic_lwp, const Address &remote_addr,
const Address &local_addr, const uint8_t *data, size_t datalen) {
std::array<uint8_t, 512> header;
assert(header.size() >= 1 + 1 + 1 + sizeof(sockaddr_storage) * 2);
assert(remote_addr.len > 0);
assert(local_addr.len > 0);
auto p = header.data();
*p++ = static_cast<uint8_t>(QUICIPCType::DGRAM_FORWARD);
*p++ = static_cast<uint8_t>(remote_addr.len - 1);
p = std::copy_n(reinterpret_cast<const uint8_t *>(&remote_addr.su),
remote_addr.len, p);
*p++ = static_cast<uint8_t>(local_addr.len - 1);
p = std::copy_n(reinterpret_cast<const uint8_t *>(&local_addr.su),
local_addr.len, p);
iovec msg_iov[] = {
{
.iov_base = header.data(),
.iov_len = static_cast<size_t>(p - header.data()),
},
{
.iov_base = const_cast<uint8_t *>(data),
.iov_len = datalen,
},
};
msghdr msg{};
msg.msg_iov = msg_iov;
msg.msg_iovlen = array_size(msg_iov);
ssize_t nwrite;
while ((nwrite = sendmsg(quic_lwp->quic_ipc_fd, &msg, 0)) == -1 &&
errno == EINTR)
;
if (nwrite == -1) {
std::array<char, STRERROR_BUFSIZE> errbuf;
auto error = errno;
LOG(ERROR) << "Failed to send QUIC IPC message: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
return -1;
}
return 0;
}
int ConnectionHandler::quic_ipc_read() {
std::array<uint8_t, 65536> buf;
ssize_t nread;
while ((nread = recv(quic_ipc_fd_, buf.data(), buf.size(), 0)) == -1 &&
errno == EINTR)
;
if (nread == -1) {
std::array<char, STRERROR_BUFSIZE> errbuf;
auto error = errno;
LOG(ERROR) << "Failed to read data from QUIC IPC channel: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
return -1;
}
if (nread == 0) {
return 0;
}
size_t len = 1 + 1 + 1;
// Wire format:
// TYPE(1) REMOTE_ADDRLEN(1) REMOTE_ADDR(N) LOCAL_ADDRLEN(1) REMOTE_ADDR(N)
// DGRAM_PAYLAOD(N)
//
// When encoding, REMOTE_ADDRLEN and LOCAL_ADDRLEN is decremented by
// 1.
if (static_cast<size_t>(nread) < len) {
return 0;
}
auto p = buf.data();
if (*p != static_cast<uint8_t>(QUICIPCType::DGRAM_FORWARD)) {
LOG(ERROR) << "Unknown QUICIPCType: " << static_cast<uint32_t>(*p);
return -1;
}
++p;
auto pkt = std::make_unique<QUICPacket>();
auto remote_addrlen = static_cast<size_t>(*p++) + 1;
if (remote_addrlen > sizeof(sockaddr_storage)) {
LOG(ERROR) << "The length of remote address is too large: "
<< remote_addrlen;
return -1;
}
len += remote_addrlen;
if (static_cast<size_t>(nread) < len) {
LOG(ERROR) << "Insufficient QUIC IPC message length";
return -1;
}
pkt->remote_addr.len = remote_addrlen;
memcpy(&pkt->remote_addr.su, p, remote_addrlen);
p += remote_addrlen;
auto local_addrlen = static_cast<size_t>(*p++) + 1;
if (local_addrlen > sizeof(sockaddr_storage)) {
LOG(ERROR) << "The length of local address is too large: " << local_addrlen;
return -1;
}
len += local_addrlen;
if (static_cast<size_t>(nread) < len) {
LOG(ERROR) << "Insufficient QUIC IPC message length";
return -1;
}
pkt->local_addr.len = local_addrlen;
memcpy(&pkt->local_addr.su, p, local_addrlen);
p += local_addrlen;
auto datalen = nread - (p - buf.data());
pkt->data.assign(p, p + datalen);
// At the moment, UpstreamAddr index is unknown.
pkt->upstream_addr_index = static_cast<size_t>(-1);
uint32_t version;
const uint8_t *dcid;
size_t dcidlen;
const uint8_t *scid;
size_t scidlen;
auto rv =
ngtcp2_pkt_decode_version_cid(&version, &dcid, &dcidlen, &scid, &scidlen,
p, datalen, SHRPX_QUIC_SCIDLEN);
if (rv < 0) {
LOG(ERROR) << "ngtcp2_pkt_decode_version_cid: " << ngtcp2_strerror(rv);
return -1;
}
if (dcidlen < SHRPX_QUIC_CID_PREFIXLEN) {
LOG(ERROR) << "DCID is too short";
return -1;
}
if (single_worker_) {
auto faddr = single_worker_->find_quic_upstream_addr(pkt->local_addr);
if (faddr == nullptr) {
LOG(ERROR) << "No suitable upstream address found";
return 0;
}
auto quic_conn_handler = single_worker_->get_quic_connection_handler();
// Ignore return value
quic_conn_handler->handle_packet(faddr, pkt->remote_addr, pkt->local_addr,
pkt->data.data(), pkt->data.size());
return 0;
}
for (auto &worker : workers_) {
if (!std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN,
worker->get_cid_prefix())) {
continue;
}
WorkerEvent wev{
.type = WorkerEventType::QUIC_PKT_FORWARD,
.quic_pkt = std::move(pkt),
};
worker->send(std::move(wev));
return 0;
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "No worker to match CID prefix";
}
return 0;
}
#endif // ENABLE_HTTP3
} // namespace shrpx

View File

@ -103,12 +103,33 @@ struct SerialEvent {
std::shared_ptr<DownstreamConfig> downstreamconf;
};
#if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
#ifdef ENABLE_HTTP3
# ifdef HAVE_LIBBPF
struct BPFRef {
int reuseport_array;
int cid_prefix_map;
};
#endif // ENABLE_HTTP3 && HAVE_LIBBPF
# endif // HAVE_LIBBPF
// QUIC IPC message type.
enum class QUICIPCType {
NONE,
// Send forwarded QUIC UDP datagram and its metadata.
DGRAM_FORWARD,
};
// WorkerProcesses which are in graceful shutdown period.
struct QUICLingeringWorkerProcess {
QUICLingeringWorkerProcess(
std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>> cid_prefixes,
int quic_ipc_fd)
: cid_prefixes{std::move(cid_prefixes)}, quic_ipc_fd{quic_ipc_fd} {}
std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>> cid_prefixes;
// Socket to send QUIC IPC message to this worker process.
int quic_ipc_fd;
};
#endif // ENABLE_HTTP3
class ConnectionHandler {
public:
@ -179,6 +200,28 @@ public:
int create_quic_secret();
void set_cid_prefixes(
const std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>>
&cid_prefixes);
void set_quic_lingering_worker_processes(
const std::vector<QUICLingeringWorkerProcess> &quic_lwps);
// Return matching QUICLingeringWorkerProcess which has a CID prefix
// such that |dcid| starts with it. If no such
// QUICLingeringWorkerProcess, it returns nullptr.
QUICLingeringWorkerProcess *
match_quic_lingering_worker_process_cid_prefix(const uint8_t *dcid,
size_t dcidlen);
int forward_quic_packet_to_lingering_worker_process(
QUICLingeringWorkerProcess *quic_lwp, const Address &remote_addr,
const Address &local_addr, const uint8_t *data, size_t datalen);
void set_quic_ipc_fd(int fd);
int quic_ipc_read();
# ifdef HAVE_LIBBPF
std::vector<BPFRef> &get_quic_bpf_refs();
# endif // HAVE_LIBBPF
@ -212,6 +255,11 @@ private:
// and signature algorithm presented by client.
std::vector<std::vector<SSL_CTX *>> indexed_ssl_ctx_;
#ifdef ENABLE_HTTP3
std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>> cid_prefixes_;
std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>>
lingering_cid_prefixes_;
int quic_ipc_fd_;
std::vector<QUICLingeringWorkerProcess> quic_lingering_worker_processes_;
# ifdef HAVE_LIBBPF
std::vector<BPFRef> quic_bpf_refs_;
# endif // HAVE_LIBBPF

View File

@ -77,10 +77,27 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
auto dcid_key = make_cid_key(dcid, dcidlen);
auto conn_handler = worker_->get_connection_handler();
ClientHandler *handler;
auto it = connections_.find(dcid_key);
if (it == std::end(connections_)) {
if (!std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN,
worker_->get_cid_prefix())) {
auto quic_lwp =
conn_handler->match_quic_lingering_worker_process_cid_prefix(dcid,
dcidlen);
if (quic_lwp) {
if (conn_handler->forward_quic_packet_to_lingering_worker_process(
quic_lwp, remote_addr, local_addr, data, datalen) == 0) {
return 0;
}
return 0;
}
}
// new connection
ngtcp2_pkt_hd hd;
@ -90,6 +107,14 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
switch (ngtcp2_accept(&hd, data, datalen)) {
case 0: {
// If we get Initial and it has the CID prefix of this worker, it
// is likely that client is intentionally use the our prefix.
// Just drop it.
if (std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN,
worker_->get_cid_prefix())) {
return 0;
}
if (hd.token.len == 0) {
break;
}
@ -102,7 +127,9 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
if (verify_retry_token(&odcid, hd.token.base, hd.token.len, &hd.dcid,
&remote_addr.su.sa, remote_addr.len,
secret.data()) != 0) {
break;
// 2nd Retry packet is not allowed, so just drop it or send
// CONNECTIONC_CLOE with INVALID_TOKEN.
return 0;
}
podcid = &odcid;
@ -139,8 +166,6 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
dcidlen > SHRPX_QUIC_CID_PREFIXLEN &&
!std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN,
worker_->get_cid_prefix())) {
auto conn_handler = worker_->get_connection_handler();
if (conn_handler->forward_quic_packet(faddr, remote_addr, local_addr,
dcid, data, datalen) == 0) {
return 0;
@ -152,14 +177,6 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
return 0;
}
// If we get Initial and it has the CID prefix of this worker, it
// is likely that client is intentionally use the our prefix.
// Just drop it.
if (std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN,
worker_->get_cid_prefix())) {
return 0;
}
handler = handle_new_connection(faddr, remote_addr, local_addr, hd, podcid,
token, tokenlen);
if (handler == nullptr) {

View File

@ -531,9 +531,26 @@ void Worker::process_events() {
break;
#ifdef ENABLE_HTTP3
case WorkerEventType::QUIC_PKT_FORWARD: {
const UpstreamAddr *faddr;
if (wev.quic_pkt->upstream_addr_index == static_cast<size_t>(-1)) {
faddr = find_quic_upstream_addr(wev.quic_pkt->local_addr);
if (faddr == nullptr) {
LOG(ERROR) << "No suitable upstream address found";
break;
}
} else if (quic_upstream_addrs_.size() <=
wev.quic_pkt->upstream_addr_index) {
LOG(ERROR) << "upstream_addr_index is too large";
break;
} else {
faddr = &quic_upstream_addrs_[wev.quic_pkt->upstream_addr_index];
}
quic_conn_handler_.handle_packet(
&quic_upstream_addrs_[wev.quic_pkt->upstream_addr_index],
wev.quic_pkt->remote_addr, wev.quic_pkt->local_addr,
faddr, wev.quic_pkt->remote_addr, wev.quic_pkt->local_addr,
wev.quic_pkt->data.data(), wev.quic_pkt->data.size());
break;
@ -647,12 +664,11 @@ bool Worker::should_attach_bpf() const {
auto &quicconf = config->quic;
auto &apiconf = config->api;
if (quicconf.bpf.disabled || config->single_thread ||
config->num_worker == 1) {
if (quicconf.bpf.disabled) {
return false;
}
if (apiconf.enabled) {
if (!config->single_thread && apiconf.enabled) {
return index_ == 1;
}
@ -663,18 +679,14 @@ bool Worker::should_update_bpf_map() const {
auto config = get_config();
auto &quicconf = config->quic;
return !quicconf.bpf.disabled && !config->single_thread &&
config->num_worker > 1;
return !quicconf.bpf.disabled;
}
uint32_t Worker::compute_sk_index() const {
auto config = get_config();
auto &apiconf = config->api;
assert(!config->single_thread);
assert(config->num_worker > 1);
if (apiconf.enabled) {
if (!config->single_thread && apiconf.enabled) {
return index_ - 1;
}
@ -979,6 +991,65 @@ void Worker::set_quic_secret(const std::shared_ptr<QUICSecret> &secret) {
const std::shared_ptr<QUICSecret> &Worker::get_quic_secret() const {
return quic_secret_;
}
const UpstreamAddr *Worker::find_quic_upstream_addr(const Address &local_addr) {
std::array<char, NI_MAXHOST> host;
auto rv = getnameinfo(&local_addr.su.sa, local_addr.len, host.data(),
host.size(), nullptr, 0, NI_NUMERICHOST);
if (rv != 0) {
LOG(ERROR) << "getnameinfo: " << gai_strerror(rv);
return nullptr;
}
uint16_t port;
switch (local_addr.su.sa.sa_family) {
case AF_INET:
port = htons(local_addr.su.in.sin_port);
break;
case AF_INET6:
port = htons(local_addr.su.in6.sin6_port);
break;
default:
assert(0);
}
auto hostport = util::make_hostport(StringRef{host.data()}, port);
const UpstreamAddr *fallback_faddr = nullptr;
for (auto &faddr : quic_upstream_addrs_) {
if (faddr.hostport == hostport) {
return &faddr;
}
if (faddr.port != port || faddr.family != local_addr.su.sa.sa_family) {
continue;
}
switch (faddr.family) {
case AF_INET:
if (util::starts_with(faddr.hostport, StringRef::from_lit("0.0.0.0:"))) {
fallback_faddr = &faddr;
}
break;
case AF_INET6:
if (util::starts_with(faddr.hostport, StringRef::from_lit("[::]:"))) {
fallback_faddr = &faddr;
}
break;
default:
assert(0);
}
}
return fallback_faddr;
}
#endif // ENABLE_HTTP3
namespace {

View File

@ -261,6 +261,7 @@ struct QUICPacket {
remote_addr{remote_addr},
local_addr{local_addr},
data{data, data + datalen} {}
QUICPacket() {}
size_t upstream_addr_index;
Address remote_addr;
Address local_addr;
@ -380,6 +381,9 @@ public:
# endif // HAVE_LIBBPF
int create_quic_server_socket(UpstreamAddr &addr);
// Returns a pointer to UpstreamAddr which matches |local_addr|.
const UpstreamAddr *find_quic_upstream_addr(const Address &local_addr);
#endif // ENABLE_HTTP3
DNSTracker *get_dns_tracker();

View File

@ -178,6 +178,20 @@ void ipc_readcb(struct ev_loop *loop, ev_io *w, int revents) {
}
} // namespace
#ifdef ENABLE_HTTP3
namespace {
void quic_ipc_readcb(struct ev_loop *loop, ev_io *w, int revents) {
auto conn_handler = static_cast<ConnectionHandler *>(w->data);
if (conn_handler->quic_ipc_read() != 0) {
LOG(ERROR) << "Failed to read data from QUIC IPC channel";
return;
}
}
} // namespace
#endif // ENABLE_HTTP3
namespace {
int generate_ticket_key(TicketKey &ticket_key) {
ticket_key.cipher = get_config()->tls.ticket.cipher;
@ -434,6 +448,12 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) {
conn_handler->set_neverbleed(nb.get());
#endif // HAVE_NEVERBLEED
#ifdef ENABLE_HTTP3
conn_handler->set_quic_ipc_fd(wpconf->quic_ipc_fd);
conn_handler->set_quic_lingering_worker_processes(
wpconf->quic_lingering_worker_processes);
#endif // ENABLE_HTTP3
for (auto &addr : config->conn.listener.addrs) {
conn_handler->add_acceptor(
std::make_unique<AcceptHandler>(&addr, conn_handler.get()));
@ -500,6 +520,10 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) {
if (conn_handler->create_quic_secret() != 0) {
return -1;
}
conn_handler->set_cid_prefixes(wpconf->cid_prefixes);
conn_handler->set_quic_lingering_worker_processes(
wpconf->quic_lingering_worker_processes);
#endif // ENABLE_HTTP3
if (config->single_thread) {
@ -547,6 +571,13 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) {
ipcev.data = conn_handler.get();
ev_io_start(loop, &ipcev);
#ifdef ENABLE_HTTP3
ev_io quic_ipcev;
ev_io_init(&quic_ipcev, quic_ipc_readcb, wpconf->quic_ipc_fd, EV_READ);
quic_ipcev.data = conn_handler.get();
ev_io_start(loop, &quic_ipcev);
#endif // ENABLE_HTTP3
if (tls::upstream_tls_enabled(config->conn) && !config->tls.ocsp.disabled) {
if (config->tls.ocsp.startup) {
conn_handler->set_enable_acceptor_on_ocsp_completion(true);

View File

@ -27,6 +27,14 @@
#include "shrpx.h"
#include <vector>
#include <array>
#include "shrpx_connection_handler.h"
#ifdef ENABLE_HTTP3
# include "shrpx_quic.h"
#endif // ENABLE_HTTP3
namespace shrpx {
class ConnectionHandler;
@ -38,6 +46,16 @@ struct WorkerProcessConfig {
int server_fd;
// IPv6 socket, or -1 if not used
int server_fd6;
#ifdef ENABLE_HTTP3
// CID prefixes for the new worker process.
std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>> cid_prefixes;
// IPC socket to read forwarded QUIC UDP datagram from the current
// worker process.
int quic_ipc_fd;
// Lingering worker processes which were created before this worker
// process to forward QUIC UDP datagram during reload.
std::vector<QUICLingeringWorkerProcess> quic_lingering_worker_processes;
#endif // ENABLE_HTTP3
};
int worker_process_event_loop(WorkerProcessConfig *wpconf);

View File

@ -1609,10 +1609,7 @@ bool is_hex_string(const StringRef &s) {
StringRef decode_hex(BlockAllocator &balloc, const StringRef &s) {
auto iov = make_byte_ref(balloc, s.size() + 1);
auto p = iov.base;
for (auto it = std::begin(s); it != std::end(s); it += 2) {
*p++ = (hex_to_uint(*it) << 4) | hex_to_uint(*(it + 1));
}
auto p = decode_hex(iov.base, s);
*p = '\0';
return StringRef{iov.base, p};
}

View File

@ -214,6 +214,15 @@ OutputIt format_hex(OutputIt it, const StringRef &s) {
// == true.
StringRef decode_hex(BlockAllocator &balloc, const StringRef &s);
template <typename OutputIt>
OutputIt decode_hex(OutputIt d_first, const StringRef &s) {
for (auto it = std::begin(s); it != std::end(s); it += 2) {
*d_first++ = (hex_to_uint(*it) << 4) | hex_to_uint(*(it + 1));
}
return d_first;
}
// Returns given time |t| from epoch in HTTP Date format (e.g., Mon,
// 10 Oct 2016 10:25:58 GMT).
std::string http_date(time_t t);