diff --git a/src/shrpx.cc b/src/shrpx.cc index 331b27f4..aa0d34a8 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -1335,13 +1335,12 @@ Connections: The backend application protocol can be specified using optional "proto" parameter, and in the form of - "proto=". All that share the same must - have the same value if it is given. - should be one of the following list without quotes: - "h2", "http/1.1". The default value of is - "http/1.1". Note that usually "h2" refers to HTTP/2 - over TLS. But in this option, it may mean HTTP/2 over - cleartext TCP unless "tls" keyword is used (see below). + "proto=". should be one of the following + list without quotes: "h2", "http/1.1". The default + value of is "http/1.1". Note that usually "h2" + refers to HTTP/2 over TLS. But in this option, it may + mean HTTP/2 over cleartext TCP unless "tls" keyword is + used (see below). TLS can be enabled by specifying optional "tls" parameter. TLS is not enabled by default. @@ -2187,9 +2186,9 @@ void process_options(int argc, char **argv, DownstreamAddrConfig addr{}; addr.host = ImmutableString::from_lit(DEFAULT_DOWNSTREAM_HOST); addr.port = DEFAULT_DOWNSTREAM_PORT; + addr.proto = PROTO_HTTP1; DownstreamAddrGroupConfig g(StringRef::from_lit("/")); - g.proto = PROTO_HTTP1; g.addrs.push_back(std::move(addr)); mod_config()->router.add_route(StringRef{g.pattern}, addr_groups.size()); addr_groups.push_back(std::move(g)); @@ -2197,34 +2196,10 @@ void process_options(int argc, char **argv, // We don't support host mapping in these cases. Move all // non-catch-all patterns to catch-all pattern. DownstreamAddrGroupConfig catch_all(StringRef::from_lit("/")); - auto proto = PROTO_NONE; - auto tls = false; - auto tls_seen = false; for (auto &g : addr_groups) { - if (proto == PROTO_NONE) { - proto = g.proto; - } else if (proto != g.proto) { - LOG(ERROR) << SHRPX_OPT_BACKEND << ": was ignored with " - "--http2-proxy, and protocol must " - "be the same for all backends."; - exit(EXIT_FAILURE); - } - - if (!tls_seen) { - tls = g.tls; - tls_seen = true; - } else if (tls != g.tls) { - LOG(ERROR) << SHRPX_OPT_BACKEND - << ": was ignored with --http2-proxy, and tls " - "must be enabled or disabled for all backends."; - exit(EXIT_FAILURE); - } - std::move(std::begin(g.addrs), std::end(g.addrs), std::back_inserter(catch_all.addrs)); } - catch_all.proto = proto; - catch_all.tls = tls; std::vector().swap(addr_groups); std::vector().swap(mod_config()->wildcard_patterns); // maybe not necessary? @@ -2272,10 +2247,12 @@ void process_options(int argc, char **argv, } if (LOG_ENABLED(INFO)) { LOG(INFO) << "Host-path pattern: group " << i << ": '" << g.pattern - << "', proto=" << strproto(g.proto) << (g.tls ? ", tls" : ""); + << "'"; for (auto &addr : g.addrs) { LOG(INFO) << "group " << i << " -> " << addr.host.c_str() - << (addr.host_unix ? "" : ":" + util::utos(addr.port)); + << (addr.host_unix ? "" : ":" + util::utos(addr.port)) + << ", proto=" << strproto(addr.proto) + << (addr.tls ? ", tls" : ""); } } } diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 7a4d5a1d..e03e7696 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -695,7 +695,7 @@ Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) { // First count the working backend addresses. size_t min = 0; for (const auto &addr : shared_addr->addrs) { - if (addr.connect_blocker->blocked()) { + if (addr.proto != PROTO_HTTP2 || addr.connect_blocker->blocked()) { continue; } @@ -735,8 +735,8 @@ Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) { DownstreamAddr *selected_addr = nullptr; for (auto &addr : shared_addr->addrs) { - if (addr.http2_extra_freelist.size() == 0 && - addr.connect_blocker->blocked()) { + if (addr.proto != PROTO_HTTP2 || (addr.http2_extra_freelist.size() == 0 && + addr.connect_blocker->blocked())) { continue; } @@ -777,9 +777,8 @@ Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) { return session; } - auto session = new Http2Session( - conn_.loop, shared_addr->tls ? worker_->get_cl_ssl_ctx() : nullptr, - worker_, &group, selected_addr); + auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(), + worker_, &group, selected_addr); if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Create new Http2Session " << session; @@ -790,6 +789,20 @@ Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) { return session; } +namespace { +bool pri_less(const WeightedPri &lhs, const WeightedPri &rhs, uint32_t max) { + if (lhs.cycle < rhs.cycle) { + return rhs.cycle - lhs.cycle <= max; + } + + return lhs.cycle - rhs.cycle > max; +} +} // namespace + +namespace { +uint32_t next_cycle(const WeightedPri &pri) { return pri.cycle + pri.iweight; } +} // namespace + std::unique_ptr ClientHandler::get_downstream_connection(Downstream *downstream) { size_t group_idx; @@ -834,39 +847,62 @@ ClientHandler::get_downstream_connection(Downstream *downstream) { auto &group = worker_->get_downstream_addr_groups()[group_idx]; auto &shared_addr = group.shared_addr; - auto &dconn_pool = shared_addr->dconn_pool; - auto dconn = dconn_pool.pop_downstream_connection(); + auto proto = PROTO_NONE; - if (!dconn) { + if (shared_addr->proto == PROTO_NONE) { + if (pri_less(shared_addr->http1_pri, shared_addr->http2_pri, + shared_addr->max_pri_dist)) { + shared_addr->http1_pri.cycle = next_cycle(shared_addr->http1_pri); + proto = PROTO_HTTP1; + } else { + shared_addr->http2_pri.cycle = next_cycle(shared_addr->http2_pri); + proto = PROTO_HTTP2; + } + } else { + proto = shared_addr->proto; + } + + if (proto == PROTO_HTTP2) { if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Downstream connection pool is empty." << " Create new one"; } - if (shared_addr->proto == PROTO_HTTP2) { - auto http2session = select_http2_session(group); + auto http2session = select_http2_session(group); - if (http2session == nullptr) { - return nullptr; - } - - dconn = make_unique(http2session); - } else { - dconn = - make_unique(&group, conn_.loop, worker_); + if (http2session == nullptr) { + return nullptr; } + + auto dconn = make_unique(http2session); + dconn->set_client_handler(this); - return dconn; + + return std::move(dconn); + } + + auto &dconn_pool = shared_addr->dconn_pool; + + // pool connection must be HTTP/1.1 connection + auto dconn = dconn_pool.pop_downstream_connection(); + + if (dconn) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Reuse downstream connection DCONN:" << dconn.get() + << " from pool"; + } + } else { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Downstream connection pool is empty." + << " Create new one"; + } + + dconn = make_unique(&group, conn_.loop, worker_); } dconn->set_client_handler(this); - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "Reuse downstream connection DCONN:" << dconn.get() - << " from pool"; - } - return dconn; } diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index f8759918..b934466b 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -753,6 +753,8 @@ int parse_mapping(DownstreamAddrConfig addr, const StringRef &src_pattern, addr.fall = params.fall; addr.rise = params.rise; + addr.proto = params.proto; + addr.tls = params.tls; addr.sni = ImmutableString{std::begin(params.sni), std::end(params.sni)}; for (const auto &raw_pattern : mapping) { @@ -772,21 +774,6 @@ int parse_mapping(DownstreamAddrConfig addr, const StringRef &src_pattern, } for (auto &g : addr_groups) { if (g.pattern == pattern) { - if (g.proto != params.proto) { - LOG(ERROR) << "backend: protocol mismatch. We saw protocol " - << strproto(g.proto) << " for pattern " << g.pattern - << ", but another protocol " << strproto(params.proto); - return -1; - } - - if (g.tls != params.tls) { - LOG(ERROR) << "backend: TLS mismatch. We saw TLS was " - << (g.tls ? "enabled" : "disabled") << " for pattern " - << g.pattern << ", but we now got TLS was " - << (params.tls ? "enabled" : "disabled"); - return -1; - } - g.addrs.push_back(addr); done = true; break; @@ -797,8 +784,6 @@ int parse_mapping(DownstreamAddrConfig addr, const StringRef &src_pattern, } DownstreamAddrGroupConfig g(StringRef{pattern}); g.addrs.push_back(addr); - g.proto = params.proto; - g.tls = params.tls; if (pattern[0] == '*') { // wildcard pattern diff --git a/src/shrpx_config.h b/src/shrpx_config.h index b16286f1..ad66fd1f 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -336,23 +336,21 @@ struct DownstreamAddrConfig { ImmutableString sni; size_t fall; size_t rise; + // Application protocol used in this group + shrpx_proto proto; // backend port. 0 if |host_unix| is true. uint16_t port; // true if |host| contains UNIX domain socket path. bool host_unix; + bool tls; }; struct DownstreamAddrGroupConfig { DownstreamAddrGroupConfig(const StringRef &pattern) - : pattern(pattern.c_str(), pattern.size()), - proto(PROTO_HTTP1), - tls(false) {} + : pattern(pattern.c_str(), pattern.size()) {} ImmutableString pattern; std::vector addrs; - // Application protocol used in this group - shrpx_proto proto; - bool tls; }; struct TicketKey { diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index 797ca000..a0361be9 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -353,7 +353,9 @@ int Http2Session::initiate_connection() { if (LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "Connecting to downstream server"; } - if (ssl_ctx_) { + if (addr_->tls) { + assert(ssl_ctx_); + auto ssl = ssl::create_ssl(ssl_ctx_); if (!ssl) { return -1; @@ -1476,7 +1478,7 @@ int Http2Session::connection_made() { state_ = Http2Session::CONNECTED; - if (ssl_ctx_) { + if (addr_->tls) { const unsigned char *next_proto = nullptr; unsigned int next_proto_len = 0; @@ -1541,9 +1543,8 @@ int Http2Session::connection_made() { } } - auto &shared_addr = group_->shared_addr; auto must_terminate = - shared_addr->tls && !nghttp2::ssl::check_http2_requirement(conn_.tls.ssl); + addr_->tls && !nghttp2::ssl::check_http2_requirement(conn_.tls.ssl); reset_connection_check_timer(CONNCHK_TIMEOUT); diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 3192e8ee..327f7cd2 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -160,7 +160,7 @@ HttpDownstreamConnection::HttpDownstreamConnection(DownstreamAddrGroup *group, do_write_(&HttpDownstreamConnection::noop), do_signal_write_(&HttpDownstreamConnection::noop), worker_(worker), - ssl_ctx_(group->shared_addr->tls ? worker->get_cl_ssl_ctx() : nullptr), + ssl_ctx_(worker->get_cl_ssl_ctx()), group_(group), addr_(nullptr), ioctrl_(&conn_.rlimit), @@ -185,17 +185,6 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { auto &downstreamconf = get_config()->conn.downstream; if (conn_.fd == -1) { - if (ssl_ctx_) { - auto ssl = ssl::create_ssl(ssl_ctx_); - if (!ssl) { - return -1; - } - - ssl::setup_downstream_http1_alpn(ssl); - - conn_.set_ssl(ssl); - } - auto &shared_addr = group_->shared_addr; auto &addrs = shared_addr->addrs; auto &next_downstream = shared_addr->next; @@ -207,6 +196,10 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { next_downstream = 0; } + if (addr.proto != PROTO_HTTP1) { + continue; + } + auto &connect_blocker = addr.connect_blocker; if (connect_blocker->blocked()) { @@ -265,7 +258,18 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { addr_ = &addr; - if (ssl_ctx_) { + if (addr_->tls) { + assert(ssl_ctx_); + + auto ssl = ssl::create_ssl(ssl_ctx_); + if (!ssl) { + return -1; + } + + ssl::setup_downstream_http1_alpn(ssl); + + conn_.set_ssl(ssl); + auto sni_name = addr_->sni.empty() ? StringRef{addr_->host} : StringRef{addr_->sni}; if (!util::numeric_host(sni_name.c_str())) { diff --git a/src/shrpx_live_check.cc b/src/shrpx_live_check.cc index fbfd7001..e0a9b723 100644 --- a/src/shrpx_live_check.cc +++ b/src/shrpx_live_check.cc @@ -96,8 +96,7 @@ void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) { } // namespace LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, - DownstreamAddrGroup *group, DownstreamAddr *addr, - std::mt19937 &gen) + DownstreamAddr *addr, std::mt19937 &gen) : conn_(loop, -1, nullptr, worker->get_mcpool(), get_config()->conn.downstream.timeout.write, get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb, @@ -109,7 +108,6 @@ LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, write_(&LiveCheck::noop), worker_(worker), ssl_ctx_(ssl_ctx), - group_(group), addr_(addr), session_(nullptr), success_count_(0), @@ -175,8 +173,6 @@ int LiveCheck::do_write() { return write_(*this); } int LiveCheck::initiate_connection() { int rv; - const auto &shared_addr = group_->shared_addr; - auto worker_blocker = worker_->get_connect_blocker(); if (worker_blocker->blocked()) { if (LOG_ENABLED(INFO)) { @@ -185,13 +181,15 @@ int LiveCheck::initiate_connection() { return -1; } - if (ssl_ctx_) { + if (addr_->tls) { + assert(ssl_ctx_); + auto ssl = ssl::create_ssl(ssl_ctx_); if (!ssl) { return -1; } - switch (shared_addr->proto) { + switch (addr_->proto) { case PROTO_HTTP1: ssl::setup_downstream_http1_alpn(ssl); break; @@ -226,7 +224,7 @@ int LiveCheck::initiate_connection() { return -1; } - if (ssl_ctx_) { + if (addr_->tls) { auto sni_name = addr_->sni.empty() ? StringRef{addr_->host} : StringRef{addr_->sni}; if (!util::numeric_host(sni_name.c_str())) { @@ -278,8 +276,7 @@ int LiveCheck::connected() { return do_write(); } - const auto &shared_addr = group_->shared_addr; - if (shared_addr->proto == PROTO_HTTP2) { + if (addr_->proto == PROTO_HTTP2) { // For HTTP/2, we try to read SETTINGS ACK from server to make // sure it is really alive, and serving HTTP/2. read_ = &LiveCheck::read_clear; @@ -343,9 +340,7 @@ int LiveCheck::tls_handshake() { auto proto = StringRef{next_proto, next_proto_len}; - const auto &shared_addr = group_->shared_addr; - - switch (shared_addr->proto) { + switch (addr_->proto) { case PROTO_HTTP1: if (proto.empty() || proto == StringRef::from_lit("http/1.1")) { break; @@ -697,9 +692,8 @@ int LiveCheck::connection_made() { return -1; } - auto &shared_addr = group_->shared_addr; auto must_terminate = - shared_addr->tls && !nghttp2::ssl::check_http2_requirement(conn_.tls.ssl); + addr_->tls && !nghttp2::ssl::check_http2_requirement(conn_.tls.ssl); if (must_terminate) { if (LOG_ENABLED(INFO)) { diff --git a/src/shrpx_live_check.h b/src/shrpx_live_check.h index fa85141d..3cd83a55 100644 --- a/src/shrpx_live_check.h +++ b/src/shrpx_live_check.h @@ -41,14 +41,12 @@ namespace shrpx { class Worker; -struct DownstreamAddrGroup; struct DownstreamAddr; class LiveCheck { public: LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, - DownstreamAddrGroup *group, DownstreamAddr *addr, - std::mt19937 &gen); + DownstreamAddr *addr, std::mt19937 &gen); ~LiveCheck(); void disconnect(); @@ -101,7 +99,6 @@ private: Worker *worker_; // nullptr if no TLS is configured SSL_CTX *ssl_ctx_; - DownstreamAddrGroup *group_; // Address of remote endpoint DownstreamAddr *addr_; nghttp2_session *session_; diff --git a/src/shrpx_ssl.cc b/src/shrpx_ssl.cc index f500d446..9b9270e4 100644 --- a/src/shrpx_ssl.cc +++ b/src/shrpx_ssl.cc @@ -1394,7 +1394,11 @@ bool downstream_tls_enabled() { const auto &groups = get_config()->conn.downstream.addr_groups; return std::any_of(std::begin(groups), std::end(groups), - [](const DownstreamAddrGroupConfig &g) { return g.tls; }); + [](const DownstreamAddrGroupConfig &g) { + return std::any_of( + std::begin(g.addrs), std::end(g.addrs), + [](const DownstreamAddrConfig &a) { return a.tls; }); + }); } SSL_CTX *setup_downstream_client_ssl_context( diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index d88f48ea..f8f184e7 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -67,8 +67,7 @@ namespace { bool match_shared_downstream_addr( const std::shared_ptr &lhs, const std::shared_ptr &rhs) { - if (lhs->addrs.size() != rhs->addrs.size() || lhs->proto != rhs->proto || - lhs->tls != rhs->tls) { + if (lhs->addrs.size() != rhs->addrs.size() || lhs->proto != rhs->proto) { return false; } @@ -83,7 +82,8 @@ bool match_shared_downstream_addr( auto &b = rhs->addrs[i]; if (a.host == b.host && a.port == b.port && a.host_unix == b.host_unix && - a.sni == b.sni && a.fall == b.fall && a.rise == b.rise) { + a.proto == b.proto && a.tls == b.tls && a.sni == b.sni && + a.fall == b.fall && a.rise == b.rise) { break; } } @@ -147,8 +147,14 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, // does not value initialize SharedDownstreamAddr above. shared_addr->next = 0; shared_addr->addrs.resize(src.addrs.size()); - shared_addr->proto = src.proto; - shared_addr->tls = src.tls; + shared_addr->proto = PROTO_NONE; + shared_addr->http1_pri = {}; + shared_addr->http2_pri = {}; + + auto mixed_proto = false; + + size_t num_http1 = 0; + size_t num_http2 = 0; for (size_t j = 0; j < src.addrs.size(); ++j) { auto &src_addr = src.addrs[j]; @@ -159,14 +165,31 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, dst_addr.hostport = src_addr.hostport; dst_addr.port = src_addr.port; dst_addr.host_unix = src_addr.host_unix; + dst_addr.proto = src_addr.proto; + dst_addr.tls = src_addr.tls; dst_addr.sni = src_addr.sni; dst_addr.fall = src_addr.fall; dst_addr.rise = src_addr.rise; dst_addr.connect_blocker = make_unique(randgen_, loop_); - dst_addr.live_check = make_unique( - loop_, shared_addr->tls ? cl_ssl_ctx_ : nullptr, this, &dst, - &dst_addr, randgen_); + dst_addr.live_check = + make_unique(loop_, cl_ssl_ctx_, this, &dst_addr, randgen_); + + if (!mixed_proto) { + if (shared_addr->proto == PROTO_NONE) { + shared_addr->proto = dst_addr.proto; + } else if (shared_addr->proto != dst_addr.proto) { + shared_addr->proto = PROTO_NONE; + mixed_proto = true; + } + } + + if (dst_addr.proto == PROTO_HTTP2) { + ++num_http2; + } else { + assert(dst_addr.proto == PROTO_HTTP1); + ++num_http1; + } } // share the connection if patterns have the same set of backend @@ -179,6 +202,16 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, }); if (it == end) { + if (shared_addr->proto == PROTO_NONE) { + auto max = std::max(static_cast(65536), + std::max(num_http1, num_http2)); + + shared_addr->http1_pri.iweight = max / num_http1; + shared_addr->http2_pri.iweight = max / num_http2; + shared_addr->max_pri_dist = std::max(shared_addr->http1_pri.iweight, + shared_addr->http2_pri.iweight); + } + dst.shared_addr = shared_addr; } else { if (LOG_ENABLED(INFO)) { diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index 4f33bc60..fae684d8 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -99,11 +99,31 @@ struct DownstreamAddr { // total number of streams created in HTTP/2 connections for this // address. size_t num_dconn; + // Application protocol used in this backend + shrpx_proto proto; + // true if TLS is used in this backend + bool tls; +}; + +// Simplified weighted fair queuing. Actually we don't use queue here +// since we have just 2 items. This is the same algorithm used in +// stream priority, but ignores remainder. +struct WeightedPri { + // current cycle of this item. The lesser cycle has higher + // priority. This is unsigned 32 bit integer, so it may overflow. + // But with the same theory described in stream priority, it is no + // problem. + uint32_t cycle; + // inverted weight, this is a penalty added to cycle when this item + // is selected. + uint32_t iweight; }; struct SharedDownstreamAddr { std::vector addrs; - // Application protocol used in this group + // Application protocol used in this backend addresses. If all + // addresses use a single protocol, this field has that value. + // Otherwise, this value contains PROTO_NONE. shrpx_proto proto; // List of Http2Session which is not fully utilized (i.e., the // server advertized maximum concurrency is not reached). We will @@ -114,9 +134,17 @@ struct SharedDownstreamAddr { // wise. DList http2_avail_freelist; DownstreamConnectionPool dconn_pool; - // Next downstream address index in addrs. + // Next http/1.1 downstream address index in addrs. size_t next; - bool tls; + // http1_pri and http2_pri are used to which protocols are used + // between HTTP/1.1 or HTTP/2 if they both are available in + // backends. They are choosed proportional to the number available + // backend. Usually, if http1_pri.cycle < http2_pri.cycle, choose + // HTTP/1.1. Otherwise, choose HTTP/2. + WeightedPri http1_pri; + WeightedPri http2_pri; + // The maximum penalty added to http2_pri or http1_pri + uint32_t max_pri_dist; }; struct DownstreamAddrGroup {