diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index c9327397..234c2b8c 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -546,7 +546,8 @@ ClientHandler::~ClientHandler() { // TODO If backend is http/2, and it is in CONNECTED state, signal // it and make it loopbreak when output is zero. - if (worker_->get_graceful_shutdown() && worker_stat->num_connections == 0) { + if (worker_->get_graceful_shutdown() && worker_stat->num_connections == 0 && + worker_stat->num_close_waits == 0) { ev_break(conn_.loop); } diff --git a/src/shrpx_http3_upstream.cc b/src/shrpx_http3_upstream.cc index 81f48a34..762a5a69 100644 --- a/src/shrpx_http3_upstream.cc +++ b/src/shrpx_http3_upstream.cc @@ -147,19 +147,6 @@ Http3Upstream::~Http3Upstream() { nghttp3_conn_del(httpconn_); if (conn_) { - auto worker = handler_->get_worker(); - auto quic_client_handler = worker->get_quic_connection_handler(); - - quic_client_handler->remove_connection_id( - ngtcp2_conn_get_client_initial_dcid(conn_)); - - std::vector scids(ngtcp2_conn_get_num_scid(conn_)); - ngtcp2_conn_get_scid(conn_, scids.data()); - - for (auto &cid : scids) { - quic_client_handler->remove_connection_id(&cid); - } - ngtcp2_conn_del(conn_); } } @@ -1275,33 +1262,66 @@ void Http3Upstream::on_handler_delete() { } } - // If this is not idle close, send APPLICATION_CLOSE since this - // might come before idle close. - if (!idle_close_ && !ngtcp2_conn_is_in_closing_period(conn_) && + auto worker = handler_->get_worker(); + auto quic_conn_handler = worker->get_quic_connection_handler(); + + quic_conn_handler->remove_connection_id( + ngtcp2_conn_get_client_initial_dcid(conn_)); + + std::vector scids(ngtcp2_conn_get_num_scid(conn_)); + ngtcp2_conn_get_scid(conn_, scids.data()); + + for (auto &cid : scids) { + quic_conn_handler->remove_connection_id(&cid); + } + + if (idle_close_) { + return; + } + + // If this is not idle close, send CONNECTION_CLOSE. + if (!ngtcp2_conn_is_in_closing_period(conn_) && !ngtcp2_conn_is_in_draining_period(conn_)) { ngtcp2_path_storage ps; ngtcp2_pkt_info pi; - std::array buf; + conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN); ngtcp2_path_storage_zero(&ps); - auto nwrite = ngtcp2_conn_write_application_close( - conn_, &ps.path, &pi, buf.data(), buf.size(), NGHTTP3_H3_NO_ERROR, - quic_timestamp()); + auto nwrite = ngtcp2_conn_write_connection_close( + conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(), + NGTCP2_NO_ERROR, quic_timestamp()); if (nwrite < 0) { if (nwrite != NGTCP2_ERR_INVALID_STATE) { - ULOG(ERROR, this) << "ngtcp2_conn_write_application_close: " + ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: " << ngtcp2_strerror(nwrite); } return; } + conn_close_.resize(nwrite); + quic_send_packet(static_cast(ps.path.user_data), ps.path.remote.addr, ps.path.remote.addrlen, - ps.path.local.addr, ps.path.local.addrlen, buf.data(), - nwrite, 0); + ps.path.local.addr, ps.path.local.addrlen, + conn_close_.data(), nwrite, 0); } + + auto d = + static_cast(ngtcp2_conn_get_pto(conn_) * 3) / NGTCP2_SECONDS; + + if (LOG_ENABLED(INFO)) { + ULOG(INFO, this) << "Enter close-wait period " << d << "s with " + << conn_close_.size() << " bytes sentinel packet"; + } + + auto cw = std::make_unique(worker, std::move(scids), + std::move(conn_close_), d); + + quic_conn_handler->add_close_wait(cw.get()); + + cw.release(); } int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) { @@ -1569,12 +1589,14 @@ int Http3Upstream::handle_error() { auto ts = quic_timestamp(); - std::array buf; + conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN); + ngtcp2_ssize nwrite; if (last_error_.type == quic::ErrorType::Transport) { nwrite = ngtcp2_conn_write_connection_close( - conn_, &ps.path, &pi, buf.data(), buf.size(), last_error_.code, ts); + conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(), + last_error_.code, ts); if (nwrite < 0) { ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: " << ngtcp2_strerror(nwrite); @@ -1582,7 +1604,8 @@ int Http3Upstream::handle_error() { } } else { nwrite = ngtcp2_conn_write_application_close( - conn_, &ps.path, &pi, buf.data(), buf.size(), last_error_.code, ts); + conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(), + last_error_.code, ts); if (nwrite < 0) { ULOG(ERROR, this) << "ngtcp2_conn_write_application_close: " << ngtcp2_strerror(nwrite); @@ -1590,10 +1613,12 @@ int Http3Upstream::handle_error() { } } + conn_close_.resize(nwrite); + quic_send_packet(static_cast(ps.path.user_data), ps.path.remote.addr, ps.path.remote.addrlen, - ps.path.local.addr, ps.path.local.addrlen, buf.data(), - nwrite, 0); + ps.path.local.addr, ps.path.local.addrlen, + conn_close_.data(), nwrite, 0); return -1; } diff --git a/src/shrpx_http3_upstream.h b/src/shrpx_http3_upstream.h index c2675e4e..db907eee 100644 --- a/src/shrpx_http3_upstream.h +++ b/src/shrpx_http3_upstream.h @@ -161,6 +161,7 @@ private: nghttp3_conn *httpconn_; DownstreamQueue downstream_queue_; bool idle_close_; + std::vector conn_close_; }; } // namespace shrpx diff --git a/src/shrpx_quic.h b/src/shrpx_quic.h index a03d260e..94483194 100644 --- a/src/shrpx_quic.h +++ b/src/shrpx_quic.h @@ -41,6 +41,7 @@ constexpr size_t SHRPX_QUIC_MAX_UDP_PAYLOAD_SIZE = 1472; constexpr size_t SHRPX_QUIC_STATELESS_RESET_SECRETLEN = 32; constexpr size_t SHRPX_QUIC_TOKEN_SECRETLEN = 32; constexpr size_t SHRPX_QUIC_TOKEN_RAND_DATALEN = 16; +constexpr size_t SHRPX_QUIC_CONN_CLOSE_PKTLEN = 256; // SHRPX_QUIC_RETRY_TOKEN_MAGIC is the magic byte of Retry token. // Sent in plaintext. diff --git a/src/shrpx_quic_connection_handler.cc b/src/shrpx_quic_connection_handler.cc index dc2bd5f0..f583738d 100644 --- a/src/shrpx_quic_connection_handler.cc +++ b/src/shrpx_quic_connection_handler.cc @@ -103,6 +103,15 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr, } } + auto it = close_waits_.find(dcid_key); + if (it != std::end(close_waits_)) { + auto cw = (*it).second; + + cw->handle_packet(faddr, remote_addr, local_addr, data, datalen); + + return 0; + } + // new connection auto &upstreamconf = config->conn.upstream; @@ -462,4 +471,89 @@ void QUICConnectionHandler::remove_connection_id(const ngtcp2_cid *cid) { connections_.erase(key); } +void QUICConnectionHandler::add_close_wait(CloseWait *cw) { + for (auto &cid : cw->scids) { + close_waits_.emplace(make_cid_key(&cid), cw); + } +} + +void QUICConnectionHandler::remove_close_wait(const CloseWait *cw) { + for (auto &cid : cw->scids) { + close_waits_.erase(make_cid_key(&cid)); + } +} + +static void close_wait_timeoutcb(struct ev_loop *loop, ev_timer *w, + int revents) { + auto cw = static_cast(w->data); + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "close-wait period finished"; + } + + auto quic_conn_handler = cw->worker->get_quic_connection_handler(); + quic_conn_handler->remove_close_wait(cw); + + delete cw; +} + +CloseWait::CloseWait(Worker *worker, std::vector scids, + std::vector conn_close, ev_tstamp period) + : worker{worker}, + scids{std::move(scids)}, + conn_close{std::move(conn_close)}, + bytes_recv{0}, + bytes_sent{0}, + num_pkts_recv{0}, + next_pkts_recv{1} { + ++worker->get_worker_stat()->num_close_waits; + + ev_timer_init(&timer, close_wait_timeoutcb, period, 0.); + timer.data = this; + + ev_timer_start(worker->get_loop(), &timer); +} + +CloseWait::~CloseWait() { + auto loop = worker->get_loop(); + + ev_timer_stop(loop, &timer); + + auto worker_stat = worker->get_worker_stat(); + --worker_stat->num_close_waits; + + if (worker->get_graceful_shutdown() && worker_stat->num_connections == 0 && + worker_stat->num_close_waits) { + ev_break(loop); + } +} + +int CloseWait::handle_packet(const UpstreamAddr *faddr, + const Address &remote_addr, + const Address &local_addr, const uint8_t *data, + size_t datalen) { + if (conn_close.empty()) { + return 0; + } + + ++num_pkts_recv; + bytes_recv += datalen; + + if (bytes_sent + conn_close.size() > 3 * bytes_recv || + next_pkts_recv > num_pkts_recv) { + return 0; + } + + if (quic_send_packet(faddr, &remote_addr.su.sa, remote_addr.len, + &local_addr.su.sa, local_addr.len, conn_close.data(), + conn_close.size(), 0) != 0) { + return -1; + } + + next_pkts_recv *= 2; + bytes_sent += conn_close.size(); + + return 0; +} + } // namespace shrpx diff --git a/src/shrpx_quic_connection_handler.h b/src/shrpx_quic_connection_handler.h index f045a839..b1888ef7 100644 --- a/src/shrpx_quic_connection_handler.h +++ b/src/shrpx_quic_connection_handler.h @@ -30,9 +30,12 @@ #include #include #include +#include #include +#include + #include "network.h" using namespace nghttp2; @@ -43,6 +46,36 @@ struct UpstreamAddr; class ClientHandler; class Worker; +// CloseWait handles packets received in close-wait (draining or +// closing period). +struct CloseWait { + CloseWait(Worker *worker, std::vector scids, + std::vector conn_close, ev_tstamp period); + ~CloseWait(); + + int handle_packet(const UpstreamAddr *faddr, const Address &remote_addr, + const Address &local_addr, const uint8_t *data, + size_t datalen); + + Worker *worker; + // Source Connection IDs of the connection. + std::vector scids; + // QUIC packet containing CONNECTION_CLOSE. It is empty when a + // connection entered in draining state. + std::vector conn_close; + // Close-wait (draining or closing period) timer. + ev_timer timer; + // The number of bytes received during close-wait period. + size_t bytes_recv; + // The number of bytes sent during close-wait period. + size_t bytes_sent; + // The number of packets received during close-wait period. + size_t num_pkts_recv; + // If the number of packets received reaches this number, send a + // QUIC packet. + size_t next_pkts_recv; +}; + class QUICConnectionHandler { public: QUICConnectionHandler(Worker *worker); @@ -89,9 +122,13 @@ public: void add_connection_id(const ngtcp2_cid *cid, ClientHandler *handler); void remove_connection_id(const ngtcp2_cid *cid); + void add_close_wait(CloseWait *cw); + void remove_close_wait(const CloseWait *cw); + private: Worker *worker_; std::unordered_map connections_; + std::unordered_map close_waits_; }; } // namespace shrpx diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 89e324d8..e128badb 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -518,7 +518,8 @@ void Worker::process_events() { graceful_shutdown_ = true; - if (worker_stat_.num_connections == 0) { + if (worker_stat_.num_connections == 0 && + worker_stat_.num_close_waits == 0) { ev_break(loop_); return; diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index 3d58fbe3..de053100 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -251,6 +251,7 @@ struct DownstreamAddrGroup { struct WorkerStat { size_t num_connections; + size_t num_close_waits; }; #ifdef ENABLE_HTTP3 diff --git a/src/shrpx_worker_process.cc b/src/shrpx_worker_process.cc index 9fc0681e..7bcdecd1 100644 --- a/src/shrpx_worker_process.cc +++ b/src/shrpx_worker_process.cc @@ -123,7 +123,9 @@ void graceful_shutdown(ConnectionHandler *conn_handler) { auto single_worker = conn_handler->get_single_worker(); if (single_worker) { - if (single_worker->get_worker_stat()->num_connections == 0) { + auto worker_stat = single_worker->get_worker_stat(); + if (worker_stat->num_connections == 0 && + worker_stat->num_close_waits == 0) { ev_break(conn_handler->get_loop()); }