nghttpx: Handle EAGAIN/EWOULDBLOCK from sendmsg

This commit is contained in:
Tatsuhiro Tsujikawa 2022-02-03 19:51:32 +09:00
parent 7ca255ff54
commit 01bcc72f66
3 changed files with 199 additions and 32 deletions

View File

@ -39,6 +39,7 @@ enum ErrorCode {
SHRPX_ERR_DCONN_CANCELED = -103,
SHRPX_ERR_RETRY = -104,
SHRPX_ERR_TLS_REQUIRED = -105,
SHRPX_ERR_SEND_BLOCKED = -106,
};
} // namespace shrpx

View File

@ -127,7 +127,10 @@ Http3Upstream::Http3Upstream(ClientHandler *handler)
downstream_queue_{downstream_queue_size(handler->get_worker()),
!get_config()->http2_proxy},
idle_close_{false},
retry_close_{false} {
retry_close_{false},
tx_{
.data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]),
} {
ev_timer_init(&timer_, timeoutcb, 0., 0.);
timer_.data = this;
@ -700,6 +703,19 @@ int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr,
int Http3Upstream::on_read() { return 0; }
int Http3Upstream::on_write() {
int rv;
if (tx_.send_blocked) {
rv = send_blocked_packet();
if (rv != 0) {
return -1;
}
if (tx_.send_blocked) {
return 0;
}
}
if (write_streams() != 0) {
return -1;
}
@ -711,14 +727,13 @@ int Http3Upstream::on_write() {
int Http3Upstream::write_streams() {
std::array<nghttp3_vec, 16> vec;
std::array<uint8_t, 64_k> buf;
auto max_udp_payload_size = std::min(
max_udp_payload_size_, ngtcp2_conn_get_path_max_udp_payload_size(conn_));
size_t max_pktcnt =
std::min(static_cast<size_t>(64_k), ngtcp2_conn_get_send_quantum(conn_)) /
max_udp_payload_size;
ngtcp2_pkt_info pi, prev_pi;
uint8_t *bufpos = buf.data();
uint8_t *bufpos = tx_.data.get();
ngtcp2_path_storage ps, prev_ps;
size_t pktcnt = 0;
int rv;
@ -803,8 +818,6 @@ int Http3Upstream::write_streams() {
last_error_ = quic::err_transport(nwrite);
handler_->get_connection()->wlimit.stopw();
return handle_error();
} else if (ndatalen >= 0) {
rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
@ -817,12 +830,26 @@ int Http3Upstream::write_streams() {
}
if (nwrite == 0) {
if (bufpos - buf.data()) {
send_packet(static_cast<UpstreamAddr *>(prev_ps.path.user_data),
prev_ps.path.remote.addr, prev_ps.path.remote.addrlen,
prev_ps.path.local.addr, prev_ps.path.local.addrlen,
prev_pi, buf.data(), bufpos - buf.data(),
max_udp_payload_size);
if (bufpos - tx_.data.get()) {
auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
auto data = tx_.data.get();
auto datalen = bufpos - data;
rv = send_packet(faddr, prev_ps.path.remote.addr,
prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
prev_ps.path.local.addrlen, prev_pi, data, datalen,
max_udp_payload_size);
if (rv == SHRPX_ERR_SEND_BLOCKED) {
on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local,
prev_pi, data, datalen, max_udp_payload_size);
ngtcp2_conn_update_pkt_tx_time(conn_, ts);
reset_idle_timer();
signal_write_upstream_addr(faddr);
return 0;
}
reset_idle_timer();
}
@ -842,54 +869,99 @@ int Http3Upstream::write_streams() {
prev_pi = pi;
} else if (!ngtcp2_path_eq(&prev_ps.path, &ps.path) ||
prev_pi.ecn != pi.ecn) {
send_packet(static_cast<UpstreamAddr *>(prev_ps.path.user_data),
prev_ps.path.remote.addr, prev_ps.path.remote.addrlen,
prev_ps.path.local.addr, prev_ps.path.local.addrlen, prev_pi,
buf.data(), bufpos - buf.data() - nwrite,
max_udp_payload_size);
auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
auto data = tx_.data.get();
auto datalen = bufpos - data - nwrite;
send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
ps.path.remote.addr, ps.path.remote.addrlen,
ps.path.local.addr, ps.path.local.addrlen, pi,
bufpos - nwrite, nwrite, max_udp_payload_size);
rv = send_packet(faddr, prev_ps.path.remote.addr,
prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
prev_ps.path.local.addrlen, prev_pi, data, datalen,
max_udp_payload_size);
switch (rv) {
case SHRPX_ERR_SEND_BLOCKED:
on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi,
data, datalen, max_udp_payload_size);
on_send_blocked(static_cast<UpstreamAddr *>(ps.path.user_data),
ps.path.remote, ps.path.local, pi, bufpos - nwrite,
nwrite, max_udp_payload_size);
signal_write_upstream_addr(faddr);
break;
default: {
auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
auto data = bufpos - nwrite;
rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
ps.path.local.addr, ps.path.local.addrlen, pi, data,
nwrite, max_udp_payload_size);
if (rv == SHRPX_ERR_SEND_BLOCKED) {
on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data,
nwrite, max_udp_payload_size);
}
signal_write_upstream_addr(faddr);
}
}
ngtcp2_conn_update_pkt_tx_time(conn_, ts);
reset_idle_timer();
handler_->signal_write();
return 0;
}
if (++pktcnt == max_pktcnt ||
static_cast<size_t>(nwrite) < max_udp_payload_size) {
send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
ps.path.remote.addr, ps.path.remote.addrlen,
ps.path.local.addr, ps.path.local.addrlen, pi, buf.data(),
bufpos - buf.data(), max_udp_payload_size);
auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
auto data = tx_.data.get();
auto datalen = bufpos - data;
rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
ps.path.local.addr, ps.path.local.addrlen, pi, data,
datalen, max_udp_payload_size);
if (rv == SHRPX_ERR_SEND_BLOCKED) {
on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
max_udp_payload_size);
}
ngtcp2_conn_update_pkt_tx_time(conn_, ts);
reset_idle_timer();
handler_->signal_write();
signal_write_upstream_addr(faddr);
return 0;
}
#else // !UDP_SEGMENT
send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
ps.path.local.addrlen, pi, buf.data(), bufpos - buf.data(), 0);
auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
auto data = tx_.data.get();
auto datalen = bufpos - data;
rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
ps.path.local.addr, ps.path.local.addrlen, pi, data,
datalen, 0);
if (rv == SHRPX_ERR_SEND_BLOCKED) {
on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
0);
ngtcp2_conn_update_pkt_tx_time(conn_, ts);
reset_idle_timer();
signal_write_upstream_addr(faddr);
return 0;
}
if (++pktcnt == max_pktcnt) {
ngtcp2_conn_update_pkt_tx_time(conn_, ts);
reset_idle_timer();
handler_->signal_write();
signal_write_upstream_addr(faddr);
return 0;
}
bufpos = buf.data();
bufpos = tx_.data.get();
#endif // !UDP_SEGMENT
}
@ -1760,6 +1832,11 @@ int Http3Upstream::send_packet(const UpstreamAddr *faddr,
case -EMSGSIZE:
max_udp_payload_size_ = NGTCP2_MAX_UDP_PAYLOAD_SIZE;
break;
case -EAGAIN:
#if EAGAIN != EWOULDBLOCK
case -EWOULDBLOCK:
#endif // EAGAIN != EWOULDBLOCK
return SHRPX_ERR_SEND_BLOCKED;
default:
break;
}
@ -1767,6 +1844,70 @@ int Http3Upstream::send_packet(const UpstreamAddr *faddr,
return -1;
}
void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr,
const ngtcp2_addr &remote_addr,
const ngtcp2_addr &local_addr,
const ngtcp2_pkt_info &pi,
const uint8_t *data, size_t datalen,
size_t max_udp_payload_size) {
assert(tx_.num_blocked || !tx_.send_blocked);
assert(tx_.num_blocked < 2);
tx_.send_blocked = true;
auto &p = tx_.blocked[tx_.num_blocked++];
memcpy(&p.local_addr.su, local_addr.addr, local_addr.addrlen);
memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
p.local_addr.len = local_addr.addrlen;
p.remote_addr.len = remote_addr.addrlen;
p.faddr = faddr;
p.pi = pi;
p.data = data;
p.datalen = datalen;
p.max_udp_payload_size = max_udp_payload_size;
}
int Http3Upstream::send_blocked_packet() {
int rv;
assert(tx_.send_blocked);
for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) {
auto &p = tx_.blocked[tx_.num_blocked_sent];
rv = send_packet(p.faddr, &p.remote_addr.su.sa, p.remote_addr.len,
&p.local_addr.su.sa, p.local_addr.len, p.pi, p.data,
p.datalen, p.max_udp_payload_size);
if (rv == SHRPX_ERR_SEND_BLOCKED) {
signal_write_upstream_addr(p.faddr);
return 0;
}
}
tx_.send_blocked = false;
tx_.num_blocked = 0;
tx_.num_blocked_sent = 0;
return 0;
}
void Http3Upstream::signal_write_upstream_addr(const UpstreamAddr *faddr) {
auto conn = handler_->get_connection();
if (faddr->fd != conn->wev.fd) {
if (ev_is_active(&conn->wev)) {
ev_io_stop(handler_->get_loop(), &conn->wev);
}
ev_io_set(&conn->wev, faddr->fd, EV_WRITE);
}
conn->wlimit.startw();
}
int Http3Upstream::handle_error() {
if (ngtcp2_conn_is_in_closing_period(conn_)) {
return -1;

View File

@ -157,6 +157,14 @@ public:
void qlog_write(const void *data, size_t datalen, bool fin);
int open_qlog_file(const StringRef &dir, const ngtcp2_cid &scid) const;
void on_send_blocked(const UpstreamAddr *faddr,
const ngtcp2_addr &remote_addr,
const ngtcp2_addr &local_addr, const ngtcp2_pkt_info &pi,
const uint8_t *data, size_t datalen,
size_t max_udp_payload_size);
int send_blocked_packet();
void signal_write_upstream_addr(const UpstreamAddr *faddr);
private:
ClientHandler *handler_;
ev_timer timer_;
@ -174,6 +182,23 @@ private:
bool idle_close_;
bool retry_close_;
std::vector<uint8_t> conn_close_;
struct {
bool send_blocked;
size_t num_blocked;
size_t num_blocked_sent;
// blocked field is effective only when send_blocked is true.
struct {
const UpstreamAddr *faddr;
Address local_addr;
Address remote_addr;
ngtcp2_pkt_info pi;
const uint8_t *data;
size_t datalen;
size_t max_udp_payload_size;
} blocked[2];
std::unique_ptr<uint8_t[]> data;
} tx_;
};
} // namespace shrpx