nghttpx: Support server push from HTTP/2 backend

This commits enables HTTP/2 server push from HTTP/2 backend to be
relayed to HTTP/2 frontend.  To use this feature, --http2-bridge or
--client is required.  Server push via Link header field contiues to
work.
This commit is contained in:
Tatsuhiro Tsujikawa 2015-11-16 00:12:54 +09:00
parent b95a3c4b28
commit 1753bea692
13 changed files with 458 additions and 89 deletions

View File

@ -60,8 +60,8 @@ SIGUSR2
SERVER PUSH
-----------
nghttpx supports HTTP/2 server push in default mode. nghttpx looks
for Link header field (`RFC 5988
nghttpx supports HTTP/2 server push in default mode with Link header
field. nghttpx looks for Link header field (`RFC 5988
<http://tools.ietf.org/html/rfc5988>`_) in response headers from
backend server and extracts URI-reference with parameter
``rel=preload`` (see `preload
@ -81,6 +81,14 @@ Currently, the following restriction is applied for server push:
This limitation may be loosened in the future release.
nghttpx also supports server push if both frontend and backend are
HTTP/2 (which implies :option:`--http2-bridge` or :option:`--client`).
In this case, in addition to server push via Link header field, server
push from backend is relayed to frontend HTTP/2 session.
HTTP/2 server push will be disabled if :option:`--http2-proxy` or
:option:`--client-proxy` is used.
UNIX DOMAIN SOCKET
------------------

View File

@ -1480,9 +1480,14 @@ HTTP/2 and SPDY:
meant for debugging purpose and not intended to enhance
protocol security.
--no-server-push
Disable HTTP/2 server push. Server push is only
supported by default mode and HTTP/2 frontend. SPDY
frontend does not support server push.
Disable HTTP/2 server push. Server push is supported by
default mode and HTTP/2 frontend via Link header field.
It is also supported if both frontend and backend are
HTTP/2 (which implies --http2-bridge or --client mode).
In this case, server push from backend session is
relayed to frontend, and server push via Link header
field is also supported. HTTP SPDY frontend does not
support server push.
Mode:
(default mode)

View File

@ -120,7 +120,8 @@ bool remove_host_entry_if_empty(const DownstreamQueue::HostEntry &ent,
}
} // namespace
Downstream *DownstreamQueue::remove_and_get_blocked(Downstream *downstream) {
Downstream *DownstreamQueue::remove_and_get_blocked(Downstream *downstream,
bool next_blocked) {
// Delete downstream when this function returns.
auto delptr = std::unique_ptr<Downstream>(downstream);
@ -144,7 +145,7 @@ Downstream *DownstreamQueue::remove_and_get_blocked(Downstream *downstream) {
return nullptr;
}
if (ent.num_active >= conn_max_per_host_) {
if (!next_blocked || ent.num_active >= conn_max_per_host_) {
return nullptr;
}

View File

@ -79,10 +79,12 @@ public:
// |host|.
bool can_activate(const std::string &host) const;
// Removes and frees |downstream| object. If |downstream| is in
// Downstream::DISPATCH_ACTIVE, this function may return Downstream
// object with the same target host in Downstream::DISPATCH_BLOCKED
// if its connection is now not blocked by conn_max_per_host_ limit.
Downstream *remove_and_get_blocked(Downstream *downstream);
// Downstream::DISPATCH_ACTIVE, and |next_blocked| is true, this
// function may return Downstream object with the same target host
// in Downstream::DISPATCH_BLOCKED if its connection is now not
// blocked by conn_max_per_host_ limit.
Downstream *remove_and_get_blocked(Downstream *downstream,
bool next_blocked = true);
Downstream *get_downstreams() const;
HostEntry &find_host_entry(const std::string &host);
const std::string &make_host_key(const std::string &host) const;

View File

@ -682,7 +682,17 @@ int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
if (dconn) {
auto downstream = dconn->get_downstream();
if (downstream && downstream->get_downstream_stream_id() == stream_id) {
auto upstream = downstream->get_upstream();
if (downstream->get_downstream_stream_id() % 2 == 0 &&
downstream->get_request_state() == Downstream::INITIAL) {
// Downstream is canceled in backend before it is submitted in
// frontend session.
// This will avoid to send RST_STREAM to backend
downstream->set_response_state(Downstream::MSG_RESET);
upstream->cancel_premature_downstream(downstream);
} else {
if (downstream->get_upgraded() &&
downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
// For tunneled connection, we have to submit RST_STREAM to
@ -708,6 +718,7 @@ int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
downstream->set_response_rst_stream_error_code(error_code);
}
call_downstream_readcb(http2session, downstream);
}
// dconn may be deleted
}
}
@ -730,6 +741,7 @@ int on_header_callback(nghttp2_session *session, const nghttp2_frame *frame,
const uint8_t *name, size_t namelen,
const uint8_t *value, size_t valuelen, uint8_t flags,
void *user_data) {
auto http2session = static_cast<Http2Session *>(user_data);
auto sd = static_cast<StreamData *>(
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if (!sd || !sd->dconn) {
@ -740,11 +752,9 @@ int on_header_callback(nghttp2_session *session, const nghttp2_frame *frame,
return 0;
}
if (frame->hd.type != NGHTTP2_HEADERS) {
return 0;
}
auto trailer = frame->headers.cat != NGHTTP2_HCAT_RESPONSE &&
switch (frame->hd.type) {
case NGHTTP2_HEADERS: {
auto trailer = frame->headers.cat == NGHTTP2_HCAT_HEADERS &&
!downstream->get_expect_final_response();
if (downstream->get_response_headers_sum() + namelen + valuelen >
@ -779,6 +789,44 @@ int on_header_callback(nghttp2_session *session, const nghttp2_frame *frame,
downstream->add_response_header(name, namelen, value, valuelen,
flags & NGHTTP2_NV_FLAG_NO_INDEX, token);
return 0;
}
case NGHTTP2_PUSH_PROMISE: {
auto promised_stream_id = frame->push_promise.promised_stream_id;
auto promised_sd = static_cast<StreamData *>(
nghttp2_session_get_stream_user_data(session, promised_stream_id));
if (!promised_sd || !promised_sd->dconn) {
http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
return 0;
}
auto promised_downstream = promised_sd->dconn->get_downstream();
assert(promised_downstream);
if (promised_downstream->get_request_headers_sum() + namelen + valuelen >
get_config()->header_field_buffer ||
promised_downstream->get_request_headers().size() >=
get_config()->max_header_fields) {
if (LOG_ENABLED(INFO)) {
DLOG(INFO, promised_downstream)
<< "Too large or many header field size="
<< promised_downstream->get_request_headers_sum() + namelen +
valuelen << ", num="
<< promised_downstream->get_request_headers().size() + 1;
}
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
auto token = http2::lookup_token(name, namelen);
promised_downstream->add_request_header(name, namelen, value, valuelen,
flags & NGHTTP2_NV_FLAG_NO_INDEX,
token);
return 0;
}
}
return 0;
}
} // namespace
@ -786,8 +834,11 @@ namespace {
int on_begin_headers_callback(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data) {
auto http2session = static_cast<Http2Session *>(user_data);
if (frame->hd.type != NGHTTP2_HEADERS ||
frame->headers.cat != NGHTTP2_HCAT_RESPONSE) {
switch (frame->hd.type) {
case NGHTTP2_HEADERS: {
if (frame->headers.cat != NGHTTP2_HCAT_RESPONSE &&
frame->headers.cat != NGHTTP2_HCAT_PUSH_RESPONSE) {
return 0;
}
auto sd = static_cast<StreamData *>(
@ -804,6 +855,31 @@ int on_begin_headers_callback(nghttp2_session *session,
NGHTTP2_INTERNAL_ERROR);
return 0;
}
return 0;
}
case NGHTTP2_PUSH_PROMISE: {
auto promised_stream_id = frame->push_promise.promised_stream_id;
auto sd = static_cast<StreamData *>(
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if (!sd || !sd->dconn) {
http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
return 0;
}
auto downstream = sd->dconn->get_downstream();
assert(downstream);
assert(downstream->get_downstream_stream_id() == frame->hd.stream_id);
if (http2session->handle_downstream_push_promise(downstream,
promised_stream_id) != 0) {
http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
}
return 0;
}
}
return 0;
}
} // namespace
@ -976,7 +1052,8 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
return 0;
}
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE ||
frame->headers.cat == NGHTTP2_HCAT_PUSH_RESPONSE) {
rv = on_response_headers(http2session, downstream, session, frame);
if (rv != 0) {
@ -1044,17 +1121,47 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
http2session->connection_alive();
}
return 0;
case NGHTTP2_PUSH_PROMISE:
case NGHTTP2_PUSH_PROMISE: {
auto promised_stream_id = frame->push_promise.promised_stream_id;
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, http2session)
<< "Received downstream PUSH_PROMISE stream_id="
<< frame->hd.stream_id
<< ", promised_stream_id=" << frame->push_promise.promised_stream_id;
<< ", promised_stream_id=" << promised_stream_id;
}
// We just respond with RST_STREAM.
http2session->submit_rst_stream(frame->push_promise.promised_stream_id,
NGHTTP2_REFUSED_STREAM);
auto sd = static_cast<StreamData *>(
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if (!sd || !sd->dconn) {
http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
return 0;
}
auto downstream = sd->dconn->get_downstream();
assert(downstream);
assert(downstream->get_downstream_stream_id() == frame->hd.stream_id);
auto promised_sd = static_cast<StreamData *>(
nghttp2_session_get_stream_user_data(session, promised_stream_id));
if (!promised_sd || !promised_sd->dconn) {
http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
return 0;
}
auto promised_downstream = promised_sd->dconn->get_downstream();
assert(promised_downstream);
if (http2session->handle_downstream_push_promise_complete(
downstream, promised_downstream) != 0) {
http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
return 0;
}
return 0;
}
default:
return 0;
}
@ -1279,16 +1386,22 @@ int Http2Session::connection_made() {
flow_control_ = true;
std::array<nghttp2_settings_entry, 3> entry;
entry[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
entry[0].value = 0;
entry[1].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
entry[1].value = get_config()->http2_max_concurrent_streams;
size_t nentry = 2;
entry[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
entry[0].value = get_config()->http2_max_concurrent_streams;
entry[2].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
entry[2].value = (1 << get_config()->http2_downstream_window_bits) - 1;
entry[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
entry[1].value = (1 << get_config()->http2_downstream_window_bits) - 1;
if (get_config()->no_server_push || get_config()->http2_proxy ||
get_config()->client_proxy) {
entry[nentry].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
entry[nentry].value = 0;
++nentry;
}
rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, entry.data(),
entry.size());
nentry);
if (rv != 0) {
return -1;
}
@ -1771,4 +1884,96 @@ size_t Http2Session::get_group() const { return group_; }
size_t Http2Session::get_index() const { return index_; }
int Http2Session::handle_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id) {
auto upstream = downstream->get_upstream();
if (!upstream->push_enabled()) {
return -1;
}
auto promised_downstream =
upstream->on_downstream_push_promise(downstream, promised_stream_id);
if (!promised_downstream) {
return -1;
}
// Now we have Downstream object for pushed stream.
// promised_downstream->get_stream() still returns 0.
auto handler = upstream->get_client_handler();
auto worker = handler->get_worker();
auto promised_dconn =
make_unique<Http2DownstreamConnection>(worker->get_dconn_pool(), this);
promised_dconn->set_client_handler(handler);
auto ptr = promised_dconn.get();
if (promised_downstream->attach_downstream_connection(
std::move(promised_dconn)) != 0) {
return -1;
}
auto promised_sd = make_unique<StreamData>();
nghttp2_session_set_stream_user_data(session_, promised_stream_id,
promised_sd.get());
ptr->attach_stream_data(promised_sd.get());
streams_.append(promised_sd.release());
return 0;
}
int Http2Session::handle_downstream_push_promise_complete(
Downstream *downstream, Downstream *promised_downstream) {
auto authority =
promised_downstream->get_request_header(http2::HD__AUTHORITY);
auto path = promised_downstream->get_request_header(http2::HD__PATH);
auto method = promised_downstream->get_request_header(http2::HD__METHOD);
auto scheme = promised_downstream->get_request_header(http2::HD__SCHEME);
if (!authority) {
authority = promised_downstream->get_request_header(http2::HD_HOST);
}
auto method_token = http2::lookup_method_token(method->value);
if (method_token == -1) {
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Unrecognized method: " << method->value;
}
return -1;
}
// TODO Rewrite authority if we enabled rewrite host. But we
// really don't know how to rewrite host. Should we use the same
// host in associated stream?
promised_downstream->set_request_http2_authority(
http2::value_to_str(authority));
promised_downstream->set_request_method(method_token);
// libnghttp2 ensures that we don't have CONNECT method in
// PUSH_PROMISE, and guarantees that :scheme exists.
promised_downstream->set_request_http2_scheme(http2::value_to_str(scheme));
// For server-wide OPTIONS request, path is empty.
if (method_token != HTTP_OPTIONS || path->value != "*") {
promised_downstream->set_request_path(http2::rewrite_clean_path(
std::begin(path->value), std::end(path->value)));
}
promised_downstream->inspect_http2_request();
auto upstream = promised_downstream->get_upstream();
promised_downstream->set_request_state(Downstream::MSG_COMPLETE);
if (upstream->on_downstream_push_promise_complete(downstream,
promised_downstream) != 0) {
return -1;
}
return 0;
}
} // namespace shrpx

View File

@ -156,6 +156,11 @@ public:
size_t get_index() const;
int handle_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id);
int handle_downstream_push_promise_complete(Downstream *downstream,
Downstream *promised_downstream);
enum {
// Disconnected
DISCONNECTED,

View File

@ -543,6 +543,13 @@ int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame,
return 0;
case NGHTTP2_PUSH_PROMISE: {
auto promised_stream_id = frame->push_promise.promised_stream_id;
if (nghttp2_session_get_stream_user_data(session, promised_stream_id)) {
// In case of push from backend, downstream object was already
// created.
return 0;
}
auto downstream = make_unique<Downstream>(upstream, handler->get_mcpool(),
promised_stream_id, 0);
@ -1712,6 +1719,7 @@ int Http2Upstream::on_downstream_reset(bool no_retry) {
}
if (!downstream->request_submission_ready()) {
// pushed stream is handled here
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
downstream->pop_downstream_connection();
continue;
@ -1853,15 +1861,18 @@ int Http2Upstream::submit_push_promise(const std::string &scheme,
return 0;
}
bool Http2Upstream::push_enabled() const {
return !(get_config()->no_server_push ||
nghttp2_session_get_remote_settings(
session_, NGHTTP2_SETTINGS_ENABLE_PUSH) == 0 ||
get_config()->http2_proxy || get_config()->client_proxy);
}
int Http2Upstream::initiate_push(Downstream *downstream, const char *uri,
size_t len) {
int rv;
if (len == 0 || get_config()->no_server_push ||
nghttp2_session_get_remote_settings(session_,
NGHTTP2_SETTINGS_ENABLE_PUSH) == 0 ||
get_config()->http2_proxy || get_config()->client_proxy ||
(downstream->get_stream_id() % 2) == 0) {
if (len == 0 || !push_enabled() || (downstream->get_stream_id() % 2)) {
return 0;
}
@ -1916,4 +1927,58 @@ bool Http2Upstream::response_empty() const { return wb_.rleft() == 0; }
Http2Upstream::WriteBuffer *Http2Upstream::get_response_buf() { return &wb_; }
Downstream *
Http2Upstream::on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id) {
// promised_stream_id is for backend HTTP/2 session, not for
// frontend.
auto promised_downstream =
make_unique<Downstream>(this, handler_->get_mcpool(), 0, 0);
promised_downstream->set_downstream_stream_id(promised_stream_id);
promised_downstream->disable_upstream_rtimer();
promised_downstream->set_request_major(2);
promised_downstream->set_request_minor(0);
auto ptr = promised_downstream.get();
add_pending_downstream(std::move(promised_downstream));
downstream_queue_.mark_active(ptr);
return ptr;
}
int Http2Upstream::on_downstream_push_promise_complete(
Downstream *downstream, Downstream *promised_downstream) {
std::vector<nghttp2_nv> nva;
auto &headers = promised_downstream->get_request_headers();
nva.reserve(headers.size());
for (auto &kv : headers) {
nva.push_back(http2::make_nv_nocopy(kv.name, kv.value, kv.no_index));
}
auto promised_stream_id = nghttp2_submit_push_promise(
session_, NGHTTP2_FLAG_NONE, downstream->get_stream_id(), nva.data(),
nva.size(), promised_downstream);
if (promised_stream_id < 0) {
return -1;
}
promised_downstream->set_stream_id(promised_stream_id);
return 0;
}
void Http2Upstream::cancel_premature_downstream(
Downstream *promised_downstream) {
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Remove premature promised stream "
<< promised_downstream;
}
downstream_queue_.remove_and_get_blocked(promised_downstream, false);
}
} // namespace shrpx

View File

@ -87,6 +87,14 @@ public:
virtual void response_drain(size_t n);
virtual bool response_empty() const;
virtual Downstream *on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id);
virtual int
on_downstream_push_promise_complete(Downstream *downstream,
Downstream *promised_downstream);
virtual bool push_enabled() const;
virtual void cancel_premature_downstream(Downstream *promised_downstream);
bool get_flow_control() const;
// Perform HTTP/2 upgrade from |upstream|. On success, this object
// takes ownership of the |upstream|. This function returns 0 if it

View File

@ -1191,4 +1191,20 @@ bool HttpsUpstream::response_empty() const {
return buf->rleft() == 0;
}
Downstream *
HttpsUpstream::on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id) {
return nullptr;
}
int HttpsUpstream::on_downstream_push_promise_complete(
Downstream *downstream, Downstream *promised_downstream) {
return -1;
}
bool HttpsUpstream::push_enabled() const { return false; }
void HttpsUpstream::cancel_premature_downstream(
Downstream *promised_downstream) {}
} // namespace shrpx

View File

@ -82,6 +82,14 @@ public:
virtual void response_drain(size_t n);
virtual bool response_empty() const;
virtual Downstream *on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id);
virtual int
on_downstream_push_promise_complete(Downstream *downstream,
Downstream *promised_downstream);
virtual bool push_enabled() const;
virtual void cancel_premature_downstream(Downstream *promised_downstream);
void reset_current_header_length();
void log_response_headers(DefaultMemchunks *buf) const;

View File

@ -1230,4 +1230,20 @@ bool SpdyUpstream::response_empty() const { return wb_.rleft() == 0; }
SpdyUpstream::WriteBuffer *SpdyUpstream::get_response_buf() { return &wb_; }
Downstream *
SpdyUpstream::on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id) {
return nullptr;
}
int SpdyUpstream::on_downstream_push_promise_complete(
Downstream *downstream, Downstream *promised_downstream) {
return -1;
}
bool SpdyUpstream::push_enabled() const { return false; }
void SpdyUpstream::cancel_premature_downstream(
Downstream *promised_downstream) {}
} // namespace shrpx

View File

@ -82,6 +82,14 @@ public:
virtual void response_drain(size_t n);
virtual bool response_empty() const;
virtual Downstream *on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id);
virtual int
on_downstream_push_promise_complete(Downstream *downstream,
Downstream *promised_downstream);
virtual bool push_enabled() const;
virtual void cancel_premature_downstream(Downstream *promised_downstream);
bool get_flow_control() const;
int consume(int32_t stream_id, size_t len);

View File

@ -76,6 +76,28 @@ public:
virtual int response_riovec(struct iovec *iov, int iovcnt) const = 0;
virtual void response_drain(size_t n) = 0;
virtual bool response_empty() const = 0;
// Called when PUSH_PROMISE was started in downstream. The
// associated downstream is given as |downstream|. The promised
// stream ID is given as |promised_stream_id|. If upstream supports
// server push for the corresponding upstream connection, it should
// return Downstream object for pushed stream. Otherwise, returns
// nullptr.
virtual Downstream *
on_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id) = 0;
// Called when PUSH_PROMISE frame was completely received in
// downstream. The associated downstream is given as |downstream|.
// This function returns 0 if it succeeds, or -1.
virtual int
on_downstream_push_promise_complete(Downstream *downstream,
Downstream *promised_downstream) = 0;
// Returns true if server push is enabled in upstream connection.
virtual bool push_enabled() const = 0;
// Cancels promised downstream. This function is called when
// PUSH_PROMISE for |promised_downstream| is not submitted to
// upstream session.
virtual void cancel_premature_downstream(Downstream *promised_downstream) = 0;
};
} // namespace shrpx