diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index e03e7696..c86624d5 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -790,17 +790,25 @@ Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) { } namespace { -bool pri_less(const WeightedPri &lhs, const WeightedPri &rhs, uint32_t max) { +// The chosen value is small enough for uint32_t, and large enough for +// the number of backend. +constexpr uint32_t WEIGHT_MAX = 65536; +} // namespace + +namespace { +bool pri_less(const WeightedPri &lhs, const WeightedPri &rhs) { if (lhs.cycle < rhs.cycle) { - return rhs.cycle - lhs.cycle <= max; + return rhs.cycle - lhs.cycle <= WEIGHT_MAX; } - return lhs.cycle - rhs.cycle > max; + return lhs.cycle - rhs.cycle > WEIGHT_MAX; } } // namespace namespace { -uint32_t next_cycle(const WeightedPri &pri) { return pri.cycle + pri.iweight; } +uint32_t next_cycle(const WeightedPri &pri) { + return pri.cycle + WEIGHT_MAX / std::min(WEIGHT_MAX, pri.weight); +} } // namespace std::unique_ptr @@ -850,17 +858,31 @@ ClientHandler::get_downstream_connection(Downstream *downstream) { auto proto = PROTO_NONE; - if (shared_addr->proto == PROTO_NONE) { - if (pri_less(shared_addr->http1_pri, shared_addr->http2_pri, - shared_addr->max_pri_dist)) { - shared_addr->http1_pri.cycle = next_cycle(shared_addr->http1_pri); + auto http1_weight = shared_addr->http1_pri.weight; + auto http2_weight = shared_addr->http2_pri.weight; + + if (http1_weight > 0 && http2_weight > 0) { + // We only advance cycle if both weight has nonzero to keep its + // distance under WEIGHT_MAX. + if (pri_less(shared_addr->http1_pri, shared_addr->http2_pri)) { proto = PROTO_HTTP1; + shared_addr->http1_pri.cycle = next_cycle(shared_addr->http1_pri); } else { - shared_addr->http2_pri.cycle = next_cycle(shared_addr->http2_pri); proto = PROTO_HTTP2; + shared_addr->http2_pri.cycle = next_cycle(shared_addr->http2_pri); } + } else if (http1_weight > 0) { + proto = PROTO_HTTP1; } else { - proto = shared_addr->proto; + proto = PROTO_HTTP2; + } + + if (proto == PROTO_NONE) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "No working downstream address found"; + } + + return nullptr; } if (proto == PROTO_HTTP2) { diff --git a/src/shrpx_connect_blocker.cc b/src/shrpx_connect_blocker.cc index d70fe5ca..df6026d2 100644 --- a/src/shrpx_connect_blocker.cc +++ b/src/shrpx_connect_blocker.cc @@ -28,15 +28,26 @@ namespace shrpx { namespace { void connect_blocker_cb(struct ev_loop *loop, ev_timer *w, int revents) { + auto connect_blocker = static_cast(w->data); if (LOG_ENABLED(INFO)) { LOG(INFO) << "Unblock"; } + + connect_blocker->call_unblock_func(); } } // namespace -ConnectBlocker::ConnectBlocker(std::mt19937 &gen, struct ev_loop *loop) - : gen_(gen), loop_(loop), fail_count_(0), offline_(false) { +ConnectBlocker::ConnectBlocker(std::mt19937 &gen, struct ev_loop *loop, + std::function block_func, + std::function unblock_func) + : gen_(gen), + block_func_(block_func), + unblock_func_(unblock_func), + loop_(loop), + fail_count_(0), + offline_(false) { ev_timer_init(&timer_, connect_blocker_cb, 0., 0.); + timer_.data = this; } ConnectBlocker::~ConnectBlocker() { ev_timer_stop(loop_, &timer_); } @@ -64,6 +75,8 @@ void ConnectBlocker::on_failure() { return; } + call_block_func(); + ++fail_count_; auto base_backoff = pow(MULTIPLIER, std::min(MAX_BACKOFF_EXP, fail_count_)); @@ -85,6 +98,10 @@ void ConnectBlocker::offline() { return; } + if (!ev_is_active(&timer_)) { + call_block_func(); + } + offline_ = true; ev_timer_stop(loop_, &timer_); @@ -102,4 +119,8 @@ void ConnectBlocker::online() { bool ConnectBlocker::in_offline() const { return offline_; } +void ConnectBlocker::call_block_func() { block_func_(); } + +void ConnectBlocker::call_unblock_func() { unblock_func_(); } + } // namespace shrpx diff --git a/src/shrpx_connect_blocker.h b/src/shrpx_connect_blocker.h index 441ac277..625422f1 100644 --- a/src/shrpx_connect_blocker.h +++ b/src/shrpx_connect_blocker.h @@ -35,7 +35,9 @@ namespace shrpx { class ConnectBlocker { public: - ConnectBlocker(std::mt19937 &gen, struct ev_loop *loop); + ConnectBlocker(std::mt19937 &gen, struct ev_loop *loop, + std::function block_func, + std::function unblock_func); ~ConnectBlocker(); // Returns true if making connection is not allowed. @@ -60,8 +62,15 @@ public: // Returns true if peer is considered offline. bool in_offline() const; + void call_block_func(); + void call_unblock_func(); + private: std::mt19937 gen_; + // Called when blocking is started + std::function block_func_; + // Called when unblocked + std::function unblock_func_; ev_timer timer_; struct ev_loop *loop_; // The number of consecutive connection failure. Reset to 0 on diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 327f7cd2..b6337c7b 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -197,6 +197,10 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { } if (addr.proto != PROTO_HTTP1) { + if (end == next_downstream) { + return SHRPX_ERR_NETWORK; + } + continue; } diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index f8f184e7..d75da0b5 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -67,7 +67,7 @@ namespace { bool match_shared_downstream_addr( const std::shared_ptr &lhs, const std::shared_ptr &rhs) { - if (lhs->addrs.size() != rhs->addrs.size() || lhs->proto != rhs->proto) { + if (lhs->addrs.size() != rhs->addrs.size()) { return false; } @@ -115,7 +115,8 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, cert_tree_(cert_tree), ticket_keys_(ticket_keys), downstream_addr_groups_(get_config()->conn.downstream.addr_groups.size()), - connect_blocker_(make_unique(randgen_, loop_)), + connect_blocker_( + make_unique(randgen_, loop_, []() {}, []() {})), graceful_shutdown_(false) { ev_async_init(&w_, eventcb); w_.data = this; @@ -147,12 +148,9 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, // does not value initialize SharedDownstreamAddr above. shared_addr->next = 0; shared_addr->addrs.resize(src.addrs.size()); - shared_addr->proto = PROTO_NONE; shared_addr->http1_pri = {}; shared_addr->http2_pri = {}; - auto mixed_proto = false; - size_t num_http1 = 0; size_t num_http2 = 0; @@ -171,19 +169,36 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, dst_addr.fall = src_addr.fall; dst_addr.rise = src_addr.rise; - dst_addr.connect_blocker = make_unique(randgen_, loop_); + dst_addr.connect_blocker = + make_unique(randgen_, loop_, + [shared_addr, &dst_addr]() { + switch (dst_addr.proto) { + case PROTO_HTTP1: + --shared_addr->http1_pri.weight; + break; + case PROTO_HTTP2: + --shared_addr->http2_pri.weight; + break; + default: + assert(0); + } + }, + [shared_addr, &dst_addr]() { + switch (dst_addr.proto) { + case PROTO_HTTP1: + ++shared_addr->http1_pri.weight; + break; + case PROTO_HTTP2: + ++shared_addr->http2_pri.weight; + break; + default: + assert(0); + } + }); + dst_addr.live_check = make_unique(loop_, cl_ssl_ctx_, this, &dst_addr, randgen_); - if (!mixed_proto) { - if (shared_addr->proto == PROTO_NONE) { - shared_addr->proto = dst_addr.proto; - } else if (shared_addr->proto != dst_addr.proto) { - shared_addr->proto = PROTO_NONE; - mixed_proto = true; - } - } - if (dst_addr.proto == PROTO_HTTP2) { ++num_http2; } else { @@ -202,14 +217,14 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, }); if (it == end) { - if (shared_addr->proto == PROTO_NONE) { - auto max = std::max(static_cast(65536), - std::max(num_http1, num_http2)); + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "number of http/1.1 backend: " << num_http1 + << ", number of h2 backend: " << num_http2; + } - shared_addr->http1_pri.iweight = max / num_http1; - shared_addr->http2_pri.iweight = max / num_http2; - shared_addr->max_pri_dist = std::max(shared_addr->http1_pri.iweight, - shared_addr->http2_pri.iweight); + if (num_http2 > 0 && num_http1 > 0) { + shared_addr->http1_pri.weight = num_http1; + shared_addr->http2_pri.weight = num_http2; } dst.shared_addr = shared_addr; diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index fae684d8..c5b23658 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -114,17 +114,12 @@ struct WeightedPri { // But with the same theory described in stream priority, it is no // problem. uint32_t cycle; - // inverted weight, this is a penalty added to cycle when this item - // is selected. - uint32_t iweight; + // weight, larger weight means more frequent use. + uint32_t weight; }; struct SharedDownstreamAddr { std::vector addrs; - // Application protocol used in this backend addresses. If all - // addresses use a single protocol, this field has that value. - // Otherwise, this value contains PROTO_NONE. - shrpx_proto proto; // List of Http2Session which is not fully utilized (i.e., the // server advertized maximum concurrency is not reached). We will // coalesce as much stream as possible in one Http2Session to fully @@ -143,8 +138,6 @@ struct SharedDownstreamAddr { // HTTP/1.1. Otherwise, choose HTTP/2. WeightedPri http1_pri; WeightedPri http2_pri; - // The maximum penalty added to http2_pri or http1_pri - uint32_t max_pri_dist; }; struct DownstreamAddrGroup {