diff --git a/src/Makefile.am b/src/Makefile.am index 344cc7cf..9226ed31 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -157,6 +157,7 @@ NGHTTPX_SRCS = \ shrpx_quic_listener.cc shrpx_quic_listener.h \ shrpx_quic_connection_handler.cc shrpx_quic_connection_handler.h \ shrpx_http3_upstream.cc shrpx_http3_upstream.h \ + http3.cc http3.h \ quic.cc quic.h \ buffer.h memchunk.h template.h allocator.h \ xsi_strerror.c xsi_strerror.h diff --git a/src/http3.cc b/src/http3.cc new file mode 100644 index 00000000..9d9185ea --- /dev/null +++ b/src/http3.cc @@ -0,0 +1,215 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2021 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "http3.h" + +namespace nghttp2 { + +namespace http3 { + +namespace { +nghttp3_nv make_nv_internal(const std::string &name, const std::string &value, + bool never_index, uint8_t nv_flags) { + uint8_t flags; + + flags = nv_flags | + (never_index ? NGHTTP3_NV_FLAG_NEVER_INDEX : NGHTTP3_NV_FLAG_NONE); + + return {(uint8_t *)name.c_str(), (uint8_t *)value.c_str(), name.size(), + value.size(), flags}; +} +} // namespace + +namespace { +nghttp3_nv make_nv_internal(const StringRef &name, const StringRef &value, + bool never_index, uint8_t nv_flags) { + uint8_t flags; + + flags = nv_flags | + (never_index ? NGHTTP3_NV_FLAG_NEVER_INDEX : NGHTTP3_NV_FLAG_NONE); + + return {(uint8_t *)name.c_str(), (uint8_t *)value.c_str(), name.size(), + value.size(), flags}; +} +} // namespace + +nghttp3_nv make_nv(const std::string &name, const std::string &value, + bool never_index) { + return make_nv_internal(name, value, never_index, NGHTTP3_NV_FLAG_NONE); +} + +nghttp3_nv make_nv(const StringRef &name, const StringRef &value, + bool never_index) { + return make_nv_internal(name, value, never_index, NGHTTP3_NV_FLAG_NONE); +} + +nghttp3_nv make_nv_nocopy(const std::string &name, const std::string &value, + bool never_index) { + return make_nv_internal(name, value, never_index, + NGHTTP3_NV_FLAG_NO_COPY_NAME | + NGHTTP3_NV_FLAG_NO_COPY_VALUE); +} + +nghttp3_nv make_nv_nocopy(const StringRef &name, const StringRef &value, + bool never_index) { + return make_nv_internal(name, value, never_index, + NGHTTP3_NV_FLAG_NO_COPY_NAME | + NGHTTP3_NV_FLAG_NO_COPY_VALUE); +} + +namespace { +void copy_headers_to_nva_internal(std::vector &nva, + const HeaderRefs &headers, uint8_t nv_flags, + uint32_t flags) { + auto it_forwarded = std::end(headers); + auto it_xff = std::end(headers); + auto it_xfp = std::end(headers); + auto it_via = std::end(headers); + + for (auto it = std::begin(headers); it != std::end(headers); ++it) { + auto kv = &(*it); + if (kv->name.empty() || kv->name[0] == ':') { + continue; + } + switch (kv->token) { + case http2::HD_COOKIE: + case http2::HD_CONNECTION: + case http2::HD_HOST: + case http2::HD_HTTP2_SETTINGS: + case http2::HD_KEEP_ALIVE: + case http2::HD_PROXY_CONNECTION: + case http2::HD_SERVER: + case http2::HD_TE: + case http2::HD_TRANSFER_ENCODING: + case http2::HD_UPGRADE: + continue; + case http2::HD_EARLY_DATA: + if (flags & http2::HDOP_STRIP_EARLY_DATA) { + continue; + } + break; + case http2::HD_SEC_WEBSOCKET_ACCEPT: + if (flags & http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT) { + continue; + } + break; + case http2::HD_SEC_WEBSOCKET_KEY: + if (flags & http2::HDOP_STRIP_SEC_WEBSOCKET_KEY) { + continue; + } + break; + case http2::HD_FORWARDED: + if (flags & http2::HDOP_STRIP_FORWARDED) { + continue; + } + + if (it_forwarded == std::end(headers)) { + it_forwarded = it; + continue; + } + + kv = &(*it_forwarded); + it_forwarded = it; + break; + case http2::HD_X_FORWARDED_FOR: + if (flags & http2::HDOP_STRIP_X_FORWARDED_FOR) { + continue; + } + + if (it_xff == std::end(headers)) { + it_xff = it; + continue; + } + + kv = &(*it_xff); + it_xff = it; + break; + case http2::HD_X_FORWARDED_PROTO: + if (flags & http2::HDOP_STRIP_X_FORWARDED_PROTO) { + continue; + } + + if (it_xfp == std::end(headers)) { + it_xfp = it; + continue; + } + + kv = &(*it_xfp); + it_xfp = it; + break; + case http2::HD_VIA: + if (flags & http2::HDOP_STRIP_VIA) { + continue; + } + + if (it_via == std::end(headers)) { + it_via = it; + continue; + } + + kv = &(*it_via); + it_via = it; + break; + } + nva.push_back( + make_nv_internal(kv->name, kv->value, kv->no_index, nv_flags)); + } +} +} // namespace + +void copy_headers_to_nva(std::vector &nva, + const HeaderRefs &headers, uint32_t flags) { + copy_headers_to_nva_internal(nva, headers, NGHTTP3_NV_FLAG_NONE, flags); +} + +void copy_headers_to_nva_nocopy(std::vector &nva, + const HeaderRefs &headers, uint32_t flags) { + copy_headers_to_nva_internal( + nva, headers, + NGHTTP3_NV_FLAG_NO_COPY_NAME | NGHTTP3_NV_FLAG_NO_COPY_VALUE, flags); +} + +int check_nv(const uint8_t *name, size_t namelen, const uint8_t *value, + size_t valuelen) { + if (!nghttp3_check_header_name(name, namelen)) { + return 0; + } + if (!nghttp3_check_header_value(value, valuelen)) { + return 0; + } + return 1; +} + +void dump_nv(FILE *out, const nghttp3_nv *nva, size_t nvlen) { + auto end = nva + nvlen; + for (; nva != end; ++nva) { + fprintf(out, "%s: %s\n", nva->name, nva->value); + } + fputc('\n', out); + fflush(out); +} + +} // namespace http3 + +} // namespace nghttp2 diff --git a/src/http3.h b/src/http3.h new file mode 100644 index 00000000..dfabd784 --- /dev/null +++ b/src/http3.h @@ -0,0 +1,129 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2021 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef HTTP3_H +#define HTTP3_H + +#include "nghttp2_config.h" + +#include +#include +#include + +#include + +#include "http2.h" +#include "template.h" + +namespace nghttp2 { + +namespace http3 { + +// Creates nghttp3_nv using |name| and |value| and returns it. The +// returned value only references the data pointer to name.c_str() and +// value.c_str(). If |no_index| is true, nghttp3_nv flags member has +// NGHTTP3_NV_FLAG_NEVER_INDEX flag set. +nghttp3_nv make_nv(const std::string &name, const std::string &value, + bool never_index = false); + +nghttp3_nv make_nv(const StringRef &name, const StringRef &value, + bool never_index = false); + +nghttp3_nv make_nv_nocopy(const std::string &name, const std::string &value, + bool never_index = false); + +nghttp3_nv make_nv_nocopy(const StringRef &name, const StringRef &value, + bool never_index = false); + +// Create nghttp3_nv from string literal |name| and |value|. +template +constexpr nghttp3_nv make_nv_ll(const char (&name)[N], const char (&value)[M]) { + return {(uint8_t *)name, (uint8_t *)value, N - 1, M - 1, + NGHTTP3_NV_FLAG_NO_COPY_NAME | NGHTTP3_NV_FLAG_NO_COPY_VALUE}; +} + +// Create nghttp3_nv from string literal |name| and c-string |value|. +template +nghttp3_nv make_nv_lc(const char (&name)[N], const char *value) { + return {(uint8_t *)name, (uint8_t *)value, N - 1, strlen(value), + NGHTTP3_NV_FLAG_NO_COPY_NAME}; +} + +template +nghttp3_nv make_nv_lc_nocopy(const char (&name)[N], const char *value) { + return {(uint8_t *)name, (uint8_t *)value, N - 1, strlen(value), + NGHTTP3_NV_FLAG_NO_COPY_NAME | NGHTTP3_NV_FLAG_NO_COPY_VALUE}; +} + +// Create nghttp3_nv from string literal |name| and std::string +// |value|. +template +nghttp3_nv make_nv_ls(const char (&name)[N], const std::string &value) { + return {(uint8_t *)name, (uint8_t *)value.c_str(), N - 1, value.size(), + NGHTTP3_NV_FLAG_NO_COPY_NAME}; +} + +template +nghttp3_nv make_nv_ls_nocopy(const char (&name)[N], const std::string &value) { + return {(uint8_t *)name, (uint8_t *)value.c_str(), N - 1, value.size(), + NGHTTP3_NV_FLAG_NO_COPY_NAME | NGHTTP3_NV_FLAG_NO_COPY_VALUE}; +} + +template +nghttp3_nv make_nv_ls_nocopy(const char (&name)[N], const StringRef &value) { + return {(uint8_t *)name, (uint8_t *)value.c_str(), N - 1, value.size(), + NGHTTP3_NV_FLAG_NO_COPY_NAME | NGHTTP3_NV_FLAG_NO_COPY_VALUE}; +} + +// Appends headers in |headers| to |nv|. |headers| must be indexed +// before this call (its element's token field is assigned). Certain +// headers, including disallowed headers in HTTP/3 spec and headers +// which require special handling (i.e. via), are not copied. |flags| +// is one or more of HeaderBuildOp flags. They tell function that +// certain header fields should not be added. +void copy_headers_to_nva(std::vector &nva, + const HeaderRefs &headers, uint32_t flags); + +// Just like copy_headers_to_nva(), but this adds +// NGHTTP3_NV_FLAG_NO_COPY_NAME and NGHTTP3_NV_FLAG_NO_COPY_VALUE. +void copy_headers_to_nva_nocopy(std::vector &nva, + const HeaderRefs &headers, uint32_t flags); + +// Dumps name/value pairs in |nva| to |out|. +void dump_nv(FILE *out, const nghttp3_nv *nva, size_t nvlen); + +// Checks the header name/value pair using nghttp3_check_header_name() +// and nghttp3_check_header_value(). If both function returns nonzero, +// this function returns nonzero. +int check_nv(const uint8_t *name, size_t namelen, const uint8_t *value, + size_t valuelen); + +// Dumps name/value pairs in |nva| to |out|. +void dump_nv(FILE *out, const nghttp3_nv *nva, size_t nvlen); + +} // namespace http3 + +} // namespace nghttp2 + +#endif // HTTP3_H diff --git a/src/memchunk.h b/src/memchunk.h index 46116c19..7a7f2e9b 100644 --- a/src/memchunk.h +++ b/src/memchunk.h @@ -113,13 +113,22 @@ template struct Pool { template struct Memchunks { Memchunks(Pool *pool) - : pool(pool), head(nullptr), tail(nullptr), len(0) {} + : pool(pool), + head(nullptr), + tail(nullptr), + len(0), + mark(nullptr), + mark_pos(nullptr), + mark_offset(0) {} Memchunks(const Memchunks &) = delete; Memchunks(Memchunks &&other) noexcept : pool{other.pool}, // keep other.pool head{std::exchange(other.head, nullptr)}, tail{std::exchange(other.tail, nullptr)}, - len{std::exchange(other.len, 0)} {} + len{std::exchange(other.len, 0)}, + mark{std::exchange(other.mark, nullptr)}, + mark_pos{std::exchange(other.mark_pos, nullptr)}, + mark_offset{std::exchange(other.mark_offset, 0)} {} Memchunks &operator=(const Memchunks &) = delete; Memchunks &operator=(Memchunks &&other) noexcept { if (this == &other) { @@ -132,6 +141,9 @@ template struct Memchunks { head = std::exchange(other.head, nullptr); tail = std::exchange(other.tail, nullptr); len = std::exchange(other.len, 0); + mark = std::exchange(other.mark, nullptr); + mark_pos = std::exchange(other.mark_pos, nullptr); + mark_offset = std::exchange(other.mark_offset, 0); return *this; } @@ -200,6 +212,8 @@ template struct Memchunks { return len; } size_t remove(void *dest, size_t count) { + assert(mark == nullptr); + if (!tail || count == 0) { return 0; } @@ -231,6 +245,8 @@ template struct Memchunks { return first - static_cast(dest); } size_t remove(Memchunks &dest, size_t count) { + assert(mark == nullptr); + if (!tail || count == 0) { return 0; } @@ -262,6 +278,7 @@ template struct Memchunks { } size_t remove(Memchunks &dest) { assert(pool == dest.pool); + assert(mark == nullptr); if (head == nullptr) { return 0; @@ -284,6 +301,8 @@ template struct Memchunks { return n; } size_t drain(size_t count) { + assert(mark == nullptr); + auto ndata = count; auto m = head; while (m) { @@ -305,6 +324,38 @@ template struct Memchunks { } return ndata - count; } + size_t drain_mark(size_t count) { + auto ndata = count; + auto m = head; + while (m) { + auto next = m->next; + auto n = std::min(count, m->len()); + m->pos += n; + count -= n; + len -= n; + mark_offset -= n; + + if (m->len() > 0) { + assert(mark != m || m->pos <= mark_pos); + break; + } + if (mark == m) { + assert(m->pos <= mark_pos); + + mark = nullptr; + mark_pos = nullptr; + mark_offset = 0; + } + + pool->recycle(m); + m = next; + } + head = m; + if (head == nullptr) { + tail = nullptr; + } + return ndata - count; + } int riovec(struct iovec *iov, int iovcnt) const { if (!head) { return 0; @@ -317,7 +368,41 @@ template struct Memchunks { } return i; } + int riovec_mark(struct iovec *iov, int iovcnt) { + if (!head || iovcnt == 0) { + return 0; + } + + int i = 0; + Memchunk *m; + if (mark) { + if (mark_pos != mark->last) { + iov[0].iov_base = mark_pos; + iov[0].iov_len = mark->len() - (mark_pos - mark->pos); + + mark_pos = mark->last; + mark_offset += iov[0].iov_len; + i = 1; + } + m = mark->next; + } else { + i = 0; + m = head; + } + + for (; i < iovcnt && m; ++i, m = m->next) { + iov[i].iov_base = m->pos; + iov[i].iov_len = m->len(); + + mark = m; + mark_pos = m->last; + mark_offset += m->len(); + } + + return i; + } size_t rleft() const { return len; } + size_t rleft_mark() const { return len - mark_offset; } void reset() { for (auto m = head; m;) { auto next = m->next; @@ -325,12 +410,17 @@ template struct Memchunks { m = next; } len = 0; - head = tail = nullptr; + head = tail = mark = nullptr; + mark_pos = nullptr; + mark_offset = 0; } Pool *pool; Memchunk *head, *tail; size_t len; + Memchunk *mark; + uint8_t *mark_pos; + size_t mark_offset; }; // Wrapper around Memchunks to offer "peeking" functionality. diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index d3d40e51..ca07f2f1 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -145,7 +145,8 @@ Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool, accesslog_written_(false), new_affinity_cookie_(false), blocked_request_data_eof_(false), - expect_100_continue_(false) { + expect_100_continue_(false), + stop_reading_(false) { auto &timeoutconf = get_config()->http2.timeout; @@ -164,6 +165,7 @@ Downstream::Downstream(Upstream *upstream, MemchunkPool *mcpool, downstream_wtimer_.data = this; rcbufs_.reserve(32); + rcbufs3_.reserve(32); } Downstream::~Downstream() { @@ -203,6 +205,10 @@ Downstream::~Downstream() { // explicitly. dconn_.reset(); + for (auto rcbuf : rcbufs3_) { + nghttp3_rcbuf_decref(rcbuf); + } + for (auto rcbuf : rcbufs_) { nghttp2_rcbuf_decref(rcbuf); } @@ -1128,6 +1134,11 @@ void Downstream::add_rcbuf(nghttp2_rcbuf *rcbuf) { rcbufs_.push_back(rcbuf); } +void Downstream::add_rcbuf(nghttp3_rcbuf *rcbuf) { + nghttp3_rcbuf_incref(rcbuf); + rcbufs3_.push_back(rcbuf); +} + void Downstream::set_downstream_addr_group( const std::shared_ptr &group) { group_ = group; @@ -1169,4 +1180,8 @@ bool Downstream::get_expect_100_continue() const { return expect_100_continue_; } +bool Downstream::get_stop_reading() const { return stop_reading_; } + +void Downstream::set_stop_reading(bool f) { stop_reading_ = f; } + } // namespace shrpx diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index 10063b50..20ec54c4 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -38,6 +38,8 @@ #include +#include + #include "llhttp.h" #include "shrpx_io_control.h" @@ -488,6 +490,7 @@ public: BlockAllocator &get_block_allocator(); void add_rcbuf(nghttp2_rcbuf *rcbuf); + void add_rcbuf(nghttp3_rcbuf *rcbuf); void set_downstream_addr_group(const std::shared_ptr &group); @@ -513,6 +516,9 @@ public: bool get_expect_100_continue() const; + bool get_stop_reading() const; + void set_stop_reading(bool f); + enum { EVENT_ERROR = 0x1, EVENT_TIMEOUT = 0x2, @@ -527,6 +533,7 @@ private: BlockAllocator balloc_; std::vector rcbufs_; + std::vector rcbufs3_; Request req_; Response resp_; @@ -606,6 +613,7 @@ private: bool blocked_request_data_eof_; // true if request contains "expect: 100-continue" header field. bool expect_100_continue_; + bool stop_reading_; }; } // namespace shrpx diff --git a/src/shrpx_http3_upstream.cc b/src/shrpx_http3_upstream.cc index 33e62a3e..7f43ff95 100644 --- a/src/shrpx_http3_upstream.cc +++ b/src/shrpx_http3_upstream.cc @@ -34,6 +34,8 @@ #include "shrpx_log.h" #include "shrpx_quic.h" #include "shrpx_worker.h" +#include "shrpx_http.h" +#include "http3.h" #include "util.h" namespace shrpx { @@ -71,8 +73,25 @@ fail: } } // namespace +namespace { +size_t downstream_queue_size(Worker *worker) { + auto &downstreamconf = *worker->get_downstream_config(); + + if (get_config()->http2_proxy) { + return downstreamconf.connections_per_host; + } + + return downstreamconf.connections_per_frontend; +} +} // namespace + Http3Upstream::Http3Upstream(ClientHandler *handler) - : handler_{handler}, conn_{nullptr}, tls_alert_{0}, httpconn_{nullptr} { + : handler_{handler}, + conn_{nullptr}, + tls_alert_{0}, + httpconn_{nullptr}, + downstream_queue_{downstream_queue_size(handler->get_worker()), + !get_config()->http2_proxy} { ev_timer_init(&timer_, timeoutcb, 0., 0.); timer_.data = this; @@ -150,6 +169,13 @@ int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token, return NGTCP2_ERR_CALLBACK_FAILURE; } + auto upstream = static_cast(user_data); + auto handler = upstream->get_client_handler(); + auto worker = handler->get_worker(); + auto quic_connection_handler = worker->get_quic_connection_handler(); + + quic_connection_handler->add_connection_id(cid, handler); + return 0; } } // namespace @@ -168,6 +194,209 @@ int remove_connection_id(ngtcp2_conn *conn, const ngtcp2_cid *cid, } } // namespace +void Http3Upstream::http_begin_request_headers(int64_t stream_id) { + auto downstream = + std::make_unique(this, handler_->get_mcpool(), stream_id); + nghttp3_conn_set_stream_user_data(httpconn_, stream_id, downstream.get()); + + downstream->reset_upstream_rtimer(); + + handler_->repeat_read_timer(); + + auto &req = downstream->request(); + req.http_major = 3; + req.http_minor = 0; + + add_pending_downstream(std::move(downstream)); +} + +void Http3Upstream::add_pending_downstream( + std::unique_ptr downstream) { + downstream_queue_.add_pending(std::move(downstream)); +} + +namespace { +int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id, + uint64_t offset, const uint8_t *data, size_t datalen, + void *user_data, void *stream_user_data) { + auto upstream = static_cast(user_data); + + if (upstream->recv_stream_data(flags, stream_id, data, datalen) != 0) { + return NGTCP2_ERR_CALLBACK_FAILURE; + } + + return 0; +} +} // namespace + +int Http3Upstream::recv_stream_data(uint32_t flags, int64_t stream_id, + const uint8_t *data, size_t datalen) { + assert(httpconn_); + + auto nconsumed = nghttp3_conn_read_stream( + httpconn_, stream_id, data, datalen, flags & NGTCP2_STREAM_DATA_FLAG_FIN); + if (nconsumed < 0) { + LOG(ERROR) << "nghttp3_conn_read_stream: " << nghttp3_strerror(nconsumed); + last_error_ = quic::err_application(nconsumed); + return -1; + } + + ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed); + ngtcp2_conn_extend_max_offset(conn_, nconsumed); + + return 0; +} + +namespace { +int stream_close(ngtcp2_conn *conn, int64_t stream_id, uint64_t app_error_code, + void *user_data, void *stream_user_data) { + auto upstream = static_cast(user_data); + + if (upstream->stream_close(stream_id, app_error_code) != 0) { + return NGTCP2_ERR_CALLBACK_FAILURE; + } + + return 0; +} +} // namespace + +int Http3Upstream::stream_close(int64_t stream_id, uint64_t app_error_code) { + if (!httpconn_) { + return 0; + } + + auto rv = nghttp3_conn_close_stream(httpconn_, stream_id, app_error_code); + switch (rv) { + case 0: + break; + case NGHTTP3_ERR_STREAM_NOT_FOUND: + if (ngtcp2_is_bidi_stream(stream_id)) { + ngtcp2_conn_extend_max_streams_bidi(conn_, 1); + } + break; + default: + LOG(ERROR) << "nghttp3_conn_close_stream: " << nghttp3_strerror(rv); + last_error_ = quic::err_application(rv); + return -1; + } + + return 0; +} + +namespace { +int acked_stream_data_offset(ngtcp2_conn *conn, int64_t stream_id, + uint64_t offset, uint64_t datalen, void *user_data, + void *stream_user_data) { + auto upstream = static_cast(user_data); + + if (upstream->acked_stream_data_offset(stream_id, datalen) != 0) { + return NGTCP2_ERR_CALLBACK_FAILURE; + } + + return 0; +} +} // namespace + +int Http3Upstream::acked_stream_data_offset(int64_t stream_id, + uint64_t datalen) { + if (!httpconn_) { + return 0; + } + + auto rv = nghttp3_conn_add_ack_offset(httpconn_, stream_id, datalen); + if (rv != 0) { + LOG(ERROR) << "nghttp3_conn_add_ack_offset: " << nghttp3_strerror(rv); + return -1; + } + + return 0; +} + +namespace { +int extend_max_stream_data(ngtcp2_conn *conn, int64_t stream_id, + uint64_t max_data, void *user_data, + void *stream_user_data) { + auto upstream = static_cast(user_data); + + if (upstream->extend_max_stream_data(stream_id) != 0) { + return NGTCP2_ERR_CALLBACK_FAILURE; + } + + return 0; +} +} // namespace + +int Http3Upstream::extend_max_stream_data(int64_t stream_id) { + if (!httpconn_) { + return 0; + } + + auto rv = nghttp3_conn_unblock_stream(httpconn_, stream_id); + if (rv != 0) { + LOG(ERROR) << "nghttp3_conn_unblock_stream: " << nghttp3_strerror(rv); + return -1; + } + + return 0; +} + +namespace { +int extend_max_remote_streams_bidi(ngtcp2_conn *conn, uint64_t max_streams, + void *user_data) { + auto upstream = static_cast(user_data); + + upstream->extend_max_remote_streams_bidi(max_streams); + + return 0; +} +} // namespace + +void Http3Upstream::extend_max_remote_streams_bidi(uint64_t max_streams) { + nghttp3_conn_set_max_client_streams_bidi(httpconn_, max_streams); +} + +namespace { +int stream_reset(ngtcp2_conn *conn, int64_t stream_id, uint64_t final_size, + uint64_t app_error_code, void *user_data, + void *stream_user_data) { + auto upstream = static_cast(user_data); + + if (upstream->http_shutdown_stream_read(stream_id) != 0) { + return NGTCP2_ERR_CALLBACK_FAILURE; + } + + return 0; +} +} // namespace + +int Http3Upstream::http_shutdown_stream_read(int64_t stream_id) { + if (!httpconn_) { + return 0; + } + + auto rv = nghttp3_conn_shutdown_stream_read(httpconn_, stream_id); + if (rv != 0) { + LOG(ERROR) << "nghttp3_conn_shutdown_stream_read: " << nghttp3_strerror(rv); + return -1; + } + + return 0; +} + +namespace { +int stream_stop_sending(ngtcp2_conn *conn, int64_t stream_id, + uint64_t app_error_code, void *user_data, + void *stream_user_data) { + auto upstream = static_cast(user_data); + + if (upstream->http_shutdown_stream_read(stream_id) != 0) { + return NGTCP2_ERR_CALLBACK_FAILURE; + } + + return 0; +} +} // namespace + int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr, const Address &local_addr, const ngtcp2_pkt_hd &initial_hd) { @@ -184,10 +413,10 @@ int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr, ngtcp2_crypto_encrypt_cb, ngtcp2_crypto_decrypt_cb, ngtcp2_crypto_hp_mask_cb, - nullptr, // recv_stream_data - nullptr, // acked_stream_data_offset + shrpx::recv_stream_data, + shrpx::acked_stream_data_offset, nullptr, // stream_open - nullptr, // stream_close + shrpx::stream_close, nullptr, // recv_stateless_reset nullptr, // recv_retry nullptr, // extend_max_local_streams_bidi @@ -198,10 +427,10 @@ int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr, ngtcp2_crypto_update_key_cb, nullptr, // path_validation nullptr, // select_preferred_addr - nullptr, // stream_reset - nullptr, // extend_max_remote_streams_bidi + shrpx::stream_reset, + shrpx::extend_max_remote_streams_bidi, nullptr, // extend_max_remote_streams_uni - nullptr, // extend_max_stream_data + shrpx::extend_max_stream_data, nullptr, // dcid_status nullptr, // handshake_confirmed nullptr, // recv_new_token @@ -211,7 +440,7 @@ int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr, nullptr, // ack_datagram nullptr, // lost_datagram ngtcp2_crypto_get_path_challenge_data_cb, - nullptr, // stream_stop_sending + shrpx::stream_stop_sending, }; initial_client_dcid_ = initial_hd.dcid; @@ -230,13 +459,14 @@ int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr, settings.max_window = 6_m; settings.max_stream_window = 6_m; settings.max_udp_payload_size = SHRPX_MAX_UDP_PAYLOAD_SIZE; - settings.rand_ctx = {&worker->get_randgen()}; + settings.rand_ctx.native_handle = &worker->get_randgen(); auto config = get_config(); auto &quicconf = config->quic; ngtcp2_transport_params params; ngtcp2_transport_params_default(¶ms); + params.initial_max_streams_bidi = 100; params.initial_max_streams_uni = 3; params.initial_max_data = 1_m; params.initial_max_stream_data_bidi_remote = 256_k; @@ -282,6 +512,7 @@ int Http3Upstream::on_write() { } int Http3Upstream::write_streams() { + std::array vec; std::array buf; size_t max_pktcnt = std::min(static_cast(64_k), ngtcp2_conn_get_send_quantum(conn_)) / @@ -290,6 +521,7 @@ int Http3Upstream::write_streams() { uint8_t *bufpos = buf.data(); ngtcp2_path_storage ps, prev_ps; size_t pktcnt = 0; + int rv; auto ts = quic_timestamp(); ngtcp2_path_storage_zero(&ps); @@ -298,8 +530,22 @@ int Http3Upstream::write_streams() { for (;;) { int64_t stream_id = -1; int fin = 0; + nghttp3_ssize sveccnt = 0; + + if (httpconn_ && ngtcp2_conn_get_max_data_left(conn_)) { + sveccnt = nghttp3_conn_writev_stream(httpconn_, &stream_id, &fin, + vec.data(), vec.size()); + if (sveccnt < 0) { + LOG(ERROR) << "nghttp3_conn_writev_stream: " + << nghttp3_strerror(sveccnt); + last_error_ = quic::err_application(sveccnt); + return handle_error(); + } + } ngtcp2_ssize ndatalen; + auto v = vec.data(); + auto vcnt = static_cast(sveccnt); uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE; if (fin) { @@ -308,17 +554,37 @@ int Http3Upstream::write_streams() { auto nwrite = ngtcp2_conn_writev_stream( conn_, &ps.path, &pi, bufpos, SHRPX_MAX_UDP_PAYLOAD_SIZE, &ndatalen, - flags, stream_id, nullptr, 0, ts); + flags, stream_id, reinterpret_cast(v), vcnt, ts); if (nwrite < 0) { switch (nwrite) { case NGTCP2_ERR_STREAM_DATA_BLOCKED: assert(ndatalen == -1); + rv = nghttp3_conn_block_stream(httpconn_, stream_id); + if (rv != 0) { + LOG(ERROR) << "nghttp3_conn_block_stream: " << nghttp3_strerror(rv); + last_error_ = quic::err_application(rv); + return handle_error(); + } continue; case NGTCP2_ERR_STREAM_SHUT_WR: assert(ndatalen == -1); + rv = nghttp3_conn_shutdown_stream_write(httpconn_, stream_id); + if (rv != 0) { + LOG(ERROR) << "nghttp3_conn_shutdown_stream_write: " + << nghttp3_strerror(rv); + last_error_ = quic::err_application(rv); + return handle_error(); + } continue; case NGTCP2_ERR_WRITE_MORE: assert(ndatalen >= 0); + rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen); + if (rv != 0) { + LOG(ERROR) << "nghttp3_conn_add_write_offset: " + << nghttp3_strerror(rv); + last_error_ = quic::err_application(rv); + return handle_error(); + } continue; } @@ -332,7 +598,12 @@ int Http3Upstream::write_streams() { return handle_error(); } else if (ndatalen >= 0) { - // TODO do something + rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen); + if (rv != 0) { + LOG(ERROR) << "nghttp3_conn_add_write_offset: " << nghttp3_strerror(rv); + last_error_ = quic::err_application(rv); + return handle_error(); + } } if (nwrite == 0) { @@ -399,6 +670,16 @@ int Http3Upstream::on_timeout(Downstream *downstream) { return 0; } int Http3Upstream::on_downstream_abort_request(Downstream *downstream, unsigned int status_code) { + int rv; + + rv = error_reply(downstream, status_code); + + if (rv != 0) { + return -1; + } + + handler_->signal_write(); + return 0; } @@ -407,33 +688,458 @@ int Http3Upstream::on_downstream_abort_request_with_https_redirect( return 0; } -int Http3Upstream::downstream_read(DownstreamConnection *dconn) { return 0; } +namespace { +uint64_t +infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code) { + // NGHTTP2_REFUSED_STREAM is important because it tells upstream + // client to retry. + switch (downstream_error_code) { + case NGHTTP2_NO_ERROR: + return NGHTTP3_H3_NO_ERROR; + case NGHTTP2_REFUSED_STREAM: + return NGHTTP3_H3_REQUEST_REJECTED; + default: + return NGHTTP3_H3_INTERNAL_ERROR; + } +} +} // namespace -int Http3Upstream::downstream_write(DownstreamConnection *dconn) { return 0; } +int Http3Upstream::downstream_read(DownstreamConnection *dconn) { + auto downstream = dconn->get_downstream(); -int Http3Upstream::downstream_eof(DownstreamConnection *dconn) { return 0; } + if (downstream->get_response_state() == DownstreamState::MSG_RESET) { + // The downstream stream was reset (canceled). In this case, + // RST_STREAM to the upstream and delete downstream connection + // here. Deleting downstream will be taken place at + // on_stream_close_callback. + shutdown_stream(downstream, + infer_upstream_shutdown_stream_error_code( + downstream->get_response_rst_stream_error_code())); + downstream->pop_downstream_connection(); + // dconn was deleted + dconn = nullptr; + } else if (downstream->get_response_state() == + DownstreamState::MSG_BAD_HEADER) { + if (error_reply(downstream, 502) != 0) { + return -1; + } + downstream->pop_downstream_connection(); + // dconn was deleted + dconn = nullptr; + } else { + auto rv = downstream->on_read(); + if (rv == SHRPX_ERR_EOF) { + if (downstream->get_request_header_sent()) { + return downstream_eof(dconn); + } + return SHRPX_ERR_RETRY; + } + if (rv == SHRPX_ERR_DCONN_CANCELED) { + downstream->pop_downstream_connection(); + handler_->signal_write(); + return 0; + } + if (rv != 0) { + if (rv != SHRPX_ERR_NETWORK) { + if (LOG_ENABLED(INFO)) { + DCLOG(INFO, dconn) << "HTTP parser failure"; + } + } + return downstream_error(dconn, Downstream::EVENT_ERROR); + } + + if (downstream->can_detach_downstream_connection()) { + // Keep-alive + downstream->detach_downstream_connection(); + } + } + + handler_->signal_write(); + + // At this point, downstream may be deleted. + + return 0; +} + +int Http3Upstream::downstream_write(DownstreamConnection *dconn) { + int rv; + rv = dconn->on_write(); + if (rv == SHRPX_ERR_NETWORK) { + return downstream_error(dconn, Downstream::EVENT_ERROR); + } + if (rv != 0) { + return rv; + } + return 0; +} + +int Http3Upstream::downstream_eof(DownstreamConnection *dconn) { + auto downstream = dconn->get_downstream(); + + if (LOG_ENABLED(INFO)) { + DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id(); + } + + // Delete downstream connection. If we don't delete it here, it will + // be pooled in on_stream_close_callback. + downstream->pop_downstream_connection(); + // dconn was deleted + dconn = nullptr; + // downstream wil be deleted in on_stream_close_callback. + if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) { + // Server may indicate the end of the request by EOF + if (LOG_ENABLED(INFO)) { + ULOG(INFO, this) << "Downstream body was ended by EOF"; + } + downstream->set_response_state(DownstreamState::MSG_COMPLETE); + + // For tunneled connection, MSG_COMPLETE signals + // downstream_read_data_callback to send RST_STREAM after pending + // response body is sent. This is needed to ensure that RST_STREAM + // is sent after all pending data are sent. + on_downstream_body_complete(downstream); + } else if (downstream->get_response_state() != + DownstreamState::MSG_COMPLETE) { + // If stream was not closed, then we set MSG_COMPLETE and let + // on_stream_close_callback delete downstream. + if (error_reply(downstream, 502) != 0) { + return -1; + } + } + handler_->signal_write(); + // At this point, downstream may be deleted. + return 0; +} int Http3Upstream::downstream_error(DownstreamConnection *dconn, int events) { + auto downstream = dconn->get_downstream(); + + if (LOG_ENABLED(INFO)) { + if (events & Downstream::EVENT_ERROR) { + DCLOG(INFO, dconn) << "Downstream network/general error"; + } else { + DCLOG(INFO, dconn) << "Timeout"; + } + if (downstream->get_upgraded()) { + DCLOG(INFO, dconn) << "Note: this is tunnel connection"; + } + } + + // Delete downstream connection. If we don't delete it here, it will + // be pooled in on_stream_close_callback. + downstream->pop_downstream_connection(); + // dconn was deleted + dconn = nullptr; + + if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) { + // For SSL tunneling, we issue RST_STREAM. For other types of + // stream, we don't have to do anything since response was + // complete. + if (downstream->get_upgraded()) { + shutdown_stream(downstream, NGHTTP3_H3_NO_ERROR); + } + } else { + if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) { + if (downstream->get_upgraded()) { + on_downstream_body_complete(downstream); + } else { + shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR); + } + } else { + unsigned int status; + if (events & Downstream::EVENT_TIMEOUT) { + if (downstream->get_request_header_sent()) { + status = 504; + } else { + status = 408; + } + } else { + status = 502; + } + if (error_reply(downstream, status) != 0) { + return -1; + } + } + downstream->set_response_state(DownstreamState::MSG_COMPLETE); + } + handler_->signal_write(); + // At this point, downstream may be deleted. return 0; } ClientHandler *Http3Upstream::get_client_handler() const { return handler_; } +namespace { +nghttp3_ssize downstream_read_data_callback(nghttp3_conn *conn, + int64_t stream_id, nghttp3_vec *vec, + size_t veccnt, uint32_t *pflags, + void *conn_user_data, + void *stream_user_data) { + auto downstream = static_cast(stream_user_data); + + assert(downstream); + + auto body = downstream->get_response_buf(); + + assert(body); + + if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) { + *pflags |= NGHTTP3_DATA_FLAG_EOF; + } else if (body->rleft_mark() == 0) { + downstream->disable_upstream_wtimer(); + return NGHTTP3_ERR_WOULDBLOCK; + } + + downstream->reset_upstream_wtimer(); + + veccnt = body->riovec_mark(reinterpret_cast(vec), veccnt); + + assert(veccnt); + + downstream->response_sent_body_length += nghttp3_vec_len(vec, veccnt); + + return veccnt; +} +} // namespace + int Http3Upstream::on_downstream_header_complete(Downstream *downstream) { + int rv; + + const auto &req = downstream->request(); + auto &resp = downstream->response(); + + auto &balloc = downstream->get_block_allocator(); + + if (LOG_ENABLED(INFO)) { + if (downstream->get_non_final_response()) { + DLOG(INFO, downstream) << "HTTP non-final response header"; + } else { + DLOG(INFO, downstream) << "HTTP response header completed"; + } + } + + auto config = get_config(); + auto &httpconf = config->http; + + if (!config->http2_proxy && !httpconf.no_location_rewrite) { + downstream->rewrite_location_response_header(req.scheme); + } + +#ifdef HAVE_MRUBY + if (!downstream->get_non_final_response()) { + auto dconn = downstream->get_downstream_connection(); + const auto &group = dconn->get_downstream_addr_group(); + if (group) { + const auto &dmruby_ctx = group->shared_addr->mruby_ctx; + + if (dmruby_ctx->run_on_response_proc(downstream) != 0) { + if (error_reply(downstream, 500) != 0) { + return -1; + } + // Returning -1 will signal deletion of dconn. + return -1; + } + + if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) { + return -1; + } + } + + auto worker = handler_->get_worker(); + auto mruby_ctx = worker->get_mruby_context(); + + if (mruby_ctx->run_on_response_proc(downstream) != 0) { + if (error_reply(downstream, 500) != 0) { + return -1; + } + // Returning -1 will signal deletion of dconn. + return -1; + } + + if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) { + return -1; + } + } +#endif // HAVE_MRUBY + + auto &http2conf = config->http2; + + auto nva = std::vector(); + // 4 means :status and possible server, via, and set-cookie (for + // affinity cookie) header field. + nva.reserve(resp.fs.headers().size() + 4 + + httpconf.add_response_headers.size()); + + if (downstream->get_non_final_response()) { + auto response_status = http2::stringify_status(balloc, resp.http_status); + + nva.push_back(http3::make_nv_ls_nocopy(":status", response_status)); + + http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), + http2::HDOP_STRIP_ALL); + + if (LOG_ENABLED(INFO)) { + log_response_headers(downstream, nva); + } + + rv = nghttp3_conn_submit_info(httpconn_, downstream->get_stream_id(), + nva.data(), nva.size()); + + resp.fs.clear_headers(); + + if (rv != 0) { + ULOG(FATAL, this) << "nghttp3_conn_submit_info() failed"; + return -1; + } + + return 0; + } + + auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA; + StringRef response_status; + + if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) { + response_status = http2::stringify_status(balloc, 200); + striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT; + } else { + response_status = http2::stringify_status(balloc, resp.http_status); + } + + nva.push_back(http3::make_nv_ls_nocopy(":status", response_status)); + + http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags); + + if (!config->http2_proxy && !httpconf.no_server_rewrite) { + nva.push_back(http3::make_nv_ls_nocopy("server", httpconf.server_name)); + } else { + auto server = resp.fs.header(http2::HD_SERVER); + if (server) { + nva.push_back(http3::make_nv_ls_nocopy("server", (*server).value)); + } + } + + if (!req.regular_connect_method() || !downstream->get_upgraded()) { + auto affinity_cookie = downstream->get_affinity_cookie_to_send(); + if (affinity_cookie) { + auto dconn = downstream->get_downstream_connection(); + assert(dconn); + auto &group = dconn->get_downstream_addr_group(); + auto &shared_addr = group->shared_addr; + auto &cookieconf = shared_addr->affinity.cookie; + auto secure = + http::require_cookie_secure_attribute(cookieconf.secure, req.scheme); + auto cookie_str = http::create_affinity_cookie( + balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure); + nva.push_back(http3::make_nv_ls_nocopy("set-cookie", cookie_str)); + } + } + + auto via = resp.fs.header(http2::HD_VIA); + if (httpconf.no_via) { + if (via) { + nva.push_back(http3::make_nv_ls_nocopy("via", (*via).value)); + } + } else { + // we don't create more than 16 bytes in + // http::create_via_header_value. + size_t len = 16; + if (via) { + len += via->value.size() + 2; + } + + auto iov = make_byte_ref(balloc, len + 1); + auto p = iov.base; + if (via) { + p = std::copy(std::begin(via->value), std::end(via->value), p); + p = util::copy_lit(p, ", "); + } + p = http::create_via_header_value(p, resp.http_major, resp.http_minor); + *p = '\0'; + + nva.push_back(http3::make_nv_ls_nocopy("via", StringRef{iov.base, p})); + } + + for (auto &p : httpconf.add_response_headers) { + nva.push_back(http3::make_nv_nocopy(p.name, p.value)); + } + + if (LOG_ENABLED(INFO)) { + log_response_headers(downstream, nva); + } + + if (http2conf.upstream.debug.dump.response_header) { + http3::dump_nv(http2conf.upstream.debug.dump.response_header, nva.data(), + nva.size()); + } + + nghttp3_data_reader data_read; + data_read.read_data = downstream_read_data_callback; + + nghttp3_data_reader *data_readptr; + + if (downstream->expect_response_body() || + downstream->expect_response_trailer()) { + data_readptr = &data_read; + } else { + data_readptr = nullptr; + } + + rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(), + nva.data(), nva.size(), data_readptr); + if (rv != 0) { + ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed"; + return -1; + } + + if (data_readptr) { + downstream->reset_upstream_wtimer(); + } + return 0; } int Http3Upstream::on_downstream_body(Downstream *downstream, const uint8_t *data, size_t len, bool flush) { + auto body = downstream->get_response_buf(); + body->append(data, len); + + if (flush) { + nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id()); + + downstream->ensure_upstream_wtimer(); + } + return 0; } int Http3Upstream::on_downstream_body_complete(Downstream *downstream) { + if (LOG_ENABLED(INFO)) { + DLOG(INFO, downstream) << "HTTP response completed"; + } + + auto &resp = downstream->response(); + + if (!downstream->validate_response_recv_body_length()) { + shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR); + resp.connection_close = true; + return 0; + } + + nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id()); + downstream->ensure_upstream_wtimer(); + return 0; } -void Http3Upstream::on_handler_delete() {} +void Http3Upstream::on_handler_delete() { + for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) { + if (d->get_dispatch_state() == DispatchState::ACTIVE && + d->accesslog_ready()) { + handler_->write_accesslog(d); + } + } +} int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) { return 0; @@ -443,11 +1149,85 @@ void Http3Upstream::pause_read(IOCtrlReason reason) {} int Http3Upstream::resume_read(IOCtrlReason reason, Downstream *downstream, size_t consumed) { + consume(downstream->get_stream_id(), consumed); + + auto &req = downstream->request(); + + req.consume(consumed); + + handler_->signal_write(); + return 0; } int Http3Upstream::send_reply(Downstream *downstream, const uint8_t *body, size_t bodylen) { + int rv; + + nghttp3_data_reader data_read, *data_read_ptr = nullptr; + + if (bodylen) { + data_read.read_data = downstream_read_data_callback; + data_read_ptr = &data_read; + } + + const auto &resp = downstream->response(); + auto config = get_config(); + auto &httpconf = config->http; + + auto &balloc = downstream->get_block_allocator(); + + const auto &headers = resp.fs.headers(); + auto nva = std::vector(); + // 2 for :status and server + nva.reserve(2 + headers.size() + httpconf.add_response_headers.size()); + + auto response_status = http2::stringify_status(balloc, resp.http_status); + + nva.push_back(http3::make_nv_ls_nocopy(":status", response_status)); + + for (auto &kv : headers) { + if (kv.name.empty() || kv.name[0] == ':') { + continue; + } + switch (kv.token) { + case http2::HD_CONNECTION: + case http2::HD_KEEP_ALIVE: + case http2::HD_PROXY_CONNECTION: + case http2::HD_TE: + case http2::HD_TRANSFER_ENCODING: + case http2::HD_UPGRADE: + continue; + } + nva.push_back(http3::make_nv_nocopy(kv.name, kv.value, kv.no_index)); + } + + if (!resp.fs.header(http2::HD_SERVER)) { + nva.push_back(http3::make_nv_ls_nocopy("server", config->http.server_name)); + } + + for (auto &p : httpconf.add_response_headers) { + nva.push_back(http3::make_nv_nocopy(p.name, p.value)); + } + + rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(), + nva.data(), nva.size(), data_read_ptr); + if (nghttp3_err_is_fatal(rv)) { + ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: " + << nghttp3_strerror(rv); + return -1; + } + + auto buf = downstream->get_response_buf(); + + buf->append(body, bodylen); + + downstream->set_response_state(DownstreamState::MSG_COMPLETE); + + if (data_read_ptr) { + downstream->reset_upstream_wtimer(); + } + return 0; } @@ -656,8 +1436,12 @@ void Http3Upstream::reset_timer() { namespace { int http_deferred_consume(nghttp3_conn *conn, int64_t stream_id, - size_t nsoncumed, void *user_data, + size_t nconsumed, void *user_data, void *stream_user_data) { + auto upstream = static_cast(user_data); + + upstream->consume(stream_id, nconsumed); + return 0; } } // namespace @@ -666,13 +1450,48 @@ namespace { int http_acked_stream_data(nghttp3_conn *conn, int64_t stream_id, size_t datalen, void *user_data, void *stream_user_data) { + auto upstream = static_cast(user_data); + auto downstream = static_cast(stream_user_data); + + assert(downstream); + + if (upstream->http_acked_stream_data(downstream, datalen) != 0) { + return NGHTTP3_ERR_CALLBACK_FAILURE; + } + return 0; } } // namespace +int Http3Upstream::http_acked_stream_data(Downstream *downstream, + size_t datalen) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Stream " << downstream->get_stream_id() << " " << datalen + << " bytes acknowledged"; + } + + auto body = downstream->get_response_buf(); + auto drained = body->drain_mark(datalen); + + assert(datalen == drained); + + if (downstream->resume_read(SHRPX_NO_BUFFER, datalen) != 0) { + return -1; + } + + return 0; +} + namespace { int http_begin_request_headers(nghttp3_conn *conn, int64_t stream_id, void *user_data, void *stream_user_data) { + if (!ngtcp2_is_bidi_stream(stream_id)) { + return 0; + } + + auto upstream = static_cast(user_data); + upstream->http_begin_request_headers(stream_id); + return 0; } } // namespace @@ -682,17 +1501,300 @@ int http_recv_request_header(nghttp3_conn *conn, int64_t stream_id, int32_t token, nghttp3_rcbuf *name, nghttp3_rcbuf *value, uint8_t flags, void *user_data, void *stream_user_data) { + auto upstream = static_cast(user_data); + auto downstream = static_cast(stream_user_data); + + if (!downstream || downstream->get_stop_reading()) { + return 0; + } + + if (upstream->http_recv_request_header(downstream, token, name, value, + flags) != 0) { + return NGHTTP3_ERR_CALLBACK_FAILURE; + } + return 0; } } // namespace +int Http3Upstream::http_recv_request_header(Downstream *downstream, + int32_t h3token, + nghttp3_rcbuf *name, + nghttp3_rcbuf *value, + uint8_t flags) { + auto namebuf = nghttp3_rcbuf_get_buf(name); + auto valuebuf = nghttp3_rcbuf_get_buf(value); + auto &req = downstream->request(); + auto config = get_config(); + auto &httpconf = config->http; + + if (req.fs.buffer_size() + namebuf.len + valuebuf.len > + httpconf.request_header_field_buffer || + req.fs.num_fields() >= httpconf.max_request_header_fields) { + downstream->set_stop_reading(true); + + if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) { + return 0; + } + + if (LOG_ENABLED(INFO)) { + ULOG(INFO, this) << "Too large or many header field size=" + << req.fs.buffer_size() + namebuf.len + valuebuf.len + << ", num=" << req.fs.num_fields() + 1; + } + + if (error_reply(downstream, 431) != 0) { + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; + } + + return 0; + } + + auto token = http2::lookup_token(namebuf.base, namebuf.len); + auto no_index = flags & NGHTTP3_NV_FLAG_NEVER_INDEX; + + downstream->add_rcbuf(name); + downstream->add_rcbuf(value); + + req.fs.add_header_token(StringRef{namebuf.base, namebuf.len}, + StringRef{valuebuf.base, valuebuf.len}, no_index, + token); + return 0; +} + namespace { int http_end_request_headers(nghttp3_conn *conn, int64_t stream_id, void *user_data, void *stream_user_data) { + auto upstream = static_cast(user_data); + auto downstream = static_cast(stream_user_data); + + if (!downstream || downstream->get_stop_reading()) { + return 0; + } + + if (upstream->http_end_request_headers(downstream) != 0) { + return NGHTTP3_ERR_CALLBACK_FAILURE; + } + return 0; } } // namespace +int Http3Upstream::http_end_request_headers(Downstream *downstream) { + auto lgconf = log_config(); + lgconf->update_tstamp(std::chrono::system_clock::now()); + auto &req = downstream->request(); + req.tstamp = lgconf->tstamp; + + if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) { + return 0; + } + + auto &nva = req.fs.headers(); + + if (LOG_ENABLED(INFO)) { + std::stringstream ss; + for (auto &nv : nva) { + if (nv.name == "authorization") { + ss << TTY_HTTP_HD << nv.name << TTY_RST << ": \n"; + continue; + } + ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n"; + } + ULOG(INFO, this) << "HTTP request headers. stream_id=" + << downstream->get_stream_id() << "\n" + << ss.str(); + } + + auto config = get_config(); + auto &dump = config->http2.upstream.debug.dump; + + if (dump.request_header) { + http2::dump_nv(dump.request_header, nva); + } + + auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH); + if (content_length) { + // libnghttp2 guarantees this can be parsed + req.fs.content_length = util::parse_uint(content_length->value); + } + + // presence of mandatory header fields are guaranteed by libnghttp2. + auto authority = req.fs.header(http2::HD__AUTHORITY); + auto path = req.fs.header(http2::HD__PATH); + auto method = req.fs.header(http2::HD__METHOD); + auto scheme = req.fs.header(http2::HD__SCHEME); + + auto method_token = http2::lookup_method_token(method->value); + if (method_token == -1) { + if (error_reply(downstream, 501) != 0) { + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; + } + return 0; + } + + auto faddr = handler_->get_upstream_addr(); + + // For HTTP/2 proxy, we require :authority. + if (method_token != HTTP_CONNECT && config->http2_proxy && + faddr->alt_mode == UpstreamAltMode::NONE && !authority) { + shutdown_stream(downstream, NGHTTP2_PROTOCOL_ERROR); + return 0; + } + + req.method = method_token; + if (scheme) { + req.scheme = scheme->value; + } + + // nghttp2 library guarantees either :authority or host exist + if (!authority) { + req.no_authority = true; + authority = req.fs.header(http2::HD_HOST); + } + + if (authority) { + req.authority = authority->value; + } + + if (path) { + if (method_token == HTTP_OPTIONS && + path->value == StringRef::from_lit("*")) { + // Server-wide OPTIONS request. Path is empty. + } else if (config->http2_proxy && + faddr->alt_mode == UpstreamAltMode::NONE) { + req.path = path->value; + } else { + req.path = http2::rewrite_clean_path(downstream->get_block_allocator(), + path->value); + } + } + + auto connect_proto = req.fs.header(http2::HD__PROTOCOL); + if (connect_proto) { + if (connect_proto->value != "websocket") { + if (error_reply(downstream, 400) != 0) { + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; + } + return 0; + } + req.connect_proto = ConnectProto::WEBSOCKET; + } + + // We are not sure that request has body or not at the moment. + req.http2_expect_body = true; + + downstream->inspect_http2_request(); + + downstream->set_request_state(DownstreamState::HEADER_COMPLETE); + +#ifdef HAVE_MRUBY + auto upstream = downstream->get_upstream(); + auto handler = upstream->get_client_handler(); + auto worker = handler->get_worker(); + auto mruby_ctx = worker->get_mruby_context(); + + if (mruby_ctx->run_on_request_proc(downstream) != 0) { + if (error_reply(downstream, 500) != 0) { + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; + } + return 0; + } +#endif // HAVE_MRUBY + + if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) { + return 0; + } + + start_downstream(downstream); + + return 0; +} + +void Http3Upstream::start_downstream(Downstream *downstream) { + if (downstream_queue_.can_activate(downstream->request().authority)) { + initiate_downstream(downstream); + return; + } + + downstream_queue_.mark_blocked(downstream); +} + +void Http3Upstream::initiate_downstream(Downstream *downstream) { + int rv; + + DownstreamConnection *dconn_ptr; + + for (;;) { + auto dconn = handler_->get_downstream_connection(rv, downstream); + if (!dconn) { + if (rv == SHRPX_ERR_TLS_REQUIRED) { + rv = redirect_to_https(downstream); + } else { + rv = error_reply(downstream, 502); + } + if (rv != 0) { + shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR); + } + + downstream->set_request_state(DownstreamState::CONNECT_FAIL); + downstream_queue_.mark_failure(downstream); + + return; + } + +#ifdef HAVE_MRUBY + dconn_ptr = dconn.get(); +#endif // HAVE_MRUBY + rv = downstream->attach_downstream_connection(std::move(dconn)); + if (rv == 0) { + break; + } + } + +#ifdef HAVE_MRUBY + const auto &group = dconn_ptr->get_downstream_addr_group(); + if (group) { + const auto &mruby_ctx = group->shared_addr->mruby_ctx; + if (mruby_ctx->run_on_request_proc(downstream) != 0) { + if (error_reply(downstream, 500) != 0) { + shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR); + } + + downstream_queue_.mark_failure(downstream); + + return; + } + + if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) { + return; + } + } +#endif // HAVE_MRUBY + + rv = downstream->push_request_headers(); + if (rv != 0) { + + if (error_reply(downstream, 502) != 0) { + shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR); + } + + downstream_queue_.mark_failure(downstream); + + return; + } + + downstream_queue_.mark_active(downstream); + + auto &req = downstream->request(); + if (!req.http2_expect_body) { + rv = downstream->end_upload_data(); + if (rv != 0) { + shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR); + } + } +} + namespace { int http_recv_data(nghttp3_conn *conn, int64_t stream_id, const uint8_t *data, size_t datalen, void *user_data, void *stream_user_data) { @@ -703,18 +1805,101 @@ int http_recv_data(nghttp3_conn *conn, int64_t stream_id, const uint8_t *data, namespace { int http_end_stream(nghttp3_conn *conn, int64_t stream_id, void *user_data, void *stream_user_data) { + auto upstream = static_cast(user_data); + auto downstream = static_cast(stream_user_data); + + if (!downstream || downstream->get_stop_reading()) { + return 0; + } + + if (upstream->http_end_stream(downstream) != 0) { + return NGHTTP3_ERR_CALLBACK_FAILURE; + } + return 0; } } // namespace +int Http3Upstream::http_end_stream(Downstream *downstream) { + downstream->disable_upstream_rtimer(); + + if (downstream->end_upload_data() != 0) { + if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) { + shutdown_stream(downstream, NGHTTP2_INTERNAL_ERROR); + } + } + + downstream->set_request_state(DownstreamState::MSG_COMPLETE); + + return 0; +} + namespace { int http_stream_close(nghttp3_conn *conn, int64_t stream_id, uint64_t app_error_code, void *conn_user_data, void *stream_user_data) { + auto upstream = static_cast(conn_user_data); + auto downstream = static_cast(stream_user_data); + + if (!downstream) { + return 0; + } + + if (upstream->http_stream_close(downstream, app_error_code) != 0) { + return NGHTTP3_ERR_CALLBACK_FAILURE; + } + return 0; } } // namespace +int Http3Upstream::http_stream_close(Downstream *downstream, + uint64_t app_error_code) { + auto stream_id = downstream->get_stream_id(); + + if (LOG_ENABLED(INFO)) { + ULOG(INFO, this) << "Stream stream_id=" << stream_id + << " is being closed with app_error_code=" + << app_error_code; + + auto body = downstream->get_response_buf(); + + LOG(INFO) << "response unacked_left=" << body->rleft() + << " not_sent=" << body->rleft_mark(); + } + + auto &req = downstream->request(); + + consume(stream_id, req.unconsumed_body_length); + + req.unconsumed_body_length = 0; + + ngtcp2_conn_extend_max_streams_bidi(conn_, 1); + + if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) { + remove_downstream(downstream); + // downstream was deleted + + return 0; + } + + if (downstream->can_detach_downstream_connection()) { + // Keep-alive + downstream->detach_downstream_connection(); + } + + downstream->set_request_state(DownstreamState::STREAM_CLOSED); + + // At this point, downstream read may be paused. + + // If shrpx_downstream::push_request_headers() failed, the + // error is handled here. + remove_downstream(downstream); + // downstream was deleted + + return 0; +} + namespace { int http_send_stop_sending(nghttp3_conn *conn, int64_t stream_id, uint64_t app_error_code, void *user_data, @@ -739,18 +1924,18 @@ int Http3Upstream::setup_httpconn() { } nghttp3_callbacks callbacks{ - http_acked_stream_data, - http_stream_close, + shrpx::http_acked_stream_data, + shrpx::http_stream_close, http_recv_data, http_deferred_consume, - http_begin_request_headers, - http_recv_request_header, - http_end_request_headers, + shrpx::http_begin_request_headers, + shrpx::http_recv_request_header, + shrpx::http_end_request_headers, nullptr, // begin_trailers nullptr, // recv_trailer nullptr, // end_trailers http_send_stop_sending, - http_end_stream, + shrpx::http_end_stream, http_reset_stream, }; @@ -810,4 +1995,138 @@ int Http3Upstream::setup_httpconn() { return 0; } +int Http3Upstream::error_reply(Downstream *downstream, + unsigned int status_code) { + int rv; + auto &resp = downstream->response(); + + auto &balloc = downstream->get_block_allocator(); + + auto html = http::create_error_html(balloc, status_code); + resp.http_status = status_code; + auto body = downstream->get_response_buf(); + body->append(html); + downstream->set_response_state(DownstreamState::MSG_COMPLETE); + + nghttp3_data_reader data_read; + data_read.read_data = downstream_read_data_callback; + + auto lgconf = log_config(); + lgconf->update_tstamp(std::chrono::system_clock::now()); + + auto response_status = http2::stringify_status(balloc, status_code); + auto content_length = util::make_string_ref_uint(balloc, html.size()); + auto date = make_string_ref(balloc, lgconf->tstamp->time_http); + + auto nva = std::array{ + {http3::make_nv_ls_nocopy(":status", response_status), + http3::make_nv_ll("content-type", "text/html; charset=UTF-8"), + http3::make_nv_ls_nocopy("server", get_config()->http.server_name), + http3::make_nv_ls_nocopy("content-length", content_length), + http3::make_nv_ls_nocopy("date", date)}}; + + rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(), + nva.data(), nva.size(), &data_read); + if (nghttp3_err_is_fatal(rv)) { + ULOG(FATAL, this) << "nghttp3_submit_response() failed: " + << nghttp3_strerror(rv); + return -1; + } + + downstream->reset_upstream_wtimer(); + + // TODO Should we shutdown read here? + + return 0; +} + +int Http3Upstream::shutdown_stream(Downstream *downstream, + uint64_t app_error_code) { + auto stream_id = downstream->get_stream_id(); + + if (LOG_ENABLED(INFO)) { + ULOG(INFO, this) << "Shutdown stream_id=" << stream_id + << " with app_error_code=" << app_error_code; + } + + auto rv = ngtcp2_conn_shutdown_stream(conn_, stream_id, app_error_code); + if (rv != 0) { + ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream() failed: " + << ngtcp2_strerror(rv); + return -1; + } + + return 0; +} + +int Http3Upstream::redirect_to_https(Downstream *downstream) { + auto &req = downstream->request(); + if (req.regular_connect_method() || req.scheme != "http") { + return error_reply(downstream, 400); + } + + auto authority = util::extract_host(req.authority); + if (authority.empty()) { + return error_reply(downstream, 400); + } + + auto &balloc = downstream->get_block_allocator(); + auto config = get_config(); + auto &httpconf = config->http; + + StringRef loc; + if (httpconf.redirect_https_port == StringRef::from_lit("443")) { + loc = concat_string_ref(balloc, StringRef::from_lit("https://"), authority, + req.path); + } else { + loc = concat_string_ref(balloc, StringRef::from_lit("https://"), authority, + StringRef::from_lit(":"), + httpconf.redirect_https_port, req.path); + } + + auto &resp = downstream->response(); + resp.http_status = 308; + resp.fs.add_header_token(StringRef::from_lit("location"), loc, false, + http2::HD_LOCATION); + + return send_reply(downstream, nullptr, 0); +} + +void Http3Upstream::consume(int64_t stream_id, size_t nconsumed) { + ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed); + ngtcp2_conn_extend_max_offset(conn_, nconsumed); +} + +void Http3Upstream::remove_downstream(Downstream *downstream) { + if (downstream->accesslog_ready()) { + handler_->write_accesslog(downstream); + } + + nghttp3_conn_set_stream_user_data(httpconn_, downstream->get_stream_id(), + nullptr); + + auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream); + + if (next_downstream) { + initiate_downstream(next_downstream); + } + + if (downstream_queue_.get_downstreams() == nullptr) { + // There is no downstream at the moment. Start idle timer now. + handler_->repeat_read_timer(); + } +} + +void Http3Upstream::log_response_headers( + Downstream *downstream, const std::vector &nva) const { + std::stringstream ss; + for (auto &nv : nva) { + ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": " + << StringRef{nv.value, nv.valuelen} << "\n"; + } + ULOG(INFO, this) << "HTTP response headers. stream_id=" + << downstream->get_stream_id() << "\n" + << ss.str(); +} + } // namespace shrpx diff --git a/src/shrpx_http3_upstream.h b/src/shrpx_http3_upstream.h index cf8fa9b4..fd052ec6 100644 --- a/src/shrpx_http3_upstream.h +++ b/src/shrpx_http3_upstream.h @@ -31,6 +31,8 @@ #include #include "shrpx_upstream.h" +#include "shrpx_downstream_queue.h" +#include "shrpx_mruby.h" #include "quic.h" #include "network.h" @@ -111,6 +113,31 @@ public: void reset_timer(); int setup_httpconn(); + void add_pending_downstream(std::unique_ptr downstream); + int recv_stream_data(uint32_t flags, int64_t stream_id, const uint8_t *data, + size_t datalen); + int acked_stream_data_offset(int64_t stream_id, uint64_t datalen); + int extend_max_stream_data(int64_t stream_id); + void extend_max_remote_streams_bidi(uint64_t max_streams); + int error_reply(Downstream *downstream, unsigned int status_code); + void http_begin_request_headers(int64_t stream_id); + int http_recv_request_header(Downstream *downstream, int32_t token, + nghttp3_rcbuf *name, nghttp3_rcbuf *value, + uint8_t flags); + int http_end_request_headers(Downstream *downstream); + int http_end_stream(Downstream *downstream); + void start_downstream(Downstream *downstream); + void initiate_downstream(Downstream *downstream); + int shutdown_stream(Downstream *downstream, uint64_t app_error_code); + int redirect_to_https(Downstream *downstream); + int http_stream_close(Downstream *downstream, uint64_t app_error_code); + void consume(int64_t stream_id, size_t nconsumed); + void remove_downstream(Downstream *downstream); + int stream_close(int64_t stream_id, uint64_t app_error_code); + void log_response_headers(Downstream *downstream, + const std::vector &nva) const; + int http_acked_stream_data(Downstream *downstream, size_t datalen); + int http_shutdown_stream_read(int64_t stream_id); private: ClientHandler *handler_; @@ -121,6 +148,7 @@ private: quic::Error last_error_; uint8_t tls_alert_; nghttp3_conn *httpconn_; + DownstreamQueue downstream_queue_; }; } // namespace shrpx