Merge branch 'mix-backend-proto-tls'

This commit is contained in:
Tatsuhiro Tsujikawa 2016-05-24 23:49:21 +09:00
commit fce7908fe6
11 changed files with 187 additions and 130 deletions

View File

@ -1335,13 +1335,12 @@ Connections:
The backend application protocol can be specified using The backend application protocol can be specified using
optional "proto" parameter, and in the form of optional "proto" parameter, and in the form of
"proto=<PROTO>". All that share the same <PATTERN> must "proto=<PROTO>". <PROTO> should be one of the following
have the same <PROTO> value if it is given. <PROTO> list without quotes: "h2", "http/1.1". The default
should be one of the following list without quotes: value of <PROTO> is "http/1.1". Note that usually "h2"
"h2", "http/1.1". The default value of <PROTO> is refers to HTTP/2 over TLS. But in this option, it may
"http/1.1". Note that usually "h2" refers to HTTP/2 mean HTTP/2 over cleartext TCP unless "tls" keyword is
over TLS. But in this option, it may mean HTTP/2 over used (see below).
cleartext TCP unless "tls" keyword is used (see below).
TLS can be enabled by specifying optional "tls" TLS can be enabled by specifying optional "tls"
parameter. TLS is not enabled by default. parameter. TLS is not enabled by default.
@ -2187,9 +2186,9 @@ void process_options(int argc, char **argv,
DownstreamAddrConfig addr{}; DownstreamAddrConfig addr{};
addr.host = ImmutableString::from_lit(DEFAULT_DOWNSTREAM_HOST); addr.host = ImmutableString::from_lit(DEFAULT_DOWNSTREAM_HOST);
addr.port = DEFAULT_DOWNSTREAM_PORT; addr.port = DEFAULT_DOWNSTREAM_PORT;
addr.proto = PROTO_HTTP1;
DownstreamAddrGroupConfig g(StringRef::from_lit("/")); DownstreamAddrGroupConfig g(StringRef::from_lit("/"));
g.proto = PROTO_HTTP1;
g.addrs.push_back(std::move(addr)); g.addrs.push_back(std::move(addr));
mod_config()->router.add_route(StringRef{g.pattern}, addr_groups.size()); mod_config()->router.add_route(StringRef{g.pattern}, addr_groups.size());
addr_groups.push_back(std::move(g)); addr_groups.push_back(std::move(g));
@ -2197,34 +2196,10 @@ void process_options(int argc, char **argv,
// We don't support host mapping in these cases. Move all // We don't support host mapping in these cases. Move all
// non-catch-all patterns to catch-all pattern. // non-catch-all patterns to catch-all pattern.
DownstreamAddrGroupConfig catch_all(StringRef::from_lit("/")); DownstreamAddrGroupConfig catch_all(StringRef::from_lit("/"));
auto proto = PROTO_NONE;
auto tls = false;
auto tls_seen = false;
for (auto &g : addr_groups) { for (auto &g : addr_groups) {
if (proto == PROTO_NONE) {
proto = g.proto;
} else if (proto != g.proto) {
LOG(ERROR) << SHRPX_OPT_BACKEND << ": <PATTERN> was ignored with "
"--http2-proxy, and protocol must "
"be the same for all backends.";
exit(EXIT_FAILURE);
}
if (!tls_seen) {
tls = g.tls;
tls_seen = true;
} else if (tls != g.tls) {
LOG(ERROR) << SHRPX_OPT_BACKEND
<< ": <PATTERN> was ignored with --http2-proxy, and tls "
"must be enabled or disabled for all backends.";
exit(EXIT_FAILURE);
}
std::move(std::begin(g.addrs), std::end(g.addrs), std::move(std::begin(g.addrs), std::end(g.addrs),
std::back_inserter(catch_all.addrs)); std::back_inserter(catch_all.addrs));
} }
catch_all.proto = proto;
catch_all.tls = tls;
std::vector<DownstreamAddrGroupConfig>().swap(addr_groups); std::vector<DownstreamAddrGroupConfig>().swap(addr_groups);
std::vector<WildcardPattern>().swap(mod_config()->wildcard_patterns); std::vector<WildcardPattern>().swap(mod_config()->wildcard_patterns);
// maybe not necessary? // maybe not necessary?
@ -2272,10 +2247,12 @@ void process_options(int argc, char **argv,
} }
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Host-path pattern: group " << i << ": '" << g.pattern LOG(INFO) << "Host-path pattern: group " << i << ": '" << g.pattern
<< "', proto=" << strproto(g.proto) << (g.tls ? ", tls" : ""); << "'";
for (auto &addr : g.addrs) { for (auto &addr : g.addrs) {
LOG(INFO) << "group " << i << " -> " << addr.host.c_str() LOG(INFO) << "group " << i << " -> " << addr.host.c_str()
<< (addr.host_unix ? "" : ":" + util::utos(addr.port)); << (addr.host_unix ? "" : ":" + util::utos(addr.port))
<< ", proto=" << strproto(addr.proto)
<< (addr.tls ? ", tls" : "");
} }
} }
} }

View File

@ -695,7 +695,7 @@ Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) {
// First count the working backend addresses. // First count the working backend addresses.
size_t min = 0; size_t min = 0;
for (const auto &addr : shared_addr->addrs) { for (const auto &addr : shared_addr->addrs) {
if (addr.connect_blocker->blocked()) { if (addr.proto != PROTO_HTTP2 || addr.connect_blocker->blocked()) {
continue; continue;
} }
@ -735,8 +735,8 @@ Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) {
DownstreamAddr *selected_addr = nullptr; DownstreamAddr *selected_addr = nullptr;
for (auto &addr : shared_addr->addrs) { for (auto &addr : shared_addr->addrs) {
if (addr.http2_extra_freelist.size() == 0 && if (addr.proto != PROTO_HTTP2 || (addr.http2_extra_freelist.size() == 0 &&
addr.connect_blocker->blocked()) { addr.connect_blocker->blocked())) {
continue; continue;
} }
@ -777,9 +777,8 @@ Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) {
return session; return session;
} }
auto session = new Http2Session( auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(),
conn_.loop, shared_addr->tls ? worker_->get_cl_ssl_ctx() : nullptr, worker_, &group, selected_addr);
worker_, &group, selected_addr);
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Create new Http2Session " << session; CLOG(INFO, this) << "Create new Http2Session " << session;
@ -790,6 +789,20 @@ Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) {
return session; return session;
} }
namespace {
bool pri_less(const WeightedPri &lhs, const WeightedPri &rhs, uint32_t max) {
if (lhs.cycle < rhs.cycle) {
return rhs.cycle - lhs.cycle <= max;
}
return lhs.cycle - rhs.cycle > max;
}
} // namespace
namespace {
uint32_t next_cycle(const WeightedPri &pri) { return pri.cycle + pri.iweight; }
} // namespace
std::unique_ptr<DownstreamConnection> std::unique_ptr<DownstreamConnection>
ClientHandler::get_downstream_connection(Downstream *downstream) { ClientHandler::get_downstream_connection(Downstream *downstream) {
size_t group_idx; size_t group_idx;
@ -834,39 +847,62 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
auto &group = worker_->get_downstream_addr_groups()[group_idx]; auto &group = worker_->get_downstream_addr_groups()[group_idx];
auto &shared_addr = group.shared_addr; auto &shared_addr = group.shared_addr;
auto &dconn_pool = shared_addr->dconn_pool;
auto dconn = dconn_pool.pop_downstream_connection(); auto proto = PROTO_NONE;
if (!dconn) { if (shared_addr->proto == PROTO_NONE) {
if (pri_less(shared_addr->http1_pri, shared_addr->http2_pri,
shared_addr->max_pri_dist)) {
shared_addr->http1_pri.cycle = next_cycle(shared_addr->http1_pri);
proto = PROTO_HTTP1;
} else {
shared_addr->http2_pri.cycle = next_cycle(shared_addr->http2_pri);
proto = PROTO_HTTP2;
}
} else {
proto = shared_addr->proto;
}
if (proto == PROTO_HTTP2) {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Downstream connection pool is empty." CLOG(INFO, this) << "Downstream connection pool is empty."
<< " Create new one"; << " Create new one";
} }
if (shared_addr->proto == PROTO_HTTP2) { auto http2session = select_http2_session(group);
auto http2session = select_http2_session(group);
if (http2session == nullptr) { if (http2session == nullptr) {
return nullptr; return nullptr;
}
dconn = make_unique<Http2DownstreamConnection>(http2session);
} else {
dconn =
make_unique<HttpDownstreamConnection>(&group, conn_.loop, worker_);
} }
auto dconn = make_unique<Http2DownstreamConnection>(http2session);
dconn->set_client_handler(this); dconn->set_client_handler(this);
return dconn;
return std::move(dconn);
}
auto &dconn_pool = shared_addr->dconn_pool;
// pool connection must be HTTP/1.1 connection
auto dconn = dconn_pool.pop_downstream_connection();
if (dconn) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Reuse downstream connection DCONN:" << dconn.get()
<< " from pool";
}
} else {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Downstream connection pool is empty."
<< " Create new one";
}
dconn = make_unique<HttpDownstreamConnection>(&group, conn_.loop, worker_);
} }
dconn->set_client_handler(this); dconn->set_client_handler(this);
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Reuse downstream connection DCONN:" << dconn.get()
<< " from pool";
}
return dconn; return dconn;
} }

View File

@ -753,6 +753,8 @@ int parse_mapping(DownstreamAddrConfig addr, const StringRef &src_pattern,
addr.fall = params.fall; addr.fall = params.fall;
addr.rise = params.rise; addr.rise = params.rise;
addr.proto = params.proto;
addr.tls = params.tls;
addr.sni = ImmutableString{std::begin(params.sni), std::end(params.sni)}; addr.sni = ImmutableString{std::begin(params.sni), std::end(params.sni)};
for (const auto &raw_pattern : mapping) { for (const auto &raw_pattern : mapping) {
@ -772,21 +774,6 @@ int parse_mapping(DownstreamAddrConfig addr, const StringRef &src_pattern,
} }
for (auto &g : addr_groups) { for (auto &g : addr_groups) {
if (g.pattern == pattern) { if (g.pattern == pattern) {
if (g.proto != params.proto) {
LOG(ERROR) << "backend: protocol mismatch. We saw protocol "
<< strproto(g.proto) << " for pattern " << g.pattern
<< ", but another protocol " << strproto(params.proto);
return -1;
}
if (g.tls != params.tls) {
LOG(ERROR) << "backend: TLS mismatch. We saw TLS was "
<< (g.tls ? "enabled" : "disabled") << " for pattern "
<< g.pattern << ", but we now got TLS was "
<< (params.tls ? "enabled" : "disabled");
return -1;
}
g.addrs.push_back(addr); g.addrs.push_back(addr);
done = true; done = true;
break; break;
@ -797,8 +784,6 @@ int parse_mapping(DownstreamAddrConfig addr, const StringRef &src_pattern,
} }
DownstreamAddrGroupConfig g(StringRef{pattern}); DownstreamAddrGroupConfig g(StringRef{pattern});
g.addrs.push_back(addr); g.addrs.push_back(addr);
g.proto = params.proto;
g.tls = params.tls;
if (pattern[0] == '*') { if (pattern[0] == '*') {
// wildcard pattern // wildcard pattern

View File

@ -336,23 +336,21 @@ struct DownstreamAddrConfig {
ImmutableString sni; ImmutableString sni;
size_t fall; size_t fall;
size_t rise; size_t rise;
// Application protocol used in this group
shrpx_proto proto;
// backend port. 0 if |host_unix| is true. // backend port. 0 if |host_unix| is true.
uint16_t port; uint16_t port;
// true if |host| contains UNIX domain socket path. // true if |host| contains UNIX domain socket path.
bool host_unix; bool host_unix;
bool tls;
}; };
struct DownstreamAddrGroupConfig { struct DownstreamAddrGroupConfig {
DownstreamAddrGroupConfig(const StringRef &pattern) DownstreamAddrGroupConfig(const StringRef &pattern)
: pattern(pattern.c_str(), pattern.size()), : pattern(pattern.c_str(), pattern.size()) {}
proto(PROTO_HTTP1),
tls(false) {}
ImmutableString pattern; ImmutableString pattern;
std::vector<DownstreamAddrConfig> addrs; std::vector<DownstreamAddrConfig> addrs;
// Application protocol used in this group
shrpx_proto proto;
bool tls;
}; };
struct TicketKey { struct TicketKey {

View File

@ -353,7 +353,9 @@ int Http2Session::initiate_connection() {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Connecting to downstream server"; SSLOG(INFO, this) << "Connecting to downstream server";
} }
if (ssl_ctx_) { if (addr_->tls) {
assert(ssl_ctx_);
auto ssl = ssl::create_ssl(ssl_ctx_); auto ssl = ssl::create_ssl(ssl_ctx_);
if (!ssl) { if (!ssl) {
return -1; return -1;
@ -1476,7 +1478,7 @@ int Http2Session::connection_made() {
state_ = Http2Session::CONNECTED; state_ = Http2Session::CONNECTED;
if (ssl_ctx_) { if (addr_->tls) {
const unsigned char *next_proto = nullptr; const unsigned char *next_proto = nullptr;
unsigned int next_proto_len = 0; unsigned int next_proto_len = 0;
@ -1541,9 +1543,8 @@ int Http2Session::connection_made() {
} }
} }
auto &shared_addr = group_->shared_addr;
auto must_terminate = auto must_terminate =
shared_addr->tls && !nghttp2::ssl::check_http2_requirement(conn_.tls.ssl); addr_->tls && !nghttp2::ssl::check_http2_requirement(conn_.tls.ssl);
reset_connection_check_timer(CONNCHK_TIMEOUT); reset_connection_check_timer(CONNCHK_TIMEOUT);

View File

@ -160,7 +160,7 @@ HttpDownstreamConnection::HttpDownstreamConnection(DownstreamAddrGroup *group,
do_write_(&HttpDownstreamConnection::noop), do_write_(&HttpDownstreamConnection::noop),
do_signal_write_(&HttpDownstreamConnection::noop), do_signal_write_(&HttpDownstreamConnection::noop),
worker_(worker), worker_(worker),
ssl_ctx_(group->shared_addr->tls ? worker->get_cl_ssl_ctx() : nullptr), ssl_ctx_(worker->get_cl_ssl_ctx()),
group_(group), group_(group),
addr_(nullptr), addr_(nullptr),
ioctrl_(&conn_.rlimit), ioctrl_(&conn_.rlimit),
@ -185,17 +185,6 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
auto &downstreamconf = get_config()->conn.downstream; auto &downstreamconf = get_config()->conn.downstream;
if (conn_.fd == -1) { if (conn_.fd == -1) {
if (ssl_ctx_) {
auto ssl = ssl::create_ssl(ssl_ctx_);
if (!ssl) {
return -1;
}
ssl::setup_downstream_http1_alpn(ssl);
conn_.set_ssl(ssl);
}
auto &shared_addr = group_->shared_addr; auto &shared_addr = group_->shared_addr;
auto &addrs = shared_addr->addrs; auto &addrs = shared_addr->addrs;
auto &next_downstream = shared_addr->next; auto &next_downstream = shared_addr->next;
@ -207,6 +196,10 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
next_downstream = 0; next_downstream = 0;
} }
if (addr.proto != PROTO_HTTP1) {
continue;
}
auto &connect_blocker = addr.connect_blocker; auto &connect_blocker = addr.connect_blocker;
if (connect_blocker->blocked()) { if (connect_blocker->blocked()) {
@ -265,7 +258,18 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
addr_ = &addr; addr_ = &addr;
if (ssl_ctx_) { if (addr_->tls) {
assert(ssl_ctx_);
auto ssl = ssl::create_ssl(ssl_ctx_);
if (!ssl) {
return -1;
}
ssl::setup_downstream_http1_alpn(ssl);
conn_.set_ssl(ssl);
auto sni_name = auto sni_name =
addr_->sni.empty() ? StringRef{addr_->host} : StringRef{addr_->sni}; addr_->sni.empty() ? StringRef{addr_->host} : StringRef{addr_->sni};
if (!util::numeric_host(sni_name.c_str())) { if (!util::numeric_host(sni_name.c_str())) {

View File

@ -96,8 +96,7 @@ void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
} // namespace } // namespace
LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
DownstreamAddrGroup *group, DownstreamAddr *addr, DownstreamAddr *addr, std::mt19937 &gen)
std::mt19937 &gen)
: conn_(loop, -1, nullptr, worker->get_mcpool(), : conn_(loop, -1, nullptr, worker->get_mcpool(),
get_config()->conn.downstream.timeout.write, get_config()->conn.downstream.timeout.write,
get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb, get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb,
@ -109,7 +108,6 @@ LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
write_(&LiveCheck::noop), write_(&LiveCheck::noop),
worker_(worker), worker_(worker),
ssl_ctx_(ssl_ctx), ssl_ctx_(ssl_ctx),
group_(group),
addr_(addr), addr_(addr),
session_(nullptr), session_(nullptr),
success_count_(0), success_count_(0),
@ -175,8 +173,6 @@ int LiveCheck::do_write() { return write_(*this); }
int LiveCheck::initiate_connection() { int LiveCheck::initiate_connection() {
int rv; int rv;
const auto &shared_addr = group_->shared_addr;
auto worker_blocker = worker_->get_connect_blocker(); auto worker_blocker = worker_->get_connect_blocker();
if (worker_blocker->blocked()) { if (worker_blocker->blocked()) {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
@ -185,13 +181,15 @@ int LiveCheck::initiate_connection() {
return -1; return -1;
} }
if (ssl_ctx_) { if (addr_->tls) {
assert(ssl_ctx_);
auto ssl = ssl::create_ssl(ssl_ctx_); auto ssl = ssl::create_ssl(ssl_ctx_);
if (!ssl) { if (!ssl) {
return -1; return -1;
} }
switch (shared_addr->proto) { switch (addr_->proto) {
case PROTO_HTTP1: case PROTO_HTTP1:
ssl::setup_downstream_http1_alpn(ssl); ssl::setup_downstream_http1_alpn(ssl);
break; break;
@ -226,7 +224,7 @@ int LiveCheck::initiate_connection() {
return -1; return -1;
} }
if (ssl_ctx_) { if (addr_->tls) {
auto sni_name = auto sni_name =
addr_->sni.empty() ? StringRef{addr_->host} : StringRef{addr_->sni}; addr_->sni.empty() ? StringRef{addr_->host} : StringRef{addr_->sni};
if (!util::numeric_host(sni_name.c_str())) { if (!util::numeric_host(sni_name.c_str())) {
@ -278,8 +276,7 @@ int LiveCheck::connected() {
return do_write(); return do_write();
} }
const auto &shared_addr = group_->shared_addr; if (addr_->proto == PROTO_HTTP2) {
if (shared_addr->proto == PROTO_HTTP2) {
// For HTTP/2, we try to read SETTINGS ACK from server to make // For HTTP/2, we try to read SETTINGS ACK from server to make
// sure it is really alive, and serving HTTP/2. // sure it is really alive, and serving HTTP/2.
read_ = &LiveCheck::read_clear; read_ = &LiveCheck::read_clear;
@ -343,9 +340,7 @@ int LiveCheck::tls_handshake() {
auto proto = StringRef{next_proto, next_proto_len}; auto proto = StringRef{next_proto, next_proto_len};
const auto &shared_addr = group_->shared_addr; switch (addr_->proto) {
switch (shared_addr->proto) {
case PROTO_HTTP1: case PROTO_HTTP1:
if (proto.empty() || proto == StringRef::from_lit("http/1.1")) { if (proto.empty() || proto == StringRef::from_lit("http/1.1")) {
break; break;
@ -697,9 +692,8 @@ int LiveCheck::connection_made() {
return -1; return -1;
} }
auto &shared_addr = group_->shared_addr;
auto must_terminate = auto must_terminate =
shared_addr->tls && !nghttp2::ssl::check_http2_requirement(conn_.tls.ssl); addr_->tls && !nghttp2::ssl::check_http2_requirement(conn_.tls.ssl);
if (must_terminate) { if (must_terminate) {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {

View File

@ -41,14 +41,12 @@
namespace shrpx { namespace shrpx {
class Worker; class Worker;
struct DownstreamAddrGroup;
struct DownstreamAddr; struct DownstreamAddr;
class LiveCheck { class LiveCheck {
public: public:
LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
DownstreamAddrGroup *group, DownstreamAddr *addr, DownstreamAddr *addr, std::mt19937 &gen);
std::mt19937 &gen);
~LiveCheck(); ~LiveCheck();
void disconnect(); void disconnect();
@ -101,7 +99,6 @@ private:
Worker *worker_; Worker *worker_;
// nullptr if no TLS is configured // nullptr if no TLS is configured
SSL_CTX *ssl_ctx_; SSL_CTX *ssl_ctx_;
DownstreamAddrGroup *group_;
// Address of remote endpoint // Address of remote endpoint
DownstreamAddr *addr_; DownstreamAddr *addr_;
nghttp2_session *session_; nghttp2_session *session_;

View File

@ -1394,7 +1394,11 @@ bool downstream_tls_enabled() {
const auto &groups = get_config()->conn.downstream.addr_groups; const auto &groups = get_config()->conn.downstream.addr_groups;
return std::any_of(std::begin(groups), std::end(groups), return std::any_of(std::begin(groups), std::end(groups),
[](const DownstreamAddrGroupConfig &g) { return g.tls; }); [](const DownstreamAddrGroupConfig &g) {
return std::any_of(
std::begin(g.addrs), std::end(g.addrs),
[](const DownstreamAddrConfig &a) { return a.tls; });
});
} }
SSL_CTX *setup_downstream_client_ssl_context( SSL_CTX *setup_downstream_client_ssl_context(

View File

@ -67,8 +67,7 @@ namespace {
bool match_shared_downstream_addr( bool match_shared_downstream_addr(
const std::shared_ptr<SharedDownstreamAddr> &lhs, const std::shared_ptr<SharedDownstreamAddr> &lhs,
const std::shared_ptr<SharedDownstreamAddr> &rhs) { const std::shared_ptr<SharedDownstreamAddr> &rhs) {
if (lhs->addrs.size() != rhs->addrs.size() || lhs->proto != rhs->proto || if (lhs->addrs.size() != rhs->addrs.size() || lhs->proto != rhs->proto) {
lhs->tls != rhs->tls) {
return false; return false;
} }
@ -83,7 +82,8 @@ bool match_shared_downstream_addr(
auto &b = rhs->addrs[i]; auto &b = rhs->addrs[i];
if (a.host == b.host && a.port == b.port && a.host_unix == b.host_unix && if (a.host == b.host && a.port == b.port && a.host_unix == b.host_unix &&
a.sni == b.sni && a.fall == b.fall && a.rise == b.rise) { a.proto == b.proto && a.tls == b.tls && a.sni == b.sni &&
a.fall == b.fall && a.rise == b.rise) {
break; break;
} }
} }
@ -147,8 +147,14 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
// does not value initialize SharedDownstreamAddr above. // does not value initialize SharedDownstreamAddr above.
shared_addr->next = 0; shared_addr->next = 0;
shared_addr->addrs.resize(src.addrs.size()); shared_addr->addrs.resize(src.addrs.size());
shared_addr->proto = src.proto; shared_addr->proto = PROTO_NONE;
shared_addr->tls = src.tls; shared_addr->http1_pri = {};
shared_addr->http2_pri = {};
auto mixed_proto = false;
size_t num_http1 = 0;
size_t num_http2 = 0;
for (size_t j = 0; j < src.addrs.size(); ++j) { for (size_t j = 0; j < src.addrs.size(); ++j) {
auto &src_addr = src.addrs[j]; auto &src_addr = src.addrs[j];
@ -159,14 +165,31 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
dst_addr.hostport = src_addr.hostport; dst_addr.hostport = src_addr.hostport;
dst_addr.port = src_addr.port; dst_addr.port = src_addr.port;
dst_addr.host_unix = src_addr.host_unix; dst_addr.host_unix = src_addr.host_unix;
dst_addr.proto = src_addr.proto;
dst_addr.tls = src_addr.tls;
dst_addr.sni = src_addr.sni; dst_addr.sni = src_addr.sni;
dst_addr.fall = src_addr.fall; dst_addr.fall = src_addr.fall;
dst_addr.rise = src_addr.rise; dst_addr.rise = src_addr.rise;
dst_addr.connect_blocker = make_unique<ConnectBlocker>(randgen_, loop_); dst_addr.connect_blocker = make_unique<ConnectBlocker>(randgen_, loop_);
dst_addr.live_check = make_unique<LiveCheck>( dst_addr.live_check =
loop_, shared_addr->tls ? cl_ssl_ctx_ : nullptr, this, &dst, make_unique<LiveCheck>(loop_, cl_ssl_ctx_, this, &dst_addr, randgen_);
&dst_addr, randgen_);
if (!mixed_proto) {
if (shared_addr->proto == PROTO_NONE) {
shared_addr->proto = dst_addr.proto;
} else if (shared_addr->proto != dst_addr.proto) {
shared_addr->proto = PROTO_NONE;
mixed_proto = true;
}
}
if (dst_addr.proto == PROTO_HTTP2) {
++num_http2;
} else {
assert(dst_addr.proto == PROTO_HTTP1);
++num_http1;
}
} }
// share the connection if patterns have the same set of backend // share the connection if patterns have the same set of backend
@ -179,6 +202,16 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
}); });
if (it == end) { if (it == end) {
if (shared_addr->proto == PROTO_NONE) {
auto max = std::max(static_cast<size_t>(65536),
std::max(num_http1, num_http2));
shared_addr->http1_pri.iweight = max / num_http1;
shared_addr->http2_pri.iweight = max / num_http2;
shared_addr->max_pri_dist = std::max(shared_addr->http1_pri.iweight,
shared_addr->http2_pri.iweight);
}
dst.shared_addr = shared_addr; dst.shared_addr = shared_addr;
} else { } else {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {

View File

@ -99,11 +99,31 @@ struct DownstreamAddr {
// total number of streams created in HTTP/2 connections for this // total number of streams created in HTTP/2 connections for this
// address. // address.
size_t num_dconn; size_t num_dconn;
// Application protocol used in this backend
shrpx_proto proto;
// true if TLS is used in this backend
bool tls;
};
// Simplified weighted fair queuing. Actually we don't use queue here
// since we have just 2 items. This is the same algorithm used in
// stream priority, but ignores remainder.
struct WeightedPri {
// current cycle of this item. The lesser cycle has higher
// priority. This is unsigned 32 bit integer, so it may overflow.
// But with the same theory described in stream priority, it is no
// problem.
uint32_t cycle;
// inverted weight, this is a penalty added to cycle when this item
// is selected.
uint32_t iweight;
}; };
struct SharedDownstreamAddr { struct SharedDownstreamAddr {
std::vector<DownstreamAddr> addrs; std::vector<DownstreamAddr> addrs;
// Application protocol used in this group // Application protocol used in this backend addresses. If all
// addresses use a single protocol, this field has that value.
// Otherwise, this value contains PROTO_NONE.
shrpx_proto proto; shrpx_proto proto;
// List of Http2Session which is not fully utilized (i.e., the // List of Http2Session which is not fully utilized (i.e., the
// server advertized maximum concurrency is not reached). We will // server advertized maximum concurrency is not reached). We will
@ -114,9 +134,17 @@ struct SharedDownstreamAddr {
// wise. // wise.
DList<Http2Session> http2_avail_freelist; DList<Http2Session> http2_avail_freelist;
DownstreamConnectionPool dconn_pool; DownstreamConnectionPool dconn_pool;
// Next downstream address index in addrs. // Next http/1.1 downstream address index in addrs.
size_t next; size_t next;
bool tls; // http1_pri and http2_pri are used to which protocols are used
// between HTTP/1.1 or HTTP/2 if they both are available in
// backends. They are choosed proportional to the number available
// backend. Usually, if http1_pri.cycle < http2_pri.cycle, choose
// HTTP/1.1. Otherwise, choose HTTP/2.
WeightedPri http1_pri;
WeightedPri http2_pri;
// The maximum penalty added to http2_pri or http1_pri
uint32_t max_pri_dist;
}; };
struct DownstreamAddrGroup { struct DownstreamAddrGroup {