h2load: Handle EAGAIN/EWOULDBLOCK from sendmsg

This commit is contained in:
Tatsuhiro Tsujikawa 2022-02-10 19:35:50 +09:00
parent bd3cc94a36
commit b70fdca9ac
3 changed files with 86 additions and 7 deletions

View File

@ -495,6 +495,10 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo)
#ifdef ENABLE_HTTP3
ev_timer_init(&quic.pkt_timer, quic_pkt_timeout_cb, 0., 0.);
quic.pkt_timer.data = this;
if (config.is_quic()) {
quic.tx.data = std::make_unique<uint8_t[]>(64_k);
}
#endif // ENABLE_HTTP3
}
@ -1419,6 +1423,7 @@ int Client::write_tls() {
}
#ifdef ENABLE_HTTP3
// Returns 1 if sendmsg is blocked.
int Client::write_udp(const sockaddr *addr, socklen_t addrlen,
const uint8_t *data, size_t datalen, size_t gso_size) {
iovec msg_iov;
@ -1447,6 +1452,10 @@ int Client::write_udp(const sockaddr *addr, socklen_t addrlen,
auto nwrite = sendmsg(fd, &msg, 0);
if (nwrite < 0) {
if (nwrite == EAGAIN || nwrite == EWOULDBLOCK) {
return 1;
}
std::cerr << "sendto: errno=" << errno << std::endl;
} else {
++worker->stats.udp_dgram_sent;

View File

@ -342,6 +342,16 @@ struct Client {
quic::Error last_error;
bool close_requested;
FILE *qlog_file;
struct {
bool send_blocked;
struct {
Address remote_addr;
size_t datalen;
size_t max_udp_payload_size;
} blocked;
std::unique_ptr<uint8_t[]> data;
} tx;
} quic;
#endif // ENABLE_HTTP3
ev_timer request_timeout_watcher;
@ -465,6 +475,9 @@ struct Client {
int write_quic();
int write_udp(const sockaddr *addr, socklen_t addrlen, const uint8_t *data,
size_t datalen, size_t gso_size);
void on_send_blocked(const ngtcp2_addr &remote_addr, size_t datalen,
size_t max_udp_payload_size);
int send_blocked_packet();
void quic_close_connection();
int quic_handshake_completed();

View File

@ -684,12 +684,25 @@ int Client::read_quic() {
}
int Client::write_quic() {
int rv;
ev_io_stop(worker->loop, &wev);
if (quic.close_requested) {
return -1;
}
if (quic.tx.send_blocked) {
rv = send_blocked_packet();
if (rv != 0) {
return -1;
}
if (quic.tx.send_blocked) {
return 0;
}
}
std::array<nghttp3_vec, 16> vec;
size_t pktcnt = 0;
auto max_udp_payload_size =
@ -703,8 +716,7 @@ int Client::write_quic() {
#else // !UDP_SEGMENT
1;
#endif // !UDP_SEGMENT
std::array<uint8_t, 64_k> buf;
uint8_t *bufpos = buf.data();
uint8_t *bufpos = quic.tx.data.get();
ngtcp2_path_storage ps;
ngtcp2_path_storage_zero(&ps);
@ -767,9 +779,15 @@ int Client::write_quic() {
quic_restart_pkt_timer();
if (nwrite == 0) {
if (bufpos - buf.data()) {
write_udp(ps.path.remote.addr, ps.path.remote.addrlen, buf.data(),
bufpos - buf.data(), max_udp_payload_size);
if (bufpos - quic.tx.data.get()) {
auto datalen = bufpos - quic.tx.data.get();
rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen,
quic.tx.data.get(), datalen, max_udp_payload_size);
if (rv == 1) {
on_send_blocked(ps.path.remote, datalen, max_udp_payload_size);
signal_write();
return 0;
}
}
return 0;
}
@ -779,12 +797,51 @@ int Client::write_quic() {
// Assume that the path does not change.
if (++pktcnt == max_pktcnt ||
static_cast<size_t>(nwrite) < max_udp_payload_size) {
write_udp(ps.path.remote.addr, ps.path.remote.addrlen, buf.data(),
bufpos - buf.data(), max_udp_payload_size);
auto datalen = bufpos - quic.tx.data.get();
rv = write_udp(ps.path.remote.addr, ps.path.remote.addrlen,
quic.tx.data.get(), datalen, max_udp_payload_size);
if (rv == 1) {
on_send_blocked(ps.path.remote, datalen, max_udp_payload_size);
}
signal_write();
return 0;
}
}
}
void Client::on_send_blocked(const ngtcp2_addr &remote_addr, size_t datalen,
size_t max_udp_payload_size) {
assert(!quic.tx.send_blocked);
quic.tx.send_blocked = true;
auto &p = quic.tx.blocked;
memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
p.remote_addr.len = remote_addr.addrlen;
p.datalen = datalen;
p.max_udp_payload_size = max_udp_payload_size;
}
int Client::send_blocked_packet() {
int rv;
assert(quic.tx.send_blocked);
auto &p = quic.tx.blocked;
rv = write_udp(&p.remote_addr.su.sa, p.remote_addr.len, quic.tx.data.get(),
p.datalen, p.max_udp_payload_size);
if (rv == 1) {
signal_write();
return 0;
}
quic.tx.send_blocked = false;
return 0;
}
} // namespace h2load