nghttpx: Rewrite backend HTTP/2 connection coalesce strategy

Previously, we use one Http2Session object per DownstreamAddrGroup.
This is not flexible, and we have to provision how many HTTP/2
connection is required in advance.  The new strategy is we add
Http2Session object on demand.  We measure the number of attached
downstream connection object and server advertised concurrency limit.
As long as former is smaller than the latter, we attach new downstream
connection to it.  Once the limit is reached, we create new
Http2Session object.  If the number lowers the limit, we start to
share Http2Session object again.
This commit is contained in:
Tatsuhiro Tsujikawa 2016-02-27 19:39:03 +09:00
parent 9eeac27966
commit 21007da392
9 changed files with 141 additions and 92 deletions

View File

@ -386,11 +386,6 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
get_config()->conn.upstream.ratelimit.read, writecb, readcb,
timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
get_config()->tls.dyn_rec.idle_timeout),
pinned_http2sessions_(
get_config()->conn.downstream.proto == PROTO_HTTP2
? make_unique<std::vector<ssize_t>>(
worker->get_downstream_addr_groups().size(), -1)
: nullptr),
ipaddr_(ipaddr),
port_(port),
faddr_(faddr),
@ -714,15 +709,30 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
auto dconn_pool = worker_->get_dconn_pool();
if (downstreamconf.proto == PROTO_HTTP2) {
Http2Session *http2session;
auto &pinned = (*pinned_http2sessions_)[group];
if (pinned == -1) {
http2session = worker_->next_http2_session(group);
pinned = http2session->get_index();
} else {
auto dgrp = worker_->get_dgrp(group);
http2session = dgrp->http2sessions[pinned].get();
auto &addr_group = worker_->get_downstream_addr_groups()[group];
if (addr_group.http2_freelist.empty()) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this)
<< "http2_freelist is empty; create new Http2Session";
}
auto session = make_unique<Http2Session>(
conn_.loop, worker_->get_cl_ssl_ctx(), worker_, group);
addr_group.http2_freelist.append(session.release());
}
auto http2session = addr_group.http2_freelist.head;
// TODO max_concurrent_streams option must be independent from
// frontend and backend.
if (http2session->max_concurrency_reached(1)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
<< http2session
<< "). Remove Http2Session from http2_freelist";
}
addr_group.http2_freelist.remove(http2session);
}
dconn = make_unique<Http2DownstreamConnection>(dconn_pool, http2session);
} else {
dconn = make_unique<HttpDownstreamConnection>(dconn_pool, group,

View File

@ -144,7 +144,6 @@ private:
Connection conn_;
ev_timer reneg_shutdown_timer_;
std::unique_ptr<Upstream> upstream_;
std::unique_ptr<std::vector<ssize_t>> pinned_http2sessions_;
// IP address of client. If UNIX domain socket is used, this is
// "localhost".
std::string ipaddr_;

View File

@ -61,6 +61,7 @@ namespace shrpx {
struct LogFragment;
class ConnectBlocker;
class Http2Session;
namespace ssl {
@ -304,6 +305,16 @@ struct DownstreamAddrGroup {
ImmutableString pattern;
std::vector<DownstreamAddr> addrs;
// List of Http2Session which is not fully utilized (i.e., the
// server advertized maximum concurrency is not reached). We will
// coalesce as much stream as possible in one Http2Session to fully
// utilize TCP connection.
//
// TODO Verify that this approach performs better in performance
// wise.
DList<Http2Session> http2_freelist;
// Next downstream address index in addrs.
size_t next;
};
struct TicketKey {

View File

@ -74,6 +74,10 @@ void connchk_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
SSLOG(INFO, http2session) << "ping timeout";
}
http2session->disconnect();
if (http2session->get_num_dconns() == 0) {
delete http2session;
}
return;
default:
if (LOG_ENABLED(INFO)) {
@ -92,6 +96,9 @@ void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
SSLOG(INFO, http2session) << "SETTINGS timeout";
if (http2session->terminate_session(NGHTTP2_SETTINGS_TIMEOUT) != 0) {
http2session->disconnect();
if (http2session->get_num_dconns() == 0) {
delete http2session;
}
return;
}
http2session->signal_write();
@ -109,6 +116,9 @@ void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
http2session->disconnect(http2session->get_state() ==
Http2Session::CONNECTING);
if (http2session->get_num_dconns() == 0) {
delete http2session;
}
}
} // namespace
@ -120,6 +130,9 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) {
rv = http2session->do_read();
if (rv != 0) {
http2session->disconnect(http2session->should_hard_fail());
if (http2session->get_num_dconns() == 0) {
delete http2session;
}
return;
}
http2session->connection_alive();
@ -127,6 +140,9 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) {
rv = http2session->do_write();
if (rv != 0) {
http2session->disconnect(http2session->should_hard_fail());
if (http2session->get_num_dconns() == 0) {
delete http2session;
}
return;
}
}
@ -140,6 +156,9 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
rv = http2session->do_write();
if (rv != 0) {
http2session->disconnect(http2session->should_hard_fail());
if (http2session->get_num_dconns() == 0) {
delete http2session;
}
return;
}
http2session->reset_connection_check_timer_if_not_checking();
@ -147,8 +166,10 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
} // namespace
Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
Worker *worker, size_t group, size_t idx)
: conn_(loop, -1, nullptr, worker->get_mcpool(),
Worker *worker, size_t group)
: dlnext(nullptr),
dlprev(nullptr),
conn_(loop, -1, nullptr, worker->get_mcpool(),
get_config()->conn.downstream.timeout.write,
get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb,
timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
@ -159,11 +180,9 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
addr_(nullptr),
session_(nullptr),
group_(group),
index_(idx),
state_(DISCONNECTED),
connection_check_state_(CONNECTION_CHECK_NONE),
flow_control_(false) {
read_ = write_ = &Http2Session::noop;
on_read_ = &Http2Session::read_noop;
@ -182,7 +201,17 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
settings_timer_.data = this;
}
Http2Session::~Http2Session() { disconnect(); }
Http2Session::~Http2Session() {
disconnect();
if (in_freelist()) {
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Removed from http2_freelist";
}
auto &addr_group = worker_->get_downstream_addr_groups()[group_];
addr_group.http2_freelist.remove(this);
}
}
int Http2Session::disconnect(bool hard) {
if (LOG_ENABLED(INFO)) {
@ -252,8 +281,8 @@ int Http2Session::disconnect(bool hard) {
int Http2Session::initiate_connection() {
int rv = 0;
auto &groups = worker_->get_downstream_addr_groups();
auto &addrs = groups[group_].addrs;
auto &addr_group = worker_->get_downstream_addr_groups()[group_];
auto &addrs = addr_group.addrs;
auto worker_blocker = worker_->get_connect_blocker();
if (state_ == DISCONNECTED) {
@ -265,7 +294,7 @@ int Http2Session::initiate_connection() {
return -1;
}
auto &next_downstream = worker_->get_dgrp(group_)->next;
auto &next_downstream = addr_group.next;
auto end = next_downstream;
for (;;) {
@ -598,6 +627,18 @@ void Http2Session::remove_downstream_connection(
Http2DownstreamConnection *dconn) {
dconns_.remove(dconn);
dconn->detach_stream_data();
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Remove downstream";
}
if (!in_freelist() && !max_concurrency_reached()) {
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Append to Http2Session freelist";
}
auto &addr_group = worker_->get_downstream_addr_groups()[group_];
addr_group.http2_freelist.append(this);
}
}
void Http2Session::remove_stream_data(StreamData *sd) {
@ -1892,8 +1933,6 @@ const DownstreamAddr *Http2Session::get_addr() const { return addr_; }
size_t Http2Session::get_group() const { return group_; }
size_t Http2Session::get_index() const { return index_; }
int Http2Session::handle_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id) {
auto upstream = downstream->get_upstream();
@ -1986,4 +2025,24 @@ int Http2Session::handle_downstream_push_promise_complete(
return 0;
}
size_t Http2Session::get_num_dconns() const { return dconns_.size(); }
bool Http2Session::in_freelist() const {
auto &addr_group = worker_->get_downstream_addr_groups()[group_];
return dlnext != nullptr || dlprev != nullptr ||
addr_group.http2_freelist.head == this ||
addr_group.http2_freelist.tail == this;
}
bool Http2Session::max_concurrency_reached(size_t extra) const {
if (!session_) {
return dconns_.size() + extra >= 100;
}
return dconns_.size() + extra >=
nghttp2_session_get_remote_settings(
session_, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
}
} // namespace shrpx

View File

@ -57,7 +57,7 @@ struct StreamData {
class Http2Session {
public:
Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
size_t group, size_t idx);
size_t group);
~Http2Session();
// If hard is true, all pending requests are abandoned and
@ -149,13 +149,27 @@ public:
size_t get_group() const;
size_t get_index() const;
int handle_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id);
int handle_downstream_push_promise_complete(Downstream *downstream,
Downstream *promised_downstream);
// Returns number of downstream connections, including pushed
// streams.
size_t get_num_dconns() const;
// Returns true if this object is included in freelist. See
// DownstreamAddrGroup object.
bool in_freelist() const;
// Returns true if the maximum concurrency is reached. In other
// words, the number of currently participated streams in this
// session is equal or greater than the max concurrent streams limit
// advertised by server. If |extra| is nonzero, it is added to the
// number of current concurrent streams when comparing against
// server initiated concurrency limit.
bool max_concurrency_reached(size_t extra = 0) const;
enum {
// Disconnected
DISCONNECTED,
@ -184,6 +198,8 @@ public:
using ReadBuf = Buffer<8_k>;
Http2Session *dlnext, *dlprev;
private:
Connection conn_;
DefaultMemchunks wb_;
@ -207,9 +223,6 @@ private:
const DownstreamAddr *addr_;
nghttp2_session *session_;
size_t group_;
// index inside group, this is used to pin frontend to certain
// HTTP/2 backend for better throughput.
size_t index_;
int state_;
int connection_check_state_;
bool flow_control_;

View File

@ -157,10 +157,10 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
conn_.set_ssl(ssl);
}
auto &next_downstream = worker_->get_dgrp(group_)->next;
auto end = next_downstream;
auto &groups = worker_->get_downstream_addr_groups();
auto &addrs = groups[group_].addrs;
auto &next_downstream = groups[group_].next;
auto end = next_downstream;
for (;;) {
auto &addr = addrs[next_downstream];

View File

@ -73,7 +73,6 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
: randgen_(rd()),
dconn_pool_(get_config()->conn.downstream.addr_groups.size()),
worker_stat_(get_config()->conn.downstream.addr_groups.size()),
dgrps_(get_config()->conn.downstream.addr_groups.size()),
loop_(loop),
sv_ssl_ctx_(sv_ssl_ctx),
cl_ssl_ctx_(cl_ssl_ctx),
@ -98,24 +97,6 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
StringRef{session_cacheconf.memcached.host}, &mcpool_);
}
auto &downstreamconf = get_config()->conn.downstream;
if (downstreamconf.proto == PROTO_HTTP2) {
auto n = get_config()->http2.downstream.connections_per_worker;
size_t group = 0;
for (auto &dgrp : dgrps_) {
auto m = n;
if (m == 0) {
m = downstreamconf.addr_groups[group].addrs.size();
}
for (size_t idx = 0; idx < m; ++idx) {
dgrp.http2sessions.push_back(
make_unique<Http2Session>(loop_, cl_ssl_ctx, this, group, idx));
}
++group;
}
}
for (auto &group : downstream_addr_groups_) {
for (auto &addr : group.addrs) {
addr.connect_blocker = new ConnectBlocker(randgen_, loop_);
@ -255,22 +236,6 @@ WorkerStat *Worker::get_worker_stat() { return &worker_stat_; }
DownstreamConnectionPool *Worker::get_dconn_pool() { return &dconn_pool_; }
Http2Session *Worker::next_http2_session(size_t group) {
auto &dgrp = dgrps_[group];
auto &http2sessions = dgrp.http2sessions;
if (http2sessions.empty()) {
return nullptr;
}
auto res = http2sessions[dgrp.next_http2session].get();
++dgrp.next_http2session;
if (dgrp.next_http2session >= http2sessions.size()) {
dgrp.next_http2session = 0;
}
return res;
}
struct ev_loop *Worker::get_loop() const {
return loop_;
}
@ -285,11 +250,6 @@ bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; }
MemchunkPool *Worker::get_mcpool() { return &mcpool_; }
DownstreamGroup *Worker::get_dgrp(size_t group) {
assert(group < dgrps_.size());
return &dgrps_[group];
}
MemcachedDispatcher *Worker::get_session_cache_memcached_dispatcher() {
return session_cache_memcached_dispatcher_.get();
}

View File

@ -67,17 +67,6 @@ namespace ssl {
class CertLookupTree;
} // namespace ssl
struct DownstreamGroup {
DownstreamGroup() : next_http2session(0), next(0) {}
std::vector<std::unique_ptr<Http2Session>> http2sessions;
// Next index in http2sessions.
size_t next_http2session;
// Next downstream address index corresponding to
// Config::downstream_addr_groups[].
size_t next;
};
struct WorkerStat {
WorkerStat(size_t num_groups) : num_connections(0) {}
@ -122,7 +111,6 @@ public:
WorkerStat *get_worker_stat();
DownstreamConnectionPool *get_dconn_pool();
Http2Session *next_http2_session(size_t group);
struct ev_loop *get_loop() const;
SSL_CTX *get_sv_ssl_ctx() const;
SSL_CTX *get_cl_ssl_ctx() const;
@ -133,8 +121,6 @@ public:
MemchunkPool *get_mcpool();
void schedule_clear_mcpool();
DownstreamGroup *get_dgrp(size_t group);
MemcachedDispatcher *get_session_cache_memcached_dispatcher();
std::mt19937 &get_randgen();
@ -161,7 +147,6 @@ private:
MemchunkPool mcpool_;
DownstreamConnectionPool dconn_pool_;
WorkerStat worker_stat_;
std::vector<DownstreamGroup> dgrps_;
std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_;
#ifdef HAVE_MRUBY

View File

@ -99,14 +99,17 @@ template <typename T, typename F> bool test_flags(T t, F flags) {
// T *dlnext, which point to previous element and next element in the
// list respectively.
template <typename T> struct DList {
DList() : head(nullptr), tail(nullptr) {}
DList() : head(nullptr), tail(nullptr), n(0) {}
DList(const DList &) = delete;
// We should delete these copy ctor and assignment operator. We
// need to them where copy is required before we add item to it. If
// you doubt, make them delete and try to compile.
DList(const DList &) = default;
DList &operator=(const DList &) = default;
DList &operator=(const DList &) = delete;
DList(DList &&other) : head(other.head), tail(other.tail) {
DList(DList &&other) : head(other.head), tail(other.tail), n(other.n) {
other.head = other.tail = nullptr;
other.n = 0;
}
DList &operator=(DList &&other) {
@ -115,11 +118,16 @@ template <typename T> struct DList {
}
head = other.head;
tail = other.tail;
n = other.n;
other.head = other.tail = nullptr;
other.n = 0;
return *this;
}
void append(T *t) {
++n;
if (tail) {
tail->dlnext = t;
t->dlprev = tail;
@ -130,6 +138,7 @@ template <typename T> struct DList {
}
void remove(T *t) {
--n;
auto p = t->dlprev;
auto n = t->dlnext;
if (p) {
@ -149,7 +158,10 @@ template <typename T> struct DList {
bool empty() const { return head == nullptr; }
size_t size() const { return n; }
T *head, *tail;
size_t n;
};
template <typename T> void dlist_delete_all(DList<T> &dl) {