nghttpx: Better load balancing between backend HTTP/2 servers

This commit is contained in:
Tatsuhiro Tsujikawa 2016-04-02 23:11:03 +09:00
parent 1816af4fb2
commit 46514074a4
8 changed files with 250 additions and 110 deletions

View File

@ -47,6 +47,7 @@
#include "shrpx_downstream_connection_pool.h"
#include "shrpx_downstream.h"
#include "shrpx_http2_session.h"
#include "shrpx_connect_blocker.h"
#ifdef HAVE_SPDYLAY
#include "shrpx_spdy_upstream.h"
#endif // HAVE_SPDYLAY
@ -680,6 +681,116 @@ void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) {
dconn_pool.remove_downstream_connection(dconn);
}
namespace {
// Returns true if load of |lhs| is lighter than that of |rhs|.
// Currently, we assume that lesser streams means lesser load.
bool load_lighter(const DownstreamAddr *lhs, const DownstreamAddr *rhs) {
return lhs->num_dconn < rhs->num_dconn;
}
} // namespace
Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) {
auto &shared_addr = group.shared_addr;
// First count the working backend addresses.
size_t min = 0;
for (const auto &addr : shared_addr->addrs) {
if (addr.connect_blocker->blocked()) {
continue;
}
++min;
}
if (min == 0) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "No working backend address found";
}
return nullptr;
}
auto &http2_avail_freelist = shared_addr->http2_avail_freelist;
if (http2_avail_freelist.size() >= min) {
auto session = http2_avail_freelist.head;
session->remove_from_freelist();
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Use Http2Session " << session
<< " from http2_avail_freelist";
}
if (session->max_concurrency_reached(1)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
<< session << ").";
}
} else {
session->add_to_avail_freelist();
}
return session;
}
DownstreamAddr *selected_addr = nullptr;
for (auto &addr : shared_addr->addrs) {
if (addr.http2_extra_freelist.size() == 0 &&
addr.connect_blocker->blocked()) {
continue;
}
if (addr.in_avail) {
continue;
}
if (selected_addr == nullptr || load_lighter(&addr, selected_addr)) {
selected_addr = &addr;
}
}
assert(selected_addr);
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Selected DownstreamAddr=" << selected_addr
<< ", index="
<< (selected_addr - shared_addr->addrs.data()) /
sizeof(*selected_addr);
}
if (selected_addr->http2_extra_freelist.size()) {
auto session = selected_addr->http2_extra_freelist.head;
session->remove_from_freelist();
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Use Http2Session " << session
<< " from http2_extra_freelist";
}
if (session->max_concurrency_reached(1)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
<< session << ").";
}
} else {
session->add_to_avail_freelist();
}
return session;
}
auto session = new Http2Session(
conn_.loop, shared_addr->tls ? worker_->get_cl_ssl_ctx() : nullptr,
worker_, &group, selected_addr);
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Create new Http2Session " << session;
}
session->add_to_avail_freelist();
return session;
}
std::unique_ptr<DownstreamConnection>
ClientHandler::get_downstream_connection(Downstream *downstream) {
size_t group_idx;
@ -735,38 +846,10 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
}
if (shared_addr->proto == PROTO_HTTP2) {
auto &http2_freelist = shared_addr->http2_freelist;
auto http2session = select_http2_session(group);
Http2Session *http2session;
if (http2_freelist.empty() ||
http2_freelist.size() < shared_addr->addrs.size()) {
if (LOG_ENABLED(INFO)) {
if (http2_freelist.empty()) {
CLOG(INFO, this)
<< "http2_freelist is empty; create new Http2Session";
} else {
CLOG(INFO, this) << "Create new Http2Session; current "
<< http2_freelist.size() << ", min "
<< shared_addr->addrs.size();
}
}
http2session = new Http2Session(
conn_.loop, shared_addr->tls ? worker_->get_cl_ssl_ctx() : nullptr,
worker_, &group);
} else {
http2session = http2_freelist.head;
http2_freelist.remove(http2session);
}
if (http2session->max_concurrency_reached(1)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
<< http2session
<< "). Remove Http2Session from http2_freelist";
}
} else {
http2_freelist.append(http2session);
if (http2session == nullptr) {
return nullptr;
}
dconn = make_unique<Http2DownstreamConnection>(http2session);

View File

@ -49,6 +49,7 @@ class ConnectBlocker;
class DownstreamConnectionPool;
class Worker;
struct WorkerStat;
struct DownstreamAddrGroup;
class ClientHandler {
public:
@ -141,6 +142,8 @@ public:
// header field.
StringRef get_forwarded_for() const;
Http2Session *select_http2_session(DownstreamAddrGroup &group);
private:
Connection conn_;
ev_timer reneg_shutdown_timer_;

View File

@ -182,7 +182,8 @@ void initiate_connection_cb(struct ev_loop *loop, ev_timer *w, int revents) {
} // namespace
Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
Worker *worker, DownstreamAddrGroup *group)
Worker *worker, DownstreamAddrGroup *group,
DownstreamAddr *addr)
: dlnext(nullptr),
dlprev(nullptr),
conn_(loop, -1, nullptr, worker->get_mcpool(),
@ -194,10 +195,11 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
worker_(worker),
ssl_ctx_(ssl_ctx),
group_(group),
addr_(nullptr),
addr_(addr),
session_(nullptr),
state_(DISCONNECTED),
connection_check_state_(CONNECTION_CHECK_NONE),
freelist_zone_(FREELIST_ZONE_NONE),
flow_control_(false) {
read_ = write_ = &Http2Session::noop;
@ -223,12 +225,7 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
Http2Session::~Http2Session() {
disconnect(true);
if (in_freelist()) {
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Removed from http2_freelist";
}
group_->shared_addr->http2_freelist.remove(this);
}
remove_from_freelist();
}
int Http2Session::disconnect(bool hard) {
@ -254,8 +251,6 @@ int Http2Session::disconnect(bool hard) {
conn_.disconnect();
addr_ = nullptr;
if (proxy_htp_) {
proxy_htp_.reset();
}
@ -298,8 +293,6 @@ int Http2Session::disconnect(bool hard) {
int Http2Session::initiate_connection() {
int rv = 0;
auto &shared_addr = group_->shared_addr;
auto &addrs = shared_addr->addrs;
auto worker_blocker = worker_->get_connect_blocker();
if (state_ == DISCONNECTED) {
@ -310,42 +303,6 @@ int Http2Session::initiate_connection() {
}
return -1;
}
auto &next_downstream = shared_addr->next;
auto end = next_downstream;
for (;;) {
auto &addr = addrs[next_downstream];
if (++next_downstream >= addrs.size()) {
next_downstream = 0;
}
auto &connect_blocker = addr.connect_blocker;
if (connect_blocker->blocked()) {
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Backend server "
<< util::to_numeric_addr(&addr.addr)
<< " was not available temporarily";
}
if (end == next_downstream) {
return -1;
}
continue;
}
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Using downstream address idx=" << next_downstream
<< " out of " << addrs.size();
}
addr_ = &addr;
break;
}
}
auto &connect_blocker = addr_->connect_blocker;
@ -647,10 +604,12 @@ int Http2Session::downstream_connect_proxy() {
void Http2Session::add_downstream_connection(Http2DownstreamConnection *dconn) {
dconns_.append(dconn);
++addr_->num_dconn;
}
void Http2Session::remove_downstream_connection(
Http2DownstreamConnection *dconn) {
--addr_->num_dconn;
dconns_.remove(dconn);
dconn->detach_stream_data();
@ -658,11 +617,14 @@ void Http2Session::remove_downstream_connection(
SSLOG(INFO, this) << "Remove downstream";
}
if (!in_freelist() && !max_concurrency_reached()) {
if (freelist_zone_ == FREELIST_ZONE_NONE && !max_concurrency_reached()) {
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Append to Http2Session freelist";
SSLOG(INFO, this) << "Append to http2_extra_freelist, addr=" << addr_
<< ", freelist.size="
<< addr_->http2_extra_freelist.size();
}
group_->shared_addr->http2_freelist.append(this);
add_to_extra_freelist();
}
}
@ -2087,14 +2049,6 @@ int Http2Session::handle_downstream_push_promise_complete(
size_t Http2Session::get_num_dconns() const { return dconns_.size(); }
bool Http2Session::in_freelist() const {
auto &shared_addr = group_->shared_addr;
auto &http2_freelist = shared_addr->http2_freelist;
return dlnext != nullptr || dlprev != nullptr ||
http2_freelist.head == this || http2_freelist.tail == this;
}
bool Http2Session::max_concurrency_reached(size_t extra) const {
if (!session_) {
return dconns_.size() + extra >= 100;
@ -2112,4 +2066,57 @@ DownstreamAddrGroup *Http2Session::get_downstream_addr_group() const {
return group_;
}
void Http2Session::add_to_avail_freelist() {
assert(freelist_zone_ == FREELIST_ZONE_NONE);
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Append to http2_avail_freelist, group=" << group_
<< ", freelist.size="
<< group_->shared_addr->http2_avail_freelist.size();
}
freelist_zone_ = FREELIST_ZONE_AVAIL;
group_->shared_addr->http2_avail_freelist.append(this);
addr_->in_avail = true;
}
void Http2Session::add_to_extra_freelist() {
assert(freelist_zone_ == FREELIST_ZONE_NONE);
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Append to http2_extra_freelist, addr=" << addr_
<< ", freelist.size="
<< addr_->http2_extra_freelist.size();
}
freelist_zone_ = FREELIST_ZONE_EXTRA;
addr_->http2_extra_freelist.append(this);
}
void Http2Session::remove_from_freelist() {
switch (freelist_zone_) {
case FREELIST_ZONE_NONE:
return;
case FREELIST_ZONE_AVAIL:
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Remove from http2_avail_freelist, group=" << group_
<< ", freelist.size="
<< group_->shared_addr->http2_avail_freelist.size();
}
group_->shared_addr->http2_avail_freelist.remove(this);
addr_->in_avail = false;
break;
case FREELIST_ZONE_EXTRA:
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Remove from http2_extra_freelist, addr=" << addr_
<< ", freelist.size="
<< addr_->http2_extra_freelist.size();
}
addr_->http2_extra_freelist.remove(this);
break;
}
freelist_zone_ = FREELIST_ZONE_NONE;
}
} // namespace shrpx

View File

@ -56,10 +56,21 @@ struct StreamData {
Http2DownstreamConnection *dconn;
};
enum FreelistZone {
// Http2Session object is not linked in any freelist.
FREELIST_ZONE_NONE,
// Http2Session object is linked in group scope
// http2_avail_freelist.
FREELIST_ZONE_AVAIL,
// Http2Session object is linked in address scope
// http2_extra_freelist.
FREELIST_ZONE_EXTRA
};
class Http2Session {
public:
Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
DownstreamAddrGroup *group);
DownstreamAddrGroup *group, DownstreamAddr *addr);
~Http2Session();
// If hard is true, all pending requests are abandoned and
@ -160,9 +171,14 @@ public:
// streams.
size_t get_num_dconns() const;
// Returns true if this object is included in freelist. See
// DownstreamAddrGroup object.
bool in_freelist() const;
// Adds to group scope http2_avail_freelist.
void add_to_avail_freelist();
// Adds to address scope http2_extra_freelist.
void add_to_extra_freelist();
// Removes this object from any freelist. If this object is not
// linked from any freelist, this function does nothing.
void remove_from_freelist();
// Returns true if the maximum concurrency is reached. In other
// words, the number of currently participated streams in this
@ -229,6 +245,7 @@ private:
nghttp2_session *session_;
int state_;
int connection_check_state_;
int freelist_zone_;
bool flow_control_;
};

View File

@ -385,9 +385,9 @@ void Http2Upstream::start_downstream(Downstream *downstream) {
void Http2Upstream::initiate_downstream(Downstream *downstream) {
int rv;
rv = downstream->attach_downstream_connection(
handler_->get_downstream_connection(downstream));
if (rv != 0) {
auto dconn = handler_->get_downstream_connection(downstream);
if (!dconn ||
(rv = downstream->attach_downstream_connection(std::move(dconn))) != 0) {
// downstream connection fails, send error page
if (error_reply(downstream, 503) != 0) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
@ -1739,6 +1739,8 @@ int Http2Upstream::on_downstream_reset(bool no_retry) {
downstream->add_retry();
std::unique_ptr<DownstreamConnection> dconn;
if (no_retry || downstream->no_more_retry()) {
goto fail;
}
@ -1746,8 +1748,12 @@ int Http2Upstream::on_downstream_reset(bool no_retry) {
// downstream connection is clean; we can retry with new
// downstream connection.
rv = downstream->attach_downstream_connection(
handler_->get_downstream_connection(downstream));
dconn = handler_->get_downstream_connection(downstream);
if (!dconn) {
goto fail;
}
rv = downstream->attach_downstream_connection(std::move(dconn));
if (rv != 0) {
goto fail;
}

View File

@ -394,10 +394,10 @@ int htp_hdrs_completecb(http_parser *htp) {
return 0;
}
rv = downstream->attach_downstream_connection(
handler->get_downstream_connection(downstream));
auto dconn = handler->get_downstream_connection(downstream);
if (rv != 0) {
if (!dconn ||
(rv = downstream->attach_downstream_connection(std::move(dconn))) != 0) {
downstream->set_request_state(Downstream::CONNECT_FAIL);
return -1;
@ -1172,6 +1172,7 @@ void HttpsUpstream::on_handler_delete() {
int HttpsUpstream::on_downstream_reset(bool no_retry) {
int rv;
std::unique_ptr<DownstreamConnection> dconn;
if (!downstream_->request_submission_ready()) {
// Return error so that caller can delete handler
@ -1186,8 +1187,12 @@ int HttpsUpstream::on_downstream_reset(bool no_retry) {
goto fail;
}
rv = downstream_->attach_downstream_connection(
handler_->get_downstream_connection(downstream_.get()));
dconn = handler_->get_downstream_connection(downstream_.get());
if (!dconn) {
goto fail;
}
rv = downstream_->attach_downstream_connection(std::move(dconn));
if (rv != 0) {
goto fail;
}

View File

@ -335,9 +335,12 @@ void SpdyUpstream::start_downstream(Downstream *downstream) {
}
void SpdyUpstream::initiate_downstream(Downstream *downstream) {
int rv = downstream->attach_downstream_connection(
handler_->get_downstream_connection(downstream));
if (rv != 0) {
int rv;
auto dconn = handler_->get_downstream_connection(downstream);
if (!dconn ||
(rv = downstream->attach_downstream_connection(std::move(dconn))) != 0) {
// If downstream connection fails, issue RST_STREAM.
rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
downstream->set_request_state(Downstream::CONNECT_FAIL);
@ -1247,6 +1250,8 @@ int SpdyUpstream::on_downstream_reset(bool no_retry) {
downstream->add_retry();
std::unique_ptr<DownstreamConnection> dconn;
if (no_retry || downstream->no_more_retry()) {
goto fail;
}
@ -1254,8 +1259,12 @@ int SpdyUpstream::on_downstream_reset(bool no_retry) {
// downstream connection is clean; we can retry with new
// downstream connection.
rv = downstream->attach_downstream_connection(
handler_->get_downstream_connection(downstream));
dconn = handler_->get_downstream_connection(downstream);
if (!dconn) {
goto fail;
}
rv = downstream->attach_downstream_connection(std::move(dconn));
if (rv != 0) {
goto fail;
}

View File

@ -81,6 +81,16 @@ struct DownstreamAddr {
std::unique_ptr<ConnectBlocker> connect_blocker;
// Client side TLS session cache
TLSSessionCache tls_session_cache;
// Http2Session object created for this address. This list chains
// all Http2Session objects that is not in group scope
// http2_avail_freelist, and is not reached in maximum concurrency.
DList<Http2Session> http2_extra_freelist;
// true if Http2Session for this address is in group scope
// SharedDownstreamAddr.http2_avail_freelist
bool in_avail;
// total number of streams created in HTTP/2 connections for this
// address.
size_t num_dconn;
};
struct SharedDownstreamAddr {
@ -94,7 +104,7 @@ struct SharedDownstreamAddr {
//
// TODO Verify that this approach performs better in performance
// wise.
DList<Http2Session> http2_freelist;
DList<Http2Session> http2_avail_freelist;
DownstreamConnectionPool dconn_pool;
// Next downstream address index in addrs.
size_t next;