nghttp2/src/shrpx_http3_upstream.cc

2860 lines
81 KiB
C++

/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2021 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_http3_upstream.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <netinet/udp.h>
#include <cstdio>
#include <ngtcp2/ngtcp2_crypto.h>
#include "shrpx_client_handler.h"
#include "shrpx_downstream.h"
#include "shrpx_downstream_connection.h"
#include "shrpx_log.h"
#include "shrpx_quic.h"
#include "shrpx_worker.h"
#include "shrpx_http.h"
#include "shrpx_connection_handler.h"
#ifdef HAVE_MRUBY
# include "shrpx_mruby.h"
#endif // HAVE_MRUBY
#include "http3.h"
#include "util.h"
namespace shrpx {
namespace {
void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
auto upstream = static_cast<Http3Upstream *>(w->data);
if (upstream->handle_expiry() != 0 || upstream->on_write() != 0) {
goto fail;
}
return;
fail:
auto handler = upstream->get_client_handler();
delete handler;
}
} // namespace
namespace {
void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
auto upstream = static_cast<Http3Upstream *>(w->data);
auto handler = upstream->get_client_handler();
if (upstream->submit_goaway() != 0) {
delete handler;
}
}
} // namespace
namespace {
void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revent) {
auto upstream = static_cast<Http3Upstream *>(w->data);
auto handler = upstream->get_client_handler();
if (upstream->check_shutdown() != 0) {
delete handler;
}
}
} // namespace
namespace {
size_t downstream_queue_size(Worker *worker) {
auto &downstreamconf = *worker->get_downstream_config();
if (get_config()->http2_proxy) {
return downstreamconf.connections_per_host;
}
return downstreamconf.connections_per_frontend;
}
} // namespace
namespace {
ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) {
auto conn = static_cast<Connection *>(conn_ref->user_data);
auto handler = static_cast<ClientHandler *>(conn->data);
auto upstream = static_cast<Http3Upstream *>(handler->get_upstream());
return upstream->get_conn();
}
} // namespace
Http3Upstream::Http3Upstream(ClientHandler *handler)
: handler_{handler},
qlog_fd_{-1},
hashed_scid_{},
conn_{nullptr},
httpconn_{nullptr},
downstream_queue_{downstream_queue_size(handler->get_worker()),
!get_config()->http2_proxy},
retry_close_{false},
tx_{
.data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]),
} {
auto conn = handler_->get_connection();
conn->conn_ref.get_conn = shrpx::get_conn;
ev_timer_init(&timer_, timeoutcb, 0., 0.);
timer_.data = this;
ngtcp2_connection_close_error_default(&last_error_);
ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 0., 0.);
shutdown_timer_.data = this;
ev_prepare_init(&prep_, prepare_cb);
prep_.data = this;
ev_prepare_start(handler_->get_loop(), &prep_);
}
Http3Upstream::~Http3Upstream() {
auto loop = handler_->get_loop();
ev_prepare_stop(loop, &prep_);
ev_timer_stop(loop, &shutdown_timer_);
ev_timer_stop(loop, &timer_);
nghttp3_conn_del(httpconn_);
ngtcp2_conn_del(conn_);
if (qlog_fd_ != -1) {
close(qlog_fd_);
}
}
namespace {
void log_printf(void *user_data, const char *fmt, ...) {
va_list ap;
std::array<char, 4096> buf;
va_start(ap, fmt);
auto nwrite = vsnprintf(buf.data(), buf.size(), fmt, ap);
va_end(ap);
if (static_cast<size_t>(nwrite) >= buf.size()) {
nwrite = buf.size() - 1;
}
buf[nwrite++] = '\n';
while (write(fileno(stderr), buf.data(), nwrite) == -1 && errno == EINTR)
;
}
} // namespace
namespace {
void qlog_write(void *user_data, uint32_t flags, const void *data,
size_t datalen) {
auto upstream = static_cast<Http3Upstream *>(user_data);
upstream->qlog_write(data, datalen, flags & NGTCP2_QLOG_WRITE_FLAG_FIN);
}
} // namespace
void Http3Upstream::qlog_write(const void *data, size_t datalen, bool fin) {
assert(qlog_fd_ != -1);
while (write(qlog_fd_, data, datalen) == -1 && errno == EINTR)
;
if (fin) {
close(qlog_fd_);
qlog_fd_ = -1;
}
}
namespace {
void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) {
util::random_bytes(dest, dest + destlen,
*static_cast<std::mt19937 *>(rand_ctx->native_handle));
}
} // namespace
namespace {
int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
size_t cidlen, void *user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
auto handler = upstream->get_client_handler();
auto worker = handler->get_worker();
auto conn_handler = worker->get_connection_handler();
auto &qkms = conn_handler->get_quic_keying_materials();
auto &qkm = qkms->keying_materials.front();
if (generate_quic_connection_id(*cid, cidlen, worker->get_cid_prefix(),
qkm.id, qkm.cid_encryption_key.data()) != 0) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
if (generate_quic_stateless_reset_token(token, *cid, qkm.secret.data(),
qkm.secret.size()) != 0) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
auto quic_connection_handler = worker->get_quic_connection_handler();
quic_connection_handler->add_connection_id(*cid, handler);
return 0;
}
} // namespace
namespace {
int remove_connection_id(ngtcp2_conn *conn, const ngtcp2_cid *cid,
void *user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
auto handler = upstream->get_client_handler();
auto worker = handler->get_worker();
auto quic_conn_handler = worker->get_quic_connection_handler();
quic_conn_handler->remove_connection_id(*cid);
return 0;
}
} // namespace
void Http3Upstream::http_begin_request_headers(int64_t stream_id) {
auto downstream =
std::make_unique<Downstream>(this, handler_->get_mcpool(), stream_id);
nghttp3_conn_set_stream_user_data(httpconn_, stream_id, downstream.get());
downstream->reset_upstream_rtimer();
handler_->repeat_read_timer();
auto &req = downstream->request();
req.http_major = 3;
req.http_minor = 0;
add_pending_downstream(std::move(downstream));
}
void Http3Upstream::add_pending_downstream(
std::unique_ptr<Downstream> downstream) {
downstream_queue_.add_pending(std::move(downstream));
}
namespace {
int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
uint64_t offset, const uint8_t *data, size_t datalen,
void *user_data, void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
if (upstream->recv_stream_data(flags, stream_id, data, datalen) != 0) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::recv_stream_data(uint32_t flags, int64_t stream_id,
const uint8_t *data, size_t datalen) {
assert(httpconn_);
auto nconsumed = nghttp3_conn_read_stream(
httpconn_, stream_id, data, datalen, flags & NGTCP2_STREAM_DATA_FLAG_FIN);
if (nconsumed < 0) {
ULOG(ERROR, this) << "nghttp3_conn_read_stream: "
<< nghttp3_strerror(nconsumed);
ngtcp2_connection_close_error_set_application_error(
&last_error_, nghttp3_err_infer_quic_app_error_code(nconsumed), nullptr,
0);
return -1;
}
ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
ngtcp2_conn_extend_max_offset(conn_, nconsumed);
return 0;
}
namespace {
int stream_close(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
uint64_t app_error_code, void *user_data,
void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
if (!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
app_error_code = NGHTTP3_H3_NO_ERROR;
}
if (upstream->stream_close(stream_id, app_error_code) != 0) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::stream_close(int64_t stream_id, uint64_t app_error_code) {
if (!httpconn_) {
return 0;
}
auto rv = nghttp3_conn_close_stream(httpconn_, stream_id, app_error_code);
switch (rv) {
case 0:
break;
case NGHTTP3_ERR_STREAM_NOT_FOUND:
if (ngtcp2_is_bidi_stream(stream_id)) {
ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
}
break;
default:
ULOG(ERROR, this) << "nghttp3_conn_close_stream: " << nghttp3_strerror(rv);
ngtcp2_connection_close_error_set_application_error(
&last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0);
return -1;
}
return 0;
}
namespace {
int acked_stream_data_offset(ngtcp2_conn *conn, int64_t stream_id,
uint64_t offset, uint64_t datalen, void *user_data,
void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
if (upstream->acked_stream_data_offset(stream_id, datalen) != 0) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::acked_stream_data_offset(int64_t stream_id,
uint64_t datalen) {
if (!httpconn_) {
return 0;
}
auto rv = nghttp3_conn_add_ack_offset(httpconn_, stream_id, datalen);
if (rv != 0) {
ULOG(ERROR, this) << "nghttp3_conn_add_ack_offset: "
<< nghttp3_strerror(rv);
return -1;
}
return 0;
}
namespace {
int extend_max_stream_data(ngtcp2_conn *conn, int64_t stream_id,
uint64_t max_data, void *user_data,
void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
if (upstream->extend_max_stream_data(stream_id) != 0) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::extend_max_stream_data(int64_t stream_id) {
if (!httpconn_) {
return 0;
}
auto rv = nghttp3_conn_unblock_stream(httpconn_, stream_id);
if (rv != 0) {
ULOG(ERROR, this) << "nghttp3_conn_unblock_stream: "
<< nghttp3_strerror(rv);
return -1;
}
return 0;
}
namespace {
int extend_max_remote_streams_bidi(ngtcp2_conn *conn, uint64_t max_streams,
void *user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
upstream->extend_max_remote_streams_bidi(max_streams);
return 0;
}
} // namespace
void Http3Upstream::extend_max_remote_streams_bidi(uint64_t max_streams) {
nghttp3_conn_set_max_client_streams_bidi(httpconn_, max_streams);
}
namespace {
int stream_reset(ngtcp2_conn *conn, int64_t stream_id, uint64_t final_size,
uint64_t app_error_code, void *user_data,
void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
if (upstream->http_shutdown_stream_read(stream_id) != 0) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::http_shutdown_stream_read(int64_t stream_id) {
if (!httpconn_) {
return 0;
}
auto rv = nghttp3_conn_shutdown_stream_read(httpconn_, stream_id);
if (rv != 0) {
ULOG(ERROR, this) << "nghttp3_conn_shutdown_stream_read: "
<< nghttp3_strerror(rv);
return -1;
}
return 0;
}
namespace {
int stream_stop_sending(ngtcp2_conn *conn, int64_t stream_id,
uint64_t app_error_code, void *user_data,
void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
if (upstream->http_shutdown_stream_read(stream_id) != 0) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
namespace {
int handshake_completed(ngtcp2_conn *conn, void *user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
if (upstream->handshake_completed() != 0) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::handshake_completed() {
handler_->set_alpn_from_conn();
auto alpn = handler_->get_alpn();
if (alpn.empty()) {
ULOG(ERROR, this) << "NO ALPN was negotiated";
return -1;
}
std::array<uint8_t, NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1> token;
size_t tokenlen;
auto path = ngtcp2_conn_get_path(conn_);
auto worker = handler_->get_worker();
auto conn_handler = worker->get_connection_handler();
auto &qkms = conn_handler->get_quic_keying_materials();
auto &qkm = qkms->keying_materials.front();
if (generate_token(token.data(), tokenlen, path->remote.addr,
path->remote.addrlen, qkm.secret.data(),
qkm.secret.size()) != 0) {
return 0;
}
assert(tokenlen == NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN);
token[tokenlen++] = qkm.id;
auto rv = ngtcp2_conn_submit_new_token(conn_, token.data(), tokenlen);
if (rv != 0) {
ULOG(ERROR, this) << "ngtcp2_conn_submit_new_token: "
<< ngtcp2_strerror(rv);
return -1;
}
return 0;
}
namespace {
int recv_tx_key(ngtcp2_conn *conn, ngtcp2_crypto_level level, void *user_data) {
if (level != NGTCP2_CRYPTO_LEVEL_APPLICATION) {
return 0;
}
auto upstream = static_cast<Http3Upstream *>(user_data);
if (upstream->setup_httpconn() != 0) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr,
const Address &local_addr,
const ngtcp2_pkt_hd &initial_hd,
const ngtcp2_cid *odcid, const uint8_t *token,
size_t tokenlen) {
int rv;
auto worker = handler_->get_worker();
auto conn_handler = worker->get_connection_handler();
auto callbacks = ngtcp2_callbacks{
nullptr, // client_initial
ngtcp2_crypto_recv_client_initial_cb,
ngtcp2_crypto_recv_crypto_data_cb,
shrpx::handshake_completed,
nullptr, // recv_version_negotiation
ngtcp2_crypto_encrypt_cb,
ngtcp2_crypto_decrypt_cb,
ngtcp2_crypto_hp_mask_cb,
shrpx::recv_stream_data,
shrpx::acked_stream_data_offset,
nullptr, // stream_open
shrpx::stream_close,
nullptr, // recv_stateless_reset
nullptr, // recv_retry
nullptr, // extend_max_local_streams_bidi
nullptr, // extend_max_local_streams_uni
rand,
get_new_connection_id,
remove_connection_id,
ngtcp2_crypto_update_key_cb,
nullptr, // path_validation
nullptr, // select_preferred_addr
shrpx::stream_reset,
shrpx::extend_max_remote_streams_bidi,
nullptr, // extend_max_remote_streams_uni
shrpx::extend_max_stream_data,
nullptr, // dcid_status
nullptr, // handshake_confirmed
nullptr, // recv_new_token
ngtcp2_crypto_delete_crypto_aead_ctx_cb,
ngtcp2_crypto_delete_crypto_cipher_ctx_cb,
nullptr, // recv_datagram
nullptr, // ack_datagram
nullptr, // lost_datagram
ngtcp2_crypto_get_path_challenge_data_cb,
shrpx::stream_stop_sending,
nullptr, // version_negotiation
nullptr, // recv_rx_key
shrpx::recv_tx_key,
};
auto config = get_config();
auto &quicconf = config->quic;
auto &http3conf = config->http3;
auto &qkms = conn_handler->get_quic_keying_materials();
auto &qkm = qkms->keying_materials.front();
ngtcp2_cid scid;
if (generate_quic_connection_id(scid, SHRPX_QUIC_SCIDLEN,
worker->get_cid_prefix(), qkm.id,
qkm.cid_encryption_key.data()) != 0) {
return -1;
}
ngtcp2_settings settings;
ngtcp2_settings_default(&settings);
if (quicconf.upstream.debug.log) {
settings.log_printf = log_printf;
}
if (!quicconf.upstream.qlog.dir.empty()) {
auto fd = open_qlog_file(quicconf.upstream.qlog.dir, scid);
if (fd != -1) {
qlog_fd_ = fd;
settings.qlog.odcid = initial_hd.dcid;
settings.qlog.write = shrpx::qlog_write;
}
}
settings.initial_ts = quic_timestamp();
settings.initial_rtt = static_cast<ngtcp2_tstamp>(
quicconf.upstream.initial_rtt * NGTCP2_SECONDS);
settings.cc_algo = quicconf.upstream.congestion_controller;
settings.max_window = http3conf.upstream.max_connection_window_size;
settings.max_stream_window = http3conf.upstream.max_window_size;
settings.max_tx_udp_payload_size = SHRPX_QUIC_MAX_UDP_PAYLOAD_SIZE;
settings.rand_ctx.native_handle = &worker->get_randgen();
settings.token = ngtcp2_vec{const_cast<uint8_t *>(token), tokenlen};
ngtcp2_transport_params params;
ngtcp2_transport_params_default(&params);
params.initial_max_streams_bidi = http3conf.upstream.max_concurrent_streams;
// The minimum number of unidirectional streams required for HTTP/3.
params.initial_max_streams_uni = 3;
params.initial_max_data = http3conf.upstream.connection_window_size;
params.initial_max_stream_data_bidi_remote = http3conf.upstream.window_size;
params.initial_max_stream_data_uni = http3conf.upstream.window_size;
params.max_idle_timeout = static_cast<ngtcp2_tstamp>(
quicconf.upstream.timeout.idle * NGTCP2_SECONDS);
#ifdef OPENSSL_IS_BORINGSSL
if (quicconf.upstream.early_data) {
ngtcp2_transport_params early_data_params{
.initial_max_stream_data_bidi_local =
params.initial_max_stream_data_bidi_local,
.initial_max_stream_data_bidi_remote =
params.initial_max_stream_data_bidi_remote,
.initial_max_stream_data_uni = params.initial_max_stream_data_uni,
.initial_max_data = params.initial_max_data,
.initial_max_streams_bidi = params.initial_max_streams_bidi,
.initial_max_streams_uni = params.initial_max_streams_uni,
};
// TODO include HTTP/3 SETTINGS
std::array<uint8_t, 128> quic_early_data_ctx;
auto quic_early_data_ctxlen = ngtcp2_encode_transport_params(
quic_early_data_ctx.data(), quic_early_data_ctx.size(),
NGTCP2_TRANSPORT_PARAMS_TYPE_ENCRYPTED_EXTENSIONS, &early_data_params);
assert(quic_early_data_ctxlen > 0);
assert(static_cast<size_t>(quic_early_data_ctxlen) <=
quic_early_data_ctx.size());
if (SSL_set_quic_early_data_context(handler_->get_ssl(),
quic_early_data_ctx.data(),
quic_early_data_ctxlen) != 1) {
ULOG(ERROR, this) << "SSL_set_quic_early_data_context failed";
return -1;
}
}
#endif // OPENSSL_IS_BORINGSSL
if (odcid) {
params.original_dcid = *odcid;
params.retry_scid = initial_hd.dcid;
params.retry_scid_present = 1;
} else {
params.original_dcid = initial_hd.dcid;
}
rv = generate_quic_stateless_reset_token(
params.stateless_reset_token, scid, qkm.secret.data(), qkm.secret.size());
if (rv != 0) {
ULOG(ERROR, this) << "generate_quic_stateless_reset_token failed";
return -1;
}
params.stateless_reset_token_present = 1;
auto path = ngtcp2_path{
{
const_cast<sockaddr *>(&local_addr.su.sa),
static_cast<socklen_t>(local_addr.len),
},
{
const_cast<sockaddr *>(&remote_addr.su.sa),
static_cast<socklen_t>(remote_addr.len),
},
const_cast<UpstreamAddr *>(faddr),
};
rv = ngtcp2_conn_server_new(&conn_, &initial_hd.scid, &scid, &path,
initial_hd.version, &callbacks, &settings,
&params, nullptr, this);
if (rv != 0) {
ULOG(ERROR, this) << "ngtcp2_conn_server_new: " << ngtcp2_strerror(rv);
return -1;
}
ngtcp2_conn_set_tls_native_handle(conn_, handler_->get_ssl());
auto quic_connection_handler = worker->get_quic_connection_handler();
if (generate_quic_hashed_connection_id(hashed_scid_, remote_addr, local_addr,
initial_hd.dcid) != 0) {
return -1;
}
quic_connection_handler->add_connection_id(hashed_scid_, handler_);
quic_connection_handler->add_connection_id(scid, handler_);
return 0;
}
int Http3Upstream::on_read() { return 0; }
int Http3Upstream::on_write() {
int rv;
if (tx_.send_blocked) {
rv = send_blocked_packet();
if (rv != 0) {
return -1;
}
if (tx_.send_blocked) {
return 0;
}
}
if (write_streams() != 0) {
return -1;
}
reset_timer();
return 0;
}
int Http3Upstream::write_streams() {
std::array<nghttp3_vec, 16> vec;
auto max_udp_payload_size = ngtcp2_conn_get_max_tx_udp_payload_size(conn_);
#ifdef UDP_SEGMENT
auto path_max_udp_payload_size =
ngtcp2_conn_get_path_max_tx_udp_payload_size(conn_);
#endif // UDP_SEGMENT
auto max_pktcnt = ngtcp2_conn_get_send_quantum(conn_) / max_udp_payload_size;
ngtcp2_pkt_info pi, prev_pi;
uint8_t *bufpos = tx_.data.get();
ngtcp2_path_storage ps, prev_ps;
size_t pktcnt = 0;
int rv;
size_t gso_size = 0;
auto ts = quic_timestamp();
ngtcp2_path_storage_zero(&ps);
ngtcp2_path_storage_zero(&prev_ps);
for (;;) {
int64_t stream_id = -1;
int fin = 0;
nghttp3_ssize sveccnt = 0;
if (httpconn_ && ngtcp2_conn_get_max_data_left(conn_)) {
sveccnt = nghttp3_conn_writev_stream(httpconn_, &stream_id, &fin,
vec.data(), vec.size());
if (sveccnt < 0) {
ULOG(ERROR, this) << "nghttp3_conn_writev_stream: "
<< nghttp3_strerror(sveccnt);
ngtcp2_connection_close_error_set_application_error(
&last_error_, nghttp3_err_infer_quic_app_error_code(sveccnt),
nullptr, 0);
return handle_error();
}
}
ngtcp2_ssize ndatalen;
auto v = vec.data();
auto vcnt = static_cast<size_t>(sveccnt);
uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
if (fin) {
flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
}
auto nwrite = ngtcp2_conn_writev_stream(
conn_, &ps.path, &pi, bufpos, max_udp_payload_size, &ndatalen, flags,
stream_id, reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
if (nwrite < 0) {
switch (nwrite) {
case NGTCP2_ERR_STREAM_DATA_BLOCKED:
assert(ndatalen == -1);
nghttp3_conn_block_stream(httpconn_, stream_id);
continue;
case NGTCP2_ERR_STREAM_SHUT_WR:
assert(ndatalen == -1);
nghttp3_conn_shutdown_stream_write(httpconn_, stream_id);
continue;
case NGTCP2_ERR_WRITE_MORE:
assert(ndatalen >= 0);
rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
if (rv != 0) {
ULOG(ERROR, this)
<< "nghttp3_conn_add_write_offset: " << nghttp3_strerror(rv);
ngtcp2_connection_close_error_set_application_error(
&last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
0);
return handle_error();
}
continue;
}
assert(ndatalen == -1);
ULOG(ERROR, this) << "ngtcp2_conn_writev_stream: "
<< ngtcp2_strerror(nwrite);
ngtcp2_connection_close_error_set_transport_error_liberr(
&last_error_, nwrite, nullptr, 0);
return handle_error();
} else if (ndatalen >= 0) {
rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
if (rv != 0) {
ULOG(ERROR, this) << "nghttp3_conn_add_write_offset: "
<< nghttp3_strerror(rv);
ngtcp2_connection_close_error_set_application_error(
&last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
0);
return handle_error();
}
}
if (nwrite == 0) {
if (bufpos - tx_.data.get()) {
auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
auto data = tx_.data.get();
auto datalen = bufpos - data;
rv = send_packet(faddr, prev_ps.path.remote.addr,
prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
prev_ps.path.local.addrlen, prev_pi, data, datalen,
gso_size);
if (rv == SHRPX_ERR_SEND_BLOCKED) {
on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local,
prev_pi, data, datalen, gso_size);
ngtcp2_conn_update_pkt_tx_time(conn_, ts);
signal_write_upstream_addr(faddr);
return 0;
}
}
ngtcp2_conn_update_pkt_tx_time(conn_, ts);
handler_->get_connection()->wlimit.stopw();
return 0;
}
bufpos += nwrite;
#ifdef UDP_SEGMENT
if (pktcnt == 0) {
ngtcp2_path_copy(&prev_ps.path, &ps.path);
prev_pi = pi;
gso_size = nwrite;
} else if (!ngtcp2_path_eq(&prev_ps.path, &ps.path) ||
prev_pi.ecn != pi.ecn ||
static_cast<size_t>(nwrite) > gso_size ||
(gso_size > path_max_udp_payload_size &&
static_cast<size_t>(nwrite) != gso_size)) {
auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
auto data = tx_.data.get();
auto datalen = bufpos - data - nwrite;
rv = send_packet(faddr, prev_ps.path.remote.addr,
prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
prev_ps.path.local.addrlen, prev_pi, data, datalen,
gso_size);
switch (rv) {
case SHRPX_ERR_SEND_BLOCKED:
on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi,
data, datalen, gso_size);
on_send_blocked(static_cast<UpstreamAddr *>(ps.path.user_data),
ps.path.remote, ps.path.local, pi, bufpos - nwrite,
nwrite, 0);
signal_write_upstream_addr(faddr);
break;
default: {
auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
auto data = bufpos - nwrite;
rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
ps.path.local.addr, ps.path.local.addrlen, pi, data,
nwrite, 0);
if (rv == SHRPX_ERR_SEND_BLOCKED) {
on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data,
nwrite, 0);
}
signal_write_upstream_addr(faddr);
}
}
ngtcp2_conn_update_pkt_tx_time(conn_, ts);
return 0;
}
if (++pktcnt == max_pktcnt || static_cast<size_t>(nwrite) < gso_size) {
auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
auto data = tx_.data.get();
auto datalen = bufpos - data;
rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
ps.path.local.addr, ps.path.local.addrlen, pi, data,
datalen, gso_size);
if (rv == SHRPX_ERR_SEND_BLOCKED) {
on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
gso_size);
}
ngtcp2_conn_update_pkt_tx_time(conn_, ts);
signal_write_upstream_addr(faddr);
return 0;
}
#else // !UDP_SEGMENT
auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
auto data = tx_.data.get();
auto datalen = bufpos - data;
rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
ps.path.local.addr, ps.path.local.addrlen, pi, data,
datalen, 0);
if (rv == SHRPX_ERR_SEND_BLOCKED) {
on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
0);
ngtcp2_conn_update_pkt_tx_time(conn_, ts);
signal_write_upstream_addr(faddr);
return 0;
}
if (++pktcnt == max_pktcnt) {
ngtcp2_conn_update_pkt_tx_time(conn_, ts);
signal_write_upstream_addr(faddr);
return 0;
}
bufpos = tx_.data.get();
#endif // !UDP_SEGMENT
}
return 0;
}
int Http3Upstream::on_timeout(Downstream *downstream) { return 0; }
int Http3Upstream::on_downstream_abort_request(Downstream *downstream,
unsigned int status_code) {
int rv;
rv = error_reply(downstream, status_code);
if (rv != 0) {
return -1;
}
handler_->signal_write();
return 0;
}
int Http3Upstream::on_downstream_abort_request_with_https_redirect(
Downstream *downstream) {
assert(0);
abort();
}
namespace {
uint64_t
infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code) {
// NGHTTP2_REFUSED_STREAM is important because it tells upstream
// client to retry.
switch (downstream_error_code) {
case NGHTTP2_NO_ERROR:
return NGHTTP3_H3_NO_ERROR;
case NGHTTP2_REFUSED_STREAM:
return NGHTTP3_H3_REQUEST_REJECTED;
default:
return NGHTTP3_H3_INTERNAL_ERROR;
}
}
} // namespace
int Http3Upstream::downstream_read(DownstreamConnection *dconn) {
auto downstream = dconn->get_downstream();
if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
// The downstream stream was reset (canceled). In this case,
// RST_STREAM to the upstream and delete downstream connection
// here. Deleting downstream will be taken place at
// on_stream_close_callback.
shutdown_stream(downstream,
infer_upstream_shutdown_stream_error_code(
downstream->get_response_rst_stream_error_code()));
downstream->pop_downstream_connection();
// dconn was deleted
dconn = nullptr;
} else if (downstream->get_response_state() ==
DownstreamState::MSG_BAD_HEADER) {
if (error_reply(downstream, 502) != 0) {
return -1;
}
downstream->pop_downstream_connection();
// dconn was deleted
dconn = nullptr;
} else {
auto rv = downstream->on_read();
if (rv == SHRPX_ERR_EOF) {
if (downstream->get_request_header_sent()) {
return downstream_eof(dconn);
}
return SHRPX_ERR_RETRY;
}
if (rv == SHRPX_ERR_DCONN_CANCELED) {
downstream->pop_downstream_connection();
handler_->signal_write();
return 0;
}
if (rv != 0) {
if (rv != SHRPX_ERR_NETWORK) {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, dconn) << "HTTP parser failure";
}
}
return downstream_error(dconn, Downstream::EVENT_ERROR);
}
if (downstream->can_detach_downstream_connection()) {
// Keep-alive
downstream->detach_downstream_connection();
}
}
handler_->signal_write();
// At this point, downstream may be deleted.
return 0;
}
int Http3Upstream::downstream_write(DownstreamConnection *dconn) {
int rv;
rv = dconn->on_write();
if (rv == SHRPX_ERR_NETWORK) {
return downstream_error(dconn, Downstream::EVENT_ERROR);
}
if (rv != 0) {
return rv;
}
return 0;
}
int Http3Upstream::downstream_eof(DownstreamConnection *dconn) {
auto downstream = dconn->get_downstream();
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
}
// Delete downstream connection. If we don't delete it here, it will
// be pooled in on_stream_close_callback.
downstream->pop_downstream_connection();
// dconn was deleted
dconn = nullptr;
// downstream will be deleted in on_stream_close_callback.
if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
// Server may indicate the end of the request by EOF
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Downstream body was ended by EOF";
}
downstream->set_response_state(DownstreamState::MSG_COMPLETE);
// For tunneled connection, MSG_COMPLETE signals
// downstream_read_data_callback to send RST_STREAM after pending
// response body is sent. This is needed to ensure that RST_STREAM
// is sent after all pending data are sent.
if (on_downstream_body_complete(downstream) != 0) {
return -1;
}
} else if (downstream->get_response_state() !=
DownstreamState::MSG_COMPLETE) {
// If stream was not closed, then we set MSG_COMPLETE and let
// on_stream_close_callback delete downstream.
if (error_reply(downstream, 502) != 0) {
return -1;
}
}
handler_->signal_write();
// At this point, downstream may be deleted.
return 0;
}
int Http3Upstream::downstream_error(DownstreamConnection *dconn, int events) {
auto downstream = dconn->get_downstream();
if (LOG_ENABLED(INFO)) {
if (events & Downstream::EVENT_ERROR) {
DCLOG(INFO, dconn) << "Downstream network/general error";
} else {
DCLOG(INFO, dconn) << "Timeout";
}
if (downstream->get_upgraded()) {
DCLOG(INFO, dconn) << "Note: this is tunnel connection";
}
}
// Delete downstream connection. If we don't delete it here, it will
// be pooled in on_stream_close_callback.
downstream->pop_downstream_connection();
// dconn was deleted
dconn = nullptr;
if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
// For SSL tunneling, we issue RST_STREAM. For other types of
// stream, we don't have to do anything since response was
// complete.
if (downstream->get_upgraded()) {
shutdown_stream(downstream, NGHTTP3_H3_NO_ERROR);
}
} else {
if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
if (downstream->get_upgraded()) {
if (on_downstream_body_complete(downstream) != 0) {
return -1;
}
} else {
shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
}
} else {
unsigned int status;
if (events & Downstream::EVENT_TIMEOUT) {
if (downstream->get_request_header_sent()) {
status = 504;
} else {
status = 408;
}
} else {
status = 502;
}
if (error_reply(downstream, status) != 0) {
return -1;
}
}
downstream->set_response_state(DownstreamState::MSG_COMPLETE);
}
handler_->signal_write();
// At this point, downstream may be deleted.
return 0;
}
ClientHandler *Http3Upstream::get_client_handler() const { return handler_; }
namespace {
nghttp3_ssize downstream_read_data_callback(nghttp3_conn *conn,
int64_t stream_id, nghttp3_vec *vec,
size_t veccnt, uint32_t *pflags,
void *conn_user_data,
void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(conn_user_data);
auto downstream = static_cast<Downstream *>(stream_user_data);
assert(downstream);
auto body = downstream->get_response_buf();
assert(body);
if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE &&
body->rleft_mark() == 0) {
downstream->disable_upstream_wtimer();
return NGHTTP3_ERR_WOULDBLOCK;
}
downstream->reset_upstream_wtimer();
veccnt = body->riovec_mark(reinterpret_cast<struct iovec *>(vec), veccnt);
if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE &&
body->rleft_mark() == 0) {
*pflags |= NGHTTP3_DATA_FLAG_EOF;
}
assert((*pflags & NGHTTP3_DATA_FLAG_EOF) || veccnt);
downstream->response_sent_body_length += nghttp3_vec_len(vec, veccnt);
if ((*pflags & NGHTTP3_DATA_FLAG_EOF) &&
upstream->shutdown_stream_read(stream_id, NGHTTP3_H3_NO_ERROR) != 0) {
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
return veccnt;
}
} // namespace
int Http3Upstream::on_downstream_header_complete(Downstream *downstream) {
int rv;
const auto &req = downstream->request();
auto &resp = downstream->response();
auto &balloc = downstream->get_block_allocator();
if (LOG_ENABLED(INFO)) {
if (downstream->get_non_final_response()) {
DLOG(INFO, downstream) << "HTTP non-final response header";
} else {
DLOG(INFO, downstream) << "HTTP response header completed";
}
}
auto config = get_config();
auto &httpconf = config->http;
if (!config->http2_proxy && !httpconf.no_location_rewrite) {
downstream->rewrite_location_response_header(req.scheme);
}
#ifdef HAVE_MRUBY
if (!downstream->get_non_final_response()) {
auto dconn = downstream->get_downstream_connection();
const auto &group = dconn->get_downstream_addr_group();
if (group) {
const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
if (error_reply(downstream, 500) != 0) {
return -1;
}
// Returning -1 will signal deletion of dconn.
return -1;
}
if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
return -1;
}
}
auto worker = handler_->get_worker();
auto mruby_ctx = worker->get_mruby_context();
if (mruby_ctx->run_on_response_proc(downstream) != 0) {
if (error_reply(downstream, 500) != 0) {
return -1;
}
// Returning -1 will signal deletion of dconn.
return -1;
}
if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
return -1;
}
}
#endif // HAVE_MRUBY
auto nva = std::vector<nghttp3_nv>();
// 4 means :status and possible server, via, and set-cookie (for
// affinity cookie) header field.
nva.reserve(resp.fs.headers().size() + 4 +
httpconf.add_response_headers.size());
if (downstream->get_non_final_response()) {
auto response_status = http2::stringify_status(balloc, resp.http_status);
nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
http2::HDOP_STRIP_ALL);
if (LOG_ENABLED(INFO)) {
log_response_headers(downstream, nva);
}
rv = nghttp3_conn_submit_info(httpconn_, downstream->get_stream_id(),
nva.data(), nva.size());
resp.fs.clear_headers();
if (rv != 0) {
ULOG(FATAL, this) << "nghttp3_conn_submit_info() failed";
return -1;
}
return 0;
}
auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
StringRef response_status;
if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
response_status = http2::stringify_status(balloc, 200);
striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
} else {
response_status = http2::stringify_status(balloc, resp.http_status);
}
nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
if (!config->http2_proxy && !httpconf.no_server_rewrite) {
nva.push_back(http3::make_nv_ls_nocopy("server", httpconf.server_name));
} else {
auto server = resp.fs.header(http2::HD_SERVER);
if (server) {
nva.push_back(http3::make_nv_ls_nocopy("server", (*server).value));
}
}
if (!req.regular_connect_method() || !downstream->get_upgraded()) {
auto affinity_cookie = downstream->get_affinity_cookie_to_send();
if (affinity_cookie) {
auto dconn = downstream->get_downstream_connection();
assert(dconn);
auto &group = dconn->get_downstream_addr_group();
auto &shared_addr = group->shared_addr;
auto &cookieconf = shared_addr->affinity.cookie;
auto secure =
http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
auto cookie_str = http::create_affinity_cookie(
balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
nva.push_back(http3::make_nv_ls_nocopy("set-cookie", cookie_str));
}
}
auto via = resp.fs.header(http2::HD_VIA);
if (httpconf.no_via) {
if (via) {
nva.push_back(http3::make_nv_ls_nocopy("via", (*via).value));
}
} else {
// we don't create more than 16 bytes in
// http::create_via_header_value.
size_t len = 16;
if (via) {
len += via->value.size() + 2;
}
auto iov = make_byte_ref(balloc, len + 1);
auto p = iov.base;
if (via) {
p = std::copy(std::begin(via->value), std::end(via->value), p);
p = util::copy_lit(p, ", ");
}
p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
*p = '\0';
nva.push_back(http3::make_nv_ls_nocopy("via", StringRef{iov.base, p}));
}
for (auto &p : httpconf.add_response_headers) {
nva.push_back(http3::make_nv_nocopy(p.name, p.value));
}
if (LOG_ENABLED(INFO)) {
log_response_headers(downstream, nva);
}
nghttp3_data_reader data_read;
data_read.read_data = downstream_read_data_callback;
nghttp3_data_reader *data_readptr;
if (downstream->expect_response_body() ||
downstream->expect_response_trailer()) {
data_readptr = &data_read;
} else {
data_readptr = nullptr;
}
rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
nva.data(), nva.size(), data_readptr);
if (rv != 0) {
ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed";
return -1;
}
if (data_readptr) {
downstream->reset_upstream_wtimer();
} else if (shutdown_stream_read(downstream->get_stream_id(),
NGHTTP3_H3_NO_ERROR) != 0) {
return -1;
}
return 0;
}
int Http3Upstream::on_downstream_body(Downstream *downstream,
const uint8_t *data, size_t len,
bool flush) {
auto body = downstream->get_response_buf();
body->append(data, len);
if (flush) {
nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
downstream->ensure_upstream_wtimer();
}
return 0;
}
int Http3Upstream::on_downstream_body_complete(Downstream *downstream) {
if (LOG_ENABLED(INFO)) {
DLOG(INFO, downstream) << "HTTP response completed";
}
auto &resp = downstream->response();
if (!downstream->validate_response_recv_body_length()) {
shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
resp.connection_close = true;
return 0;
}
if (!downstream->get_upgraded()) {
const auto &trailers = resp.fs.trailers();
if (!trailers.empty()) {
std::vector<nghttp3_nv> nva;
nva.reserve(trailers.size());
http3::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
if (!nva.empty()) {
auto rv = nghttp3_conn_submit_trailers(
httpconn_, downstream->get_stream_id(), nva.data(), nva.size());
if (rv != 0) {
ULOG(FATAL, this) << "nghttp3_conn_submit_trailers() failed: "
<< nghttp3_strerror(rv);
return -1;
}
}
}
}
nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
downstream->ensure_upstream_wtimer();
return 0;
}
void Http3Upstream::on_handler_delete() {
for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
if (d->get_dispatch_state() == DispatchState::ACTIVE &&
d->accesslog_ready()) {
handler_->write_accesslog(d);
}
}
auto worker = handler_->get_worker();
auto quic_conn_handler = worker->get_quic_connection_handler();
std::vector<ngtcp2_cid> scids(ngtcp2_conn_get_num_scid(conn_) + 1);
ngtcp2_conn_get_scid(conn_, scids.data());
scids.back() = hashed_scid_;
for (auto &cid : scids) {
quic_conn_handler->remove_connection_id(cid);
}
if (retry_close_ ||
last_error_.type ==
NGTCP2_CONNECTION_CLOSE_ERROR_CODE_TYPE_TRANSPORT_IDLE_CLOSE) {
return;
}
// If this is not idle close, send CONNECTION_CLOSE.
if (!ngtcp2_conn_is_in_closing_period(conn_) &&
!ngtcp2_conn_is_in_draining_period(conn_)) {
ngtcp2_path_storage ps;
ngtcp2_pkt_info pi;
conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
ngtcp2_path_storage_zero(&ps);
ngtcp2_connection_close_error ccerr;
ngtcp2_connection_close_error_default(&ccerr);
auto nwrite = ngtcp2_conn_write_connection_close(
conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(), &ccerr,
quic_timestamp());
if (nwrite < 0) {
if (nwrite != NGTCP2_ERR_INVALID_STATE) {
ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
<< ngtcp2_strerror(nwrite);
}
return;
}
conn_close_.resize(nwrite);
send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
ps.path.local.addrlen, pi, conn_close_.data(), nwrite, 0);
}
auto d =
static_cast<ev_tstamp>(ngtcp2_conn_get_pto(conn_) * 3) / NGTCP2_SECONDS;
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Enter close-wait period " << d << "s with "
<< conn_close_.size() << " bytes sentinel packet";
}
auto cw = std::make_unique<CloseWait>(worker, std::move(scids),
std::move(conn_close_), d);
quic_conn_handler->add_close_wait(cw.get());
cw.release();
}
int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
int rv;
if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
// This is error condition when we failed push_request_headers()
// in initiate_downstream(). Otherwise, we have
// DispatchState::ACTIVE state, or we did not set
// DownstreamConnection.
downstream->pop_downstream_connection();
handler_->signal_write();
return 0;
}
if (!downstream->request_submission_ready()) {
if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
// We have got all response body already. Send it off.
downstream->pop_downstream_connection();
return 0;
}
// pushed stream is handled here
shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
downstream->pop_downstream_connection();
handler_->signal_write();
return 0;
}
downstream->pop_downstream_connection();
downstream->add_retry();
std::unique_ptr<DownstreamConnection> dconn;
rv = 0;
if (no_retry || downstream->no_more_retry()) {
goto fail;
}
// downstream connection is clean; we can retry with new
// downstream connection.
for (;;) {
auto dconn = handler_->get_downstream_connection(rv, downstream);
if (!dconn) {
goto fail;
}
rv = downstream->attach_downstream_connection(std::move(dconn));
if (rv == 0) {
break;
}
}
rv = downstream->push_request_headers();
if (rv != 0) {
goto fail;
}
return 0;
fail:
if (rv == SHRPX_ERR_TLS_REQUIRED) {
assert(0);
abort();
}
rv = on_downstream_abort_request(downstream, 502);
if (rv != 0) {
shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
}
downstream->pop_downstream_connection();
handler_->signal_write();
return 0;
}
void Http3Upstream::pause_read(IOCtrlReason reason) {}
int Http3Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
size_t consumed) {
consume(downstream->get_stream_id(), consumed);
auto &req = downstream->request();
req.consume(consumed);
handler_->signal_write();
return 0;
}
int Http3Upstream::send_reply(Downstream *downstream, const uint8_t *body,
size_t bodylen) {
int rv;
nghttp3_data_reader data_read, *data_read_ptr = nullptr;
if (bodylen) {
data_read.read_data = downstream_read_data_callback;
data_read_ptr = &data_read;
}
const auto &resp = downstream->response();
auto config = get_config();
auto &httpconf = config->http;
auto &balloc = downstream->get_block_allocator();
const auto &headers = resp.fs.headers();
auto nva = std::vector<nghttp3_nv>();
// 2 for :status and server
nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
auto response_status = http2::stringify_status(balloc, resp.http_status);
nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
for (auto &kv : headers) {
if (kv.name.empty() || kv.name[0] == ':') {
continue;
}
switch (kv.token) {
case http2::HD_CONNECTION:
case http2::HD_KEEP_ALIVE:
case http2::HD_PROXY_CONNECTION:
case http2::HD_TE:
case http2::HD_TRANSFER_ENCODING:
case http2::HD_UPGRADE:
continue;
}
nva.push_back(http3::make_nv_nocopy(kv.name, kv.value, kv.no_index));
}
if (!resp.fs.header(http2::HD_SERVER)) {
nva.push_back(http3::make_nv_ls_nocopy("server", config->http.server_name));
}
for (auto &p : httpconf.add_response_headers) {
nva.push_back(http3::make_nv_nocopy(p.name, p.value));
}
rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
nva.data(), nva.size(), data_read_ptr);
if (nghttp3_err_is_fatal(rv)) {
ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
<< nghttp3_strerror(rv);
return -1;
}
auto buf = downstream->get_response_buf();
buf->append(body, bodylen);
downstream->set_response_state(DownstreamState::MSG_COMPLETE);
if (data_read_ptr) {
downstream->reset_upstream_wtimer();
}
if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
0) {
return -1;
}
return 0;
}
int Http3Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
return 0;
}
int Http3Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
return 0;
}
void Http3Upstream::response_drain(size_t n) {}
bool Http3Upstream::response_empty() const { return false; }
Downstream *
Http3Upstream::on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id) {
return nullptr;
}
int Http3Upstream::on_downstream_push_promise_complete(
Downstream *downstream, Downstream *promised_downstream) {
return 0;
}
bool Http3Upstream::push_enabled() const { return false; }
void Http3Upstream::cancel_premature_downstream(
Downstream *promised_downstream) {}
int Http3Upstream::on_read(const UpstreamAddr *faddr,
const Address &remote_addr,
const Address &local_addr, const ngtcp2_pkt_info &pi,
const uint8_t *data, size_t datalen) {
int rv;
auto path = ngtcp2_path{
{
const_cast<sockaddr *>(&local_addr.su.sa),
static_cast<socklen_t>(local_addr.len),
},
{
const_cast<sockaddr *>(&remote_addr.su.sa),
static_cast<socklen_t>(remote_addr.len),
},
const_cast<UpstreamAddr *>(faddr),
};
rv = ngtcp2_conn_read_pkt(conn_, &path, &pi, data, datalen, quic_timestamp());
if (rv != 0) {
switch (rv) {
case NGTCP2_ERR_DRAINING:
return -1;
case NGTCP2_ERR_RETRY: {
auto worker = handler_->get_worker();
auto quic_conn_handler = worker->get_quic_connection_handler();
ngtcp2_version_cid vc;
rv =
ngtcp2_pkt_decode_version_cid(&vc, data, datalen, SHRPX_QUIC_SCIDLEN);
if (rv != 0) {
return -1;
}
if (worker->get_graceful_shutdown()) {
ngtcp2_cid ini_dcid, ini_scid;
ngtcp2_cid_init(&ini_dcid, vc.dcid, vc.dcidlen);
ngtcp2_cid_init(&ini_scid, vc.scid, vc.scidlen);
quic_conn_handler->send_connection_close(
faddr, vc.version, ini_dcid, ini_scid, remote_addr, local_addr,
NGTCP2_CONNECTION_REFUSED, datalen * 3);
return -1;
}
retry_close_ = true;
quic_conn_handler->send_retry(handler_->get_upstream_addr(), vc.version,
vc.dcid, vc.dcidlen, vc.scid, vc.scidlen,
remote_addr, local_addr, datalen * 3);
return -1;
}
case NGTCP2_ERR_CRYPTO:
if (!last_error_.error_code) {
ngtcp2_connection_close_error_set_transport_error_tls_alert(
&last_error_, ngtcp2_conn_get_tls_alert(conn_), nullptr, 0);
}
break;
case NGTCP2_ERR_DROP_CONN:
return -1;
default:
if (!last_error_.error_code) {
ngtcp2_connection_close_error_set_transport_error_liberr(
&last_error_, rv, nullptr, 0);
}
}
ULOG(ERROR, this) << "ngtcp2_conn_read_pkt: " << ngtcp2_strerror(rv);
return handle_error();
}
return 0;
}
int Http3Upstream::send_packet(const UpstreamAddr *faddr,
const sockaddr *remote_sa, size_t remote_salen,
const sockaddr *local_sa, size_t local_salen,
const ngtcp2_pkt_info &pi, const uint8_t *data,
size_t datalen, size_t gso_size) {
auto rv = quic_send_packet(faddr, remote_sa, remote_salen, local_sa,
local_salen, pi, data, datalen, gso_size);
switch (rv) {
case 0:
return 0;
// With GSO, sendmsg may fail with EINVAL if UDP payload is too
// large.
case -EINVAL:
case -EMSGSIZE:
// Let the packet lost.
break;
case -EAGAIN:
#if EAGAIN != EWOULDBLOCK
case -EWOULDBLOCK:
#endif // EAGAIN != EWOULDBLOCK
return SHRPX_ERR_SEND_BLOCKED;
default:
break;
}
return -1;
}
void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr,
const ngtcp2_addr &remote_addr,
const ngtcp2_addr &local_addr,
const ngtcp2_pkt_info &pi,
const uint8_t *data, size_t datalen,
size_t gso_size) {
assert(tx_.num_blocked || !tx_.send_blocked);
assert(tx_.num_blocked < 2);
tx_.send_blocked = true;
auto &p = tx_.blocked[tx_.num_blocked++];
memcpy(&p.local_addr.su, local_addr.addr, local_addr.addrlen);
memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
p.local_addr.len = local_addr.addrlen;
p.remote_addr.len = remote_addr.addrlen;
p.faddr = faddr;
p.pi = pi;
p.data = data;
p.datalen = datalen;
p.gso_size = gso_size;
}
int Http3Upstream::send_blocked_packet() {
int rv;
assert(tx_.send_blocked);
for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) {
auto &p = tx_.blocked[tx_.num_blocked_sent];
rv = send_packet(p.faddr, &p.remote_addr.su.sa, p.remote_addr.len,
&p.local_addr.su.sa, p.local_addr.len, p.pi, p.data,
p.datalen, p.gso_size);
if (rv == SHRPX_ERR_SEND_BLOCKED) {
signal_write_upstream_addr(p.faddr);
return 0;
}
}
tx_.send_blocked = false;
tx_.num_blocked = 0;
tx_.num_blocked_sent = 0;
return 0;
}
void Http3Upstream::signal_write_upstream_addr(const UpstreamAddr *faddr) {
auto conn = handler_->get_connection();
if (faddr->fd != conn->wev.fd) {
if (ev_is_active(&conn->wev)) {
ev_io_stop(handler_->get_loop(), &conn->wev);
}
ev_io_set(&conn->wev, faddr->fd, EV_WRITE);
}
conn->wlimit.startw();
}
int Http3Upstream::handle_error() {
if (ngtcp2_conn_is_in_closing_period(conn_) ||
ngtcp2_conn_is_in_draining_period(conn_)) {
return -1;
}
ngtcp2_path_storage ps;
ngtcp2_pkt_info pi;
ngtcp2_path_storage_zero(&ps);
auto ts = quic_timestamp();
conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
auto nwrite = ngtcp2_conn_write_connection_close(
conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(),
&last_error_, ts);
if (nwrite < 0) {
ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
<< ngtcp2_strerror(nwrite);
return -1;
}
conn_close_.resize(nwrite);
if (nwrite == 0) {
return -1;
}
send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
ps.path.local.addrlen, pi, conn_close_.data(), nwrite, 0);
return -1;
}
int Http3Upstream::handle_expiry() {
int rv;
auto ts = quic_timestamp();
rv = ngtcp2_conn_handle_expiry(conn_, ts);
if (rv != 0) {
if (rv == NGTCP2_ERR_IDLE_CLOSE) {
ULOG(INFO, this) << "Idle connection timeout";
} else {
ULOG(ERROR, this) << "ngtcp2_conn_handle_expiry: " << ngtcp2_strerror(rv);
}
ngtcp2_connection_close_error_set_transport_error_liberr(&last_error_, rv,
nullptr, 0);
return handle_error();
}
return 0;
}
void Http3Upstream::reset_timer() {
auto ts = quic_timestamp();
auto expiry_ts = ngtcp2_conn_get_expiry(conn_);
auto loop = handler_->get_loop();
if (expiry_ts <= ts) {
ev_feed_event(loop, &timer_, EV_TIMER);
return;
}
timer_.repeat = static_cast<ev_tstamp>(expiry_ts - ts) / NGTCP2_SECONDS;
ev_timer_again(loop, &timer_);
}
namespace {
int http_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
size_t nconsumed, void *user_data,
void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
upstream->consume(stream_id, nconsumed);
return 0;
}
} // namespace
namespace {
int http_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
uint64_t datalen, void *user_data,
void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
auto downstream = static_cast<Downstream *>(stream_user_data);
assert(downstream);
if (upstream->http_acked_stream_data(downstream, datalen) != 0) {
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::http_acked_stream_data(Downstream *downstream,
uint64_t datalen) {
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Stream " << downstream->get_stream_id() << " "
<< datalen << " bytes acknowledged";
}
auto body = downstream->get_response_buf();
auto drained = body->drain_mark(datalen);
(void)drained;
assert(datalen == drained);
if (downstream->resume_read(SHRPX_NO_BUFFER, datalen) != 0) {
return -1;
}
return 0;
}
namespace {
int http_begin_request_headers(nghttp3_conn *conn, int64_t stream_id,
void *user_data, void *stream_user_data) {
if (!ngtcp2_is_bidi_stream(stream_id)) {
return 0;
}
auto upstream = static_cast<Http3Upstream *>(user_data);
upstream->http_begin_request_headers(stream_id);
return 0;
}
} // namespace
namespace {
int http_recv_request_header(nghttp3_conn *conn, int64_t stream_id,
int32_t token, nghttp3_rcbuf *name,
nghttp3_rcbuf *value, uint8_t flags,
void *user_data, void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
auto downstream = static_cast<Downstream *>(stream_user_data);
if (!downstream || downstream->get_stop_reading()) {
return 0;
}
if (upstream->http_recv_request_header(downstream, token, name, value, flags,
/* trailer = */ false) != 0) {
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
namespace {
int http_recv_request_trailer(nghttp3_conn *conn, int64_t stream_id,
int32_t token, nghttp3_rcbuf *name,
nghttp3_rcbuf *value, uint8_t flags,
void *user_data, void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
auto downstream = static_cast<Downstream *>(stream_user_data);
if (!downstream || downstream->get_stop_reading()) {
return 0;
}
if (upstream->http_recv_request_header(downstream, token, name, value, flags,
/* trailer = */ true) != 0) {
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::http_recv_request_header(Downstream *downstream,
int32_t h3token,
nghttp3_rcbuf *name,
nghttp3_rcbuf *value, uint8_t flags,
bool trailer) {
auto namebuf = nghttp3_rcbuf_get_buf(name);
auto valuebuf = nghttp3_rcbuf_get_buf(value);
auto &req = downstream->request();
auto config = get_config();
auto &httpconf = config->http;
if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
httpconf.request_header_field_buffer ||
req.fs.num_fields() >= httpconf.max_request_header_fields) {
downstream->set_stop_reading(true);
if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
return 0;
}
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Too large or many header field size="
<< req.fs.buffer_size() + namebuf.len + valuebuf.len
<< ", num=" << req.fs.num_fields() + 1;
}
// just ignore if this is a trailer part.
if (trailer) {
return 0;
}
if (error_reply(downstream, 431) != 0) {
return -1;
}
return 0;
}
auto token = http2::lookup_token(namebuf.base, namebuf.len);
auto no_index = flags & NGHTTP3_NV_FLAG_NEVER_INDEX;
downstream->add_rcbuf(name);
downstream->add_rcbuf(value);
if (trailer) {
req.fs.add_trailer_token(StringRef{namebuf.base, namebuf.len},
StringRef{valuebuf.base, valuebuf.len}, no_index,
token);
return 0;
}
req.fs.add_header_token(StringRef{namebuf.base, namebuf.len},
StringRef{valuebuf.base, valuebuf.len}, no_index,
token);
return 0;
}
namespace {
int http_end_request_headers(nghttp3_conn *conn, int64_t stream_id, int fin,
void *user_data, void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
auto handler = upstream->get_client_handler();
auto downstream = static_cast<Downstream *>(stream_user_data);
if (!downstream || downstream->get_stop_reading()) {
return 0;
}
if (upstream->http_end_request_headers(downstream, fin) != 0) {
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
downstream->reset_upstream_rtimer();
handler->stop_read_timer();
return 0;
}
} // namespace
int Http3Upstream::http_end_request_headers(Downstream *downstream, int fin) {
auto lgconf = log_config();
lgconf->update_tstamp(std::chrono::system_clock::now());
auto &req = downstream->request();
req.tstamp = lgconf->tstamp;
if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
return 0;
}
auto &nva = req.fs.headers();
if (LOG_ENABLED(INFO)) {
std::stringstream ss;
for (auto &nv : nva) {
if (nv.name == "authorization") {
ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
continue;
}
ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
}
ULOG(INFO, this) << "HTTP request headers. stream_id="
<< downstream->get_stream_id() << "\n"
<< ss.str();
}
auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
if (content_length) {
// libnghttp3 guarantees this can be parsed
req.fs.content_length = util::parse_uint(content_length->value);
}
// presence of mandatory header fields are guaranteed by libnghttp3.
auto authority = req.fs.header(http2::HD__AUTHORITY);
auto path = req.fs.header(http2::HD__PATH);
auto method = req.fs.header(http2::HD__METHOD);
auto scheme = req.fs.header(http2::HD__SCHEME);
auto method_token = http2::lookup_method_token(method->value);
if (method_token == -1) {
if (error_reply(downstream, 501) != 0) {
return -1;
}
return 0;
}
auto faddr = handler_->get_upstream_addr();
auto config = get_config();
// For HTTP/2 proxy, we require :authority.
if (method_token != HTTP_CONNECT && config->http2_proxy &&
faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
return 0;
}
req.method = method_token;
if (scheme) {
req.scheme = scheme->value;
}
// nghttp2 library guarantees either :authority or host exist
if (!authority) {
req.no_authority = true;
authority = req.fs.header(http2::HD_HOST);
}
if (authority) {
req.authority = authority->value;
}
if (path) {
if (method_token == HTTP_OPTIONS &&
path->value == StringRef::from_lit("*")) {
// Server-wide OPTIONS request. Path is empty.
} else if (config->http2_proxy &&
faddr->alt_mode == UpstreamAltMode::NONE) {
req.path = path->value;
} else {
req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
path->value);
}
}
auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
if (connect_proto) {
if (connect_proto->value != "websocket") {
if (error_reply(downstream, 400) != 0) {
return -1;
}
return 0;
}
req.connect_proto = ConnectProto::WEBSOCKET;
}
if (!fin) {
req.http2_expect_body = true;
} else if (req.fs.content_length == -1) {
req.fs.content_length = 0;
}
downstream->inspect_http2_request();
downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
if (config->http.require_http_scheme &&
!http::check_http_scheme(req.scheme, /* encrypted = */ true)) {
if (error_reply(downstream, 400) != 0) {
return -1;
}
}
#ifdef HAVE_MRUBY
auto worker = handler_->get_worker();
auto mruby_ctx = worker->get_mruby_context();
if (mruby_ctx->run_on_request_proc(downstream) != 0) {
if (error_reply(downstream, 500) != 0) {
return -1;
}
return 0;
}
#endif // HAVE_MRUBY
if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
return 0;
}
start_downstream(downstream);
return 0;
}
void Http3Upstream::start_downstream(Downstream *downstream) {
if (downstream_queue_.can_activate(downstream->request().authority)) {
initiate_downstream(downstream);
return;
}
downstream_queue_.mark_blocked(downstream);
}
void Http3Upstream::initiate_downstream(Downstream *downstream) {
int rv;
#ifdef HAVE_MRUBY
DownstreamConnection *dconn_ptr;
#endif // HAVE_MRUBY
for (;;) {
auto dconn = handler_->get_downstream_connection(rv, downstream);
if (!dconn) {
if (rv == SHRPX_ERR_TLS_REQUIRED) {
assert(0);
abort();
}
rv = error_reply(downstream, 502);
if (rv != 0) {
shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
}
downstream->set_request_state(DownstreamState::CONNECT_FAIL);
downstream_queue_.mark_failure(downstream);
return;
}
#ifdef HAVE_MRUBY
dconn_ptr = dconn.get();
#endif // HAVE_MRUBY
rv = downstream->attach_downstream_connection(std::move(dconn));
if (rv == 0) {
break;
}
}
#ifdef HAVE_MRUBY
const auto &group = dconn_ptr->get_downstream_addr_group();
if (group) {
const auto &mruby_ctx = group->shared_addr->mruby_ctx;
if (mruby_ctx->run_on_request_proc(downstream) != 0) {
if (error_reply(downstream, 500) != 0) {
shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
}
downstream_queue_.mark_failure(downstream);
return;
}
if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
return;
}
}
#endif // HAVE_MRUBY
rv = downstream->push_request_headers();
if (rv != 0) {
if (error_reply(downstream, 502) != 0) {
shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
}
downstream_queue_.mark_failure(downstream);
return;
}
downstream_queue_.mark_active(downstream);
auto &req = downstream->request();
if (!req.http2_expect_body) {
rv = downstream->end_upload_data();
if (rv != 0) {
shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
}
}
}
namespace {
int http_recv_data(nghttp3_conn *conn, int64_t stream_id, const uint8_t *data,
size_t datalen, void *user_data, void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
auto downstream = static_cast<Downstream *>(stream_user_data);
if (upstream->http_recv_data(downstream, data, datalen) != 0) {
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::http_recv_data(Downstream *downstream, const uint8_t *data,
size_t datalen) {
downstream->reset_upstream_rtimer();
if (downstream->push_upload_data_chunk(data, datalen) != 0) {
if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
}
consume(downstream->get_stream_id(), datalen);
return 0;
}
return 0;
}
namespace {
int http_end_stream(nghttp3_conn *conn, int64_t stream_id, void *user_data,
void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
auto downstream = static_cast<Downstream *>(stream_user_data);
if (!downstream || downstream->get_stop_reading()) {
return 0;
}
if (upstream->http_end_stream(downstream) != 0) {
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::http_end_stream(Downstream *downstream) {
downstream->disable_upstream_rtimer();
if (downstream->end_upload_data() != 0) {
if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
}
}
downstream->set_request_state(DownstreamState::MSG_COMPLETE);
return 0;
}
namespace {
int http_stream_close(nghttp3_conn *conn, int64_t stream_id,
uint64_t app_error_code, void *conn_user_data,
void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(conn_user_data);
auto downstream = static_cast<Downstream *>(stream_user_data);
if (!downstream) {
return 0;
}
if (upstream->http_stream_close(downstream, app_error_code) != 0) {
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::http_stream_close(Downstream *downstream,
uint64_t app_error_code) {
auto stream_id = downstream->get_stream_id();
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Stream stream_id=" << stream_id
<< " is being closed with app_error_code="
<< app_error_code;
auto body = downstream->get_response_buf();
ULOG(INFO, this) << "response unacked_left=" << body->rleft()
<< " not_sent=" << body->rleft_mark();
}
auto &req = downstream->request();
consume(stream_id, req.unconsumed_body_length);
req.unconsumed_body_length = 0;
ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
remove_downstream(downstream);
// downstream was deleted
return 0;
}
if (downstream->can_detach_downstream_connection()) {
// Keep-alive
downstream->detach_downstream_connection();
}
downstream->set_request_state(DownstreamState::STREAM_CLOSED);
// At this point, downstream read may be paused.
// If shrpx_downstream::push_request_headers() failed, the
// error is handled here.
remove_downstream(downstream);
// downstream was deleted
return 0;
}
namespace {
int http_stop_sending(nghttp3_conn *conn, int64_t stream_id,
uint64_t app_error_code, void *user_data,
void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
if (upstream->http_stop_sending(stream_id, app_error_code) != 0) {
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::http_stop_sending(int64_t stream_id,
uint64_t app_error_code) {
auto rv = ngtcp2_conn_shutdown_stream_read(conn_, stream_id, app_error_code);
if (ngtcp2_err_is_fatal(rv)) {
ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_read: "
<< ngtcp2_strerror(rv);
return -1;
}
return 0;
}
namespace {
int http_reset_stream(nghttp3_conn *conn, int64_t stream_id,
uint64_t app_error_code, void *user_data,
void *stream_user_data) {
auto upstream = static_cast<Http3Upstream *>(user_data);
if (upstream->http_reset_stream(stream_id, app_error_code) != 0) {
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
return 0;
}
} // namespace
int Http3Upstream::http_reset_stream(int64_t stream_id,
uint64_t app_error_code) {
auto rv = ngtcp2_conn_shutdown_stream_write(conn_, stream_id, app_error_code);
if (ngtcp2_err_is_fatal(rv)) {
ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_write: "
<< ngtcp2_strerror(rv);
return -1;
}
return 0;
}
int Http3Upstream::setup_httpconn() {
int rv;
if (ngtcp2_conn_get_max_local_streams_uni(conn_) < 3) {
return -1;
}
nghttp3_callbacks callbacks{
shrpx::http_acked_stream_data,
shrpx::http_stream_close,
shrpx::http_recv_data,
http_deferred_consume,
shrpx::http_begin_request_headers,
shrpx::http_recv_request_header,
shrpx::http_end_request_headers,
nullptr, // begin_trailers
shrpx::http_recv_request_trailer,
nullptr, // end_trailers
shrpx::http_stop_sending,
shrpx::http_end_stream,
shrpx::http_reset_stream,
};
auto config = get_config();
nghttp3_settings settings;
nghttp3_settings_default(&settings);
settings.qpack_max_dtable_capacity = 4_k;
if (!config->http2_proxy) {
settings.enable_connect_protocol = 1;
}
auto mem = nghttp3_mem_default();
rv = nghttp3_conn_server_new(&httpconn_, &callbacks, &settings, mem, this);
if (rv != 0) {
ULOG(ERROR, this) << "nghttp3_conn_server_new: " << nghttp3_strerror(rv);
return -1;
}
auto params = ngtcp2_conn_get_local_transport_params(conn_);
nghttp3_conn_set_max_client_streams_bidi(httpconn_,
params->initial_max_streams_bidi);
int64_t ctrl_stream_id;
rv = ngtcp2_conn_open_uni_stream(conn_, &ctrl_stream_id, nullptr);
if (rv != 0) {
ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
return -1;
}
rv = nghttp3_conn_bind_control_stream(httpconn_, ctrl_stream_id);
if (rv != 0) {
ULOG(ERROR, this) << "nghttp3_conn_bind_control_stream: "
<< nghttp3_strerror(rv);
return -1;
}
int64_t qpack_enc_stream_id, qpack_dec_stream_id;
rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_enc_stream_id, nullptr);
if (rv != 0) {
ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
return -1;
}
rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_dec_stream_id, nullptr);
if (rv != 0) {
ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
return -1;
}
rv = nghttp3_conn_bind_qpack_streams(httpconn_, qpack_enc_stream_id,
qpack_dec_stream_id);
if (rv != 0) {
ULOG(ERROR, this) << "nghttp3_conn_bind_qpack_streams: "
<< nghttp3_strerror(rv);
return -1;
}
return 0;
}
int Http3Upstream::error_reply(Downstream *downstream,
unsigned int status_code) {
int rv;
auto &resp = downstream->response();
auto &balloc = downstream->get_block_allocator();
auto html = http::create_error_html(balloc, status_code);
resp.http_status = status_code;
auto body = downstream->get_response_buf();
body->append(html);
downstream->set_response_state(DownstreamState::MSG_COMPLETE);
nghttp3_data_reader data_read;
data_read.read_data = downstream_read_data_callback;
auto lgconf = log_config();
lgconf->update_tstamp(std::chrono::system_clock::now());
auto response_status = http2::stringify_status(balloc, status_code);
auto content_length = util::make_string_ref_uint(balloc, html.size());
auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
auto nva = std::array<nghttp3_nv, 5>{
{http3::make_nv_ls_nocopy(":status", response_status),
http3::make_nv_ll("content-type", "text/html; charset=UTF-8"),
http3::make_nv_ls_nocopy("server", get_config()->http.server_name),
http3::make_nv_ls_nocopy("content-length", content_length),
http3::make_nv_ls_nocopy("date", date)}};
rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
nva.data(), nva.size(), &data_read);
if (nghttp3_err_is_fatal(rv)) {
ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
<< nghttp3_strerror(rv);
return -1;
}
downstream->reset_upstream_wtimer();
if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
0) {
return -1;
}
return 0;
}
int Http3Upstream::shutdown_stream(Downstream *downstream,
uint64_t app_error_code) {
auto stream_id = downstream->get_stream_id();
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Shutdown stream_id=" << stream_id
<< " with app_error_code=" << app_error_code;
}
auto rv = ngtcp2_conn_shutdown_stream(conn_, stream_id, app_error_code);
if (rv != 0) {
ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream() failed: "
<< ngtcp2_strerror(rv);
return -1;
}
return 0;
}
int Http3Upstream::shutdown_stream_read(int64_t stream_id,
uint64_t app_error_code) {
auto rv =
ngtcp2_conn_shutdown_stream_read(conn_, stream_id, NGHTTP3_H3_NO_ERROR);
if (ngtcp2_err_is_fatal(rv)) {
ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream_read: "
<< ngtcp2_strerror(rv);
return -1;
}
return 0;
}
void Http3Upstream::consume(int64_t stream_id, size_t nconsumed) {
ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
ngtcp2_conn_extend_max_offset(conn_, nconsumed);
}
void Http3Upstream::remove_downstream(Downstream *downstream) {
if (downstream->accesslog_ready()) {
handler_->write_accesslog(downstream);
}
nghttp3_conn_set_stream_user_data(httpconn_, downstream->get_stream_id(),
nullptr);
auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
if (next_downstream) {
initiate_downstream(next_downstream);
}
if (downstream_queue_.get_downstreams() == nullptr) {
// There is no downstream at the moment. Start idle timer now.
handler_->repeat_read_timer();
}
}
void Http3Upstream::log_response_headers(
Downstream *downstream, const std::vector<nghttp3_nv> &nva) const {
std::stringstream ss;
for (auto &nv : nva) {
ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
<< StringRef{nv.value, nv.valuelen} << "\n";
}
ULOG(INFO, this) << "HTTP response headers. stream_id="
<< downstream->get_stream_id() << "\n"
<< ss.str();
}
int Http3Upstream::check_shutdown() {
auto worker = handler_->get_worker();
if (!worker->get_graceful_shutdown()) {
return 0;
}
ev_prepare_stop(handler_->get_loop(), &prep_);
return start_graceful_shutdown();
}
int Http3Upstream::start_graceful_shutdown() {
int rv;
if (ev_is_active(&shutdown_timer_)) {
return 0;
}
rv = nghttp3_conn_submit_shutdown_notice(httpconn_);
if (rv != 0) {
ULOG(FATAL, this) << "nghttp3_conn_submit_shutdown_notice: "
<< nghttp3_strerror(rv);
return -1;
}
handler_->signal_write();
auto t = ngtcp2_conn_get_pto(conn_);
ev_timer_set(&shutdown_timer_, static_cast<ev_tstamp>(t * 3) / NGTCP2_SECONDS,
0.);
ev_timer_start(handler_->get_loop(), &shutdown_timer_);
return 0;
}
int Http3Upstream::submit_goaway() {
int rv;
rv = nghttp3_conn_shutdown(httpconn_);
if (rv != 0) {
ULOG(FATAL, this) << "nghttp3_conn_shutdown: " << nghttp3_strerror(rv);
return -1;
}
handler_->signal_write();
return 0;
}
int Http3Upstream::open_qlog_file(const StringRef &dir,
const ngtcp2_cid &scid) const {
std::array<char, sizeof("20141115T125824.741+0900")> buf;
auto path = dir.str();
path += '/';
path +=
util::format_iso8601_basic(buf.data(), std::chrono::system_clock::now());
path += '-';
path += util::format_hex(scid.data, scid.datalen);
path += ".sqlog";
int fd;
#ifdef O_CLOEXEC
while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
errno == EINTR)
;
#else // !O_CLOEXEC
while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC,
S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
errno == EINTR)
;
if (fd != -1) {
util::make_socket_closeonexec(fd);
}
#endif // !O_CLOEXEC
if (fd == -1) {
auto error = errno;
ULOG(ERROR, this) << "Failed to open qlog file " << path
<< ": errno=" << error;
return -1;
}
return fd;
}
ngtcp2_conn *Http3Upstream::get_conn() const { return conn_; }
} // namespace shrpx