From b70fdca9acabfd7781e8a05b8b75b117cc284424 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Thu, 10 Feb 2022 19:35:50 +0900 Subject: [PATCH] h2load: Handle EAGAIN/EWOULDBLOCK from sendmsg --- src/h2load.cc | 9 ++++++ src/h2load.h | 13 +++++++++ src/h2load_quic.cc | 71 +++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 86 insertions(+), 7 deletions(-) diff --git a/src/h2load.cc b/src/h2load.cc index 561e58c2..fa7d9268 100644 --- a/src/h2load.cc +++ b/src/h2load.cc @@ -495,6 +495,10 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo) #ifdef ENABLE_HTTP3 ev_timer_init(&quic.pkt_timer, quic_pkt_timeout_cb, 0., 0.); quic.pkt_timer.data = this; + + if (config.is_quic()) { + quic.tx.data = std::make_unique(64_k); + } #endif // ENABLE_HTTP3 } @@ -1419,6 +1423,7 @@ int Client::write_tls() { } #ifdef ENABLE_HTTP3 +// Returns 1 if sendmsg is blocked. int Client::write_udp(const sockaddr *addr, socklen_t addrlen, const uint8_t *data, size_t datalen, size_t gso_size) { iovec msg_iov; @@ -1447,6 +1452,10 @@ int Client::write_udp(const sockaddr *addr, socklen_t addrlen, auto nwrite = sendmsg(fd, &msg, 0); if (nwrite < 0) { + if (nwrite == EAGAIN || nwrite == EWOULDBLOCK) { + return 1; + } + std::cerr << "sendto: errno=" << errno << std::endl; } else { ++worker->stats.udp_dgram_sent; diff --git a/src/h2load.h b/src/h2load.h index f1083a1d..60bcb3c4 100644 --- a/src/h2load.h +++ b/src/h2load.h @@ -342,6 +342,16 @@ struct Client { quic::Error last_error; bool close_requested; FILE *qlog_file; + + struct { + bool send_blocked; + struct { + Address remote_addr; + size_t datalen; + size_t max_udp_payload_size; + } blocked; + std::unique_ptr data; + } tx; } quic; #endif // ENABLE_HTTP3 ev_timer request_timeout_watcher; @@ -465,6 +475,9 @@ struct Client { int write_quic(); int write_udp(const sockaddr *addr, socklen_t addrlen, const uint8_t *data, size_t datalen, size_t gso_size); + void on_send_blocked(const ngtcp2_addr &remote_addr, size_t datalen, + size_t max_udp_payload_size); + int send_blocked_packet(); void quic_close_connection(); int quic_handshake_completed(); diff --git a/src/h2load_quic.cc b/src/h2load_quic.cc index d7b71159..86579496 100644 --- a/src/h2load_quic.cc +++ b/src/h2load_quic.cc @@ -684,12 +684,25 @@ int Client::read_quic() { } int Client::write_quic() { + int rv; + ev_io_stop(worker->loop, &wev); if (quic.close_requested) { return -1; } + if (quic.tx.send_blocked) { + rv = send_blocked_packet(); + if (rv != 0) { + return -1; + } + + if (quic.tx.send_blocked) { + return 0; + } + } + std::array vec; size_t pktcnt = 0; auto max_udp_payload_size = @@ -703,8 +716,7 @@ int Client::write_quic() { #else // !UDP_SEGMENT 1; #endif // !UDP_SEGMENT - std::array buf; - uint8_t *bufpos = buf.data(); + uint8_t *bufpos = quic.tx.data.get(); ngtcp2_path_storage ps; ngtcp2_path_storage_zero(&ps); @@ -767,9 +779,15 @@ int Client::write_quic() { quic_restart_pkt_timer(); if (nwrite == 0) { - if (bufpos - buf.data()) { - write_udp(ps.path.remote.addr, ps.path.remote.addrlen, buf.data(), - bufpos - buf.data(), max_udp_payload_size); + if (bufpos - quic.tx.data.get()) { + auto datalen = bufpos - quic.tx.data.get(); + rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen, + quic.tx.data.get(), datalen, max_udp_payload_size); + if (rv == 1) { + on_send_blocked(ps.path.remote, datalen, max_udp_payload_size); + signal_write(); + return 0; + } } return 0; } @@ -779,12 +797,51 @@ int Client::write_quic() { // Assume that the path does not change. if (++pktcnt == max_pktcnt || static_cast(nwrite) < max_udp_payload_size) { - write_udp(ps.path.remote.addr, ps.path.remote.addrlen, buf.data(), - bufpos - buf.data(), max_udp_payload_size); + auto datalen = bufpos - quic.tx.data.get(); + rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen, + quic.tx.data.get(), datalen, max_udp_payload_size); + if (rv == 1) { + on_send_blocked(ps.path.remote, datalen, max_udp_payload_size); + } signal_write(); return 0; } } } +void Client::on_send_blocked(const ngtcp2_addr &remote_addr, size_t datalen, + size_t max_udp_payload_size) { + assert(!quic.tx.send_blocked); + + quic.tx.send_blocked = true; + + auto &p = quic.tx.blocked; + + memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen); + + p.remote_addr.len = remote_addr.addrlen; + p.datalen = datalen; + p.max_udp_payload_size = max_udp_payload_size; +} + +int Client::send_blocked_packet() { + int rv; + + assert(quic.tx.send_blocked); + + auto &p = quic.tx.blocked; + + rv = write_udp(&p.remote_addr.su.sa, p.remote_addr.len, quic.tx.data.get(), + p.datalen, p.max_udp_payload_size); + if (rv == 1) { + signal_write(); + + return 0; + } + + quic.tx.send_blocked = false; + + return 0; +} + } // namespace h2load