nghttpx: Implement closing and draining state
This commit is contained in:
parent
72702a042e
commit
b743ee21f0
|
@ -546,7 +546,8 @@ ClientHandler::~ClientHandler() {
|
||||||
|
|
||||||
// TODO If backend is http/2, and it is in CONNECTED state, signal
|
// TODO If backend is http/2, and it is in CONNECTED state, signal
|
||||||
// it and make it loopbreak when output is zero.
|
// it and make it loopbreak when output is zero.
|
||||||
if (worker_->get_graceful_shutdown() && worker_stat->num_connections == 0) {
|
if (worker_->get_graceful_shutdown() && worker_stat->num_connections == 0 &&
|
||||||
|
worker_stat->num_close_waits == 0) {
|
||||||
ev_break(conn_.loop);
|
ev_break(conn_.loop);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -147,19 +147,6 @@ Http3Upstream::~Http3Upstream() {
|
||||||
nghttp3_conn_del(httpconn_);
|
nghttp3_conn_del(httpconn_);
|
||||||
|
|
||||||
if (conn_) {
|
if (conn_) {
|
||||||
auto worker = handler_->get_worker();
|
|
||||||
auto quic_client_handler = worker->get_quic_connection_handler();
|
|
||||||
|
|
||||||
quic_client_handler->remove_connection_id(
|
|
||||||
ngtcp2_conn_get_client_initial_dcid(conn_));
|
|
||||||
|
|
||||||
std::vector<ngtcp2_cid> scids(ngtcp2_conn_get_num_scid(conn_));
|
|
||||||
ngtcp2_conn_get_scid(conn_, scids.data());
|
|
||||||
|
|
||||||
for (auto &cid : scids) {
|
|
||||||
quic_client_handler->remove_connection_id(&cid);
|
|
||||||
}
|
|
||||||
|
|
||||||
ngtcp2_conn_del(conn_);
|
ngtcp2_conn_del(conn_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1275,33 +1262,66 @@ void Http3Upstream::on_handler_delete() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is not idle close, send APPLICATION_CLOSE since this
|
auto worker = handler_->get_worker();
|
||||||
// might come before idle close.
|
auto quic_conn_handler = worker->get_quic_connection_handler();
|
||||||
if (!idle_close_ && !ngtcp2_conn_is_in_closing_period(conn_) &&
|
|
||||||
|
quic_conn_handler->remove_connection_id(
|
||||||
|
ngtcp2_conn_get_client_initial_dcid(conn_));
|
||||||
|
|
||||||
|
std::vector<ngtcp2_cid> scids(ngtcp2_conn_get_num_scid(conn_));
|
||||||
|
ngtcp2_conn_get_scid(conn_, scids.data());
|
||||||
|
|
||||||
|
for (auto &cid : scids) {
|
||||||
|
quic_conn_handler->remove_connection_id(&cid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (idle_close_) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this is not idle close, send CONNECTION_CLOSE.
|
||||||
|
if (!ngtcp2_conn_is_in_closing_period(conn_) &&
|
||||||
!ngtcp2_conn_is_in_draining_period(conn_)) {
|
!ngtcp2_conn_is_in_draining_period(conn_)) {
|
||||||
ngtcp2_path_storage ps;
|
ngtcp2_path_storage ps;
|
||||||
ngtcp2_pkt_info pi;
|
ngtcp2_pkt_info pi;
|
||||||
std::array<uint8_t, NGTCP2_DEFAULT_MAX_PKTLEN> buf;
|
conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
|
||||||
|
|
||||||
ngtcp2_path_storage_zero(&ps);
|
ngtcp2_path_storage_zero(&ps);
|
||||||
|
|
||||||
auto nwrite = ngtcp2_conn_write_application_close(
|
auto nwrite = ngtcp2_conn_write_connection_close(
|
||||||
conn_, &ps.path, &pi, buf.data(), buf.size(), NGHTTP3_H3_NO_ERROR,
|
conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(),
|
||||||
quic_timestamp());
|
NGTCP2_NO_ERROR, quic_timestamp());
|
||||||
if (nwrite < 0) {
|
if (nwrite < 0) {
|
||||||
if (nwrite != NGTCP2_ERR_INVALID_STATE) {
|
if (nwrite != NGTCP2_ERR_INVALID_STATE) {
|
||||||
ULOG(ERROR, this) << "ngtcp2_conn_write_application_close: "
|
ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
|
||||||
<< ngtcp2_strerror(nwrite);
|
<< ngtcp2_strerror(nwrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
conn_close_.resize(nwrite);
|
||||||
|
|
||||||
quic_send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
|
quic_send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
|
||||||
ps.path.remote.addr, ps.path.remote.addrlen,
|
ps.path.remote.addr, ps.path.remote.addrlen,
|
||||||
ps.path.local.addr, ps.path.local.addrlen, buf.data(),
|
ps.path.local.addr, ps.path.local.addrlen,
|
||||||
nwrite, 0);
|
conn_close_.data(), nwrite, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto d =
|
||||||
|
static_cast<ev_tstamp>(ngtcp2_conn_get_pto(conn_) * 3) / NGTCP2_SECONDS;
|
||||||
|
|
||||||
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
ULOG(INFO, this) << "Enter close-wait period " << d << "s with "
|
||||||
|
<< conn_close_.size() << " bytes sentinel packet";
|
||||||
|
}
|
||||||
|
|
||||||
|
auto cw = std::make_unique<CloseWait>(worker, std::move(scids),
|
||||||
|
std::move(conn_close_), d);
|
||||||
|
|
||||||
|
quic_conn_handler->add_close_wait(cw.get());
|
||||||
|
|
||||||
|
cw.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
|
int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
|
||||||
|
@ -1569,12 +1589,14 @@ int Http3Upstream::handle_error() {
|
||||||
|
|
||||||
auto ts = quic_timestamp();
|
auto ts = quic_timestamp();
|
||||||
|
|
||||||
std::array<uint8_t, NGTCP2_DEFAULT_MAX_PKTLEN> buf;
|
conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
|
||||||
|
|
||||||
ngtcp2_ssize nwrite;
|
ngtcp2_ssize nwrite;
|
||||||
|
|
||||||
if (last_error_.type == quic::ErrorType::Transport) {
|
if (last_error_.type == quic::ErrorType::Transport) {
|
||||||
nwrite = ngtcp2_conn_write_connection_close(
|
nwrite = ngtcp2_conn_write_connection_close(
|
||||||
conn_, &ps.path, &pi, buf.data(), buf.size(), last_error_.code, ts);
|
conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(),
|
||||||
|
last_error_.code, ts);
|
||||||
if (nwrite < 0) {
|
if (nwrite < 0) {
|
||||||
ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
|
ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
|
||||||
<< ngtcp2_strerror(nwrite);
|
<< ngtcp2_strerror(nwrite);
|
||||||
|
@ -1582,7 +1604,8 @@ int Http3Upstream::handle_error() {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
nwrite = ngtcp2_conn_write_application_close(
|
nwrite = ngtcp2_conn_write_application_close(
|
||||||
conn_, &ps.path, &pi, buf.data(), buf.size(), last_error_.code, ts);
|
conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(),
|
||||||
|
last_error_.code, ts);
|
||||||
if (nwrite < 0) {
|
if (nwrite < 0) {
|
||||||
ULOG(ERROR, this) << "ngtcp2_conn_write_application_close: "
|
ULOG(ERROR, this) << "ngtcp2_conn_write_application_close: "
|
||||||
<< ngtcp2_strerror(nwrite);
|
<< ngtcp2_strerror(nwrite);
|
||||||
|
@ -1590,10 +1613,12 @@ int Http3Upstream::handle_error() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
conn_close_.resize(nwrite);
|
||||||
|
|
||||||
quic_send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
|
quic_send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
|
||||||
ps.path.remote.addr, ps.path.remote.addrlen,
|
ps.path.remote.addr, ps.path.remote.addrlen,
|
||||||
ps.path.local.addr, ps.path.local.addrlen, buf.data(),
|
ps.path.local.addr, ps.path.local.addrlen,
|
||||||
nwrite, 0);
|
conn_close_.data(), nwrite, 0);
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,6 +161,7 @@ private:
|
||||||
nghttp3_conn *httpconn_;
|
nghttp3_conn *httpconn_;
|
||||||
DownstreamQueue downstream_queue_;
|
DownstreamQueue downstream_queue_;
|
||||||
bool idle_close_;
|
bool idle_close_;
|
||||||
|
std::vector<uint8_t> conn_close_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
|
@ -41,6 +41,7 @@ constexpr size_t SHRPX_QUIC_MAX_UDP_PAYLOAD_SIZE = 1472;
|
||||||
constexpr size_t SHRPX_QUIC_STATELESS_RESET_SECRETLEN = 32;
|
constexpr size_t SHRPX_QUIC_STATELESS_RESET_SECRETLEN = 32;
|
||||||
constexpr size_t SHRPX_QUIC_TOKEN_SECRETLEN = 32;
|
constexpr size_t SHRPX_QUIC_TOKEN_SECRETLEN = 32;
|
||||||
constexpr size_t SHRPX_QUIC_TOKEN_RAND_DATALEN = 16;
|
constexpr size_t SHRPX_QUIC_TOKEN_RAND_DATALEN = 16;
|
||||||
|
constexpr size_t SHRPX_QUIC_CONN_CLOSE_PKTLEN = 256;
|
||||||
|
|
||||||
// SHRPX_QUIC_RETRY_TOKEN_MAGIC is the magic byte of Retry token.
|
// SHRPX_QUIC_RETRY_TOKEN_MAGIC is the magic byte of Retry token.
|
||||||
// Sent in plaintext.
|
// Sent in plaintext.
|
||||||
|
|
|
@ -103,6 +103,15 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto it = close_waits_.find(dcid_key);
|
||||||
|
if (it != std::end(close_waits_)) {
|
||||||
|
auto cw = (*it).second;
|
||||||
|
|
||||||
|
cw->handle_packet(faddr, remote_addr, local_addr, data, datalen);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// new connection
|
// new connection
|
||||||
|
|
||||||
auto &upstreamconf = config->conn.upstream;
|
auto &upstreamconf = config->conn.upstream;
|
||||||
|
@ -462,4 +471,89 @@ void QUICConnectionHandler::remove_connection_id(const ngtcp2_cid *cid) {
|
||||||
connections_.erase(key);
|
connections_.erase(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void QUICConnectionHandler::add_close_wait(CloseWait *cw) {
|
||||||
|
for (auto &cid : cw->scids) {
|
||||||
|
close_waits_.emplace(make_cid_key(&cid), cw);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void QUICConnectionHandler::remove_close_wait(const CloseWait *cw) {
|
||||||
|
for (auto &cid : cw->scids) {
|
||||||
|
close_waits_.erase(make_cid_key(&cid));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void close_wait_timeoutcb(struct ev_loop *loop, ev_timer *w,
|
||||||
|
int revents) {
|
||||||
|
auto cw = static_cast<CloseWait *>(w->data);
|
||||||
|
|
||||||
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
LOG(INFO) << "close-wait period finished";
|
||||||
|
}
|
||||||
|
|
||||||
|
auto quic_conn_handler = cw->worker->get_quic_connection_handler();
|
||||||
|
quic_conn_handler->remove_close_wait(cw);
|
||||||
|
|
||||||
|
delete cw;
|
||||||
|
}
|
||||||
|
|
||||||
|
CloseWait::CloseWait(Worker *worker, std::vector<ngtcp2_cid> scids,
|
||||||
|
std::vector<uint8_t> conn_close, ev_tstamp period)
|
||||||
|
: worker{worker},
|
||||||
|
scids{std::move(scids)},
|
||||||
|
conn_close{std::move(conn_close)},
|
||||||
|
bytes_recv{0},
|
||||||
|
bytes_sent{0},
|
||||||
|
num_pkts_recv{0},
|
||||||
|
next_pkts_recv{1} {
|
||||||
|
++worker->get_worker_stat()->num_close_waits;
|
||||||
|
|
||||||
|
ev_timer_init(&timer, close_wait_timeoutcb, period, 0.);
|
||||||
|
timer.data = this;
|
||||||
|
|
||||||
|
ev_timer_start(worker->get_loop(), &timer);
|
||||||
|
}
|
||||||
|
|
||||||
|
CloseWait::~CloseWait() {
|
||||||
|
auto loop = worker->get_loop();
|
||||||
|
|
||||||
|
ev_timer_stop(loop, &timer);
|
||||||
|
|
||||||
|
auto worker_stat = worker->get_worker_stat();
|
||||||
|
--worker_stat->num_close_waits;
|
||||||
|
|
||||||
|
if (worker->get_graceful_shutdown() && worker_stat->num_connections == 0 &&
|
||||||
|
worker_stat->num_close_waits) {
|
||||||
|
ev_break(loop);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int CloseWait::handle_packet(const UpstreamAddr *faddr,
|
||||||
|
const Address &remote_addr,
|
||||||
|
const Address &local_addr, const uint8_t *data,
|
||||||
|
size_t datalen) {
|
||||||
|
if (conn_close.empty()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
++num_pkts_recv;
|
||||||
|
bytes_recv += datalen;
|
||||||
|
|
||||||
|
if (bytes_sent + conn_close.size() > 3 * bytes_recv ||
|
||||||
|
next_pkts_recv > num_pkts_recv) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (quic_send_packet(faddr, &remote_addr.su.sa, remote_addr.len,
|
||||||
|
&local_addr.su.sa, local_addr.len, conn_close.data(),
|
||||||
|
conn_close.size(), 0) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
next_pkts_recv *= 2;
|
||||||
|
bytes_sent += conn_close.size();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
|
@ -30,9 +30,12 @@
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
#include <ngtcp2/ngtcp2.h>
|
#include <ngtcp2/ngtcp2.h>
|
||||||
|
|
||||||
|
#include <ev.h>
|
||||||
|
|
||||||
#include "network.h"
|
#include "network.h"
|
||||||
|
|
||||||
using namespace nghttp2;
|
using namespace nghttp2;
|
||||||
|
@ -43,6 +46,36 @@ struct UpstreamAddr;
|
||||||
class ClientHandler;
|
class ClientHandler;
|
||||||
class Worker;
|
class Worker;
|
||||||
|
|
||||||
|
// CloseWait handles packets received in close-wait (draining or
|
||||||
|
// closing period).
|
||||||
|
struct CloseWait {
|
||||||
|
CloseWait(Worker *worker, std::vector<ngtcp2_cid> scids,
|
||||||
|
std::vector<uint8_t> conn_close, ev_tstamp period);
|
||||||
|
~CloseWait();
|
||||||
|
|
||||||
|
int handle_packet(const UpstreamAddr *faddr, const Address &remote_addr,
|
||||||
|
const Address &local_addr, const uint8_t *data,
|
||||||
|
size_t datalen);
|
||||||
|
|
||||||
|
Worker *worker;
|
||||||
|
// Source Connection IDs of the connection.
|
||||||
|
std::vector<ngtcp2_cid> scids;
|
||||||
|
// QUIC packet containing CONNECTION_CLOSE. It is empty when a
|
||||||
|
// connection entered in draining state.
|
||||||
|
std::vector<uint8_t> conn_close;
|
||||||
|
// Close-wait (draining or closing period) timer.
|
||||||
|
ev_timer timer;
|
||||||
|
// The number of bytes received during close-wait period.
|
||||||
|
size_t bytes_recv;
|
||||||
|
// The number of bytes sent during close-wait period.
|
||||||
|
size_t bytes_sent;
|
||||||
|
// The number of packets received during close-wait period.
|
||||||
|
size_t num_pkts_recv;
|
||||||
|
// If the number of packets received reaches this number, send a
|
||||||
|
// QUIC packet.
|
||||||
|
size_t next_pkts_recv;
|
||||||
|
};
|
||||||
|
|
||||||
class QUICConnectionHandler {
|
class QUICConnectionHandler {
|
||||||
public:
|
public:
|
||||||
QUICConnectionHandler(Worker *worker);
|
QUICConnectionHandler(Worker *worker);
|
||||||
|
@ -89,9 +122,13 @@ public:
|
||||||
void add_connection_id(const ngtcp2_cid *cid, ClientHandler *handler);
|
void add_connection_id(const ngtcp2_cid *cid, ClientHandler *handler);
|
||||||
void remove_connection_id(const ngtcp2_cid *cid);
|
void remove_connection_id(const ngtcp2_cid *cid);
|
||||||
|
|
||||||
|
void add_close_wait(CloseWait *cw);
|
||||||
|
void remove_close_wait(const CloseWait *cw);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Worker *worker_;
|
Worker *worker_;
|
||||||
std::unordered_map<std::string, ClientHandler *> connections_;
|
std::unordered_map<std::string, ClientHandler *> connections_;
|
||||||
|
std::unordered_map<std::string, CloseWait *> close_waits_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
|
@ -518,7 +518,8 @@ void Worker::process_events() {
|
||||||
|
|
||||||
graceful_shutdown_ = true;
|
graceful_shutdown_ = true;
|
||||||
|
|
||||||
if (worker_stat_.num_connections == 0) {
|
if (worker_stat_.num_connections == 0 &&
|
||||||
|
worker_stat_.num_close_waits == 0) {
|
||||||
ev_break(loop_);
|
ev_break(loop_);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -251,6 +251,7 @@ struct DownstreamAddrGroup {
|
||||||
|
|
||||||
struct WorkerStat {
|
struct WorkerStat {
|
||||||
size_t num_connections;
|
size_t num_connections;
|
||||||
|
size_t num_close_waits;
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifdef ENABLE_HTTP3
|
#ifdef ENABLE_HTTP3
|
||||||
|
|
|
@ -123,7 +123,9 @@ void graceful_shutdown(ConnectionHandler *conn_handler) {
|
||||||
|
|
||||||
auto single_worker = conn_handler->get_single_worker();
|
auto single_worker = conn_handler->get_single_worker();
|
||||||
if (single_worker) {
|
if (single_worker) {
|
||||||
if (single_worker->get_worker_stat()->num_connections == 0) {
|
auto worker_stat = single_worker->get_worker_stat();
|
||||||
|
if (worker_stat->num_connections == 0 &&
|
||||||
|
worker_stat->num_close_waits == 0) {
|
||||||
ev_break(conn_handler->get_loop());
|
ev_break(conn_handler->get_loop());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue