diff --git a/src/shrpx_error.h b/src/shrpx_error.h index eacbcf7c..c89611f1 100644 --- a/src/shrpx_error.h +++ b/src/shrpx_error.h @@ -39,6 +39,7 @@ enum ErrorCode { SHRPX_ERR_DCONN_CANCELED = -103, SHRPX_ERR_RETRY = -104, SHRPX_ERR_TLS_REQUIRED = -105, + SHRPX_ERR_SEND_BLOCKED = -106, }; } // namespace shrpx diff --git a/src/shrpx_http3_upstream.cc b/src/shrpx_http3_upstream.cc index a2906861..4551dfed 100644 --- a/src/shrpx_http3_upstream.cc +++ b/src/shrpx_http3_upstream.cc @@ -127,7 +127,10 @@ Http3Upstream::Http3Upstream(ClientHandler *handler) downstream_queue_{downstream_queue_size(handler->get_worker()), !get_config()->http2_proxy}, idle_close_{false}, - retry_close_{false} { + retry_close_{false}, + tx_{ + .data = std::unique_ptr(new uint8_t[64_k]), + } { ev_timer_init(&timer_, timeoutcb, 0., 0.); timer_.data = this; @@ -700,6 +703,19 @@ int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr, int Http3Upstream::on_read() { return 0; } int Http3Upstream::on_write() { + int rv; + + if (tx_.send_blocked) { + rv = send_blocked_packet(); + if (rv != 0) { + return -1; + } + + if (tx_.send_blocked) { + return 0; + } + } + if (write_streams() != 0) { return -1; } @@ -711,14 +727,13 @@ int Http3Upstream::on_write() { int Http3Upstream::write_streams() { std::array vec; - std::array buf; auto max_udp_payload_size = std::min( max_udp_payload_size_, ngtcp2_conn_get_path_max_udp_payload_size(conn_)); size_t max_pktcnt = std::min(static_cast(64_k), ngtcp2_conn_get_send_quantum(conn_)) / max_udp_payload_size; ngtcp2_pkt_info pi, prev_pi; - uint8_t *bufpos = buf.data(); + uint8_t *bufpos = tx_.data.get(); ngtcp2_path_storage ps, prev_ps; size_t pktcnt = 0; int rv; @@ -803,8 +818,6 @@ int Http3Upstream::write_streams() { last_error_ = quic::err_transport(nwrite); - handler_->get_connection()->wlimit.stopw(); - return handle_error(); } else if (ndatalen >= 0) { rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen); @@ -817,12 +830,26 @@ int Http3Upstream::write_streams() { } if (nwrite == 0) { - if (bufpos - buf.data()) { - send_packet(static_cast(prev_ps.path.user_data), - prev_ps.path.remote.addr, prev_ps.path.remote.addrlen, - prev_ps.path.local.addr, prev_ps.path.local.addrlen, - prev_pi, buf.data(), bufpos - buf.data(), - max_udp_payload_size); + if (bufpos - tx_.data.get()) { + auto faddr = static_cast(prev_ps.path.user_data); + auto data = tx_.data.get(); + auto datalen = bufpos - data; + + rv = send_packet(faddr, prev_ps.path.remote.addr, + prev_ps.path.remote.addrlen, prev_ps.path.local.addr, + prev_ps.path.local.addrlen, prev_pi, data, datalen, + max_udp_payload_size); + if (rv == SHRPX_ERR_SEND_BLOCKED) { + on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, + prev_pi, data, datalen, max_udp_payload_size); + + ngtcp2_conn_update_pkt_tx_time(conn_, ts); + reset_idle_timer(); + + signal_write_upstream_addr(faddr); + + return 0; + } reset_idle_timer(); } @@ -842,54 +869,99 @@ int Http3Upstream::write_streams() { prev_pi = pi; } else if (!ngtcp2_path_eq(&prev_ps.path, &ps.path) || prev_pi.ecn != pi.ecn) { - send_packet(static_cast(prev_ps.path.user_data), - prev_ps.path.remote.addr, prev_ps.path.remote.addrlen, - prev_ps.path.local.addr, prev_ps.path.local.addrlen, prev_pi, - buf.data(), bufpos - buf.data() - nwrite, - max_udp_payload_size); + auto faddr = static_cast(prev_ps.path.user_data); + auto data = tx_.data.get(); + auto datalen = bufpos - data - nwrite; - send_packet(static_cast(ps.path.user_data), - ps.path.remote.addr, ps.path.remote.addrlen, - ps.path.local.addr, ps.path.local.addrlen, pi, - bufpos - nwrite, nwrite, max_udp_payload_size); + rv = send_packet(faddr, prev_ps.path.remote.addr, + prev_ps.path.remote.addrlen, prev_ps.path.local.addr, + prev_ps.path.local.addrlen, prev_pi, data, datalen, + max_udp_payload_size); + switch (rv) { + case SHRPX_ERR_SEND_BLOCKED: + on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi, + data, datalen, max_udp_payload_size); + + on_send_blocked(static_cast(ps.path.user_data), + ps.path.remote, ps.path.local, pi, bufpos - nwrite, + nwrite, max_udp_payload_size); + + signal_write_upstream_addr(faddr); + + break; + default: { + auto faddr = static_cast(ps.path.user_data); + auto data = bufpos - nwrite; + + rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen, + ps.path.local.addr, ps.path.local.addrlen, pi, data, + nwrite, max_udp_payload_size); + if (rv == SHRPX_ERR_SEND_BLOCKED) { + on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, + nwrite, max_udp_payload_size); + } + + signal_write_upstream_addr(faddr); + } + } ngtcp2_conn_update_pkt_tx_time(conn_, ts); reset_idle_timer(); - handler_->signal_write(); - return 0; } if (++pktcnt == max_pktcnt || static_cast(nwrite) < max_udp_payload_size) { - send_packet(static_cast(ps.path.user_data), - ps.path.remote.addr, ps.path.remote.addrlen, - ps.path.local.addr, ps.path.local.addrlen, pi, buf.data(), - bufpos - buf.data(), max_udp_payload_size); + auto faddr = static_cast(ps.path.user_data); + auto data = tx_.data.get(); + auto datalen = bufpos - data; + + rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen, + ps.path.local.addr, ps.path.local.addrlen, pi, data, + datalen, max_udp_payload_size); + if (rv == SHRPX_ERR_SEND_BLOCKED) { + on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen, + max_udp_payload_size); + } ngtcp2_conn_update_pkt_tx_time(conn_, ts); reset_idle_timer(); - handler_->signal_write(); + signal_write_upstream_addr(faddr); return 0; } #else // !UDP_SEGMENT - send_packet(static_cast(ps.path.user_data), - ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr, - ps.path.local.addrlen, pi, buf.data(), bufpos - buf.data(), 0); + auto faddr = static_cast(ps.path.user_data); + auto data = tx_.data.get(); + auto datalen = bufpos - data; + + rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen, + ps.path.local.addr, ps.path.local.addrlen, pi, data, + datalen, 0); + if (rv == SHRPX_ERR_SEND_BLOCKED) { + on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen, + 0); + + ngtcp2_conn_update_pkt_tx_time(conn_, ts); + reset_idle_timer(); + + signal_write_upstream_addr(faddr); + + return 0; + } if (++pktcnt == max_pktcnt) { ngtcp2_conn_update_pkt_tx_time(conn_, ts); reset_idle_timer(); - handler_->signal_write(); + signal_write_upstream_addr(faddr); return 0; } - bufpos = buf.data(); + bufpos = tx_.data.get(); #endif // !UDP_SEGMENT } @@ -1760,6 +1832,11 @@ int Http3Upstream::send_packet(const UpstreamAddr *faddr, case -EMSGSIZE: max_udp_payload_size_ = NGTCP2_MAX_UDP_PAYLOAD_SIZE; break; + case -EAGAIN: +#if EAGAIN != EWOULDBLOCK + case -EWOULDBLOCK: +#endif // EAGAIN != EWOULDBLOCK + return SHRPX_ERR_SEND_BLOCKED; default: break; } @@ -1767,6 +1844,70 @@ int Http3Upstream::send_packet(const UpstreamAddr *faddr, return -1; } +void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr, + const ngtcp2_addr &remote_addr, + const ngtcp2_addr &local_addr, + const ngtcp2_pkt_info &pi, + const uint8_t *data, size_t datalen, + size_t max_udp_payload_size) { + assert(tx_.num_blocked || !tx_.send_blocked); + assert(tx_.num_blocked < 2); + + tx_.send_blocked = true; + + auto &p = tx_.blocked[tx_.num_blocked++]; + + memcpy(&p.local_addr.su, local_addr.addr, local_addr.addrlen); + memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen); + + p.local_addr.len = local_addr.addrlen; + p.remote_addr.len = remote_addr.addrlen; + p.faddr = faddr; + p.pi = pi; + p.data = data; + p.datalen = datalen; + p.max_udp_payload_size = max_udp_payload_size; +} + +int Http3Upstream::send_blocked_packet() { + int rv; + + assert(tx_.send_blocked); + + for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) { + auto &p = tx_.blocked[tx_.num_blocked_sent]; + + rv = send_packet(p.faddr, &p.remote_addr.su.sa, p.remote_addr.len, + &p.local_addr.su.sa, p.local_addr.len, p.pi, p.data, + p.datalen, p.max_udp_payload_size); + if (rv == SHRPX_ERR_SEND_BLOCKED) { + signal_write_upstream_addr(p.faddr); + + return 0; + } + } + + tx_.send_blocked = false; + tx_.num_blocked = 0; + tx_.num_blocked_sent = 0; + + return 0; +} + +void Http3Upstream::signal_write_upstream_addr(const UpstreamAddr *faddr) { + auto conn = handler_->get_connection(); + + if (faddr->fd != conn->wev.fd) { + if (ev_is_active(&conn->wev)) { + ev_io_stop(handler_->get_loop(), &conn->wev); + } + + ev_io_set(&conn->wev, faddr->fd, EV_WRITE); + } + + conn->wlimit.startw(); +} + int Http3Upstream::handle_error() { if (ngtcp2_conn_is_in_closing_period(conn_)) { return -1; diff --git a/src/shrpx_http3_upstream.h b/src/shrpx_http3_upstream.h index e8c8dee4..3759ede2 100644 --- a/src/shrpx_http3_upstream.h +++ b/src/shrpx_http3_upstream.h @@ -157,6 +157,14 @@ public: void qlog_write(const void *data, size_t datalen, bool fin); int open_qlog_file(const StringRef &dir, const ngtcp2_cid &scid) const; + void on_send_blocked(const UpstreamAddr *faddr, + const ngtcp2_addr &remote_addr, + const ngtcp2_addr &local_addr, const ngtcp2_pkt_info &pi, + const uint8_t *data, size_t datalen, + size_t max_udp_payload_size); + int send_blocked_packet(); + void signal_write_upstream_addr(const UpstreamAddr *faddr); + private: ClientHandler *handler_; ev_timer timer_; @@ -174,6 +182,23 @@ private: bool idle_close_; bool retry_close_; std::vector conn_close_; + + struct { + bool send_blocked; + size_t num_blocked; + size_t num_blocked_sent; + // blocked field is effective only when send_blocked is true. + struct { + const UpstreamAddr *faddr; + Address local_addr; + Address remote_addr; + ngtcp2_pkt_info pi; + const uint8_t *data; + size_t datalen; + size_t max_udp_payload_size; + } blocked[2]; + std::unique_ptr data; + } tx_; }; } // namespace shrpx