Forward QUIC packet to the correct worker

This commit is contained in:
Tatsuhiro Tsujikawa 2021-08-23 21:40:24 +09:00
parent ff389b3e97
commit 33c580ebbf
11 changed files with 179 additions and 28 deletions

View File

@ -3051,8 +3051,10 @@ int process_options(Config *config,
addr.port = 3000; addr.port = 3000;
addr.tls = true; addr.tls = true;
addr.family = AF_INET; addr.family = AF_INET;
addr.index = 0;
listenerconf.addrs.push_back(addr); listenerconf.addrs.push_back(addr);
addr.family = AF_INET6; addr.family = AF_INET6;
addr.index = 1;
listenerconf.addrs.push_back(std::move(addr)); listenerconf.addrs.push_back(std::move(addr));
} }

View File

@ -2689,6 +2689,7 @@ int parse_config(Config *config, int optid, const StringRef &opt,
auto path = std::begin(optarg) + SHRPX_UNIX_PATH_PREFIX.size(); auto path = std::begin(optarg) + SHRPX_UNIX_PATH_PREFIX.size();
addr.host = make_string_ref(config->balloc, StringRef{path, addr_end}); addr.host = make_string_ref(config->balloc, StringRef{path, addr_end});
addr.host_unix = true; addr.host_unix = true;
addr.index = addrs.size();
addrs.push_back(std::move(addr)); addrs.push_back(std::move(addr));
@ -2705,20 +2706,24 @@ int parse_config(Config *config, int optid, const StringRef &opt,
if (util::numeric_host(host, AF_INET)) { if (util::numeric_host(host, AF_INET)) {
addr.family = AF_INET; addr.family = AF_INET;
addr.index = addrs.size();
addrs.push_back(std::move(addr)); addrs.push_back(std::move(addr));
return 0; return 0;
} }
if (util::numeric_host(host, AF_INET6)) { if (util::numeric_host(host, AF_INET6)) {
addr.family = AF_INET6; addr.family = AF_INET6;
addr.index = addrs.size();
addrs.push_back(std::move(addr)); addrs.push_back(std::move(addr));
return 0; return 0;
} }
addr.family = AF_INET; addr.family = AF_INET;
addr.index = addrs.size();
addrs.push_back(addr); addrs.push_back(addr);
addr.family = AF_INET6; addr.family = AF_INET6;
addr.index = addrs.size();
addrs.push_back(std::move(addr)); addrs.push_back(std::move(addr));
return 0; return 0;

View File

@ -435,6 +435,8 @@ enum class UpstreamAltMode {
}; };
struct UpstreamAddr { struct UpstreamAddr {
// The unique index of this address.
size_t index;
// The frontend address (e.g., FQDN, hostname, IP address). If // The frontend address (e.g., FQDN, hostname, IP address). If
// |host_unix| is true, this is UNIX domain socket path. This must // |host_unix| is true, this is UNIX domain socket path. This must
// be NULL terminated string. // be NULL terminated string.

View File

@ -192,24 +192,24 @@ void ConnectionHandler::set_ticket_keys_to_worker(
} }
void ConnectionHandler::worker_reopen_log_files() { void ConnectionHandler::worker_reopen_log_files() {
WorkerEvent wev{};
wev.type = WorkerEventType::REOPEN_LOG;
for (auto &worker : workers_) { for (auto &worker : workers_) {
worker->send(wev); WorkerEvent wev{};
wev.type = WorkerEventType::REOPEN_LOG;
worker->send(std::move(wev));
} }
} }
void ConnectionHandler::worker_replace_downstream( void ConnectionHandler::worker_replace_downstream(
std::shared_ptr<DownstreamConfig> downstreamconf) { std::shared_ptr<DownstreamConfig> downstreamconf) {
WorkerEvent wev{};
wev.type = WorkerEventType::REPLACE_DOWNSTREAM;
wev.downstreamconf = std::move(downstreamconf);
for (auto &worker : workers_) { for (auto &worker : workers_) {
worker->send(wev); WorkerEvent wev{};
wev.type = WorkerEventType::REPLACE_DOWNSTREAM;
wev.downstreamconf = downstreamconf;
worker->send(std::move(wev));
} }
} }
@ -267,10 +267,18 @@ int ConnectionHandler::create_single_worker() {
} }
} }
#ifdef ENABLE_HTTP3
std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN> cid_prefix;
if (create_cid_prefix(cid_prefix.data()) != 0) {
return -1;
}
#endif // ENABLE_HTTP3
single_worker_ = std::make_unique<Worker>( single_worker_ = std::make_unique<Worker>(
loop_, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(), loop_, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(),
#ifdef ENABLE_HTTP3 #ifdef ENABLE_HTTP3
quic_sv_ssl_ctx, quic_cert_tree_.get(), quic_sv_ssl_ctx, quic_cert_tree_.get(), cid_prefix.data(),
cid_prefix.size(),
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
ticket_keys_, this, config->conn.downstream); ticket_keys_, this, config->conn.downstream);
#ifdef HAVE_MRUBY #ifdef HAVE_MRUBY
@ -355,10 +363,18 @@ int ConnectionHandler::create_worker_thread(size_t num) {
for (size_t i = 0; i < num; ++i) { for (size_t i = 0; i < num; ++i) {
auto loop = ev_loop_new(config->ev_loop_flags); auto loop = ev_loop_new(config->ev_loop_flags);
# ifdef ENABLE_HTTP3
std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN> cid_prefix;
if (create_cid_prefix(cid_prefix.data()) != 0) {
return -1;
}
# endif // ENABLE_HTTP3
auto worker = std::make_unique<Worker>( auto worker = std::make_unique<Worker>(
loop, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(), loop, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(),
# ifdef ENABLE_HTTP3 # ifdef ENABLE_HTTP3
quic_sv_ssl_ctx, quic_cert_tree_.get(), quic_sv_ssl_ctx, quic_cert_tree_.get(), cid_prefix.data(),
cid_prefix.size(),
# endif // ENABLE_HTTP3 # endif // ENABLE_HTTP3
ticket_keys_, this, config->conn.downstream); ticket_keys_, this, config->conn.downstream);
# ifdef HAVE_MRUBY # ifdef HAVE_MRUBY
@ -413,15 +429,15 @@ void ConnectionHandler::graceful_shutdown_worker() {
return; return;
} }
WorkerEvent wev{};
wev.type = WorkerEventType::GRACEFUL_SHUTDOWN;
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
LLOG(INFO, this) << "Sending graceful shutdown signal to worker"; LLOG(INFO, this) << "Sending graceful shutdown signal to worker";
} }
for (auto &worker : workers_) { for (auto &worker : workers_) {
worker->send(wev); WorkerEvent wev{};
wev.type = WorkerEventType::GRACEFUL_SHUTDOWN;
worker->send(std::move(wev));
} }
#ifndef NOTHREADS #ifndef NOTHREADS
@ -504,7 +520,7 @@ int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen,
wev.client_addrlen = addrlen; wev.client_addrlen = addrlen;
wev.faddr = faddr; wev.faddr = faddr;
worker->send(wev); worker->send(std::move(wev));
return 0; return 0;
} }
@ -981,4 +997,33 @@ void ConnectionHandler::set_enable_acceptor_on_ocsp_completion(bool f) {
enable_acceptor_on_ocsp_completion_ = f; enable_acceptor_on_ocsp_completion_ = f;
} }
#ifdef ENABLE_HTTP3
int ConnectionHandler::forward_quic_packet(const UpstreamAddr *faddr,
const Address &remote_addr,
const Address &local_addr,
const uint8_t *cid_prefix,
const uint8_t *data,
size_t datalen) {
assert(!get_config()->single_thread);
for (auto &worker : workers_) {
if (!std::equal(cid_prefix, cid_prefix + SHRPX_QUIC_CID_PREFIXLEN,
worker->get_cid_prefix())) {
continue;
}
WorkerEvent wev{};
wev.type = WorkerEventType::QUIC_PKT_FORWARD;
wev.quic_pkt = std::make_unique<QUICPacket>(faddr->index, remote_addr,
local_addr, data, datalen);
worker->send(std::move(wev));
return 0;
}
return -1;
}
#endif // ENABLE_HTTP3
} // namespace shrpx } // namespace shrpx

View File

@ -161,6 +161,10 @@ public:
const std::vector<SSL_CTX *> &get_indexed_ssl_ctx(size_t idx) const; const std::vector<SSL_CTX *> &get_indexed_ssl_ctx(size_t idx) const;
#ifdef ENABLE_HTTP3 #ifdef ENABLE_HTTP3
const std::vector<SSL_CTX *> &get_quic_indexed_ssl_ctx(size_t idx) const; const std::vector<SSL_CTX *> &get_quic_indexed_ssl_ctx(size_t idx) const;
int forward_quic_packet(const UpstreamAddr *faddr, const Address &remote_addr,
const Address &local_addr, const uint8_t *cid_prefix,
const uint8_t *data, size_t datalen);
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
#ifdef HAVE_NEVERBLEED #ifdef HAVE_NEVERBLEED

View File

@ -160,7 +160,11 @@ void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) {
namespace { namespace {
int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token, int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
size_t cidlen, void *user_data) { size_t cidlen, void *user_data) {
if (generate_quic_connection_id(cid, cidlen) != 0) { auto upstream = static_cast<Http3Upstream *>(user_data);
auto handler = upstream->get_client_handler();
auto worker = handler->get_worker();
if (generate_quic_connection_id(cid, cidlen, worker->get_cid_prefix()) != 0) {
return NGTCP2_ERR_CALLBACK_FAILURE; return NGTCP2_ERR_CALLBACK_FAILURE;
} }
@ -173,9 +177,6 @@ int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
return NGTCP2_ERR_CALLBACK_FAILURE; return NGTCP2_ERR_CALLBACK_FAILURE;
} }
auto upstream = static_cast<Http3Upstream *>(user_data);
auto handler = upstream->get_client_handler();
auto worker = handler->get_worker();
auto quic_connection_handler = worker->get_quic_connection_handler(); auto quic_connection_handler = worker->get_quic_connection_handler();
quic_connection_handler->add_connection_id(cid, handler); quic_connection_handler->add_connection_id(cid, handler);
@ -451,7 +452,8 @@ int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr,
ngtcp2_cid scid; ngtcp2_cid scid;
if (generate_quic_connection_id(&scid, SHRPX_QUIC_SCIDLEN) != 0) { if (generate_quic_connection_id(&scid, SHRPX_QUIC_SCIDLEN,
worker->get_cid_prefix()) != 0) {
return -1; return -1;
} }

View File

@ -297,6 +297,21 @@ int generate_quic_connection_id(ngtcp2_cid *cid, size_t cidlen) {
return 0; return 0;
} }
int generate_quic_connection_id(ngtcp2_cid *cid, size_t cidlen,
const uint8_t *cid_prefix) {
assert(cidlen > SHRPX_QUIC_CID_PREFIXLEN);
auto p = std::copy_n(cid_prefix, SHRPX_QUIC_CID_PREFIXLEN, cid->data);
if (RAND_bytes(p, cidlen - SHRPX_QUIC_CID_PREFIXLEN) != 1) {
return -1;
}
cid->datalen = cidlen;
return 0;
}
int generate_quic_stateless_reset_token(uint8_t *token, const ngtcp2_cid *cid, int generate_quic_stateless_reset_token(uint8_t *token, const ngtcp2_cid *cid,
const uint8_t *secret, const uint8_t *secret,
size_t secretlen) { size_t secretlen) {

View File

@ -36,6 +36,7 @@ namespace shrpx {
struct UpstreamAddr; struct UpstreamAddr;
constexpr size_t SHRPX_QUIC_SCIDLEN = 20; constexpr size_t SHRPX_QUIC_SCIDLEN = 20;
constexpr size_t SHRPX_QUIC_CID_PREFIXLEN = 8;
constexpr size_t SHRPX_MAX_UDP_PAYLOAD_SIZE = 1280; constexpr size_t SHRPX_MAX_UDP_PAYLOAD_SIZE = 1280;
ngtcp2_tstamp quic_timestamp(); ngtcp2_tstamp quic_timestamp();
@ -49,6 +50,9 @@ int quic_send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa,
int generate_quic_connection_id(ngtcp2_cid *cid, size_t cidlen); int generate_quic_connection_id(ngtcp2_cid *cid, size_t cidlen);
int generate_quic_connection_id(ngtcp2_cid *cid, size_t cidlen,
const uint8_t *cid_prefix);
int generate_quic_stateless_reset_token(uint8_t *token, const ngtcp2_cid *cid, int generate_quic_stateless_reset_token(uint8_t *token, const ngtcp2_cid *cid,
const uint8_t *secret, const uint8_t *secret,
size_t secretlen); size_t secretlen);

View File

@ -33,6 +33,7 @@
#include "shrpx_log.h" #include "shrpx_log.h"
#include "shrpx_quic.h" #include "shrpx_quic.h"
#include "shrpx_http3_upstream.h" #include "shrpx_http3_upstream.h"
#include "shrpx_connection_handler.h"
namespace shrpx { namespace shrpx {
@ -94,6 +95,18 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
remote_addr, local_addr); remote_addr, local_addr);
return 0; return 0;
default: default:
if (!get_config()->single_thread && !(data[0] & 0x80) &&
dcidlen > SHRPX_QUIC_CID_PREFIXLEN &&
!std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN,
worker_->get_cid_prefix())) {
auto conn_handler = worker_->get_connection_handler();
if (conn_handler->forward_quic_packet(faddr, remote_addr, local_addr,
dcid, data, datalen) == 0) {
return 0;
}
}
// TODO Must be rate limited // TODO Must be rate limited
send_stateless_reset(faddr, dcid, dcidlen, remote_addr, local_addr); send_stateless_reset(faddr, dcid, dcidlen, remote_addr, local_addr);
return 0; return 0;

View File

@ -40,7 +40,6 @@
# include "shrpx_mruby.h" # include "shrpx_mruby.h"
#endif // HAVE_MRUBY #endif // HAVE_MRUBY
#ifdef ENABLE_HTTP3 #ifdef ENABLE_HTTP3
# include "shrpx_quic.h"
# include "shrpx_quic_listener.h" # include "shrpx_quic_listener.h"
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
#include "util.h" #include "util.h"
@ -137,6 +136,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
tls::CertLookupTree *cert_tree, tls::CertLookupTree *cert_tree,
#ifdef ENABLE_HTTP3 #ifdef ENABLE_HTTP3
SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree, SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
const uint8_t *cid_prefix, size_t cid_prefixlen,
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
const std::shared_ptr<TicketKeys> &ticket_keys, const std::shared_ptr<TicketKeys> &ticket_keys,
ConnectionHandler *conn_handler, ConnectionHandler *conn_handler,
@ -161,6 +161,10 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
connect_blocker_( connect_blocker_(
std::make_unique<ConnectBlocker>(randgen_, loop_, nullptr, nullptr)), std::make_unique<ConnectBlocker>(randgen_, loop_, nullptr, nullptr)),
graceful_shutdown_(false) { graceful_shutdown_(false) {
#ifdef ENABLE_HTTP3
std::copy_n(cid_prefix, cid_prefixlen, std::begin(cid_prefix_));
#endif // ENABLE_HTTP3
ev_async_init(&w_, eventcb); ev_async_init(&w_, eventcb);
w_.data = this; w_.data = this;
ev_async_start(loop_, &w_); ev_async_start(loop_, &w_);
@ -415,11 +419,11 @@ void Worker::run_async() {
#endif // !NOTHREADS #endif // !NOTHREADS
} }
void Worker::send(const WorkerEvent &event) { void Worker::send(WorkerEvent event) {
{ {
std::lock_guard<std::mutex> g(m_); std::lock_guard<std::mutex> g(m_);
q_.push_back(event); q_.emplace_back(std::move(event));
} }
ev_async_send(loop_, &w_); ev_async_send(loop_, &w_);
@ -440,7 +444,7 @@ void Worker::process_events() {
return; return;
} }
wev = q_.front(); wev = std::move(q_.front());
q_.pop_front(); q_.pop_front();
} }
@ -510,6 +514,16 @@ void Worker::process_events() {
replace_downstream_config(wev.downstreamconf); replace_downstream_config(wev.downstreamconf);
break; break;
#ifdef ENABLE_HTTP3
case WorkerEventType::QUIC_PKT_FORWARD: {
quic_conn_handler_.handle_packet(
&quic_upstream_addrs_[wev.quic_pkt->upstream_addr_index],
wev.quic_pkt->remote_addr, wev.quic_pkt->local_addr,
wev.quic_pkt->data.data(), wev.quic_pkt->data.size());
break;
}
#endif // ENABLE_HTTP3
default: default:
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
WLOG(INFO, this) << "unknown event type " << static_cast<int>(wev.type); WLOG(INFO, this) << "unknown event type " << static_cast<int>(wev.type);
@ -624,6 +638,8 @@ int Worker::setup_quic_server_socket() {
return 0; return 0;
} }
const uint8_t *Worker::get_cid_prefix() const { return cid_prefix_.data(); }
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
namespace { namespace {
@ -797,4 +813,14 @@ void downstream_failure(DownstreamAddr *addr, const Address *raddr) {
} }
} }
#ifdef ENABLE_HTTP3
int create_cid_prefix(uint8_t *cid_prefix) {
if (RAND_bytes(cid_prefix, SHRPX_QUIC_CID_PREFIXLEN) != 1) {
return -1;
}
return 0;
}
#endif // ENABLE_HTTP3
} // namespace shrpx } // namespace shrpx

View File

@ -52,6 +52,7 @@
#include "shrpx_dns_tracker.h" #include "shrpx_dns_tracker.h"
#ifdef ENABLE_HTTP3 #ifdef ENABLE_HTTP3
# include "shrpx_quic_connection_handler.h" # include "shrpx_quic_connection_handler.h"
# include "shrpx_quic.h"
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
#include "allocator.h" #include "allocator.h"
@ -252,11 +253,29 @@ struct WorkerStat {
size_t num_connections; size_t num_connections;
}; };
#ifdef ENABLE_HTTP3
struct QUICPacket {
QUICPacket(size_t upstream_addr_index, const Address &remote_addr,
const Address &local_addr, const uint8_t *data, size_t datalen)
: upstream_addr_index{upstream_addr_index},
remote_addr{remote_addr},
local_addr{local_addr},
data{data, data + datalen} {}
size_t upstream_addr_index;
Address remote_addr;
Address local_addr;
std::vector<uint8_t> data;
};
#endif // ENABLE_HTTP3
enum class WorkerEventType { enum class WorkerEventType {
NEW_CONNECTION = 0x01, NEW_CONNECTION = 0x01,
REOPEN_LOG = 0x02, REOPEN_LOG = 0x02,
GRACEFUL_SHUTDOWN = 0x03, GRACEFUL_SHUTDOWN = 0x03,
REPLACE_DOWNSTREAM = 0x04, REPLACE_DOWNSTREAM = 0x04,
#ifdef ENABLE_HTTP3
QUIC_PKT_FORWARD = 0x05,
#endif // ENABLE_HTTP3
}; };
struct WorkerEvent { struct WorkerEvent {
@ -269,6 +288,9 @@ struct WorkerEvent {
}; };
std::shared_ptr<TicketKeys> ticket_keys; std::shared_ptr<TicketKeys> ticket_keys;
std::shared_ptr<DownstreamConfig> downstreamconf; std::shared_ptr<DownstreamConfig> downstreamconf;
#ifdef ENABLE_HTTP3
std::unique_ptr<QUICPacket> quic_pkt;
#endif // ENABLE_HTTP3
}; };
class Worker { class Worker {
@ -278,6 +300,7 @@ public:
tls::CertLookupTree *cert_tree, tls::CertLookupTree *cert_tree,
#ifdef ENABLE_HTTP3 #ifdef ENABLE_HTTP3
SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree, SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
const uint8_t *cid_prefix, size_t cid_prefixlen,
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
const std::shared_ptr<TicketKeys> &ticket_keys, const std::shared_ptr<TicketKeys> &ticket_keys,
ConnectionHandler *conn_handler, ConnectionHandler *conn_handler,
@ -286,7 +309,7 @@ public:
void run_async(); void run_async();
void wait(); void wait();
void process_events(); void process_events();
void send(const WorkerEvent &event); void send(WorkerEvent event);
tls::CertLookupTree *get_cert_lookup_tree() const; tls::CertLookupTree *get_cert_lookup_tree() const;
#ifdef ENABLE_HTTP3 #ifdef ENABLE_HTTP3
@ -338,6 +361,8 @@ public:
QUICConnectionHandler *get_quic_connection_handler(); QUICConnectionHandler *get_quic_connection_handler();
int setup_quic_server_socket(); int setup_quic_server_socket();
const uint8_t *get_cid_prefix() const;
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
DNSTracker *get_dns_tracker(); DNSTracker *get_dns_tracker();
@ -357,6 +382,7 @@ private:
DNSTracker dns_tracker_; DNSTracker dns_tracker_;
#ifdef ENABLE_HTTP3 #ifdef ENABLE_HTTP3
std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN> cid_prefix_;
std::vector<UpstreamAddr> quic_upstream_addrs_; std::vector<UpstreamAddr> quic_upstream_addrs_;
std::vector<std::unique_ptr<QUICListener>> quic_listeners_; std::vector<std::unique_ptr<QUICListener>> quic_listeners_;
#endif // ENABLE_HTTP3 #endif // ENABLE_HTTP3
@ -410,6 +436,13 @@ size_t match_downstream_addr_group(
// nullptr. This function may schedule live check. // nullptr. This function may schedule live check.
void downstream_failure(DownstreamAddr *addr, const Address *raddr); void downstream_failure(DownstreamAddr *addr, const Address *raddr);
#ifdef ENABLE_HTTP3
// Creates unpredictable SHRPX_QUIC_CID_PREFIXLEN bytes sequence which
// is used as a prefix of QUIC Connection ID. This function returns
// -1 on failure.
int create_cid_prefix(uint8_t *cid_prefix);
#endif // ENABLE_HTTP3
} // namespace shrpx } // namespace shrpx
#endif // SHRPX_WORKER_H #endif // SHRPX_WORKER_H