nghttpx: Refactor protocol selection in backend

This commit is contained in:
Tatsuhiro Tsujikawa 2016-05-25 23:07:04 +09:00
parent fce7908fe6
commit e0491c2ee8
6 changed files with 108 additions and 44 deletions

View File

@ -790,17 +790,25 @@ Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) {
}
namespace {
bool pri_less(const WeightedPri &lhs, const WeightedPri &rhs, uint32_t max) {
// The chosen value is small enough for uint32_t, and large enough for
// the number of backend.
constexpr uint32_t WEIGHT_MAX = 65536;
} // namespace
namespace {
bool pri_less(const WeightedPri &lhs, const WeightedPri &rhs) {
if (lhs.cycle < rhs.cycle) {
return rhs.cycle - lhs.cycle <= max;
return rhs.cycle - lhs.cycle <= WEIGHT_MAX;
}
return lhs.cycle - rhs.cycle > max;
return lhs.cycle - rhs.cycle > WEIGHT_MAX;
}
} // namespace
namespace {
uint32_t next_cycle(const WeightedPri &pri) { return pri.cycle + pri.iweight; }
uint32_t next_cycle(const WeightedPri &pri) {
return pri.cycle + WEIGHT_MAX / std::min(WEIGHT_MAX, pri.weight);
}
} // namespace
std::unique_ptr<DownstreamConnection>
@ -850,17 +858,31 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
auto proto = PROTO_NONE;
if (shared_addr->proto == PROTO_NONE) {
if (pri_less(shared_addr->http1_pri, shared_addr->http2_pri,
shared_addr->max_pri_dist)) {
shared_addr->http1_pri.cycle = next_cycle(shared_addr->http1_pri);
auto http1_weight = shared_addr->http1_pri.weight;
auto http2_weight = shared_addr->http2_pri.weight;
if (http1_weight > 0 && http2_weight > 0) {
// We only advance cycle if both weight has nonzero to keep its
// distance under WEIGHT_MAX.
if (pri_less(shared_addr->http1_pri, shared_addr->http2_pri)) {
proto = PROTO_HTTP1;
shared_addr->http1_pri.cycle = next_cycle(shared_addr->http1_pri);
} else {
shared_addr->http2_pri.cycle = next_cycle(shared_addr->http2_pri);
proto = PROTO_HTTP2;
shared_addr->http2_pri.cycle = next_cycle(shared_addr->http2_pri);
}
} else if (http1_weight > 0) {
proto = PROTO_HTTP1;
} else {
proto = shared_addr->proto;
proto = PROTO_HTTP2;
}
if (proto == PROTO_NONE) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "No working downstream address found";
}
return nullptr;
}
if (proto == PROTO_HTTP2) {

View File

@ -28,15 +28,26 @@ namespace shrpx {
namespace {
void connect_blocker_cb(struct ev_loop *loop, ev_timer *w, int revents) {
auto connect_blocker = static_cast<ConnectBlocker *>(w->data);
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Unblock";
}
connect_blocker->call_unblock_func();
}
} // namespace
ConnectBlocker::ConnectBlocker(std::mt19937 &gen, struct ev_loop *loop)
: gen_(gen), loop_(loop), fail_count_(0), offline_(false) {
ConnectBlocker::ConnectBlocker(std::mt19937 &gen, struct ev_loop *loop,
std::function<void()> block_func,
std::function<void()> unblock_func)
: gen_(gen),
block_func_(block_func),
unblock_func_(unblock_func),
loop_(loop),
fail_count_(0),
offline_(false) {
ev_timer_init(&timer_, connect_blocker_cb, 0., 0.);
timer_.data = this;
}
ConnectBlocker::~ConnectBlocker() { ev_timer_stop(loop_, &timer_); }
@ -64,6 +75,8 @@ void ConnectBlocker::on_failure() {
return;
}
call_block_func();
++fail_count_;
auto base_backoff = pow(MULTIPLIER, std::min(MAX_BACKOFF_EXP, fail_count_));
@ -85,6 +98,10 @@ void ConnectBlocker::offline() {
return;
}
if (!ev_is_active(&timer_)) {
call_block_func();
}
offline_ = true;
ev_timer_stop(loop_, &timer_);
@ -102,4 +119,8 @@ void ConnectBlocker::online() {
bool ConnectBlocker::in_offline() const { return offline_; }
void ConnectBlocker::call_block_func() { block_func_(); }
void ConnectBlocker::call_unblock_func() { unblock_func_(); }
} // namespace shrpx

View File

@ -35,7 +35,9 @@ namespace shrpx {
class ConnectBlocker {
public:
ConnectBlocker(std::mt19937 &gen, struct ev_loop *loop);
ConnectBlocker(std::mt19937 &gen, struct ev_loop *loop,
std::function<void()> block_func,
std::function<void()> unblock_func);
~ConnectBlocker();
// Returns true if making connection is not allowed.
@ -60,8 +62,15 @@ public:
// Returns true if peer is considered offline.
bool in_offline() const;
void call_block_func();
void call_unblock_func();
private:
std::mt19937 gen_;
// Called when blocking is started
std::function<void()> block_func_;
// Called when unblocked
std::function<void()> unblock_func_;
ev_timer timer_;
struct ev_loop *loop_;
// The number of consecutive connection failure. Reset to 0 on

View File

@ -197,6 +197,10 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
}
if (addr.proto != PROTO_HTTP1) {
if (end == next_downstream) {
return SHRPX_ERR_NETWORK;
}
continue;
}

View File

@ -67,7 +67,7 @@ namespace {
bool match_shared_downstream_addr(
const std::shared_ptr<SharedDownstreamAddr> &lhs,
const std::shared_ptr<SharedDownstreamAddr> &rhs) {
if (lhs->addrs.size() != rhs->addrs.size() || lhs->proto != rhs->proto) {
if (lhs->addrs.size() != rhs->addrs.size()) {
return false;
}
@ -115,7 +115,8 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
cert_tree_(cert_tree),
ticket_keys_(ticket_keys),
downstream_addr_groups_(get_config()->conn.downstream.addr_groups.size()),
connect_blocker_(make_unique<ConnectBlocker>(randgen_, loop_)),
connect_blocker_(
make_unique<ConnectBlocker>(randgen_, loop_, []() {}, []() {})),
graceful_shutdown_(false) {
ev_async_init(&w_, eventcb);
w_.data = this;
@ -147,12 +148,9 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
// does not value initialize SharedDownstreamAddr above.
shared_addr->next = 0;
shared_addr->addrs.resize(src.addrs.size());
shared_addr->proto = PROTO_NONE;
shared_addr->http1_pri = {};
shared_addr->http2_pri = {};
auto mixed_proto = false;
size_t num_http1 = 0;
size_t num_http2 = 0;
@ -171,19 +169,36 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
dst_addr.fall = src_addr.fall;
dst_addr.rise = src_addr.rise;
dst_addr.connect_blocker = make_unique<ConnectBlocker>(randgen_, loop_);
dst_addr.connect_blocker =
make_unique<ConnectBlocker>(randgen_, loop_,
[shared_addr, &dst_addr]() {
switch (dst_addr.proto) {
case PROTO_HTTP1:
--shared_addr->http1_pri.weight;
break;
case PROTO_HTTP2:
--shared_addr->http2_pri.weight;
break;
default:
assert(0);
}
},
[shared_addr, &dst_addr]() {
switch (dst_addr.proto) {
case PROTO_HTTP1:
++shared_addr->http1_pri.weight;
break;
case PROTO_HTTP2:
++shared_addr->http2_pri.weight;
break;
default:
assert(0);
}
});
dst_addr.live_check =
make_unique<LiveCheck>(loop_, cl_ssl_ctx_, this, &dst_addr, randgen_);
if (!mixed_proto) {
if (shared_addr->proto == PROTO_NONE) {
shared_addr->proto = dst_addr.proto;
} else if (shared_addr->proto != dst_addr.proto) {
shared_addr->proto = PROTO_NONE;
mixed_proto = true;
}
}
if (dst_addr.proto == PROTO_HTTP2) {
++num_http2;
} else {
@ -202,14 +217,14 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
});
if (it == end) {
if (shared_addr->proto == PROTO_NONE) {
auto max = std::max(static_cast<size_t>(65536),
std::max(num_http1, num_http2));
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "number of http/1.1 backend: " << num_http1
<< ", number of h2 backend: " << num_http2;
}
shared_addr->http1_pri.iweight = max / num_http1;
shared_addr->http2_pri.iweight = max / num_http2;
shared_addr->max_pri_dist = std::max(shared_addr->http1_pri.iweight,
shared_addr->http2_pri.iweight);
if (num_http2 > 0 && num_http1 > 0) {
shared_addr->http1_pri.weight = num_http1;
shared_addr->http2_pri.weight = num_http2;
}
dst.shared_addr = shared_addr;

View File

@ -114,17 +114,12 @@ struct WeightedPri {
// But with the same theory described in stream priority, it is no
// problem.
uint32_t cycle;
// inverted weight, this is a penalty added to cycle when this item
// is selected.
uint32_t iweight;
// weight, larger weight means more frequent use.
uint32_t weight;
};
struct SharedDownstreamAddr {
std::vector<DownstreamAddr> addrs;
// Application protocol used in this backend addresses. If all
// addresses use a single protocol, this field has that value.
// Otherwise, this value contains PROTO_NONE.
shrpx_proto proto;
// List of Http2Session which is not fully utilized (i.e., the
// server advertized maximum concurrency is not reached). We will
// coalesce as much stream as possible in one Http2Session to fully
@ -143,8 +138,6 @@ struct SharedDownstreamAddr {
// HTTP/1.1. Otherwise, choose HTTP/2.
WeightedPri http1_pri;
WeightedPri http2_pri;
// The maximum penalty added to http2_pri or http1_pri
uint32_t max_pri_dist;
};
struct DownstreamAddrGroup {