From 33c580ebbfc70cc2328a73f7fd7dbc1ddc6ae2f9 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Mon, 23 Aug 2021 21:40:24 +0900 Subject: [PATCH] Forward QUIC packet to the correct worker --- src/shrpx.cc | 2 + src/shrpx_config.cc | 5 ++ src/shrpx_config.h | 2 + src/shrpx_connection_handler.cc | 81 +++++++++++++++++++++------- src/shrpx_connection_handler.h | 4 ++ src/shrpx_http3_upstream.cc | 12 +++-- src/shrpx_quic.cc | 15 ++++++ src/shrpx_quic.h | 4 ++ src/shrpx_quic_connection_handler.cc | 13 +++++ src/shrpx_worker.cc | 34 ++++++++++-- src/shrpx_worker.h | 35 +++++++++++- 11 files changed, 179 insertions(+), 28 deletions(-) diff --git a/src/shrpx.cc b/src/shrpx.cc index f9fc4739..23260d32 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -3051,8 +3051,10 @@ int process_options(Config *config, addr.port = 3000; addr.tls = true; addr.family = AF_INET; + addr.index = 0; listenerconf.addrs.push_back(addr); addr.family = AF_INET6; + addr.index = 1; listenerconf.addrs.push_back(std::move(addr)); } diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index c3eb218a..aadbe916 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -2689,6 +2689,7 @@ int parse_config(Config *config, int optid, const StringRef &opt, auto path = std::begin(optarg) + SHRPX_UNIX_PATH_PREFIX.size(); addr.host = make_string_ref(config->balloc, StringRef{path, addr_end}); addr.host_unix = true; + addr.index = addrs.size(); addrs.push_back(std::move(addr)); @@ -2705,20 +2706,24 @@ int parse_config(Config *config, int optid, const StringRef &opt, if (util::numeric_host(host, AF_INET)) { addr.family = AF_INET; + addr.index = addrs.size(); addrs.push_back(std::move(addr)); return 0; } if (util::numeric_host(host, AF_INET6)) { addr.family = AF_INET6; + addr.index = addrs.size(); addrs.push_back(std::move(addr)); return 0; } addr.family = AF_INET; + addr.index = addrs.size(); addrs.push_back(addr); addr.family = AF_INET6; + addr.index = addrs.size(); addrs.push_back(std::move(addr)); return 0; diff --git a/src/shrpx_config.h b/src/shrpx_config.h index d3a22baa..16a6dbce 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -435,6 +435,8 @@ enum class UpstreamAltMode { }; struct UpstreamAddr { + // The unique index of this address. + size_t index; // The frontend address (e.g., FQDN, hostname, IP address). If // |host_unix| is true, this is UNIX domain socket path. This must // be NULL terminated string. diff --git a/src/shrpx_connection_handler.cc b/src/shrpx_connection_handler.cc index 39ea08cd..ffa1ae32 100644 --- a/src/shrpx_connection_handler.cc +++ b/src/shrpx_connection_handler.cc @@ -192,24 +192,24 @@ void ConnectionHandler::set_ticket_keys_to_worker( } void ConnectionHandler::worker_reopen_log_files() { - WorkerEvent wev{}; - - wev.type = WorkerEventType::REOPEN_LOG; - for (auto &worker : workers_) { - worker->send(wev); + WorkerEvent wev{}; + + wev.type = WorkerEventType::REOPEN_LOG; + + worker->send(std::move(wev)); } } void ConnectionHandler::worker_replace_downstream( std::shared_ptr downstreamconf) { - WorkerEvent wev{}; - - wev.type = WorkerEventType::REPLACE_DOWNSTREAM; - wev.downstreamconf = std::move(downstreamconf); - for (auto &worker : workers_) { - worker->send(wev); + WorkerEvent wev{}; + + wev.type = WorkerEventType::REPLACE_DOWNSTREAM; + wev.downstreamconf = downstreamconf; + + worker->send(std::move(wev)); } } @@ -267,10 +267,18 @@ int ConnectionHandler::create_single_worker() { } } +#ifdef ENABLE_HTTP3 + std::array cid_prefix; + if (create_cid_prefix(cid_prefix.data()) != 0) { + return -1; + } +#endif // ENABLE_HTTP3 + single_worker_ = std::make_unique( loop_, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(), #ifdef ENABLE_HTTP3 - quic_sv_ssl_ctx, quic_cert_tree_.get(), + quic_sv_ssl_ctx, quic_cert_tree_.get(), cid_prefix.data(), + cid_prefix.size(), #endif // ENABLE_HTTP3 ticket_keys_, this, config->conn.downstream); #ifdef HAVE_MRUBY @@ -355,10 +363,18 @@ int ConnectionHandler::create_worker_thread(size_t num) { for (size_t i = 0; i < num; ++i) { auto loop = ev_loop_new(config->ev_loop_flags); +# ifdef ENABLE_HTTP3 + std::array cid_prefix; + if (create_cid_prefix(cid_prefix.data()) != 0) { + return -1; + } +# endif // ENABLE_HTTP3 + auto worker = std::make_unique( loop, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(), # ifdef ENABLE_HTTP3 - quic_sv_ssl_ctx, quic_cert_tree_.get(), + quic_sv_ssl_ctx, quic_cert_tree_.get(), cid_prefix.data(), + cid_prefix.size(), # endif // ENABLE_HTTP3 ticket_keys_, this, config->conn.downstream); # ifdef HAVE_MRUBY @@ -413,15 +429,15 @@ void ConnectionHandler::graceful_shutdown_worker() { return; } - WorkerEvent wev{}; - wev.type = WorkerEventType::GRACEFUL_SHUTDOWN; - if (LOG_ENABLED(INFO)) { LLOG(INFO, this) << "Sending graceful shutdown signal to worker"; } for (auto &worker : workers_) { - worker->send(wev); + WorkerEvent wev{}; + wev.type = WorkerEventType::GRACEFUL_SHUTDOWN; + + worker->send(std::move(wev)); } #ifndef NOTHREADS @@ -504,7 +520,7 @@ int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen, wev.client_addrlen = addrlen; wev.faddr = faddr; - worker->send(wev); + worker->send(std::move(wev)); return 0; } @@ -981,4 +997,33 @@ void ConnectionHandler::set_enable_acceptor_on_ocsp_completion(bool f) { enable_acceptor_on_ocsp_completion_ = f; } +#ifdef ENABLE_HTTP3 +int ConnectionHandler::forward_quic_packet(const UpstreamAddr *faddr, + const Address &remote_addr, + const Address &local_addr, + const uint8_t *cid_prefix, + const uint8_t *data, + size_t datalen) { + assert(!get_config()->single_thread); + + for (auto &worker : workers_) { + if (!std::equal(cid_prefix, cid_prefix + SHRPX_QUIC_CID_PREFIXLEN, + worker->get_cid_prefix())) { + continue; + } + + WorkerEvent wev{}; + wev.type = WorkerEventType::QUIC_PKT_FORWARD; + wev.quic_pkt = std::make_unique(faddr->index, remote_addr, + local_addr, data, datalen); + + worker->send(std::move(wev)); + + return 0; + } + + return -1; +} +#endif // ENABLE_HTTP3 + } // namespace shrpx diff --git a/src/shrpx_connection_handler.h b/src/shrpx_connection_handler.h index 3e8a5406..9054d182 100644 --- a/src/shrpx_connection_handler.h +++ b/src/shrpx_connection_handler.h @@ -161,6 +161,10 @@ public: const std::vector &get_indexed_ssl_ctx(size_t idx) const; #ifdef ENABLE_HTTP3 const std::vector &get_quic_indexed_ssl_ctx(size_t idx) const; + + int forward_quic_packet(const UpstreamAddr *faddr, const Address &remote_addr, + const Address &local_addr, const uint8_t *cid_prefix, + const uint8_t *data, size_t datalen); #endif // ENABLE_HTTP3 #ifdef HAVE_NEVERBLEED diff --git a/src/shrpx_http3_upstream.cc b/src/shrpx_http3_upstream.cc index c4dbbd86..dab44029 100644 --- a/src/shrpx_http3_upstream.cc +++ b/src/shrpx_http3_upstream.cc @@ -160,7 +160,11 @@ void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) { namespace { int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token, size_t cidlen, void *user_data) { - if (generate_quic_connection_id(cid, cidlen) != 0) { + auto upstream = static_cast(user_data); + auto handler = upstream->get_client_handler(); + auto worker = handler->get_worker(); + + if (generate_quic_connection_id(cid, cidlen, worker->get_cid_prefix()) != 0) { return NGTCP2_ERR_CALLBACK_FAILURE; } @@ -173,9 +177,6 @@ int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token, return NGTCP2_ERR_CALLBACK_FAILURE; } - auto upstream = static_cast(user_data); - auto handler = upstream->get_client_handler(); - auto worker = handler->get_worker(); auto quic_connection_handler = worker->get_quic_connection_handler(); quic_connection_handler->add_connection_id(cid, handler); @@ -451,7 +452,8 @@ int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr, ngtcp2_cid scid; - if (generate_quic_connection_id(&scid, SHRPX_QUIC_SCIDLEN) != 0) { + if (generate_quic_connection_id(&scid, SHRPX_QUIC_SCIDLEN, + worker->get_cid_prefix()) != 0) { return -1; } diff --git a/src/shrpx_quic.cc b/src/shrpx_quic.cc index a1ea2a32..9bb86f63 100644 --- a/src/shrpx_quic.cc +++ b/src/shrpx_quic.cc @@ -297,6 +297,21 @@ int generate_quic_connection_id(ngtcp2_cid *cid, size_t cidlen) { return 0; } +int generate_quic_connection_id(ngtcp2_cid *cid, size_t cidlen, + const uint8_t *cid_prefix) { + assert(cidlen > SHRPX_QUIC_CID_PREFIXLEN); + + auto p = std::copy_n(cid_prefix, SHRPX_QUIC_CID_PREFIXLEN, cid->data); + + if (RAND_bytes(p, cidlen - SHRPX_QUIC_CID_PREFIXLEN) != 1) { + return -1; + } + + cid->datalen = cidlen; + + return 0; +} + int generate_quic_stateless_reset_token(uint8_t *token, const ngtcp2_cid *cid, const uint8_t *secret, size_t secretlen) { diff --git a/src/shrpx_quic.h b/src/shrpx_quic.h index 4fa329e8..791b6fd6 100644 --- a/src/shrpx_quic.h +++ b/src/shrpx_quic.h @@ -36,6 +36,7 @@ namespace shrpx { struct UpstreamAddr; constexpr size_t SHRPX_QUIC_SCIDLEN = 20; +constexpr size_t SHRPX_QUIC_CID_PREFIXLEN = 8; constexpr size_t SHRPX_MAX_UDP_PAYLOAD_SIZE = 1280; ngtcp2_tstamp quic_timestamp(); @@ -49,6 +50,9 @@ int quic_send_packet(const UpstreamAddr *faddr, const sockaddr *remote_sa, int generate_quic_connection_id(ngtcp2_cid *cid, size_t cidlen); +int generate_quic_connection_id(ngtcp2_cid *cid, size_t cidlen, + const uint8_t *cid_prefix); + int generate_quic_stateless_reset_token(uint8_t *token, const ngtcp2_cid *cid, const uint8_t *secret, size_t secretlen); diff --git a/src/shrpx_quic_connection_handler.cc b/src/shrpx_quic_connection_handler.cc index d83f0822..46b203d8 100644 --- a/src/shrpx_quic_connection_handler.cc +++ b/src/shrpx_quic_connection_handler.cc @@ -33,6 +33,7 @@ #include "shrpx_log.h" #include "shrpx_quic.h" #include "shrpx_http3_upstream.h" +#include "shrpx_connection_handler.h" namespace shrpx { @@ -94,6 +95,18 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr, remote_addr, local_addr); return 0; default: + if (!get_config()->single_thread && !(data[0] & 0x80) && + dcidlen > SHRPX_QUIC_CID_PREFIXLEN && + !std::equal(dcid, dcid + SHRPX_QUIC_CID_PREFIXLEN, + worker_->get_cid_prefix())) { + auto conn_handler = worker_->get_connection_handler(); + + if (conn_handler->forward_quic_packet(faddr, remote_addr, local_addr, + dcid, data, datalen) == 0) { + return 0; + } + } + // TODO Must be rate limited send_stateless_reset(faddr, dcid, dcidlen, remote_addr, local_addr); return 0; diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index b153b0e6..b30a6a5b 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -40,7 +40,6 @@ # include "shrpx_mruby.h" #endif // HAVE_MRUBY #ifdef ENABLE_HTTP3 -# include "shrpx_quic.h" # include "shrpx_quic_listener.h" #endif // ENABLE_HTTP3 #include "util.h" @@ -137,6 +136,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, tls::CertLookupTree *cert_tree, #ifdef ENABLE_HTTP3 SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree, + const uint8_t *cid_prefix, size_t cid_prefixlen, #endif // ENABLE_HTTP3 const std::shared_ptr &ticket_keys, ConnectionHandler *conn_handler, @@ -161,6 +161,10 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, connect_blocker_( std::make_unique(randgen_, loop_, nullptr, nullptr)), graceful_shutdown_(false) { +#ifdef ENABLE_HTTP3 + std::copy_n(cid_prefix, cid_prefixlen, std::begin(cid_prefix_)); +#endif // ENABLE_HTTP3 + ev_async_init(&w_, eventcb); w_.data = this; ev_async_start(loop_, &w_); @@ -415,11 +419,11 @@ void Worker::run_async() { #endif // !NOTHREADS } -void Worker::send(const WorkerEvent &event) { +void Worker::send(WorkerEvent event) { { std::lock_guard g(m_); - q_.push_back(event); + q_.emplace_back(std::move(event)); } ev_async_send(loop_, &w_); @@ -440,7 +444,7 @@ void Worker::process_events() { return; } - wev = q_.front(); + wev = std::move(q_.front()); q_.pop_front(); } @@ -510,6 +514,16 @@ void Worker::process_events() { replace_downstream_config(wev.downstreamconf); break; +#ifdef ENABLE_HTTP3 + case WorkerEventType::QUIC_PKT_FORWARD: { + quic_conn_handler_.handle_packet( + &quic_upstream_addrs_[wev.quic_pkt->upstream_addr_index], + wev.quic_pkt->remote_addr, wev.quic_pkt->local_addr, + wev.quic_pkt->data.data(), wev.quic_pkt->data.size()); + + break; + } +#endif // ENABLE_HTTP3 default: if (LOG_ENABLED(INFO)) { WLOG(INFO, this) << "unknown event type " << static_cast(wev.type); @@ -624,6 +638,8 @@ int Worker::setup_quic_server_socket() { return 0; } + +const uint8_t *Worker::get_cid_prefix() const { return cid_prefix_.data(); } #endif // ENABLE_HTTP3 namespace { @@ -797,4 +813,14 @@ void downstream_failure(DownstreamAddr *addr, const Address *raddr) { } } +#ifdef ENABLE_HTTP3 +int create_cid_prefix(uint8_t *cid_prefix) { + if (RAND_bytes(cid_prefix, SHRPX_QUIC_CID_PREFIXLEN) != 1) { + return -1; + } + + return 0; +} +#endif // ENABLE_HTTP3 + } // namespace shrpx diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index e4e83e6e..ec57f580 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -52,6 +52,7 @@ #include "shrpx_dns_tracker.h" #ifdef ENABLE_HTTP3 # include "shrpx_quic_connection_handler.h" +# include "shrpx_quic.h" #endif // ENABLE_HTTP3 #include "allocator.h" @@ -252,11 +253,29 @@ struct WorkerStat { size_t num_connections; }; +#ifdef ENABLE_HTTP3 +struct QUICPacket { + QUICPacket(size_t upstream_addr_index, const Address &remote_addr, + const Address &local_addr, const uint8_t *data, size_t datalen) + : upstream_addr_index{upstream_addr_index}, + remote_addr{remote_addr}, + local_addr{local_addr}, + data{data, data + datalen} {} + size_t upstream_addr_index; + Address remote_addr; + Address local_addr; + std::vector data; +}; +#endif // ENABLE_HTTP3 + enum class WorkerEventType { NEW_CONNECTION = 0x01, REOPEN_LOG = 0x02, GRACEFUL_SHUTDOWN = 0x03, REPLACE_DOWNSTREAM = 0x04, +#ifdef ENABLE_HTTP3 + QUIC_PKT_FORWARD = 0x05, +#endif // ENABLE_HTTP3 }; struct WorkerEvent { @@ -269,6 +288,9 @@ struct WorkerEvent { }; std::shared_ptr ticket_keys; std::shared_ptr downstreamconf; +#ifdef ENABLE_HTTP3 + std::unique_ptr quic_pkt; +#endif // ENABLE_HTTP3 }; class Worker { @@ -278,6 +300,7 @@ public: tls::CertLookupTree *cert_tree, #ifdef ENABLE_HTTP3 SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree, + const uint8_t *cid_prefix, size_t cid_prefixlen, #endif // ENABLE_HTTP3 const std::shared_ptr &ticket_keys, ConnectionHandler *conn_handler, @@ -286,7 +309,7 @@ public: void run_async(); void wait(); void process_events(); - void send(const WorkerEvent &event); + void send(WorkerEvent event); tls::CertLookupTree *get_cert_lookup_tree() const; #ifdef ENABLE_HTTP3 @@ -338,6 +361,8 @@ public: QUICConnectionHandler *get_quic_connection_handler(); int setup_quic_server_socket(); + + const uint8_t *get_cid_prefix() const; #endif // ENABLE_HTTP3 DNSTracker *get_dns_tracker(); @@ -357,6 +382,7 @@ private: DNSTracker dns_tracker_; #ifdef ENABLE_HTTP3 + std::array cid_prefix_; std::vector quic_upstream_addrs_; std::vector> quic_listeners_; #endif // ENABLE_HTTP3 @@ -410,6 +436,13 @@ size_t match_downstream_addr_group( // nullptr. This function may schedule live check. void downstream_failure(DownstreamAddr *addr, const Address *raddr); +#ifdef ENABLE_HTTP3 +// Creates unpredictable SHRPX_QUIC_CID_PREFIXLEN bytes sequence which +// is used as a prefix of QUIC Connection ID. This function returns +// -1 on failure. +int create_cid_prefix(uint8_t *cid_prefix); +#endif // ENABLE_HTTP3 + } // namespace shrpx #endif // SHRPX_WORKER_H