nghttpx: Add eBPF program to steer QUIC datagram to a correct socket

This commit is contained in:
Tatsuhiro Tsujikawa 2021-08-24 21:13:05 +09:00
parent 579fb478b5
commit 8ac4bee3bc
15 changed files with 767 additions and 159 deletions

View File

@ -106,10 +106,11 @@ jobs:
run: |
PKG_CONFIG_PATH="$PWD/openssl/build/lib/pkgconfig:$PWD/nghttp3/build/lib/pkgconfig:$PWD/ngtcp2/build/lib/pkgconfig:$PKG_CONFIG_PATH"
LDFLAGS="$LDFLAGS -Wl,-rpath,$PWD/openssl/build/lib"
EXTRA_AUTOTOOLS_OPTS="--enable-http3"
echo 'PKG_CONFIG_PATH='"$PKG_CONFIG_PATH" >> $GITHUB_ENV
echo 'LDFLAGS='"$LDFLAGS" >> $GITHUB_ENV
echo 'EXTRA_AUTOTOOLS_OPTS=--enable-http3' >> $GITHUB_ENV
echo 'EXTRA_AUTOTOOLS_OPTS='"$EXTRA_AUTOTOOLS_OPTS" >> $GITHUB_ENV
echo 'EXTRA_CMAKE_OPTS=-DENABLE_HTTP3=ON' >> $GITHUB_ENV
- name: Setup git submodules
run: |

View File

@ -475,6 +475,7 @@ include_directories(
)
# For use in src/CMakeLists.txt
set(PKGDATADIR "${CMAKE_INSTALL_FULL_DATADIR}/${CMAKE_PROJECT_NAME}")
set(PKGLIBDIR "${CMAKE_INSTALL_FULL_LIBDIR}/${CMAKE_PROJECT_NAME}")
install(FILES README.rst DESTINATION "${CMAKE_INSTALL_DOCDIR}")

View File

@ -20,7 +20,7 @@
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
SUBDIRS = lib third-party src examples python tests integration-tests \
SUBDIRS = lib third-party src bpf examples python tests integration-tests \
doc contrib script
# Now with python setuptools, make uninstall will leave many files we

36
bpf/Makefile.am Normal file
View File

@ -0,0 +1,36 @@
# nghttp2 - HTTP/2 C Library
# Copyright (c) 2021 Tatsuhiro Tsujikawa
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
if HAVE_LIBBPF
EXTRA_DIST = reuseport_kern.c
bpf_pkglibdir = $(pkglibdir)
bpf_pkglib_DATA = reuseport_kern.o
all: $(builddir)/reuseport_kern.o
$(builddir)/reuseport_kern.o: reuseport_kern.c
$(CC) @BPFCFLAGS@ @EXTRABPFCFLAGS@ -target bpf -g -c $< -o $@
endif # HAVE_LIBBPF

290
bpf/reuseport_kern.c Normal file
View File

@ -0,0 +1,290 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2021 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <stdlib.h>
#include <string.h>
#include <linux/in.h>
#include <linux/ip.h>
#include <linux/ipv6.h>
#include <linux/tcp.h>
#include <linux/udp.h>
#include <linux/bpf.h>
#include <linux/types.h>
#include <linux/if_ether.h>
#include <bpf/bpf_endian.h>
#include <bpf/bpf_helpers.h>
/*
* How to compile:
*
* clang-12 -O2 -Wall -target bpf -g -c reuseport_kern.c -o reuseport_kern.o \
* -I/path/to/kernel/include
*
* See
* https://www.kernel.org/doc/Documentation/kbuild/headers_install.txt
* how to install kernel header files.
*/
/* rol32: From linux kernel source code */
/**
* rol32 - rotate a 32-bit value left
* @word: value to rotate
* @shift: bits to roll
*/
static inline __u32 rol32(__u32 word, unsigned int shift) {
return (word << shift) | (word >> ((-shift) & 31));
}
/* jhash.h: Jenkins hash support.
*
* Copyright (C) 2006. Bob Jenkins (bob_jenkins@burtleburtle.net)
*
* https://burtleburtle.net/bob/hash/
*
* These are the credits from Bob's sources:
*
* lookup3.c, by Bob Jenkins, May 2006, Public Domain.
*
* These are functions for producing 32-bit hashes for hash table lookup.
* hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and final()
* are externally useful functions. Routines to test the hash are included
* if SELF_TEST is defined. You can use this free for any purpose. It's in
* the public domain. It has no warranty.
*
* Copyright (C) 2009-2010 Jozsef Kadlecsik (kadlec@blackhole.kfki.hu)
*
* I've modified Bob's hash to be useful in the Linux kernel, and
* any bugs present are my fault.
* Jozsef
*/
/* __jhash_final - final mixing of 3 32-bit values (a,b,c) into c */
#define __jhash_final(a, b, c) \
{ \
c ^= b; \
c -= rol32(b, 14); \
a ^= c; \
a -= rol32(c, 11); \
b ^= a; \
b -= rol32(a, 25); \
c ^= b; \
c -= rol32(b, 16); \
a ^= c; \
a -= rol32(c, 4); \
b ^= a; \
b -= rol32(a, 14); \
c ^= b; \
c -= rol32(b, 24); \
}
/* __jhash_nwords - hash exactly 3, 2 or 1 word(s) */
static inline __u32 __jhash_nwords(__u32 a, __u32 b, __u32 c, __u32 initval) {
a += initval;
b += initval;
c += initval;
__jhash_final(a, b, c);
return c;
}
/* An arbitrary initial parameter */
#define JHASH_INITVAL 0xdeadbeef
static inline __u32 jhash_2words(__u32 a, __u32 b, __u32 initval) {
return __jhash_nwords(a, b, 0, initval + JHASH_INITVAL + (2 << 2));
}
struct bpf_map_def SEC("maps") cid_prefix_map = {
.type = BPF_MAP_TYPE_HASH,
.max_entries = 255,
.key_size = sizeof(__u64),
.value_size = sizeof(__u32),
};
struct bpf_map_def SEC("maps") reuseport_array = {
.type = BPF_MAP_TYPE_REUSEPORT_SOCKARRAY,
.max_entries = 255,
.key_size = sizeof(__u32),
.value_size = sizeof(__u32),
};
struct bpf_map_def SEC("maps") sk_info = {
.type = BPF_MAP_TYPE_ARRAY,
.max_entries = 1,
.key_size = sizeof(__u32),
.value_size = sizeof(__u32),
};
typedef struct quic_hd {
__u8 *dcid;
__u32 dcidlen;
__u32 dcid_offset;
__u8 type;
} quic_hd;
#define SV_DCIDLEN 20
#define MAX_DCIDLEN 20
#define MIN_DCIDLEN 8
#define CID_PREFIXLEN 8
enum {
NGTCP2_PKT_INITIAL = 0x0,
NGTCP2_PKT_0RTT = 0x1,
NGTCP2_PKT_HANDSHAKE = 0x2,
NGTCP2_PKT_SHORT = 0x40,
};
static inline int parse_quic(quic_hd *qhd, struct sk_reuseport_md *reuse_md) {
__u64 len = sizeof(struct udphdr) + 1;
__u8 *p;
__u64 dcidlen;
if (reuse_md->data + len > reuse_md->data_end) {
return -1;
}
p = reuse_md->data + sizeof(struct udphdr);
if (*p & 0x80) {
len += 4 + 1;
if (reuse_md->data + len > reuse_md->data_end) {
return -1;
}
p += 1 + 4;
dcidlen = *p;
if (dcidlen > MAX_DCIDLEN || dcidlen < MIN_DCIDLEN) {
return -1;
}
len += 1 + dcidlen;
if (reuse_md->data + len > reuse_md->data_end) {
return -1;
}
++p;
qhd->type =
(*((__u8 *)(reuse_md->data) + sizeof(struct udphdr)) & 0x30) >> 4;
qhd->dcid = p;
qhd->dcidlen = dcidlen;
qhd->dcid_offset = sizeof(struct udphdr) + 6;
} else {
len += SV_DCIDLEN;
if (reuse_md->data + len > reuse_md->data_end) {
return -1;
}
qhd->type = NGTCP2_PKT_SHORT;
qhd->dcid = (__u8 *)reuse_md->data + sizeof(struct udphdr) + 1;
qhd->dcidlen = SV_DCIDLEN;
qhd->dcid_offset = sizeof(struct udphdr) + 1;
}
return 0;
}
SEC("sk_reuseport")
int select_reuseport(struct sk_reuseport_md *reuse_md) {
__u32 sk_index, *psk_index;
__u8 sk_prefix[8];
__u32 *pnum_socks;
__u32 zero = 0;
int rv;
quic_hd qhd;
__u32 a, b;
rv = parse_quic(&qhd, reuse_md);
if (rv != 0) {
return SK_DROP;
}
switch (qhd.type) {
case NGTCP2_PKT_INITIAL:
case NGTCP2_PKT_0RTT:
if (reuse_md->data + qhd.dcid_offset + CID_PREFIXLEN > reuse_md->data_end) {
return SK_DROP;
}
memcpy(sk_prefix, reuse_md->data + sizeof(struct udphdr) + 6,
CID_PREFIXLEN);
if (qhd.dcidlen == SV_DCIDLEN) {
psk_index = bpf_map_lookup_elem(&cid_prefix_map, sk_prefix);
if (psk_index != NULL) {
sk_index = *psk_index;
break;
}
}
pnum_socks = bpf_map_lookup_elem(&sk_info, &zero);
if (pnum_socks == NULL) {
return SK_DROP;
}
a = (sk_prefix[0] << 24) | (sk_prefix[1] << 16) | (sk_prefix[2] << 8) |
sk_prefix[3];
b = (sk_prefix[4] << 24) | (sk_prefix[5] << 16) | (sk_prefix[6] << 8) |
sk_prefix[7];
sk_index = jhash_2words(a, b, reuse_md->hash) % *pnum_socks;
break;
case NGTCP2_PKT_HANDSHAKE:
case NGTCP2_PKT_SHORT:
if (qhd.dcidlen != SV_DCIDLEN) {
return SK_DROP;
}
if (reuse_md->data + qhd.dcid_offset + SV_DCIDLEN > reuse_md->data_end) {
return SK_DROP;
}
memcpy(sk_prefix, reuse_md->data + qhd.dcid_offset, CID_PREFIXLEN);
psk_index = bpf_map_lookup_elem(&cid_prefix_map, sk_prefix);
if (psk_index == NULL) {
return SK_DROP;
}
sk_index = *psk_index;
break;
default:
return SK_DROP;
}
rv = bpf_sk_select_reuseport(reuse_md, &reuseport_array, &sk_index, 0);
if (rv != 0) {
return SK_DROP;
}
return SK_PASS;
}

View File

@ -187,6 +187,11 @@ AC_ARG_WITH([libnghttp3],
[Use libnghttp3 [default=check]])],
[request_libnghttp3=$withval], [request_libnghttp3=check])
AC_ARG_WITH([libbpf],
[AS_HELP_STRING([--with-libbpf],
[Use libbpf [default=check]])],
[request_libbpf=$withval], [request_libbpf=no])
dnl Define variables
AC_ARG_VAR([CYTHON], [the Cython executable])
@ -200,6 +205,10 @@ AC_ARG_VAR([JEMALLOC_LIBS], [linker flags for jemalloc, skipping any checks])
AC_ARG_VAR([LIBTOOL_LDFLAGS],
[libtool specific flags (e.g., -static-libtool-libs)])
AC_ARG_VAR([LIBBPF_LIBS], [linker flags for libbpf, skipping any checks])
AC_ARG_VAR([BPFCFLAGS], [C compiler flags for bpf program])
dnl Checks for programs
AC_PROG_CC
AC_PROG_CXX
@ -542,6 +551,50 @@ if test "x${request_libnghttp3}" = "xyes" &&
AC_MSG_ERROR([libnghttp3 was requested (--with-libnghttp3) but not found])
fi
# libbpf (for src)
have_libbpf=no
if test "x${request_libbpf}" != "xno"; then
if test "x${LIBBPF_LIBS}" = "x"; then
save_CFLAGS=$CFLAGS
save_LDFLAGS=$LDFLAGS
LDFLAGS="-lbpf"
AC_MSG_CHECKING([for libbpf])
AC_LINK_IFELSE([AC_LANG_PROGRAM([[
#include <bpf/bpf.h>
#include <bpf/libbpf.h>
]], [[
bpf_object__open_file("placeholder.o", NULL);
]])],
[AC_MSG_RESULT([yes]); have_libbpf=yes],
[AC_MSG_RESULT([no]); have_libbpf=no])
if test "x${have_libbpf}" = "xyes"; then
AC_DEFINE([HAVE_LIBBPF], [1], [Define to 1 if you have `libbpf` library.])
LIBBPF_LIBS=$LDFLAGS
AC_SUBST([LIBBPF_LIBS])
if test "x${BPFCFLAGS}" = "x"; then
BPFCFLAGS="-Wall -O2"
fi
# Add the include path for Debian
EXTRABPFCFLAGS="-I/usr/include/$host_cpu-$host_os"
AC_SUBST([EXTRABPFCFLAGS])
fi
CFLAGS=$save_CFLAGS
LDFLAGS=$save_LDFLAGS
else
have_libbpf=yes
fi
fi
if test "x${request_libbpf}" = "xyes" &&
test "x${have_libbpf}" != "xyes"; then
AC_MSG_ERROR([libbpf was requested (--with-libbpf) but not found])
fi
AM_CONDITIONAL([HAVE_LIBBPF], [ test "x${have_libbpf}" = "xyes" ])
# libevent_openssl (for examples)
# 2.0.8 is required because we use evconnlistener_set_error_cb()
have_libevent_openssl=no
@ -1070,6 +1123,7 @@ AC_CONFIG_FILES([
src/Makefile
src/includes/Makefile
src/libnghttp2_asio.pc
bpf/Makefile
examples/Makefile
python/Makefile
python/setup.py
@ -1121,6 +1175,8 @@ AC_MSG_NOTICE([summary of build options:
WARNCXXFLAGS: ${WARNCXXFLAGS}
CXX1XCXXFLAGS: ${CXX1XCXXFLAGS}
EXTRACFLAG: ${EXTRACFLAG}
BPFCFLAGS: ${BPFCFLAGS}
EXTRABPFCFLAGS: ${EXTRABPFCFLAGS}
LIBS: ${LIBS}
DEFS: ${DEFS}
EXTRA_DEFS: ${EXTRA_DEFS}
@ -1147,6 +1203,7 @@ AC_MSG_NOTICE([summary of build options:
libngtcp2: ${have_libngtcp2} (CFLAGS='${LIBNGTCP2_CFLAGS}' LIBS='${LIBNGTCP2_LIBS}')
libngtcp2_crypto_openssl: ${have_libngtcp2_crypto_openssl} (CFLAGS='${LIBNGTCP2_CRYPTO_OPENSSL_CFLAGS}' LIBS='${LIBNGTCP2_CRYPTO_OPENSSL_LIBS}')
libnghttp3: ${have_libnghttp3} (CFLAGS='${LIBNGHTTP3_CFLAGS}' LIBS='${LIBNGHTTP3_LIBS}')
libbpf: ${have_libbpf} (LIBS='${LIBBPF_LIBS}')
Libevent(SSL): ${have_libevent_openssl} (CFLAGS='${LIBEVENT_OPENSSL_CFLAGS}' LIBS='${LIBEVENT_OPENSSL_LIBS}')
Jansson: ${have_jansson} (CFLAGS='${JANSSON_CFLAGS}' LIBS='${JANSSON_LIBS}')
Jemalloc: ${have_jemalloc} (CFLAGS='${JEMALLOC_CFLAGS}' LIBS='${JEMALLOC_LIBS}')

View File

@ -212,7 +212,10 @@ if(ENABLE_APP)
add_executable(nghttpx ${NGHTTPX-bin_SOURCES} $<TARGET_OBJECTS:llhttp>
$<TARGET_OBJECTS:url-parser>
)
target_compile_definitions(nghttpx PRIVATE "-DPKGDATADIR=\"${PKGDATADIR}\"")
target_compile_definitions(nghttpx PRIVATE
"-DPKGDATADIR=\"${PKGDATADIR}\""
"-DPKGLIBDIR=\"${PKGLIBDIR}\""
)
target_link_libraries(nghttpx nghttpx_static)
add_executable(h2load ${H2LOAD_SOURCES} $<TARGET_OBJECTS:llhttp>
$<TARGET_OBJECTS:url-parser>

View File

@ -186,7 +186,7 @@ libnghttpx_a_CPPFLAGS = ${AM_CPPFLAGS}
nghttpx_SOURCES = shrpx.cc shrpx.h
nghttpx_CPPFLAGS = ${libnghttpx_a_CPPFLAGS}
nghttpx_LDADD = libnghttpx.a ${LDADD}
nghttpx_LDADD = libnghttpx.a ${LDADD} @LIBBPF_LIBS@
if HAVE_MRUBY
libnghttpx_a_CPPFLAGS += \
@ -218,7 +218,8 @@ nghttpx_unittest_SOURCES = shrpx-unittest.cc \
base64_test.cc base64_test.h
nghttpx_unittest_CPPFLAGS = ${AM_CPPFLAGS} \
-DNGHTTP2_SRC_DIR=\"$(top_srcdir)/src\"
nghttpx_unittest_LDADD = libnghttpx.a ${LDADD} @CUNIT_LIBS@ @TESTLDADD@
nghttpx_unittest_LDADD = libnghttpx.a ${LDADD} @CUNIT_LIBS@ @TESTLDADD@ \
@LIBBPF_LIBS@
if HAVE_MRUBY
nghttpx_unittest_CPPFLAGS += \

View File

@ -279,7 +279,10 @@ int ConnectionHandler::create_single_worker() {
#ifdef ENABLE_HTTP3
quic_sv_ssl_ctx, quic_cert_tree_.get(), cid_prefix.data(),
cid_prefix.size(),
#endif // ENABLE_HTTP3
# ifdef HAVE_LIBBPF
/* index = */ 0,
# endif // HAVE_LIBBPF
#endif // ENABLE_HTTP3
ticket_keys_, this, config->conn.downstream);
#ifdef HAVE_MRUBY
if (single_worker_->create_mruby_context() != 0) {
@ -337,6 +340,10 @@ int ConnectionHandler::create_worker_thread(size_t num) {
auto &tlsconf = config->tls;
auto &apiconf = config->api;
# if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
quic_bpf_refs_.resize(num);
# endif // ENABLE_HTTP3 && HAVE_LIBBPF
// We have dedicated worker for API request processing.
if (apiconf.enabled) {
++num;
@ -375,7 +382,10 @@ int ConnectionHandler::create_worker_thread(size_t num) {
# ifdef ENABLE_HTTP3
quic_sv_ssl_ctx, quic_cert_tree_.get(), cid_prefix.data(),
cid_prefix.size(),
# endif // ENABLE_HTTP3
# ifdef HAVE_LIBBPF
i,
# endif // HAVE_LIBBPF
# endif // ENABLE_HTTP3
ticket_keys_, this, config->conn.downstream);
# ifdef HAVE_MRUBY
if (worker->create_mruby_context() != 0) {
@ -1024,6 +1034,13 @@ int ConnectionHandler::forward_quic_packet(const UpstreamAddr *faddr,
return -1;
}
# ifdef HAVE_LIBBPF
std::vector<BPFRef> &ConnectionHandler::get_quic_bpf_refs() {
return quic_bpf_refs_;
}
# endif // HAVE_LIBBPF
#endif // ENABLE_HTTP3
} // namespace shrpx

View File

@ -40,6 +40,10 @@
# include <future>
#endif // NOTHREADS
#ifdef HAVE_LIBBPF
# include <bpf/libbpf.h>
#endif // HAVE_LIBBPF
#include <openssl/ssl.h>
#include <ev.h>
@ -99,6 +103,13 @@ struct SerialEvent {
std::shared_ptr<DownstreamConfig> downstreamconf;
};
#if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
struct BPFRef {
int reuseport_array;
int cid_prefix_map;
};
#endif // ENABLE_HTTP3 && HAVE_LIBBPF
class ConnectionHandler {
public:
ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen);
@ -165,7 +176,11 @@ public:
int forward_quic_packet(const UpstreamAddr *faddr, const Address &remote_addr,
const Address &local_addr, const uint8_t *cid_prefix,
const uint8_t *data, size_t datalen);
#endif // ENABLE_HTTP3
# ifdef HAVE_LIBBPF
std::vector<BPFRef> &get_quic_bpf_refs();
# endif // HAVE_LIBBPF
#endif // ENABLE_HTTP3
#ifdef HAVE_NEVERBLEED
void set_neverbleed(neverbleed_t *nb);
@ -195,6 +210,9 @@ private:
// and signature algorithm presented by client.
std::vector<std::vector<SSL_CTX *>> indexed_ssl_ctx_;
#ifdef ENABLE_HTTP3
# ifdef HAVE_LIBBPF
std::vector<BPFRef> quic_bpf_refs_;
# endif // HAVE_LIBBPF
std::vector<SSL_CTX *> quic_all_ssl_ctx_;
std::vector<std::vector<SSL_CTX *>> quic_indexed_ssl_ctx_;
#endif // ENABLE_HTTP3

View File

@ -53,152 +53,6 @@ ngtcp2_tstamp quic_timestamp() {
.count();
}
int create_quic_server_socket(UpstreamAddr &faddr) {
std::array<char, STRERROR_BUFSIZE> errbuf;
int fd = -1;
int rv;
auto service = util::utos(faddr.port);
addrinfo hints{};
hints.ai_family = faddr.family;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_PASSIVE;
#ifdef AI_ADDRCONFIG
hints.ai_flags |= AI_ADDRCONFIG;
#endif // AI_ADDRCONFIG
auto node =
faddr.host == StringRef::from_lit("*") ? nullptr : faddr.host.c_str();
addrinfo *res, *rp;
rv = getaddrinfo(node, service.c_str(), &hints, &res);
#ifdef AI_ADDRCONFIG
if (rv != 0) {
// Retry without AI_ADDRCONFIG
hints.ai_flags &= ~AI_ADDRCONFIG;
rv = getaddrinfo(node, service.c_str(), &hints, &res);
}
#endif // AI_ADDRCONFIG
if (rv != 0) {
LOG(FATAL) << "Unable to get IPv" << (faddr.family == AF_INET ? "4" : "6")
<< " address for " << faddr.host << ", port " << faddr.port
<< ": " << gai_strerror(rv);
return -1;
}
auto res_d = defer(freeaddrinfo, res);
std::array<char, NI_MAXHOST> host;
for (rp = res; rp; rp = rp->ai_next) {
rv = getnameinfo(rp->ai_addr, rp->ai_addrlen, host.data(), host.size(),
nullptr, 0, NI_NUMERICHOST);
if (rv != 0) {
LOG(WARN) << "getnameinfo() failed: " << gai_strerror(rv);
continue;
}
#ifdef SOCK_NONBLOCK
fd = socket(rp->ai_family, rp->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
rp->ai_protocol);
if (fd == -1) {
auto error = errno;
LOG(WARN) << "socket() syscall failed: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
continue;
}
#else // !SOCK_NONBLOCK
fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (fd == -1) {
auto error = errno;
LOG(WARN) << "socket() syscall failed: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
continue;
}
util::make_socket_nonblocking(fd);
util::make_socket_closeonexec(fd);
#endif // !SOCK_NONBLOCK
int val = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val,
static_cast<socklen_t>(sizeof(val))) == -1) {
auto error = errno;
LOG(WARN) << "Failed to set SO_REUSEADDR option to listener socket: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
close(fd);
continue;
}
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val,
static_cast<socklen_t>(sizeof(val))) == -1) {
auto error = errno;
LOG(WARN) << "Failed to set SO_REUSEPORT option to listener socket: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
close(fd);
continue;
}
if (faddr.family == AF_INET6) {
#ifdef IPV6_V6ONLY
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val,
static_cast<socklen_t>(sizeof(val))) == -1) {
auto error = errno;
LOG(WARN) << "Failed to set IPV6_V6ONLY option to listener socket: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
close(fd);
continue;
}
#endif // IPV6_V6ONLY
if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &val,
static_cast<socklen_t>(sizeof(val))) == -1) {
auto error = errno;
LOG(WARN)
<< "Failed to set IPV6_RECVPKTINFO option to listener socket: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
close(fd);
continue;
}
} else {
if (setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &val,
static_cast<socklen_t>(sizeof(val))) == -1) {
auto error = errno;
LOG(WARN) << "Failed to set IP_PKTINFO option to listener socket: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
close(fd);
continue;
}
}
// TODO Enable ECN
if (bind(fd, rp->ai_addr, rp->ai_addrlen) == -1) {
auto error = errno;
LOG(WARN) << "bind() syscall failed: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
close(fd);
continue;
}
break;
}
if (!rp) {
LOG(FATAL) << "Listening " << (faddr.family == AF_INET ? "IPv4" : "IPv6")
<< " socket failed";
return -1;
}
faddr.fd = fd;
faddr.hostport = util::make_http_hostport(mod_config()->balloc,
StringRef{host.data()}, faddr.port);
LOG(NOTICE) << "Listening on " << faddr.hostport << ", quic";
return 0;
}
int quic_send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa,
size_t remote_salen, const sockaddr *local_sa,
size_t local_salen, const uint8_t *data, size_t datalen,

View File

@ -41,8 +41,6 @@ constexpr size_t SHRPX_MAX_UDP_PAYLOAD_SIZE = 1280;
ngtcp2_tstamp quic_timestamp();
int create_quic_server_socket(UpstreamAddr &addr);
int quic_send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa,
size_t remote_salen, const sockaddr *local_sa,
size_t local_salen, const uint8_t *data, size_t datalen,

View File

@ -112,6 +112,14 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
return 0;
}
// If we get Initial and it has the CID prefix of this worker, it
// is likely that client is intentionally use the our prefix.
// Just drop it.
if (std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN,
worker_->get_cid_prefix())) {
return 0;
}
handler = handle_new_connection(faddr, remote_addr, local_addr, hd);
if (handler == nullptr) {
return 0;

View File

@ -28,8 +28,14 @@
# include <unistd.h>
#endif // HAVE_UNISTD_H
#include <cstdio>
#include <memory>
#ifdef HAVE_LIBBPF
# include <bpf/bpf.h>
# include <bpf/libbpf.h>
#endif // HAVE_LIBBPF
#include "shrpx_tls.h"
#include "shrpx_log.h"
#include "shrpx_client_handler.h"
@ -42,8 +48,10 @@
#ifdef ENABLE_HTTP3
# include "shrpx_quic_listener.h"
#endif // ENABLE_HTTP3
#include "shrpx_connection_handler.h"
#include "util.h"
#include "template.h"
#include "xsi_strerror.h"
namespace shrpx {
@ -137,11 +145,18 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
#ifdef ENABLE_HTTP3
SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
const uint8_t *cid_prefix, size_t cid_prefixlen,
#endif // ENABLE_HTTP3
# ifdef HAVE_LIBBPF
size_t index,
# endif // HAVE_LIBBPF
#endif // ENABLE_HTTP3
const std::shared_ptr<TicketKeys> &ticket_keys,
ConnectionHandler *conn_handler,
std::shared_ptr<DownstreamConfig> downstreamconf)
: randgen_(util::make_mt19937()),
:
#if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
index_{index},
#endif // ENABLE_HTTP3 && HAVE_LIBBPF
randgen_(util::make_mt19937()),
worker_stat_{},
dns_tracker_(loop),
#ifdef ENABLE_HTTP3
@ -626,6 +641,43 @@ QUICConnectionHandler *Worker::get_quic_connection_handler() {
DNSTracker *Worker::get_dns_tracker() { return &dns_tracker_; }
#ifdef ENABLE_HTTP3
# ifdef HAVE_LIBBPF
bool Worker::should_attach_bpf() const {
auto config = get_config();
auto &apiconf = config->api;
if (config->single_thread || config->num_worker == 1) {
return false;
}
if (apiconf.enabled) {
return index_ == 1;
}
return index_ == 0;
}
bool Worker::should_update_bpf_map() const {
auto config = get_config();
return !config->single_thread && config->num_worker > 1;
}
uint32_t Worker::compute_sk_index() const {
auto config = get_config();
auto &apiconf = config->api;
assert(!config->single_thread);
assert(config->num_worker > 1);
if (apiconf.enabled) {
return index_ - 1;
}
return index_;
}
# endif // HAVE_LIBBPF
int Worker::setup_quic_server_socket() {
for (auto &addr : quic_upstream_addrs_) {
assert(!addr.host_unix);
@ -639,6 +691,261 @@ int Worker::setup_quic_server_socket() {
return 0;
}
int Worker::create_quic_server_socket(UpstreamAddr &faddr) {
std::array<char, STRERROR_BUFSIZE> errbuf;
int fd = -1;
int rv;
auto service = util::utos(faddr.port);
addrinfo hints{};
hints.ai_family = faddr.family;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_PASSIVE;
# ifdef AI_ADDRCONFIG
hints.ai_flags |= AI_ADDRCONFIG;
# endif // AI_ADDRCONFIG
auto node =
faddr.host == StringRef::from_lit("*") ? nullptr : faddr.host.c_str();
addrinfo *res, *rp;
rv = getaddrinfo(node, service.c_str(), &hints, &res);
# ifdef AI_ADDRCONFIG
if (rv != 0) {
// Retry without AI_ADDRCONFIG
hints.ai_flags &= ~AI_ADDRCONFIG;
rv = getaddrinfo(node, service.c_str(), &hints, &res);
}
# endif // AI_ADDRCONFIG
if (rv != 0) {
LOG(FATAL) << "Unable to get IPv" << (faddr.family == AF_INET ? "4" : "6")
<< " address for " << faddr.host << ", port " << faddr.port
<< ": " << gai_strerror(rv);
return -1;
}
auto res_d = defer(freeaddrinfo, res);
std::array<char, NI_MAXHOST> host;
for (rp = res; rp; rp = rp->ai_next) {
rv = getnameinfo(rp->ai_addr, rp->ai_addrlen, host.data(), host.size(),
nullptr, 0, NI_NUMERICHOST);
if (rv != 0) {
LOG(WARN) << "getnameinfo() failed: " << gai_strerror(rv);
continue;
}
# ifdef SOCK_NONBLOCK
fd = socket(rp->ai_family, rp->ai_socktype | SOCK_NONBLOCK | SOCK_CLOEXEC,
rp->ai_protocol);
if (fd == -1) {
auto error = errno;
LOG(WARN) << "socket() syscall failed: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
continue;
}
# else // !SOCK_NONBLOCK
fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (fd == -1) {
auto error = errno;
LOG(WARN) << "socket() syscall failed: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
continue;
}
util::make_socket_nonblocking(fd);
util::make_socket_closeonexec(fd);
# endif // !SOCK_NONBLOCK
int val = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val,
static_cast<socklen_t>(sizeof(val))) == -1) {
auto error = errno;
LOG(WARN) << "Failed to set SO_REUSEADDR option to listener socket: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
close(fd);
continue;
}
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val,
static_cast<socklen_t>(sizeof(val))) == -1) {
auto error = errno;
LOG(WARN) << "Failed to set SO_REUSEPORT option to listener socket: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
close(fd);
continue;
}
if (faddr.family == AF_INET6) {
# ifdef IPV6_V6ONLY
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val,
static_cast<socklen_t>(sizeof(val))) == -1) {
auto error = errno;
LOG(WARN) << "Failed to set IPV6_V6ONLY option to listener socket: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
close(fd);
continue;
}
# endif // IPV6_V6ONLY
if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &val,
static_cast<socklen_t>(sizeof(val))) == -1) {
auto error = errno;
LOG(WARN)
<< "Failed to set IPV6_RECVPKTINFO option to listener socket: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
close(fd);
continue;
}
} else {
if (setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &val,
static_cast<socklen_t>(sizeof(val))) == -1) {
auto error = errno;
LOG(WARN) << "Failed to set IP_PKTINFO option to listener socket: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
close(fd);
continue;
}
}
// TODO Enable ECN
if (bind(fd, rp->ai_addr, rp->ai_addrlen) == -1) {
auto error = errno;
LOG(WARN) << "bind() syscall failed: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
close(fd);
continue;
}
# ifdef HAVE_LIBBPF
auto &quic_bpf_refs = conn_handler_->get_quic_bpf_refs();
int err;
if (should_attach_bpf()) {
auto obj = bpf_object__open_file("bpf/reuseport_kern.o", nullptr);
err = libbpf_get_error(obj);
if (err) {
LOG(FATAL) << "Failed to open bpf object file: "
<< xsi_strerror(-err, errbuf.data(), errbuf.size());
close(fd);
return -1;
}
if (bpf_object__load(obj)) {
LOG(FATAL) << "Failed to load bpf object file: "
<< xsi_strerror(errno, errbuf.data(), errbuf.size());
close(fd);
return -1;
}
auto prog = bpf_object__find_program_by_name(obj, "select_reuseport");
err = libbpf_get_error(prog);
if (err) {
LOG(FATAL) << "Failed to find sk_reuseport program: "
<< xsi_strerror(-err, errbuf.data(), errbuf.size());
close(fd);
return -1;
}
auto &ref = quic_bpf_refs[faddr.index];
auto reuseport_array =
bpf_object__find_map_by_name(obj, "reuseport_array");
err = libbpf_get_error(reuseport_array);
if (err) {
LOG(FATAL) << "Failed to get reuseport_array: "
<< xsi_strerror(-err, errbuf.data(), errbuf.size());
close(fd);
return -1;
}
ref.reuseport_array = bpf_map__fd(reuseport_array);
auto cid_prefix_map = bpf_object__find_map_by_name(obj, "cid_prefix_map");
err = libbpf_get_error(cid_prefix_map);
if (err) {
LOG(FATAL) << "Failed to get cid_prefix_map: "
<< xsi_strerror(-err, errbuf.data(), errbuf.size());
close(fd);
return -1;
}
ref.cid_prefix_map = bpf_map__fd(cid_prefix_map);
auto sk_info = bpf_object__find_map_by_name(obj, "sk_info");
err = libbpf_get_error(sk_info);
if (err) {
LOG(FATAL) << "Failed to get sk_info: "
<< xsi_strerror(-err, errbuf.data(), errbuf.size());
close(fd);
return -1;
}
constexpr uint32_t zero = 0;
auto config = get_config();
uint32_t num_socks = config->num_worker;
if (bpf_map_update_elem(bpf_map__fd(sk_info), &zero, &num_socks,
BPF_ANY) != 0) {
LOG(FATAL) << "Failed to update sk_info: "
<< xsi_strerror(errno, errbuf.data(), errbuf.size());
close(fd);
return -1;
}
auto prog_fd = bpf_program__fd(prog);
if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_REUSEPORT_EBPF, &prog_fd,
static_cast<socklen_t>(sizeof(prog_fd))) == -1) {
LOG(FATAL) << "Failed to attach bpf program: "
<< xsi_strerror(errno, errbuf.data(), errbuf.size());
close(fd);
return -1;
}
}
if (should_update_bpf_map()) {
const auto &ref = quic_bpf_refs[faddr.index];
auto sk_index = compute_sk_index();
if (bpf_map_update_elem(ref.reuseport_array, &sk_index, &fd,
BPF_NOEXIST) != 0) {
LOG(FATAL) << "Failed to update reuseport_array: "
<< xsi_strerror(errno, errbuf.data(), errbuf.size());
close(fd);
return -1;
}
if (bpf_map_update_elem(ref.cid_prefix_map, cid_prefix_.data(), &sk_index,
BPF_NOEXIST) != 0) {
LOG(FATAL) << "Failed to update cid_prefix_map: "
<< xsi_strerror(errno, errbuf.data(), errbuf.size());
close(fd);
return -1;
}
}
# endif // HAVE_LIBBPF
break;
}
if (!rp) {
LOG(FATAL) << "Listening " << (faddr.family == AF_INET ? "IPv4" : "IPv6")
<< " socket failed";
return -1;
}
faddr.fd = fd;
faddr.hostport = util::make_http_hostport(mod_config()->balloc,
StringRef{host.data()}, faddr.port);
LOG(NOTICE) << "Listening on " << faddr.hostport << ", quic";
return 0;
}
const uint8_t *Worker::get_cid_prefix() const { return cid_prefix_.data(); }
#endif // ENABLE_HTTP3

View File

@ -301,7 +301,10 @@ public:
#ifdef ENABLE_HTTP3
SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
const uint8_t *cid_prefix, size_t cid_prefixlen,
#endif // ENABLE_HTTP3
# ifdef HAVE_LIBBPF
size_t index,
# endif // HAVE_LIBBPF
#endif // ENABLE_HTTP3
const std::shared_ptr<TicketKeys> &ticket_keys,
ConnectionHandler *conn_handler,
std::shared_ptr<DownstreamConfig> downstreamconf);
@ -363,6 +366,16 @@ public:
int setup_quic_server_socket();
const uint8_t *get_cid_prefix() const;
# ifdef HAVE_LIBBPF
bool should_attach_bpf() const;
bool should_update_bpf_map() const;
uint32_t compute_sk_index() const;
# endif // HAVE_LIBBPF
int create_quic_server_socket(UpstreamAddr &addr);
#endif // ENABLE_HTTP3
DNSTracker *get_dns_tracker();
@ -371,6 +384,10 @@ private:
#ifndef NOTHREADS
std::future<void> fut_;
#endif // NOTHREADS
#if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
// Unique index of this worker.
size_t index_;
#endif // ENABLE_HTTP3 && HAVE_LIBBPF
std::mutex m_;
std::deque<WorkerEvent> q_;
std::mt19937 randgen_;