nghttpx: Add QUICListener

This commit is contained in:
Tatsuhiro Tsujikawa 2021-08-15 16:57:39 +09:00
parent 01da060496
commit 8b2746abf1
7 changed files with 201 additions and 11 deletions

View File

@ -154,6 +154,7 @@ NGHTTPX_SRCS = \
shrpx_dual_dns_resolver.cc shrpx_dual_dns_resolver.h \ shrpx_dual_dns_resolver.cc shrpx_dual_dns_resolver.h \
shrpx_dns_tracker.cc shrpx_dns_tracker.h \ shrpx_dns_tracker.cc shrpx_dns_tracker.h \
shrpx_quic.cc shrpx_quic.h \ shrpx_quic.cc shrpx_quic.h \
shrpx_quic_listener.cc shrpx_quic_listener.h \
buffer.h memchunk.h template.h allocator.h \ buffer.h memchunk.h template.h allocator.h \
xsi_strerror.c xsi_strerror.h xsi_strerror.c xsi_strerror.h

View File

@ -0,0 +1,97 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2021 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_quic_listener.h"
#include "shrpx_worker.h"
#include "shrpx_config.h"
#include "shrpx_log.h"
namespace shrpx {
namespace {
void readcb(struct ev_loop *loop, ev_io *w, int revent) {
auto l = static_cast<QUICListener *>(w->data);
l->on_read();
}
} // namespace
QUICListener::QUICListener(const UpstreamAddr *faddr, Worker *worker)
: faddr_(faddr), worker_(worker) {
ev_io_init(&rev_, readcb, faddr_->fd, EV_READ);
rev_.data = this;
ev_io_start(worker_->get_loop(), &rev_);
}
QUICListener::~QUICListener() {
ev_io_stop(worker_->get_loop(), &rev_);
close(faddr_->fd);
}
void QUICListener::on_read() {
sockaddr_union su;
std::array<uint8_t, 64_k> buf;
size_t pktcnt = 0;
iovec msg_iov{buf.data(), buf.size()};
msghdr msg{};
msg.msg_name = &su;
msg.msg_iov = &msg_iov;
msg.msg_iovlen = 1;
uint8_t msg_ctrl[CMSG_SPACE(sizeof(in6_pktinfo))];
msg.msg_control = msg_ctrl;
for (; pktcnt < 10;) {
msg.msg_namelen = sizeof(su);
msg.msg_controllen = sizeof(msg_ctrl);
auto nread = recvmsg(faddr_->fd, &msg, 0);
if (nread == -1) {
return;
}
++pktcnt;
Address local_addr{};
if (util::msghdr_get_local_addr(local_addr, &msg, su.storage.ss_family) !=
0) {
continue;
}
util::set_port(local_addr, faddr_->port);
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "QUIC received packet: local="
<< util::to_numeric_addr(&local_addr)
<< " remote=" << util::to_numeric_addr(&su.sa, msg.msg_namelen)
<< " " << nread << " bytes";
}
if (nread == 0) {
continue;
}
}
}
} // namespace shrpx

51
src/shrpx_quic_listener.h Normal file
View File

@ -0,0 +1,51 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2021 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_QUIC_LISTENER_H
#define SHRPX_QUIC_LISTENER_H
#include "shrpx.h"
#include <ev.h>
namespace shrpx {
struct UpstreamAddr;
class Worker;
class QUICListener {
public:
QUICListener(const UpstreamAddr *faddr, Worker *worker);
~QUICListener();
void on_read();
private:
const UpstreamAddr *faddr_;
Worker *worker_;
ev_io rev_;
};
} // namespace shrpx
#endif // SHRPX_QUIC_LISTENER_H

View File

@ -40,6 +40,7 @@
# include "shrpx_mruby.h" # include "shrpx_mruby.h"
#endif // HAVE_MRUBY #endif // HAVE_MRUBY
#include "shrpx_quic.h" #include "shrpx_quic.h"
#include "shrpx_quic_listener.h"
#include "util.h" #include "util.h"
#include "template.h" #include "template.h"
@ -584,15 +585,12 @@ DNSTracker *Worker::get_dns_tracker() { return &dns_tracker_; }
int Worker::setup_quic_server_socket() { int Worker::setup_quic_server_socket() {
for (auto &addr : quic_upstream_addrs_) { for (auto &addr : quic_upstream_addrs_) {
if (addr.host_unix) { assert(!addr.host_unix);
/* TODO Not implemented */
assert(0);
continue;
}
if (create_quic_server_socket(addr) != 0) { if (create_quic_server_socket(addr) != 0) {
return -1; return -1;
} }
quic_listeners_.emplace_back(std::make_unique<QUICListener>(&addr, this));
} }
return 0; return 0;

View File

@ -61,6 +61,7 @@ class ConnectBlocker;
class MemcachedDispatcher; class MemcachedDispatcher;
struct UpstreamAddr; struct UpstreamAddr;
class ConnectionHandler; class ConnectionHandler;
class QUICListener;
#ifdef HAVE_MRUBY #ifdef HAVE_MRUBY
namespace mruby { namespace mruby {
@ -338,6 +339,7 @@ private:
DNSTracker dns_tracker_; DNSTracker dns_tracker_;
std::vector<UpstreamAddr> quic_upstream_addrs_; std::vector<UpstreamAddr> quic_upstream_addrs_;
std::vector<std::unique_ptr<QUICListener>> quic_listeners_;
std::shared_ptr<DownstreamConfig> downstreamconf_; std::shared_ptr<DownstreamConfig> downstreamconf_;
std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_; std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_;

View File

@ -686,18 +686,21 @@ std::string numeric_name(const struct sockaddr *sa, socklen_t salen) {
} }
std::string to_numeric_addr(const Address *addr) { std::string to_numeric_addr(const Address *addr) {
auto family = addr->su.storage.ss_family; return to_numeric_addr(&addr->su.sa, addr->len);
}
std::string to_numeric_addr(const struct sockaddr *sa, socklen_t salen) {
auto family = sa->sa_family;
#ifndef _WIN32 #ifndef _WIN32
if (family == AF_UNIX) { if (family == AF_UNIX) {
return addr->su.un.sun_path; return reinterpret_cast<const sockaddr_un *>(sa)->sun_path;
} }
#endif // !_WIN32 #endif // !_WIN32
std::array<char, NI_MAXHOST> host; std::array<char, NI_MAXHOST> host;
std::array<char, NI_MAXSERV> serv; std::array<char, NI_MAXSERV> serv;
auto rv = auto rv = getnameinfo(sa, salen, host.data(), host.size(), serv.data(),
getnameinfo(&addr->su.sa, addr->len, host.data(), host.size(), serv.size(), NI_NUMERICHOST | NI_NUMERICSERV);
serv.data(), serv.size(), NI_NUMERICHOST | NI_NUMERICSERV);
if (rv != 0) { if (rv != 0) {
return "unknown"; return "unknown";
} }
@ -1667,6 +1670,40 @@ int daemonize(int nochdir, int noclose) {
#endif // !defined(__APPLE__) #endif // !defined(__APPLE__)
} }
int msghdr_get_local_addr(Address &dest, msghdr *msg, int family) {
switch (family) {
case AF_INET:
for (auto cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) {
auto pktinfo = reinterpret_cast<in_pktinfo *>(CMSG_DATA(cmsg));
dest.len = sizeof(dest.su.in);
auto &sa = dest.su.in;
sa.sin_family = AF_INET;
sa.sin_addr = pktinfo->ipi_addr;
return 0;
}
}
return -1;
case AF_INET6:
for (auto cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) {
auto pktinfo = reinterpret_cast<in6_pktinfo *>(CMSG_DATA(cmsg));
dest.len = sizeof(dest.su.in6);
auto &sa = dest.su.in6;
sa.sin6_family = AF_INET6;
sa.sin6_addr = pktinfo->ipi6_addr;
return 0;
}
}
return -1;
}
return -1;
}
} // namespace util } // namespace util
} // namespace nghttp2 } // namespace nghttp2

View File

@ -505,6 +505,8 @@ std::string numeric_name(const struct sockaddr *sa, socklen_t salen);
// IPv6 address, address is enclosed by square brackets ([]). // IPv6 address, address is enclosed by square brackets ([]).
std::string to_numeric_addr(const Address *addr); std::string to_numeric_addr(const Address *addr);
std::string to_numeric_addr(const struct sockaddr *sa, socklen_t salen);
// Sets |port| to |addr|. // Sets |port| to |addr|.
void set_port(Address &addr, uint16_t port); void set_port(Address &addr, uint16_t port);
@ -786,6 +788,8 @@ std::mt19937 make_mt19937();
// daemon() using fork(). // daemon() using fork().
int daemonize(int nochdir, int noclose); int daemonize(int nochdir, int noclose);
int msghdr_get_local_addr(Address &dest, msghdr *msg, int family);
} // namespace util } // namespace util
} // namespace nghttp2 } // namespace nghttp2