nghttpx: Enable host-path backend routing in HTTP/2 backend

To achieve host-path backend routing, we changed behaviour of
--backend-http2-connections-per-worker.  It now sets the number of
HTTP/2 physical connections per pattern group if pattern is used in -b
option.

Fixes GH-292
This commit is contained in:
Tatsuhiro Tsujikawa 2015-07-12 22:16:20 +09:00
parent 8a2543d7b7
commit 6307f96fb3
13 changed files with 106 additions and 70 deletions

View File

@ -1005,22 +1005,22 @@ Connections:
with "unix:" (e.g., unix:/var/run/backend.sock). with "unix:" (e.g., unix:/var/run/backend.sock).
Optionally, if <PATTERN>s are given, the backend address Optionally, if <PATTERN>s are given, the backend address
is only used if request matches the pattern. If -s, -p, is only used if request matches the pattern. If -s or
--client or --http2-bridge is used, <PATTERN>s are -p is used, <PATTERN>s are ignored. The pattern
ignored. The pattern matching is closely designed to matching is closely designed to ServeMux in net/http
ServeMux in net/http package of Go programming language. package of Go programming language. <PATTERN> consists
<PATTERN> consists of path, host + path or just host. of path, host + path or just host. The path must starts
The path must starts with "/". If it ends with "/", it with "/". If it ends with "/", it matches to the
matches to the request path whose prefix is the path. request path whose prefix is the path. To deal with the
To deal with the request to the directory without request to the directory without trailing slash, pattern
trailing slash, pattern which ends with "/" also matches which ends with "/" also matches the path if pattern ==
the path if pattern == path + "/" (e.g., pattern "/foo/" path + "/" (e.g., pattern "/foo/" matches path "/foo").
matches path "/foo"). If it does not end with "/", it If it does not end with "/", it performs exact match
performs exact match against the request path. If host against the request path. If host is given, it performs
is given, it performs exact match against the request exact match against the request host. If host alone is
host. If host alone is given, "/" is appended to it, so given, "/" is appended to it, so that it matches all
that it matches all paths under the host (e.g., paths under the host (e.g., specifying "nghttp2.org"
specifying "nghttp2.org" equals to "nghttp2.org/"). equals to "nghttp2.org/").
Patterns with host take precedence over path only Patterns with host take precedence over path only
patterns. Then, longer patterns take precedence over patterns. Then, longer patterns take precedence over
@ -1130,9 +1130,14 @@ Performance:
accepts. Setting 0 means unlimited. accepts. Setting 0 means unlimited.
Default: )" << get_config()->worker_frontend_connections << R"( Default: )" << get_config()->worker_frontend_connections << R"(
--backend-http2-connections-per-worker=<N> --backend-http2-connections-per-worker=<N>
Set maximum number of HTTP/2 connections per worker. Set maximum number of backend HTTP/2 physical
The default value is 0, which means the number of connections per worker. If pattern is used in -b
backend addresses specified by -b option. option, this limit is applied to each pattern group (in
other words, each pattern group can have maximum <N>
HTTP/2 connections). The default value is 0, which
means that the value is adjusted to the number of
backend addresses. If pattern is used, this adjustment
is done for each pattern group.
--backend-http1-connections-per-host=<N> --backend-http1-connections-per-host=<N>
Set maximum number of backend concurrent HTTP/1 Set maximum number of backend concurrent HTTP/1
connections per origin host. This option is meaningful connections per origin host. This option is meaningful
@ -2177,10 +2182,9 @@ int main(int argc, char **argv) {
DownstreamAddrGroup g("/"); DownstreamAddrGroup g("/");
g.addrs.push_back(std::move(addr)); g.addrs.push_back(std::move(addr));
mod_config()->downstream_addr_groups.push_back(std::move(g)); mod_config()->downstream_addr_groups.push_back(std::move(g));
} else if (get_config()->downstream_proto == PROTO_HTTP2 || } else if (get_config()->http2_proxy || get_config()->client_proxy) {
get_config()->http2_proxy || get_config()->client_proxy) {
// We don't support host mapping in these cases. Move all // We don't support host mapping in these cases. Move all
// non-catch-all patterns to catch-all pattern for HTTP/2 backend // non-catch-all patterns to catch-all pattern.
DownstreamAddrGroup catch_all("/"); DownstreamAddrGroup catch_all("/");
for (auto &g : mod_config()->downstream_addr_groups) { for (auto &g : mod_config()->downstream_addr_groups) {
std::move(std::begin(g.addrs), std::end(g.addrs), std::move(std::begin(g.addrs), std::end(g.addrs),
@ -2276,12 +2280,6 @@ int main(int argc, char **argv) {
} }
} }
if (get_config()->downstream_proto == PROTO_HTTP2 &&
get_config()->http2_downstream_connections_per_worker == 0) {
mod_config()->http2_downstream_connections_per_worker =
get_config()->downstream_addr_groups[0].addrs.size();
}
if (get_config()->rlimit_nofile) { if (get_config()->rlimit_nofile) {
struct rlimit lim = {static_cast<rlim_t>(get_config()->rlimit_nofile), struct rlimit lim = {static_cast<rlim_t>(get_config()->rlimit_nofile),
static_cast<rlim_t>(get_config()->rlimit_nofile)}; static_cast<rlim_t>(get_config()->rlimit_nofile)};

View File

@ -39,6 +39,7 @@
#include "shrpx_worker.h" #include "shrpx_worker.h"
#include "shrpx_downstream_connection_pool.h" #include "shrpx_downstream_connection_pool.h"
#include "shrpx_downstream.h" #include "shrpx_downstream.h"
#include "shrpx_http2_session.h"
#ifdef HAVE_SPDYLAY #ifdef HAVE_SPDYLAY
#include "shrpx_spdy_upstream.h" #include "shrpx_spdy_upstream.h"
#endif // HAVE_SPDYLAY #endif // HAVE_SPDYLAY
@ -361,7 +362,6 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
get_config()->write_burst, get_config()->read_rate, get_config()->write_burst, get_config()->read_rate,
get_config()->read_burst, writecb, readcb, timeoutcb, this), get_config()->read_burst, writecb, readcb, timeoutcb, this),
ipaddr_(ipaddr), port_(port), worker_(worker), ipaddr_(ipaddr), port_(port), worker_(worker),
http2session_(worker_->next_http2_session()),
left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN), left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN),
should_close_after_write_(false) { should_close_after_write_(false) {
@ -612,9 +612,8 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
auto catch_all = get_config()->downstream_addr_group_catch_all; auto catch_all = get_config()->downstream_addr_group_catch_all;
// Fast path. If we have one group, it must be catch-all group. // Fast path. If we have one group, it must be catch-all group.
// HTTP/2 and client proxy modes fall in this case. Currently, // HTTP/2 and client proxy modes fall in this case.
// HTTP/2 backend does not perform host-path mapping. if (groups.size() == 1) {
if (groups.size() == 1 || get_config()->downstream_proto == PROTO_HTTP2) {
group = 0; group = 0;
} else if (downstream->get_request_method() == HTTP_CONNECT) { } else if (downstream->get_request_method() == HTTP_CONNECT) {
// We don't know how to treat CONNECT request in host-path // We don't know how to treat CONNECT request in host-path
@ -653,8 +652,9 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
auto dconn_pool = worker_->get_dconn_pool(); auto dconn_pool = worker_->get_dconn_pool();
if (http2session_) { if (get_config()->downstream_proto == PROTO_HTTP2) {
dconn = make_unique<Http2DownstreamConnection>(dconn_pool, http2session_); auto http2session = worker_->next_http2_session(group);
dconn = make_unique<Http2DownstreamConnection>(dconn_pool, http2session);
} else { } else {
dconn = dconn =
make_unique<HttpDownstreamConnection>(dconn_pool, group, conn_.loop); make_unique<HttpDownstreamConnection>(dconn_pool, group, conn_.loop);

View File

@ -44,7 +44,6 @@ namespace shrpx {
class Upstream; class Upstream;
class DownstreamConnection; class DownstreamConnection;
class Http2Session;
class HttpsUpstream; class HttpsUpstream;
class ConnectBlocker; class ConnectBlocker;
class DownstreamConnectionPool; class DownstreamConnectionPool;
@ -142,7 +141,6 @@ private:
std::function<int(ClientHandler &)> read_, write_; std::function<int(ClientHandler &)> read_, write_;
std::function<int(ClientHandler &)> on_read_, on_write_; std::function<int(ClientHandler &)> on_read_, on_write_;
Worker *worker_; Worker *worker_;
Http2Session *http2session_;
// The number of bytes of HTTP/2 client connection header to read // The number of bytes of HTTP/2 client connection header to read
size_t left_connhd_len_; size_t left_connhd_len_;
bool should_close_after_write_; bool should_close_after_write_;

View File

@ -263,8 +263,11 @@ int Http2DownstreamConnection::push_request_headers() {
// http2session_ has already in CONNECTED state, so we can get // http2session_ has already in CONNECTED state, so we can get
// addr_idx here. // addr_idx here.
auto addr_idx = http2session_->get_addr_idx(); auto addr_idx = http2session_->get_addr_idx();
auto downstream_hostport = auto group = http2session_->get_group();
get_config()->downstream_addr_groups[0].addrs[addr_idx].hostport.get(); auto downstream_hostport = get_config()
->downstream_addr_groups[group]
.addrs[addr_idx]
.hostport.get();
const char *authority = nullptr, *host = nullptr; const char *authority = nullptr, *host = nullptr;
if (!no_host_rewrite) { if (!no_host_rewrite) {
@ -557,4 +560,10 @@ int Http2DownstreamConnection::on_timeout() {
return submit_rst_stream(downstream_, NGHTTP2_NO_ERROR); return submit_rst_stream(downstream_, NGHTTP2_NO_ERROR);
} }
size_t Http2DownstreamConnection::get_group() const {
// HTTP/2 backend connections are managed by Http2Session object,
// and it stores group index.
return http2session_->get_group();
}
} // namespace shrpx } // namespace shrpx

View File

@ -61,8 +61,7 @@ public:
virtual void on_upstream_change(Upstream *upstream) {} virtual void on_upstream_change(Upstream *upstream) {}
virtual int on_priority_change(int32_t pri); virtual int on_priority_change(int32_t pri);
// Currently, HTTP/2 backend does not perform host-path mapping. virtual size_t get_group() const;
virtual size_t get_group() const { return 0; }
// This object is not poolable because we dont' have facility to // This object is not poolable because we dont' have facility to
// migrate to another Http2Session object. // migrate to another Http2Session object.

View File

@ -142,13 +142,14 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
} // namespace } // namespace
Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
ConnectBlocker *connect_blocker, Worker *worker) ConnectBlocker *connect_blocker, Worker *worker,
size_t group)
: conn_(loop, -1, nullptr, get_config()->downstream_write_timeout, : conn_(loop, -1, nullptr, get_config()->downstream_write_timeout,
get_config()->downstream_read_timeout, 0, 0, 0, 0, writecb, readcb, get_config()->downstream_read_timeout, 0, 0, 0, 0, writecb, readcb,
timeoutcb, this), timeoutcb, this),
worker_(worker), connect_blocker_(connect_blocker), ssl_ctx_(ssl_ctx), worker_(worker), connect_blocker_(connect_blocker), ssl_ctx_(ssl_ctx),
session_(nullptr), data_pending_(nullptr), data_pendinglen_(0), session_(nullptr), data_pending_(nullptr), data_pendinglen_(0),
addr_idx_(0), state_(DISCONNECTED), addr_idx_(0), group_(group), state_(DISCONNECTED),
connection_check_state_(CONNECTION_CHECK_NONE), flow_control_(false) { connection_check_state_(CONNECTION_CHECK_NONE), flow_control_(false) {
read_ = write_ = &Http2Session::noop; read_ = write_ = &Http2Session::noop;
@ -235,13 +236,14 @@ int Http2Session::disconnect(bool hard) {
int Http2Session::check_cert() { int Http2Session::check_cert() {
return ssl::check_cert( return ssl::check_cert(
conn_.tls.ssl, &get_config()->downstream_addr_groups[0].addrs[addr_idx_]); conn_.tls.ssl,
&get_config()->downstream_addr_groups[group_].addrs[addr_idx_]);
} }
int Http2Session::initiate_connection() { int Http2Session::initiate_connection() {
int rv = 0; int rv = 0;
auto &addrs = get_config()->downstream_addr_groups[0].addrs; auto &addrs = get_config()->downstream_addr_groups[group_].addrs;
if (state_ == DISCONNECTED) { if (state_ == DISCONNECTED) {
if (connect_blocker_->blocked()) { if (connect_blocker_->blocked()) {
@ -252,8 +254,7 @@ int Http2Session::initiate_connection() {
return -1; return -1;
} }
auto worker_stat = worker_->get_worker_stat(); auto &next_downstream = worker_->get_dgrp(group_)->next;
auto &next_downstream = worker_stat->next_downstream[0];
addr_idx_ = next_downstream; addr_idx_ = next_downstream;
if (++next_downstream >= addrs.size()) { if (++next_downstream >= addrs.size()) {
next_downstream = 0; next_downstream = 0;
@ -511,7 +512,7 @@ int Http2Session::downstream_connect_proxy() {
SSLOG(INFO, this) << "Connected to the proxy"; SSLOG(INFO, this) << "Connected to the proxy";
} }
auto &downstream_addr = auto &downstream_addr =
get_config()->downstream_addr_groups[0].addrs[addr_idx_]; get_config()->downstream_addr_groups[group_].addrs[addr_idx_];
std::string req = "CONNECT "; std::string req = "CONNECT ";
req += downstream_addr.hostport.get(); req += downstream_addr.hostport.get();
@ -1752,4 +1753,6 @@ bool Http2Session::should_hard_fail() const {
size_t Http2Session::get_addr_idx() const { return addr_idx_; } size_t Http2Session::get_addr_idx() const { return addr_idx_; }
size_t Http2Session::get_group() const { return group_; }
} // namespace shrpx } // namespace shrpx

View File

@ -58,7 +58,7 @@ struct StreamData {
class Http2Session { class Http2Session {
public: public:
Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
ConnectBlocker *connect_blocker, Worker *worker); ConnectBlocker *connect_blocker, Worker *worker, size_t group);
~Http2Session(); ~Http2Session();
int check_cert(); int check_cert();
@ -151,6 +151,8 @@ public:
size_t get_addr_idx() const; size_t get_addr_idx() const;
size_t get_group() const;
enum { enum {
// Disconnected // Disconnected
DISCONNECTED, DISCONNECTED,
@ -203,6 +205,7 @@ private:
size_t data_pendinglen_; size_t data_pendinglen_;
// index of get_config()->downstream_addrs this object uses // index of get_config()->downstream_addrs this object uses
size_t addr_idx_; size_t addr_idx_;
size_t group_;
int state_; int state_;
int connection_check_state_; int connection_check_state_;
bool flow_control_; bool flow_control_;

View File

@ -36,6 +36,7 @@
#include "shrpx_config.h" #include "shrpx_config.h"
#include "shrpx_http.h" #include "shrpx_http.h"
#include "shrpx_worker.h" #include "shrpx_worker.h"
#include "shrpx_http2_session.h"
#include "http2.h" #include "http2.h"
#include "util.h" #include "util.h"
#include "base64.h" #include "base64.h"

View File

@ -34,6 +34,7 @@
#include "shrpx_connect_blocker.h" #include "shrpx_connect_blocker.h"
#include "shrpx_downstream_connection_pool.h" #include "shrpx_downstream_connection_pool.h"
#include "shrpx_worker.h" #include "shrpx_worker.h"
#include "shrpx_http2_session.h"
#include "http2.h" #include "http2.h"
#include "util.h" #include "util.h"
@ -142,8 +143,7 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
} }
auto worker = client_handler_->get_worker(); auto worker = client_handler_->get_worker();
auto worker_stat = worker->get_worker_stat(); auto &next_downstream = worker->get_dgrp(group_)->next;
auto &next_downstream = worker_stat->next_downstream[group_];
auto end = next_downstream; auto end = next_downstream;
auto &addrs = get_config()->downstream_addr_groups[group_].addrs; auto &addrs = get_config()->downstream_addr_groups[group_].addrs;
for (;;) { for (;;) {

View File

@ -36,6 +36,7 @@
#include "shrpx_error.h" #include "shrpx_error.h"
#include "shrpx_log_config.h" #include "shrpx_log_config.h"
#include "shrpx_worker.h" #include "shrpx_worker.h"
#include "shrpx_http2_session.h"
#include "http2.h" #include "http2.h"
#include "util.h" #include "util.h"
#include "template.h" #include "template.h"

View File

@ -53,6 +53,7 @@
#include "shrpx_config.h" #include "shrpx_config.h"
#include "shrpx_worker.h" #include "shrpx_worker.h"
#include "shrpx_downstream_connection_pool.h" #include "shrpx_downstream_connection_pool.h"
#include "shrpx_http2_session.h"
#include "util.h" #include "util.h"
#include "ssl.h" #include "ssl.h"
#include "template.h" #include "template.h"

View File

@ -61,9 +61,9 @@ void mcpool_clear_cb(struct ev_loop *loop, ev_timer *w, int revents) {
Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
ssl::CertLookupTree *cert_tree, ssl::CertLookupTree *cert_tree,
const std::shared_ptr<TicketKeys> &ticket_keys) const std::shared_ptr<TicketKeys> &ticket_keys)
: next_http2session_(0), : dconn_pool_(get_config()->downstream_addr_groups.size()),
dconn_pool_(get_config()->downstream_addr_groups.size()), worker_stat_(get_config()->downstream_addr_groups.size()),
worker_stat_(get_config()->downstream_addr_groups.size()), loop_(loop), dgrps_(get_config()->downstream_addr_groups.size()), loop_(loop),
sv_ssl_ctx_(sv_ssl_ctx), cl_ssl_ctx_(cl_ssl_ctx), cert_tree_(cert_tree), sv_ssl_ctx_(sv_ssl_ctx), cl_ssl_ctx_(cl_ssl_ctx), cert_tree_(cert_tree),
ticket_keys_(ticket_keys), ticket_keys_(ticket_keys),
connect_blocker_(make_unique<ConnectBlocker>(loop_)), connect_blocker_(make_unique<ConnectBlocker>(loop_)),
@ -77,9 +77,17 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
if (get_config()->downstream_proto == PROTO_HTTP2) { if (get_config()->downstream_proto == PROTO_HTTP2) {
auto n = get_config()->http2_downstream_connections_per_worker; auto n = get_config()->http2_downstream_connections_per_worker;
for (; n > 0; --n) { size_t group = 0;
http2sessions_.push_back(make_unique<Http2Session>( for (auto &dgrp : dgrps_) {
loop_, cl_ssl_ctx, connect_blocker_.get(), this)); auto m = n;
if (m == 0) {
m = get_config()->downstream_addr_groups[group].addrs.size();
}
for (; m; --m) {
dgrp.http2sessions.push_back(make_unique<Http2Session>(
loop_, cl_ssl_ctx, connect_blocker_.get(), this, group));
}
++group;
} }
} }
} }
@ -210,15 +218,17 @@ WorkerStat *Worker::get_worker_stat() { return &worker_stat_; }
DownstreamConnectionPool *Worker::get_dconn_pool() { return &dconn_pool_; } DownstreamConnectionPool *Worker::get_dconn_pool() { return &dconn_pool_; }
Http2Session *Worker::next_http2_session() { Http2Session *Worker::next_http2_session(size_t group) {
if (http2sessions_.empty()) { auto &dgrp = dgrps_[group];
auto &http2sessions = dgrp.http2sessions;
if (http2sessions.empty()) {
return nullptr; return nullptr;
} }
auto res = http2sessions_[next_http2session_].get(); auto res = http2sessions[dgrp.next_http2session].get();
++next_http2session_; ++dgrp.next_http2session;
if (next_http2session_ >= http2sessions_.size()) { if (dgrp.next_http2session >= http2sessions.size()) {
next_http2session_ = 0; dgrp.next_http2session = 0;
} }
return res; return res;
@ -242,4 +252,9 @@ bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; }
MemchunkPool *Worker::get_mcpool() { return &mcpool_; } MemchunkPool *Worker::get_mcpool() { return &mcpool_; }
DownstreamGroup *Worker::get_dgrp(size_t group) {
assert(group < dgrps_.size());
return &dgrps_[group];
}
} // namespace shrpx } // namespace shrpx

View File

@ -54,14 +54,21 @@ namespace ssl {
class CertLookupTree; class CertLookupTree;
} // namespace ssl } // namespace ssl
struct DownstreamGroup {
DownstreamGroup() : next_http2session(0), next(0) {}
std::vector<std::unique_ptr<Http2Session>> http2sessions;
// Next index in http2sessions.
size_t next_http2session;
// Next downstream address index corresponding to
// Config::downstream_addr_groups[].
size_t next;
};
struct WorkerStat { struct WorkerStat {
WorkerStat(size_t num_groups) WorkerStat(size_t num_groups) : num_connections(0) {}
: num_connections(0), next_downstream(num_groups) {}
size_t num_connections; size_t num_connections;
// Next downstream index in Config::downstream_addr_groups. This is
// used as load balancing.
std::vector<size_t> next_downstream;
}; };
enum WorkerEventType { enum WorkerEventType {
@ -97,7 +104,7 @@ public:
void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys); void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys);
WorkerStat *get_worker_stat(); WorkerStat *get_worker_stat();
DownstreamConnectionPool *get_dconn_pool(); DownstreamConnectionPool *get_dconn_pool();
Http2Session *next_http2_session(); Http2Session *next_http2_session(size_t group);
ConnectBlocker *get_connect_blocker() const; ConnectBlocker *get_connect_blocker() const;
struct ev_loop *get_loop() const; struct ev_loop *get_loop() const;
SSL_CTX *get_sv_ssl_ctx() const; SSL_CTX *get_sv_ssl_ctx() const;
@ -109,9 +116,9 @@ public:
MemchunkPool *get_mcpool(); MemchunkPool *get_mcpool();
void schedule_clear_mcpool(); void schedule_clear_mcpool();
DownstreamGroup *get_dgrp(size_t group);
private: private:
std::vector<std::unique_ptr<Http2Session>> http2sessions_;
size_t next_http2session_;
#ifndef NOTHREADS #ifndef NOTHREADS
std::future<void> fut_; std::future<void> fut_;
#endif // NOTHREADS #endif // NOTHREADS
@ -122,6 +129,7 @@ private:
MemchunkPool mcpool_; MemchunkPool mcpool_;
DownstreamConnectionPool dconn_pool_; DownstreamConnectionPool dconn_pool_;
WorkerStat worker_stat_; WorkerStat worker_stat_;
std::vector<DownstreamGroup> dgrps_;
struct ev_loop *loop_; struct ev_loop *loop_;
// Following fields are shared across threads if // Following fields are shared across threads if