From 1753bea6923aaf18276efb06e6bfaa59a03c3031 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Mon, 16 Nov 2015 00:12:54 +0900 Subject: [PATCH] nghttpx: Support server push from HTTP/2 backend This commits enables HTTP/2 server push from HTTP/2 backend to be relayed to HTTP/2 frontend. To use this feature, --http2-bridge or --client is required. Server push via Link header field contiues to work. --- doc/nghttpx.h2r | 12 +- src/shrpx.cc | 11 +- src/shrpx_downstream_queue.cc | 5 +- src/shrpx_downstream_queue.h | 10 +- src/shrpx_http2_session.cc | 351 +++++++++++++++++++++++++++------- src/shrpx_http2_session.h | 5 + src/shrpx_http2_upstream.cc | 75 +++++++- src/shrpx_http2_upstream.h | 8 + src/shrpx_https_upstream.cc | 16 ++ src/shrpx_https_upstream.h | 8 + src/shrpx_spdy_upstream.cc | 16 ++ src/shrpx_spdy_upstream.h | 8 + src/shrpx_upstream.h | 22 +++ 13 files changed, 458 insertions(+), 89 deletions(-) diff --git a/doc/nghttpx.h2r b/doc/nghttpx.h2r index 656baddb..8901ae7a 100644 --- a/doc/nghttpx.h2r +++ b/doc/nghttpx.h2r @@ -60,8 +60,8 @@ SIGUSR2 SERVER PUSH ----------- -nghttpx supports HTTP/2 server push in default mode. nghttpx looks -for Link header field (`RFC 5988 +nghttpx supports HTTP/2 server push in default mode with Link header +field. nghttpx looks for Link header field (`RFC 5988 `_) in response headers from backend server and extracts URI-reference with parameter ``rel=preload`` (see `preload @@ -81,6 +81,14 @@ Currently, the following restriction is applied for server push: This limitation may be loosened in the future release. +nghttpx also supports server push if both frontend and backend are +HTTP/2 (which implies :option:`--http2-bridge` or :option:`--client`). +In this case, in addition to server push via Link header field, server +push from backend is relayed to frontend HTTP/2 session. + +HTTP/2 server push will be disabled if :option:`--http2-proxy` or +:option:`--client-proxy` is used. + UNIX DOMAIN SOCKET ------------------ diff --git a/src/shrpx.cc b/src/shrpx.cc index 855df5f3..ab9e673a 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -1480,9 +1480,14 @@ HTTP/2 and SPDY: meant for debugging purpose and not intended to enhance protocol security. --no-server-push - Disable HTTP/2 server push. Server push is only - supported by default mode and HTTP/2 frontend. SPDY - frontend does not support server push. + Disable HTTP/2 server push. Server push is supported by + default mode and HTTP/2 frontend via Link header field. + It is also supported if both frontend and backend are + HTTP/2 (which implies --http2-bridge or --client mode). + In this case, server push from backend session is + relayed to frontend, and server push via Link header + field is also supported. HTTP SPDY frontend does not + support server push. Mode: (default mode) diff --git a/src/shrpx_downstream_queue.cc b/src/shrpx_downstream_queue.cc index d1430e61..02fd0356 100644 --- a/src/shrpx_downstream_queue.cc +++ b/src/shrpx_downstream_queue.cc @@ -120,7 +120,8 @@ bool remove_host_entry_if_empty(const DownstreamQueue::HostEntry &ent, } } // namespace -Downstream *DownstreamQueue::remove_and_get_blocked(Downstream *downstream) { +Downstream *DownstreamQueue::remove_and_get_blocked(Downstream *downstream, + bool next_blocked) { // Delete downstream when this function returns. auto delptr = std::unique_ptr(downstream); @@ -144,7 +145,7 @@ Downstream *DownstreamQueue::remove_and_get_blocked(Downstream *downstream) { return nullptr; } - if (ent.num_active >= conn_max_per_host_) { + if (!next_blocked || ent.num_active >= conn_max_per_host_) { return nullptr; } diff --git a/src/shrpx_downstream_queue.h b/src/shrpx_downstream_queue.h index de06d33e..755af10d 100644 --- a/src/shrpx_downstream_queue.h +++ b/src/shrpx_downstream_queue.h @@ -79,10 +79,12 @@ public: // |host|. bool can_activate(const std::string &host) const; // Removes and frees |downstream| object. If |downstream| is in - // Downstream::DISPATCH_ACTIVE, this function may return Downstream - // object with the same target host in Downstream::DISPATCH_BLOCKED - // if its connection is now not blocked by conn_max_per_host_ limit. - Downstream *remove_and_get_blocked(Downstream *downstream); + // Downstream::DISPATCH_ACTIVE, and |next_blocked| is true, this + // function may return Downstream object with the same target host + // in Downstream::DISPATCH_BLOCKED if its connection is now not + // blocked by conn_max_per_host_ limit. + Downstream *remove_and_get_blocked(Downstream *downstream, + bool next_blocked = true); Downstream *get_downstreams() const; HostEntry &find_host_entry(const std::string &host); const std::string &make_host_key(const std::string &host) const; diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index 2258903c..508ccd34 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -682,32 +682,43 @@ int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, if (dconn) { auto downstream = dconn->get_downstream(); if (downstream && downstream->get_downstream_stream_id() == stream_id) { + auto upstream = downstream->get_upstream(); - if (downstream->get_upgraded() && - downstream->get_response_state() == Downstream::HEADER_COMPLETE) { - // For tunneled connection, we have to submit RST_STREAM to - // upstream *after* whole response body is sent. We just set - // MSG_COMPLETE here. Upstream will take care of that. - downstream->get_upstream()->on_downstream_body_complete(downstream); - downstream->set_response_state(Downstream::MSG_COMPLETE); - } else if (error_code == NGHTTP2_NO_ERROR) { - switch (downstream->get_response_state()) { - case Downstream::MSG_COMPLETE: - case Downstream::MSG_BAD_HEADER: - break; - default: + if (downstream->get_downstream_stream_id() % 2 == 0 && + downstream->get_request_state() == Downstream::INITIAL) { + // Downstream is canceled in backend before it is submitted in + // frontend session. + + // This will avoid to send RST_STREAM to backend + downstream->set_response_state(Downstream::MSG_RESET); + upstream->cancel_premature_downstream(downstream); + } else { + if (downstream->get_upgraded() && + downstream->get_response_state() == Downstream::HEADER_COMPLETE) { + // For tunneled connection, we have to submit RST_STREAM to + // upstream *after* whole response body is sent. We just set + // MSG_COMPLETE here. Upstream will take care of that. + downstream->get_upstream()->on_downstream_body_complete(downstream); + downstream->set_response_state(Downstream::MSG_COMPLETE); + } else if (error_code == NGHTTP2_NO_ERROR) { + switch (downstream->get_response_state()) { + case Downstream::MSG_COMPLETE: + case Downstream::MSG_BAD_HEADER: + break; + default: + downstream->set_response_state(Downstream::MSG_RESET); + } + } else if (downstream->get_response_state() != + Downstream::MSG_BAD_HEADER) { downstream->set_response_state(Downstream::MSG_RESET); } - } else if (downstream->get_response_state() != - Downstream::MSG_BAD_HEADER) { - downstream->set_response_state(Downstream::MSG_RESET); + if (downstream->get_response_state() == Downstream::MSG_RESET && + downstream->get_response_rst_stream_error_code() == + NGHTTP2_NO_ERROR) { + downstream->set_response_rst_stream_error_code(error_code); + } + call_downstream_readcb(http2session, downstream); } - if (downstream->get_response_state() == Downstream::MSG_RESET && - downstream->get_response_rst_stream_error_code() == - NGHTTP2_NO_ERROR) { - downstream->set_response_rst_stream_error_code(error_code); - } - call_downstream_readcb(http2session, downstream); // dconn may be deleted } } @@ -730,6 +741,7 @@ int on_header_callback(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data) { + auto http2session = static_cast(user_data); auto sd = static_cast( nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); if (!sd || !sd->dconn) { @@ -740,44 +752,80 @@ int on_header_callback(nghttp2_session *session, const nghttp2_frame *frame, return 0; } - if (frame->hd.type != NGHTTP2_HEADERS) { - return 0; - } + switch (frame->hd.type) { + case NGHTTP2_HEADERS: { + auto trailer = frame->headers.cat == NGHTTP2_HCAT_HEADERS && + !downstream->get_expect_final_response(); - auto trailer = frame->headers.cat != NGHTTP2_HCAT_RESPONSE && - !downstream->get_expect_final_response(); + if (downstream->get_response_headers_sum() + namelen + valuelen > + get_config()->header_field_buffer || + downstream->get_response_headers().size() >= + get_config()->max_header_fields) { + if (LOG_ENABLED(INFO)) { + DLOG(INFO, downstream) + << "Too large or many header field size=" + << downstream->get_response_headers_sum() + namelen + valuelen + << ", num=" << downstream->get_response_headers().size() + 1; + } - if (downstream->get_response_headers_sum() + namelen + valuelen > - get_config()->header_field_buffer || - downstream->get_response_headers().size() >= - get_config()->max_header_fields) { - if (LOG_ENABLED(INFO)) { - DLOG(INFO, downstream) - << "Too large or many header field size=" - << downstream->get_response_headers_sum() + namelen + valuelen - << ", num=" << downstream->get_response_headers().size() + 1; + if (trailer) { + // we don't care trailer part exceeds header size limit; just + // discard it. + return 0; + } + + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; } if (trailer) { - // we don't care trailer part exceeds header size limit; just - // discard it. + // just store header fields for trailer part + downstream->add_response_trailer(name, namelen, value, valuelen, + flags & NGHTTP2_NV_FLAG_NO_INDEX, -1); return 0; } - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; - } + auto token = http2::lookup_token(name, namelen); - if (trailer) { - // just store header fields for trailer part - downstream->add_response_trailer(name, namelen, value, valuelen, - flags & NGHTTP2_NV_FLAG_NO_INDEX, -1); + downstream->add_response_header(name, namelen, value, valuelen, + flags & NGHTTP2_NV_FLAG_NO_INDEX, token); return 0; } + case NGHTTP2_PUSH_PROMISE: { + auto promised_stream_id = frame->push_promise.promised_stream_id; + auto promised_sd = static_cast( + nghttp2_session_get_stream_user_data(session, promised_stream_id)); + if (!promised_sd || !promised_sd->dconn) { + http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL); + return 0; + } - auto token = http2::lookup_token(name, namelen); + auto promised_downstream = promised_sd->dconn->get_downstream(); + + assert(promised_downstream); + + if (promised_downstream->get_request_headers_sum() + namelen + valuelen > + get_config()->header_field_buffer || + promised_downstream->get_request_headers().size() >= + get_config()->max_header_fields) { + if (LOG_ENABLED(INFO)) { + DLOG(INFO, promised_downstream) + << "Too large or many header field size=" + << promised_downstream->get_request_headers_sum() + namelen + + valuelen << ", num=" + << promised_downstream->get_request_headers().size() + 1; + } + + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; + } + + auto token = http2::lookup_token(name, namelen); + promised_downstream->add_request_header(name, namelen, value, valuelen, + flags & NGHTTP2_NV_FLAG_NO_INDEX, + token); + return 0; + } + } - downstream->add_response_header(name, namelen, value, valuelen, - flags & NGHTTP2_NV_FLAG_NO_INDEX, token); return 0; } } // namespace @@ -786,24 +834,52 @@ namespace { int on_begin_headers_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { auto http2session = static_cast(user_data); - if (frame->hd.type != NGHTTP2_HEADERS || - frame->headers.cat != NGHTTP2_HCAT_RESPONSE) { + + switch (frame->hd.type) { + case NGHTTP2_HEADERS: { + if (frame->headers.cat != NGHTTP2_HCAT_RESPONSE && + frame->headers.cat != NGHTTP2_HCAT_PUSH_RESPONSE) { + return 0; + } + auto sd = static_cast( + nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); + if (!sd || !sd->dconn) { + http2session->submit_rst_stream(frame->hd.stream_id, + NGHTTP2_INTERNAL_ERROR); + return 0; + } + auto downstream = sd->dconn->get_downstream(); + if (!downstream || + downstream->get_downstream_stream_id() != frame->hd.stream_id) { + http2session->submit_rst_stream(frame->hd.stream_id, + NGHTTP2_INTERNAL_ERROR); + return 0; + } return 0; } - auto sd = static_cast( - nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); - if (!sd || !sd->dconn) { - http2session->submit_rst_stream(frame->hd.stream_id, - NGHTTP2_INTERNAL_ERROR); + case NGHTTP2_PUSH_PROMISE: { + auto promised_stream_id = frame->push_promise.promised_stream_id; + auto sd = static_cast( + nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); + if (!sd || !sd->dconn) { + http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL); + return 0; + } + + auto downstream = sd->dconn->get_downstream(); + + assert(downstream); + assert(downstream->get_downstream_stream_id() == frame->hd.stream_id); + + if (http2session->handle_downstream_push_promise(downstream, + promised_stream_id) != 0) { + http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL); + } + return 0; } - auto downstream = sd->dconn->get_downstream(); - if (!downstream || - downstream->get_downstream_stream_id() != frame->hd.stream_id) { - http2session->submit_rst_stream(frame->hd.stream_id, - NGHTTP2_INTERNAL_ERROR); - return 0; } + return 0; } } // namespace @@ -976,7 +1052,8 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, return 0; } - if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) { + if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE || + frame->headers.cat == NGHTTP2_HCAT_PUSH_RESPONSE) { rv = on_response_headers(http2session, downstream, session, frame); if (rv != 0) { @@ -1044,17 +1121,47 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, http2session->connection_alive(); } return 0; - case NGHTTP2_PUSH_PROMISE: + case NGHTTP2_PUSH_PROMISE: { + auto promised_stream_id = frame->push_promise.promised_stream_id; + if (LOG_ENABLED(INFO)) { SSLOG(INFO, http2session) << "Received downstream PUSH_PROMISE stream_id=" << frame->hd.stream_id - << ", promised_stream_id=" << frame->push_promise.promised_stream_id; + << ", promised_stream_id=" << promised_stream_id; } - // We just respond with RST_STREAM. - http2session->submit_rst_stream(frame->push_promise.promised_stream_id, - NGHTTP2_REFUSED_STREAM); + + auto sd = static_cast( + nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); + if (!sd || !sd->dconn) { + http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL); + return 0; + } + + auto downstream = sd->dconn->get_downstream(); + + assert(downstream); + assert(downstream->get_downstream_stream_id() == frame->hd.stream_id); + + auto promised_sd = static_cast( + nghttp2_session_get_stream_user_data(session, promised_stream_id)); + if (!promised_sd || !promised_sd->dconn) { + http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL); + return 0; + } + + auto promised_downstream = promised_sd->dconn->get_downstream(); + + assert(promised_downstream); + + if (http2session->handle_downstream_push_promise_complete( + downstream, promised_downstream) != 0) { + http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL); + return 0; + } + return 0; + } default: return 0; } @@ -1279,16 +1386,22 @@ int Http2Session::connection_made() { flow_control_ = true; std::array entry; - entry[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH; - entry[0].value = 0; - entry[1].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS; - entry[1].value = get_config()->http2_max_concurrent_streams; + size_t nentry = 2; + entry[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS; + entry[0].value = get_config()->http2_max_concurrent_streams; - entry[2].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; - entry[2].value = (1 << get_config()->http2_downstream_window_bits) - 1; + entry[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; + entry[1].value = (1 << get_config()->http2_downstream_window_bits) - 1; + + if (get_config()->no_server_push || get_config()->http2_proxy || + get_config()->client_proxy) { + entry[nentry].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH; + entry[nentry].value = 0; + ++nentry; + } rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, entry.data(), - entry.size()); + nentry); if (rv != 0) { return -1; } @@ -1771,4 +1884,96 @@ size_t Http2Session::get_group() const { return group_; } size_t Http2Session::get_index() const { return index_; } +int Http2Session::handle_downstream_push_promise(Downstream *downstream, + int32_t promised_stream_id) { + auto upstream = downstream->get_upstream(); + if (!upstream->push_enabled()) { + return -1; + } + + auto promised_downstream = + upstream->on_downstream_push_promise(downstream, promised_stream_id); + if (!promised_downstream) { + return -1; + } + + // Now we have Downstream object for pushed stream. + // promised_downstream->get_stream() still returns 0. + + auto handler = upstream->get_client_handler(); + auto worker = handler->get_worker(); + + auto promised_dconn = + make_unique(worker->get_dconn_pool(), this); + promised_dconn->set_client_handler(handler); + + auto ptr = promised_dconn.get(); + + if (promised_downstream->attach_downstream_connection( + std::move(promised_dconn)) != 0) { + return -1; + } + + auto promised_sd = make_unique(); + + nghttp2_session_set_stream_user_data(session_, promised_stream_id, + promised_sd.get()); + + ptr->attach_stream_data(promised_sd.get()); + streams_.append(promised_sd.release()); + + return 0; +} + +int Http2Session::handle_downstream_push_promise_complete( + Downstream *downstream, Downstream *promised_downstream) { + auto authority = + promised_downstream->get_request_header(http2::HD__AUTHORITY); + auto path = promised_downstream->get_request_header(http2::HD__PATH); + auto method = promised_downstream->get_request_header(http2::HD__METHOD); + auto scheme = promised_downstream->get_request_header(http2::HD__SCHEME); + + if (!authority) { + authority = promised_downstream->get_request_header(http2::HD_HOST); + } + + auto method_token = http2::lookup_method_token(method->value); + if (method_token == -1) { + if (LOG_ENABLED(INFO)) { + SSLOG(INFO, this) << "Unrecognized method: " << method->value; + } + + return -1; + } + + // TODO Rewrite authority if we enabled rewrite host. But we + // really don't know how to rewrite host. Should we use the same + // host in associated stream? + promised_downstream->set_request_http2_authority( + http2::value_to_str(authority)); + promised_downstream->set_request_method(method_token); + // libnghttp2 ensures that we don't have CONNECT method in + // PUSH_PROMISE, and guarantees that :scheme exists. + promised_downstream->set_request_http2_scheme(http2::value_to_str(scheme)); + + // For server-wide OPTIONS request, path is empty. + if (method_token != HTTP_OPTIONS || path->value != "*") { + promised_downstream->set_request_path(http2::rewrite_clean_path( + std::begin(path->value), std::end(path->value))); + } + + promised_downstream->inspect_http2_request(); + + auto upstream = promised_downstream->get_upstream(); + + promised_downstream->set_request_state(Downstream::MSG_COMPLETE); + + if (upstream->on_downstream_push_promise_complete(downstream, + promised_downstream) != 0) { + return -1; + } + + return 0; +} + } // namespace shrpx diff --git a/src/shrpx_http2_session.h b/src/shrpx_http2_session.h index 3ae3e784..e321d59e 100644 --- a/src/shrpx_http2_session.h +++ b/src/shrpx_http2_session.h @@ -156,6 +156,11 @@ public: size_t get_index() const; + int handle_downstream_push_promise(Downstream *downstream, + int32_t promised_stream_id); + int handle_downstream_push_promise_complete(Downstream *downstream, + Downstream *promised_downstream); + enum { // Disconnected DISCONNECTED, diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index c7e93ed9..bc0de69c 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -543,6 +543,13 @@ int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame, return 0; case NGHTTP2_PUSH_PROMISE: { auto promised_stream_id = frame->push_promise.promised_stream_id; + + if (nghttp2_session_get_stream_user_data(session, promised_stream_id)) { + // In case of push from backend, downstream object was already + // created. + return 0; + } + auto downstream = make_unique(upstream, handler->get_mcpool(), promised_stream_id, 0); @@ -1712,6 +1719,7 @@ int Http2Upstream::on_downstream_reset(bool no_retry) { } if (!downstream->request_submission_ready()) { + // pushed stream is handled here rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); downstream->pop_downstream_connection(); continue; @@ -1853,15 +1861,18 @@ int Http2Upstream::submit_push_promise(const std::string &scheme, return 0; } +bool Http2Upstream::push_enabled() const { + return !(get_config()->no_server_push || + nghttp2_session_get_remote_settings( + session_, NGHTTP2_SETTINGS_ENABLE_PUSH) == 0 || + get_config()->http2_proxy || get_config()->client_proxy); +} + int Http2Upstream::initiate_push(Downstream *downstream, const char *uri, size_t len) { int rv; - if (len == 0 || get_config()->no_server_push || - nghttp2_session_get_remote_settings(session_, - NGHTTP2_SETTINGS_ENABLE_PUSH) == 0 || - get_config()->http2_proxy || get_config()->client_proxy || - (downstream->get_stream_id() % 2) == 0) { + if (len == 0 || !push_enabled() || (downstream->get_stream_id() % 2)) { return 0; } @@ -1916,4 +1927,58 @@ bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; } Http2Upstream::WriteBuffer *Http2Upstream::get_response_buf() { return &wb_; } +Downstream * +Http2Upstream::on_downstream_push_promise(Downstream *downstream, + int32_t promised_stream_id) { + // promised_stream_id is for backend HTTP/2 session, not for + // frontend. + auto promised_downstream = + make_unique(this, handler_->get_mcpool(), 0, 0); + promised_downstream->set_downstream_stream_id(promised_stream_id); + + promised_downstream->disable_upstream_rtimer(); + + promised_downstream->set_request_major(2); + promised_downstream->set_request_minor(0); + + auto ptr = promised_downstream.get(); + add_pending_downstream(std::move(promised_downstream)); + downstream_queue_.mark_active(ptr); + + return ptr; +} + +int Http2Upstream::on_downstream_push_promise_complete( + Downstream *downstream, Downstream *promised_downstream) { + std::vector nva; + + auto &headers = promised_downstream->get_request_headers(); + + nva.reserve(headers.size()); + + for (auto &kv : headers) { + nva.push_back(http2::make_nv_nocopy(kv.name, kv.value, kv.no_index)); + } + + auto promised_stream_id = nghttp2_submit_push_promise( + session_, NGHTTP2_FLAG_NONE, downstream->get_stream_id(), nva.data(), + nva.size(), promised_downstream); + if (promised_stream_id < 0) { + return -1; + } + + promised_downstream->set_stream_id(promised_stream_id); + + return 0; +} + +void Http2Upstream::cancel_premature_downstream( + Downstream *promised_downstream) { + if (LOG_ENABLED(INFO)) { + ULOG(INFO, this) << "Remove premature promised stream " + << promised_downstream; + } + downstream_queue_.remove_and_get_blocked(promised_downstream, false); +} + } // namespace shrpx diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index d49455cc..e2527626 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -87,6 +87,14 @@ public: virtual void response_drain(size_t n); virtual bool response_empty() const; + virtual Downstream *on_downstream_push_promise(Downstream *downstream, + int32_t promised_stream_id); + virtual int + on_downstream_push_promise_complete(Downstream *downstream, + Downstream *promised_downstream); + virtual bool push_enabled() const; + virtual void cancel_premature_downstream(Downstream *promised_downstream); + bool get_flow_control() const; // Perform HTTP/2 upgrade from |upstream|. On success, this object // takes ownership of the |upstream|. This function returns 0 if it diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index 25184427..2f9cd831 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -1191,4 +1191,20 @@ bool HttpsUpstream::response_empty() const { return buf->rleft() == 0; } +Downstream * +HttpsUpstream::on_downstream_push_promise(Downstream *downstream, + int32_t promised_stream_id) { + return nullptr; +} + +int HttpsUpstream::on_downstream_push_promise_complete( + Downstream *downstream, Downstream *promised_downstream) { + return -1; +} + +bool HttpsUpstream::push_enabled() const { return false; } + +void HttpsUpstream::cancel_premature_downstream( + Downstream *promised_downstream) {} + } // namespace shrpx diff --git a/src/shrpx_https_upstream.h b/src/shrpx_https_upstream.h index 6f3b5742..3ab698bc 100644 --- a/src/shrpx_https_upstream.h +++ b/src/shrpx_https_upstream.h @@ -82,6 +82,14 @@ public: virtual void response_drain(size_t n); virtual bool response_empty() const; + virtual Downstream *on_downstream_push_promise(Downstream *downstream, + int32_t promised_stream_id); + virtual int + on_downstream_push_promise_complete(Downstream *downstream, + Downstream *promised_downstream); + virtual bool push_enabled() const; + virtual void cancel_premature_downstream(Downstream *promised_downstream); + void reset_current_header_length(); void log_response_headers(DefaultMemchunks *buf) const; diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 2ce0491c..d7d2269a 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -1230,4 +1230,20 @@ bool SpdyUpstream::response_empty() const { return wb_.rleft() == 0; } SpdyUpstream::WriteBuffer *SpdyUpstream::get_response_buf() { return &wb_; } +Downstream * +SpdyUpstream::on_downstream_push_promise(Downstream *downstream, + int32_t promised_stream_id) { + return nullptr; +} + +int SpdyUpstream::on_downstream_push_promise_complete( + Downstream *downstream, Downstream *promised_downstream) { + return -1; +} + +bool SpdyUpstream::push_enabled() const { return false; } + +void SpdyUpstream::cancel_premature_downstream( + Downstream *promised_downstream) {} + } // namespace shrpx diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index 7d509340..9dfcc1b2 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -82,6 +82,14 @@ public: virtual void response_drain(size_t n); virtual bool response_empty() const; + virtual Downstream *on_downstream_push_promise(Downstream *downstream, + int32_t promised_stream_id); + virtual int + on_downstream_push_promise_complete(Downstream *downstream, + Downstream *promised_downstream); + virtual bool push_enabled() const; + virtual void cancel_premature_downstream(Downstream *promised_downstream); + bool get_flow_control() const; int consume(int32_t stream_id, size_t len); diff --git a/src/shrpx_upstream.h b/src/shrpx_upstream.h index 92fe5190..b9639e72 100644 --- a/src/shrpx_upstream.h +++ b/src/shrpx_upstream.h @@ -76,6 +76,28 @@ public: virtual int response_riovec(struct iovec *iov, int iovcnt) const = 0; virtual void response_drain(size_t n) = 0; virtual bool response_empty() const = 0; + + // Called when PUSH_PROMISE was started in downstream. The + // associated downstream is given as |downstream|. The promised + // stream ID is given as |promised_stream_id|. If upstream supports + // server push for the corresponding upstream connection, it should + // return Downstream object for pushed stream. Otherwise, returns + // nullptr. + virtual Downstream * + on_downstream_push_promise(Downstream *downstream, + int32_t promised_stream_id) = 0; + // Called when PUSH_PROMISE frame was completely received in + // downstream. The associated downstream is given as |downstream|. + // This function returns 0 if it succeeds, or -1. + virtual int + on_downstream_push_promise_complete(Downstream *downstream, + Downstream *promised_downstream) = 0; + // Returns true if server push is enabled in upstream connection. + virtual bool push_enabled() const = 0; + // Cancels promised downstream. This function is called when + // PUSH_PROMISE for |promised_downstream| is not submitted to + // upstream session. + virtual void cancel_premature_downstream(Downstream *promised_downstream) = 0; }; } // namespace shrpx