Merge branch 'http2-backend-push'

This commit is contained in:
Tatsuhiro Tsujikawa 2015-11-16 23:13:35 +09:00
commit dcf060198b
13 changed files with 458 additions and 89 deletions

View File

@ -60,8 +60,8 @@ SIGUSR2
SERVER PUSH SERVER PUSH
----------- -----------
nghttpx supports HTTP/2 server push in default mode. nghttpx looks nghttpx supports HTTP/2 server push in default mode with Link header
for Link header field (`RFC 5988 field. nghttpx looks for Link header field (`RFC 5988
<http://tools.ietf.org/html/rfc5988>`_) in response headers from <http://tools.ietf.org/html/rfc5988>`_) in response headers from
backend server and extracts URI-reference with parameter backend server and extracts URI-reference with parameter
``rel=preload`` (see `preload ``rel=preload`` (see `preload
@ -81,6 +81,14 @@ Currently, the following restriction is applied for server push:
This limitation may be loosened in the future release. This limitation may be loosened in the future release.
nghttpx also supports server push if both frontend and backend are
HTTP/2 (which implies :option:`--http2-bridge` or :option:`--client`).
In this case, in addition to server push via Link header field, server
push from backend is relayed to frontend HTTP/2 session.
HTTP/2 server push will be disabled if :option:`--http2-proxy` or
:option:`--client-proxy` is used.
UNIX DOMAIN SOCKET UNIX DOMAIN SOCKET
------------------ ------------------

View File

@ -1480,9 +1480,14 @@ HTTP/2 and SPDY:
meant for debugging purpose and not intended to enhance meant for debugging purpose and not intended to enhance
protocol security. protocol security.
--no-server-push --no-server-push
Disable HTTP/2 server push. Server push is only Disable HTTP/2 server push. Server push is supported by
supported by default mode and HTTP/2 frontend. SPDY default mode and HTTP/2 frontend via Link header field.
frontend does not support server push. It is also supported if both frontend and backend are
HTTP/2 (which implies --http2-bridge or --client mode).
In this case, server push from backend session is
relayed to frontend, and server push via Link header
field is also supported. HTTP SPDY frontend does not
support server push.
Mode: Mode:
(default mode) (default mode)

View File

@ -120,7 +120,8 @@ bool remove_host_entry_if_empty(const DownstreamQueue::HostEntry &ent,
} }
} // namespace } // namespace
Downstream *DownstreamQueue::remove_and_get_blocked(Downstream *downstream) { Downstream *DownstreamQueue::remove_and_get_blocked(Downstream *downstream,
bool next_blocked) {
// Delete downstream when this function returns. // Delete downstream when this function returns.
auto delptr = std::unique_ptr<Downstream>(downstream); auto delptr = std::unique_ptr<Downstream>(downstream);
@ -144,7 +145,7 @@ Downstream *DownstreamQueue::remove_and_get_blocked(Downstream *downstream) {
return nullptr; return nullptr;
} }
if (ent.num_active >= conn_max_per_host_) { if (!next_blocked || ent.num_active >= conn_max_per_host_) {
return nullptr; return nullptr;
} }

View File

@ -79,10 +79,12 @@ public:
// |host|. // |host|.
bool can_activate(const std::string &host) const; bool can_activate(const std::string &host) const;
// Removes and frees |downstream| object. If |downstream| is in // Removes and frees |downstream| object. If |downstream| is in
// Downstream::DISPATCH_ACTIVE, this function may return Downstream // Downstream::DISPATCH_ACTIVE, and |next_blocked| is true, this
// object with the same target host in Downstream::DISPATCH_BLOCKED // function may return Downstream object with the same target host
// if its connection is now not blocked by conn_max_per_host_ limit. // in Downstream::DISPATCH_BLOCKED if its connection is now not
Downstream *remove_and_get_blocked(Downstream *downstream); // blocked by conn_max_per_host_ limit.
Downstream *remove_and_get_blocked(Downstream *downstream,
bool next_blocked = true);
Downstream *get_downstreams() const; Downstream *get_downstreams() const;
HostEntry &find_host_entry(const std::string &host); HostEntry &find_host_entry(const std::string &host);
const std::string &make_host_key(const std::string &host) const; const std::string &make_host_key(const std::string &host) const;

View File

@ -682,32 +682,43 @@ int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
if (dconn) { if (dconn) {
auto downstream = dconn->get_downstream(); auto downstream = dconn->get_downstream();
if (downstream && downstream->get_downstream_stream_id() == stream_id) { if (downstream && downstream->get_downstream_stream_id() == stream_id) {
auto upstream = downstream->get_upstream();
if (downstream->get_upgraded() && if (downstream->get_downstream_stream_id() % 2 == 0 &&
downstream->get_response_state() == Downstream::HEADER_COMPLETE) { downstream->get_request_state() == Downstream::INITIAL) {
// For tunneled connection, we have to submit RST_STREAM to // Downstream is canceled in backend before it is submitted in
// upstream *after* whole response body is sent. We just set // frontend session.
// MSG_COMPLETE here. Upstream will take care of that.
downstream->get_upstream()->on_downstream_body_complete(downstream); // This will avoid to send RST_STREAM to backend
downstream->set_response_state(Downstream::MSG_COMPLETE); downstream->set_response_state(Downstream::MSG_RESET);
} else if (error_code == NGHTTP2_NO_ERROR) { upstream->cancel_premature_downstream(downstream);
switch (downstream->get_response_state()) { } else {
case Downstream::MSG_COMPLETE: if (downstream->get_upgraded() &&
case Downstream::MSG_BAD_HEADER: downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
break; // For tunneled connection, we have to submit RST_STREAM to
default: // upstream *after* whole response body is sent. We just set
// MSG_COMPLETE here. Upstream will take care of that.
downstream->get_upstream()->on_downstream_body_complete(downstream);
downstream->set_response_state(Downstream::MSG_COMPLETE);
} else if (error_code == NGHTTP2_NO_ERROR) {
switch (downstream->get_response_state()) {
case Downstream::MSG_COMPLETE:
case Downstream::MSG_BAD_HEADER:
break;
default:
downstream->set_response_state(Downstream::MSG_RESET);
}
} else if (downstream->get_response_state() !=
Downstream::MSG_BAD_HEADER) {
downstream->set_response_state(Downstream::MSG_RESET); downstream->set_response_state(Downstream::MSG_RESET);
} }
} else if (downstream->get_response_state() != if (downstream->get_response_state() == Downstream::MSG_RESET &&
Downstream::MSG_BAD_HEADER) { downstream->get_response_rst_stream_error_code() ==
downstream->set_response_state(Downstream::MSG_RESET); NGHTTP2_NO_ERROR) {
downstream->set_response_rst_stream_error_code(error_code);
}
call_downstream_readcb(http2session, downstream);
} }
if (downstream->get_response_state() == Downstream::MSG_RESET &&
downstream->get_response_rst_stream_error_code() ==
NGHTTP2_NO_ERROR) {
downstream->set_response_rst_stream_error_code(error_code);
}
call_downstream_readcb(http2session, downstream);
// dconn may be deleted // dconn may be deleted
} }
} }
@ -730,6 +741,7 @@ int on_header_callback(nghttp2_session *session, const nghttp2_frame *frame,
const uint8_t *name, size_t namelen, const uint8_t *name, size_t namelen,
const uint8_t *value, size_t valuelen, uint8_t flags, const uint8_t *value, size_t valuelen, uint8_t flags,
void *user_data) { void *user_data) {
auto http2session = static_cast<Http2Session *>(user_data);
auto sd = static_cast<StreamData *>( auto sd = static_cast<StreamData *>(
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if (!sd || !sd->dconn) { if (!sd || !sd->dconn) {
@ -740,44 +752,80 @@ int on_header_callback(nghttp2_session *session, const nghttp2_frame *frame,
return 0; return 0;
} }
if (frame->hd.type != NGHTTP2_HEADERS) { switch (frame->hd.type) {
return 0; case NGHTTP2_HEADERS: {
} auto trailer = frame->headers.cat == NGHTTP2_HCAT_HEADERS &&
!downstream->get_expect_final_response();
auto trailer = frame->headers.cat != NGHTTP2_HCAT_RESPONSE && if (downstream->get_response_headers_sum() + namelen + valuelen >
!downstream->get_expect_final_response(); get_config()->header_field_buffer ||
downstream->get_response_headers().size() >=
get_config()->max_header_fields) {
if (LOG_ENABLED(INFO)) {
DLOG(INFO, downstream)
<< "Too large or many header field size="
<< downstream->get_response_headers_sum() + namelen + valuelen
<< ", num=" << downstream->get_response_headers().size() + 1;
}
if (downstream->get_response_headers_sum() + namelen + valuelen > if (trailer) {
get_config()->header_field_buffer || // we don't care trailer part exceeds header size limit; just
downstream->get_response_headers().size() >= // discard it.
get_config()->max_header_fields) { return 0;
if (LOG_ENABLED(INFO)) { }
DLOG(INFO, downstream)
<< "Too large or many header field size=" return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
<< downstream->get_response_headers_sum() + namelen + valuelen
<< ", num=" << downstream->get_response_headers().size() + 1;
} }
if (trailer) { if (trailer) {
// we don't care trailer part exceeds header size limit; just // just store header fields for trailer part
// discard it. downstream->add_response_trailer(name, namelen, value, valuelen,
flags & NGHTTP2_NV_FLAG_NO_INDEX, -1);
return 0; return 0;
} }
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; auto token = http2::lookup_token(name, namelen);
}
if (trailer) { downstream->add_response_header(name, namelen, value, valuelen,
// just store header fields for trailer part flags & NGHTTP2_NV_FLAG_NO_INDEX, token);
downstream->add_response_trailer(name, namelen, value, valuelen,
flags & NGHTTP2_NV_FLAG_NO_INDEX, -1);
return 0; return 0;
} }
case NGHTTP2_PUSH_PROMISE: {
auto promised_stream_id = frame->push_promise.promised_stream_id;
auto promised_sd = static_cast<StreamData *>(
nghttp2_session_get_stream_user_data(session, promised_stream_id));
if (!promised_sd || !promised_sd->dconn) {
http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
return 0;
}
auto token = http2::lookup_token(name, namelen); auto promised_downstream = promised_sd->dconn->get_downstream();
assert(promised_downstream);
if (promised_downstream->get_request_headers_sum() + namelen + valuelen >
get_config()->header_field_buffer ||
promised_downstream->get_request_headers().size() >=
get_config()->max_header_fields) {
if (LOG_ENABLED(INFO)) {
DLOG(INFO, promised_downstream)
<< "Too large or many header field size="
<< promised_downstream->get_request_headers_sum() + namelen +
valuelen << ", num="
<< promised_downstream->get_request_headers().size() + 1;
}
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
auto token = http2::lookup_token(name, namelen);
promised_downstream->add_request_header(name, namelen, value, valuelen,
flags & NGHTTP2_NV_FLAG_NO_INDEX,
token);
return 0;
}
}
downstream->add_response_header(name, namelen, value, valuelen,
flags & NGHTTP2_NV_FLAG_NO_INDEX, token);
return 0; return 0;
} }
} // namespace } // namespace
@ -786,24 +834,52 @@ namespace {
int on_begin_headers_callback(nghttp2_session *session, int on_begin_headers_callback(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data) { const nghttp2_frame *frame, void *user_data) {
auto http2session = static_cast<Http2Session *>(user_data); auto http2session = static_cast<Http2Session *>(user_data);
if (frame->hd.type != NGHTTP2_HEADERS ||
frame->headers.cat != NGHTTP2_HCAT_RESPONSE) { switch (frame->hd.type) {
case NGHTTP2_HEADERS: {
if (frame->headers.cat != NGHTTP2_HCAT_RESPONSE &&
frame->headers.cat != NGHTTP2_HCAT_PUSH_RESPONSE) {
return 0;
}
auto sd = static_cast<StreamData *>(
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if (!sd || !sd->dconn) {
http2session->submit_rst_stream(frame->hd.stream_id,
NGHTTP2_INTERNAL_ERROR);
return 0;
}
auto downstream = sd->dconn->get_downstream();
if (!downstream ||
downstream->get_downstream_stream_id() != frame->hd.stream_id) {
http2session->submit_rst_stream(frame->hd.stream_id,
NGHTTP2_INTERNAL_ERROR);
return 0;
}
return 0; return 0;
} }
auto sd = static_cast<StreamData *>( case NGHTTP2_PUSH_PROMISE: {
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); auto promised_stream_id = frame->push_promise.promised_stream_id;
if (!sd || !sd->dconn) { auto sd = static_cast<StreamData *>(
http2session->submit_rst_stream(frame->hd.stream_id, nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
NGHTTP2_INTERNAL_ERROR); if (!sd || !sd->dconn) {
http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
return 0;
}
auto downstream = sd->dconn->get_downstream();
assert(downstream);
assert(downstream->get_downstream_stream_id() == frame->hd.stream_id);
if (http2session->handle_downstream_push_promise(downstream,
promised_stream_id) != 0) {
http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
}
return 0; return 0;
} }
auto downstream = sd->dconn->get_downstream();
if (!downstream ||
downstream->get_downstream_stream_id() != frame->hd.stream_id) {
http2session->submit_rst_stream(frame->hd.stream_id,
NGHTTP2_INTERNAL_ERROR);
return 0;
} }
return 0; return 0;
} }
} // namespace } // namespace
@ -976,7 +1052,8 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
return 0; return 0;
} }
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) { if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE ||
frame->headers.cat == NGHTTP2_HCAT_PUSH_RESPONSE) {
rv = on_response_headers(http2session, downstream, session, frame); rv = on_response_headers(http2session, downstream, session, frame);
if (rv != 0) { if (rv != 0) {
@ -1044,17 +1121,47 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
http2session->connection_alive(); http2session->connection_alive();
} }
return 0; return 0;
case NGHTTP2_PUSH_PROMISE: case NGHTTP2_PUSH_PROMISE: {
auto promised_stream_id = frame->push_promise.promised_stream_id;
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
SSLOG(INFO, http2session) SSLOG(INFO, http2session)
<< "Received downstream PUSH_PROMISE stream_id=" << "Received downstream PUSH_PROMISE stream_id="
<< frame->hd.stream_id << frame->hd.stream_id
<< ", promised_stream_id=" << frame->push_promise.promised_stream_id; << ", promised_stream_id=" << promised_stream_id;
} }
// We just respond with RST_STREAM.
http2session->submit_rst_stream(frame->push_promise.promised_stream_id, auto sd = static_cast<StreamData *>(
NGHTTP2_REFUSED_STREAM); nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if (!sd || !sd->dconn) {
http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
return 0;
}
auto downstream = sd->dconn->get_downstream();
assert(downstream);
assert(downstream->get_downstream_stream_id() == frame->hd.stream_id);
auto promised_sd = static_cast<StreamData *>(
nghttp2_session_get_stream_user_data(session, promised_stream_id));
if (!promised_sd || !promised_sd->dconn) {
http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
return 0;
}
auto promised_downstream = promised_sd->dconn->get_downstream();
assert(promised_downstream);
if (http2session->handle_downstream_push_promise_complete(
downstream, promised_downstream) != 0) {
http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
return 0;
}
return 0; return 0;
}
default: default:
return 0; return 0;
} }
@ -1279,16 +1386,22 @@ int Http2Session::connection_made() {
flow_control_ = true; flow_control_ = true;
std::array<nghttp2_settings_entry, 3> entry; std::array<nghttp2_settings_entry, 3> entry;
entry[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH; size_t nentry = 2;
entry[0].value = 0; entry[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
entry[1].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS; entry[0].value = get_config()->http2_max_concurrent_streams;
entry[1].value = get_config()->http2_max_concurrent_streams;
entry[2].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; entry[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
entry[2].value = (1 << get_config()->http2_downstream_window_bits) - 1; entry[1].value = (1 << get_config()->http2_downstream_window_bits) - 1;
if (get_config()->no_server_push || get_config()->http2_proxy ||
get_config()->client_proxy) {
entry[nentry].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
entry[nentry].value = 0;
++nentry;
}
rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, entry.data(), rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, entry.data(),
entry.size()); nentry);
if (rv != 0) { if (rv != 0) {
return -1; return -1;
} }
@ -1771,4 +1884,96 @@ size_t Http2Session::get_group() const { return group_; }
size_t Http2Session::get_index() const { return index_; } size_t Http2Session::get_index() const { return index_; }
int Http2Session::handle_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id) {
auto upstream = downstream->get_upstream();
if (!upstream->push_enabled()) {
return -1;
}
auto promised_downstream =
upstream->on_downstream_push_promise(downstream, promised_stream_id);
if (!promised_downstream) {
return -1;
}
// Now we have Downstream object for pushed stream.
// promised_downstream->get_stream() still returns 0.
auto handler = upstream->get_client_handler();
auto worker = handler->get_worker();
auto promised_dconn =
make_unique<Http2DownstreamConnection>(worker->get_dconn_pool(), this);
promised_dconn->set_client_handler(handler);
auto ptr = promised_dconn.get();
if (promised_downstream->attach_downstream_connection(
std::move(promised_dconn)) != 0) {
return -1;
}
auto promised_sd = make_unique<StreamData>();
nghttp2_session_set_stream_user_data(session_, promised_stream_id,
promised_sd.get());
ptr->attach_stream_data(promised_sd.get());
streams_.append(promised_sd.release());
return 0;
}
int Http2Session::handle_downstream_push_promise_complete(
Downstream *downstream, Downstream *promised_downstream) {
auto authority =
promised_downstream->get_request_header(http2::HD__AUTHORITY);
auto path = promised_downstream->get_request_header(http2::HD__PATH);
auto method = promised_downstream->get_request_header(http2::HD__METHOD);
auto scheme = promised_downstream->get_request_header(http2::HD__SCHEME);
if (!authority) {
authority = promised_downstream->get_request_header(http2::HD_HOST);
}
auto method_token = http2::lookup_method_token(method->value);
if (method_token == -1) {
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Unrecognized method: " << method->value;
}
return -1;
}
// TODO Rewrite authority if we enabled rewrite host. But we
// really don't know how to rewrite host. Should we use the same
// host in associated stream?
promised_downstream->set_request_http2_authority(
http2::value_to_str(authority));
promised_downstream->set_request_method(method_token);
// libnghttp2 ensures that we don't have CONNECT method in
// PUSH_PROMISE, and guarantees that :scheme exists.
promised_downstream->set_request_http2_scheme(http2::value_to_str(scheme));
// For server-wide OPTIONS request, path is empty.
if (method_token != HTTP_OPTIONS || path->value != "*") {
promised_downstream->set_request_path(http2::rewrite_clean_path(
std::begin(path->value), std::end(path->value)));
}
promised_downstream->inspect_http2_request();
auto upstream = promised_downstream->get_upstream();
promised_downstream->set_request_state(Downstream::MSG_COMPLETE);
if (upstream->on_downstream_push_promise_complete(downstream,
promised_downstream) != 0) {
return -1;
}
return 0;
}
} // namespace shrpx } // namespace shrpx

View File

@ -156,6 +156,11 @@ public:
size_t get_index() const; size_t get_index() const;
int handle_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id);
int handle_downstream_push_promise_complete(Downstream *downstream,
Downstream *promised_downstream);
enum { enum {
// Disconnected // Disconnected
DISCONNECTED, DISCONNECTED,

View File

@ -543,6 +543,13 @@ int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame,
return 0; return 0;
case NGHTTP2_PUSH_PROMISE: { case NGHTTP2_PUSH_PROMISE: {
auto promised_stream_id = frame->push_promise.promised_stream_id; auto promised_stream_id = frame->push_promise.promised_stream_id;
if (nghttp2_session_get_stream_user_data(session, promised_stream_id)) {
// In case of push from backend, downstream object was already
// created.
return 0;
}
auto downstream = make_unique<Downstream>(upstream, handler->get_mcpool(), auto downstream = make_unique<Downstream>(upstream, handler->get_mcpool(),
promised_stream_id, 0); promised_stream_id, 0);
@ -1712,6 +1719,7 @@ int Http2Upstream::on_downstream_reset(bool no_retry) {
} }
if (!downstream->request_submission_ready()) { if (!downstream->request_submission_ready()) {
// pushed stream is handled here
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
downstream->pop_downstream_connection(); downstream->pop_downstream_connection();
continue; continue;
@ -1853,15 +1861,18 @@ int Http2Upstream::submit_push_promise(const std::string &scheme,
return 0; return 0;
} }
bool Http2Upstream::push_enabled() const {
return !(get_config()->no_server_push ||
nghttp2_session_get_remote_settings(
session_, NGHTTP2_SETTINGS_ENABLE_PUSH) == 0 ||
get_config()->http2_proxy || get_config()->client_proxy);
}
int Http2Upstream::initiate_push(Downstream *downstream, const char *uri, int Http2Upstream::initiate_push(Downstream *downstream, const char *uri,
size_t len) { size_t len) {
int rv; int rv;
if (len == 0 || get_config()->no_server_push || if (len == 0 || !push_enabled() || (downstream->get_stream_id() % 2)) {
nghttp2_session_get_remote_settings(session_,
NGHTTP2_SETTINGS_ENABLE_PUSH) == 0 ||
get_config()->http2_proxy || get_config()->client_proxy ||
(downstream->get_stream_id() % 2) == 0) {
return 0; return 0;
} }
@ -1916,4 +1927,58 @@ bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; }
Http2Upstream::WriteBuffer *Http2Upstream::get_response_buf() { return &wb_; } Http2Upstream::WriteBuffer *Http2Upstream::get_response_buf() { return &wb_; }
Downstream *
Http2Upstream::on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id) {
// promised_stream_id is for backend HTTP/2 session, not for
// frontend.
auto promised_downstream =
make_unique<Downstream>(this, handler_->get_mcpool(), 0, 0);
promised_downstream->set_downstream_stream_id(promised_stream_id);
promised_downstream->disable_upstream_rtimer();
promised_downstream->set_request_major(2);
promised_downstream->set_request_minor(0);
auto ptr = promised_downstream.get();
add_pending_downstream(std::move(promised_downstream));
downstream_queue_.mark_active(ptr);
return ptr;
}
int Http2Upstream::on_downstream_push_promise_complete(
Downstream *downstream, Downstream *promised_downstream) {
std::vector<nghttp2_nv> nva;
auto &headers = promised_downstream->get_request_headers();
nva.reserve(headers.size());
for (auto &kv : headers) {
nva.push_back(http2::make_nv_nocopy(kv.name, kv.value, kv.no_index));
}
auto promised_stream_id = nghttp2_submit_push_promise(
session_, NGHTTP2_FLAG_NONE, downstream->get_stream_id(), nva.data(),
nva.size(), promised_downstream);
if (promised_stream_id < 0) {
return -1;
}
promised_downstream->set_stream_id(promised_stream_id);
return 0;
}
void Http2Upstream::cancel_premature_downstream(
Downstream *promised_downstream) {
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Remove premature promised stream "
<< promised_downstream;
}
downstream_queue_.remove_and_get_blocked(promised_downstream, false);
}
} // namespace shrpx } // namespace shrpx

View File

@ -87,6 +87,14 @@ public:
virtual void response_drain(size_t n); virtual void response_drain(size_t n);
virtual bool response_empty() const; virtual bool response_empty() const;
virtual Downstream *on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id);
virtual int
on_downstream_push_promise_complete(Downstream *downstream,
Downstream *promised_downstream);
virtual bool push_enabled() const;
virtual void cancel_premature_downstream(Downstream *promised_downstream);
bool get_flow_control() const; bool get_flow_control() const;
// Perform HTTP/2 upgrade from |upstream|. On success, this object // Perform HTTP/2 upgrade from |upstream|. On success, this object
// takes ownership of the |upstream|. This function returns 0 if it // takes ownership of the |upstream|. This function returns 0 if it

View File

@ -1191,4 +1191,20 @@ bool HttpsUpstream::response_empty() const {
return buf->rleft() == 0; return buf->rleft() == 0;
} }
Downstream *
HttpsUpstream::on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id) {
return nullptr;
}
int HttpsUpstream::on_downstream_push_promise_complete(
Downstream *downstream, Downstream *promised_downstream) {
return -1;
}
bool HttpsUpstream::push_enabled() const { return false; }
void HttpsUpstream::cancel_premature_downstream(
Downstream *promised_downstream) {}
} // namespace shrpx } // namespace shrpx

View File

@ -82,6 +82,14 @@ public:
virtual void response_drain(size_t n); virtual void response_drain(size_t n);
virtual bool response_empty() const; virtual bool response_empty() const;
virtual Downstream *on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id);
virtual int
on_downstream_push_promise_complete(Downstream *downstream,
Downstream *promised_downstream);
virtual bool push_enabled() const;
virtual void cancel_premature_downstream(Downstream *promised_downstream);
void reset_current_header_length(); void reset_current_header_length();
void log_response_headers(DefaultMemchunks *buf) const; void log_response_headers(DefaultMemchunks *buf) const;

View File

@ -1230,4 +1230,20 @@ bool SpdyUpstream::response_empty() const { return wb_.rleft() == 0; }
SpdyUpstream::WriteBuffer *SpdyUpstream::get_response_buf() { return &wb_; } SpdyUpstream::WriteBuffer *SpdyUpstream::get_response_buf() { return &wb_; }
Downstream *
SpdyUpstream::on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id) {
return nullptr;
}
int SpdyUpstream::on_downstream_push_promise_complete(
Downstream *downstream, Downstream *promised_downstream) {
return -1;
}
bool SpdyUpstream::push_enabled() const { return false; }
void SpdyUpstream::cancel_premature_downstream(
Downstream *promised_downstream) {}
} // namespace shrpx } // namespace shrpx

View File

@ -82,6 +82,14 @@ public:
virtual void response_drain(size_t n); virtual void response_drain(size_t n);
virtual bool response_empty() const; virtual bool response_empty() const;
virtual Downstream *on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id);
virtual int
on_downstream_push_promise_complete(Downstream *downstream,
Downstream *promised_downstream);
virtual bool push_enabled() const;
virtual void cancel_premature_downstream(Downstream *promised_downstream);
bool get_flow_control() const; bool get_flow_control() const;
int consume(int32_t stream_id, size_t len); int consume(int32_t stream_id, size_t len);

View File

@ -76,6 +76,28 @@ public:
virtual int response_riovec(struct iovec *iov, int iovcnt) const = 0; virtual int response_riovec(struct iovec *iov, int iovcnt) const = 0;
virtual void response_drain(size_t n) = 0; virtual void response_drain(size_t n) = 0;
virtual bool response_empty() const = 0; virtual bool response_empty() const = 0;
// Called when PUSH_PROMISE was started in downstream. The
// associated downstream is given as |downstream|. The promised
// stream ID is given as |promised_stream_id|. If upstream supports
// server push for the corresponding upstream connection, it should
// return Downstream object for pushed stream. Otherwise, returns
// nullptr.
virtual Downstream *
on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id) = 0;
// Called when PUSH_PROMISE frame was completely received in
// downstream. The associated downstream is given as |downstream|.
// This function returns 0 if it succeeds, or -1.
virtual int
on_downstream_push_promise_complete(Downstream *downstream,
Downstream *promised_downstream) = 0;
// Returns true if server push is enabled in upstream connection.
virtual bool push_enabled() const = 0;
// Cancels promised downstream. This function is called when
// PUSH_PROMISE for |promised_downstream| is not submitted to
// upstream session.
virtual void cancel_premature_downstream(Downstream *promised_downstream) = 0;
}; };
} // namespace shrpx } // namespace shrpx