nghttpx: Separate Downstream address group from config to runtime

This commit is contained in:
Tatsuhiro Tsujikawa 2016-02-27 23:24:14 +09:00
parent 21007da392
commit 8ca3e5f6ba
17 changed files with 271 additions and 262 deletions

View File

@ -2098,23 +2098,23 @@ void process_options(
auto &addr_groups = downstreamconf.addr_groups;
if (addr_groups.empty()) {
DownstreamAddr addr{};
DownstreamAddrConfig addr{};
addr.host = ImmutableString::from_lit(DEFAULT_DOWNSTREAM_HOST);
addr.port = DEFAULT_DOWNSTREAM_PORT;
DownstreamAddrGroup g(StringRef::from_lit("/"));
DownstreamAddrGroupConfig g(StringRef::from_lit("/"));
g.addrs.push_back(std::move(addr));
mod_config()->router.add_route(StringRef{g.pattern}, addr_groups.size());
addr_groups.push_back(std::move(g));
} else if (get_config()->http2_proxy || get_config()->client_proxy) {
// We don't support host mapping in these cases. Move all
// non-catch-all patterns to catch-all pattern.
DownstreamAddrGroup catch_all(StringRef::from_lit("/"));
DownstreamAddrGroupConfig catch_all(StringRef::from_lit("/"));
for (auto &g : addr_groups) {
std::move(std::begin(g.addrs), std::end(g.addrs),
std::back_inserter(catch_all.addrs));
}
std::vector<DownstreamAddrGroup>().swap(addr_groups);
std::vector<DownstreamAddrGroupConfig>().swap(addr_groups);
// maybe not necessary?
mod_config()->router = Router();
mod_config()->router.add_route(StringRef{catch_all.pattern},

View File

@ -637,13 +637,18 @@ void ClientHandler::pool_downstream_connection(
if (!dconn->poolable()) {
return;
}
dconn->set_client_handler(nullptr);
auto group = dconn->get_downstream_addr_group();
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Pooling downstream connection DCONN:" << dconn.get()
<< " in group " << dconn->get_group();
<< " in group " << group;
}
dconn->set_client_handler(nullptr);
auto dconn_pool = worker_->get_dconn_pool();
dconn_pool->add_downstream_connection(std::move(dconn));
auto &dconn_pool = group->dconn_pool;
dconn_pool.add_downstream_connection(std::move(dconn));
}
void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) {
@ -651,13 +656,13 @@ void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) {
CLOG(INFO, this) << "Removing downstream connection DCONN:" << dconn
<< " from pool";
}
auto dconn_pool = worker_->get_dconn_pool();
dconn_pool->remove_downstream_connection(dconn);
auto &dconn_pool = dconn->get_downstream_addr_group()->dconn_pool;
dconn_pool.remove_downstream_connection(dconn);
}
std::unique_ptr<DownstreamConnection>
ClientHandler::get_downstream_connection(Downstream *downstream) {
size_t group;
size_t group_idx;
auto &downstreamconf = get_config()->conn.downstream;
auto catch_all = downstreamconf.addr_group_catch_all;
auto &groups = worker_->get_downstream_addr_groups();
@ -667,26 +672,26 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
// Fast path. If we have one group, it must be catch-all group.
// HTTP/2 and client proxy modes fall in this case.
if (groups.size() == 1) {
group = 0;
group_idx = 0;
} else if (req.method == HTTP_CONNECT) {
// We don't know how to treat CONNECT request in host-path
// mapping. It most likely appears in proxy scenario. Since we
// have dealt with proxy case already, just use catch-all group.
group = catch_all;
group_idx = catch_all;
} else {
auto &router = get_config()->router;
if (!req.authority.empty()) {
group =
group_idx =
match_downstream_addr_group(router, StringRef{req.authority},
StringRef{req.path}, groups, catch_all);
} else {
auto h = req.fs.header(http2::HD_HOST);
if (h) {
group =
group_idx =
match_downstream_addr_group(router, StringRef{h->value},
StringRef{req.path}, groups, catch_all);
} else {
group =
group_idx =
match_downstream_addr_group(router, StringRef::from_lit(""),
StringRef{req.path}, groups, catch_all);
}
@ -694,11 +699,12 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
}
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Downstream address group: " << group;
CLOG(INFO, this) << "Downstream address group_idx: " << group_idx;
}
auto dconn_pool = worker_->get_dconn_pool();
auto dconn = dconn_pool->pop_downstream_connection(group);
auto &group = worker_->get_downstream_addr_groups()[group_idx];
auto &dconn_pool = group.dconn_pool;
auto dconn = dconn_pool.pop_downstream_connection();
if (!dconn) {
if (LOG_ENABLED(INFO)) {
@ -706,21 +712,18 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
<< " Create new one";
}
auto dconn_pool = worker_->get_dconn_pool();
if (downstreamconf.proto == PROTO_HTTP2) {
auto &addr_group = worker_->get_downstream_addr_groups()[group];
if (addr_group.http2_freelist.empty()) {
if (group.http2_freelist.empty()) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this)
<< "http2_freelist is empty; create new Http2Session";
}
auto session = make_unique<Http2Session>(
conn_.loop, worker_->get_cl_ssl_ctx(), worker_, group);
addr_group.http2_freelist.append(session.release());
conn_.loop, worker_->get_cl_ssl_ctx(), worker_, &group);
group.http2_freelist.append(session.release());
}
auto http2session = addr_group.http2_freelist.head;
auto http2session = group.http2_freelist.head;
// TODO max_concurrent_streams option must be independent from
// frontend and backend.
@ -730,13 +733,13 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
<< http2session
<< "). Remove Http2Session from http2_freelist";
}
addr_group.http2_freelist.remove(http2session);
group.http2_freelist.remove(http2session);
}
dconn = make_unique<Http2DownstreamConnection>(dconn_pool, http2session);
dconn = make_unique<Http2DownstreamConnection>(http2session);
} else {
dconn = make_unique<HttpDownstreamConnection>(dconn_pool, group,
conn_.loop, worker_);
dconn =
make_unique<HttpDownstreamConnection>(&group, conn_.loop, worker_);
}
dconn->set_client_handler(this);
return dconn;

View File

@ -575,7 +575,7 @@ namespace {
// config. We will store each host-path pattern found in |src| with
// |addr|. |addr| will be copied accordingly. Also we make a group
// based on the pattern. The "/" pattern is considered as catch-all.
void parse_mapping(const DownstreamAddr &addr, const char *src) {
void parse_mapping(const DownstreamAddrConfig &addr, const char *src) {
// This returns at least 1 element (it could be empty string). We
// will append '/' to all patterns, so it becomes catch-all pattern.
auto mapping = util::split_config_str_list(src, ':');
@ -606,7 +606,7 @@ void parse_mapping(const DownstreamAddr &addr, const char *src) {
if (done) {
continue;
}
DownstreamAddrGroup g(StringRef{pattern});
DownstreamAddrGroupConfig g(StringRef{pattern});
g.addrs.push_back(addr);
mod_config()->router.add_route(StringRef{g.pattern}, addr_groups.size());
@ -1482,7 +1482,7 @@ int parse_config(const char *opt, const char *optarg,
if (!pat_delim) {
pat_delim = optarg + optarglen;
}
DownstreamAddr addr{};
DownstreamAddrConfig addr{};
if (util::istarts_with(optarg, SHRPX_UNIX_PATH_PREFIX)) {
auto path = optarg + str_size(SHRPX_UNIX_PATH_PREFIX);
addr.host = ImmutableString(path, pat_delim);
@ -2493,93 +2493,4 @@ int int_syslog_facility(const char *strfacility) {
return -1;
}
namespace {
size_t match_downstream_addr_group_host(
const Router &router, const StringRef &host, const StringRef &path,
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all) {
if (path.empty() || path[0] != '/') {
auto group = router.match(host, StringRef::from_lit("/"));
if (group != -1) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found pattern with query " << host
<< ", matched pattern=" << groups[group].pattern;
}
return group;
}
return catch_all;
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Perform mapping selection, using host=" << host
<< ", path=" << path;
}
auto group = router.match(host, path);
if (group != -1) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found pattern with query " << host << path
<< ", matched pattern=" << groups[group].pattern;
}
return group;
}
group = router.match("", path);
if (group != -1) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found pattern with query " << path
<< ", matched pattern=" << groups[group].pattern;
}
return group;
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "None match. Use catch-all pattern";
}
return catch_all;
}
} // namespace
size_t match_downstream_addr_group(
const Router &router, const StringRef &hostport, const StringRef &raw_path,
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all) {
if (std::find(std::begin(hostport), std::end(hostport), '/') !=
std::end(hostport)) {
// We use '/' specially, and if '/' is included in host, it breaks
// our code. Select catch-all case.
return catch_all;
}
auto fragment = std::find(std::begin(raw_path), std::end(raw_path), '#');
auto query = std::find(std::begin(raw_path), fragment, '?');
auto path = StringRef{std::begin(raw_path), query};
if (hostport.empty()) {
return match_downstream_addr_group_host(router, hostport, path, groups,
catch_all);
}
std::string host;
if (hostport[0] == '[') {
// assume this is IPv6 numeric address
auto p = std::find(std::begin(hostport), std::end(hostport), ']');
if (p == std::end(hostport)) {
return catch_all;
}
if (p + 1 < std::end(hostport) && *(p + 1) != ':') {
return catch_all;
}
host.assign(std::begin(hostport), p + 1);
} else {
auto p = std::find(std::begin(hostport), std::end(hostport), ':');
if (p == std::begin(hostport)) {
return catch_all;
}
host.assign(std::begin(hostport), p);
}
util::inp_strlower(host);
return match_downstream_addr_group_host(router, StringRef{host}, path, groups,
catch_all);
}
} // namespace shrpx

View File

@ -284,37 +284,24 @@ struct TLSSessionCache {
ev_tstamp last_updated;
};
struct DownstreamAddr {
struct DownstreamAddrConfig {
Address addr;
// backend address. If |host_unix| is true, this is UNIX domain
// socket path.
ImmutableString host;
ImmutableString hostport;
ConnectBlocker *connect_blocker;
// Client side TLS session cache
TLSSessionCache tls_session_cache;
// backend port. 0 if |host_unix| is true.
uint16_t port;
// true if |host| contains UNIX domain socket path.
bool host_unix;
};
struct DownstreamAddrGroup {
DownstreamAddrGroup(const StringRef &pattern)
struct DownstreamAddrGroupConfig {
DownstreamAddrGroupConfig(const StringRef &pattern)
: pattern(pattern.c_str(), pattern.size()) {}
ImmutableString pattern;
std::vector<DownstreamAddr> addrs;
// List of Http2Session which is not fully utilized (i.e., the
// server advertized maximum concurrency is not reached). We will
// coalesce as much stream as possible in one Http2Session to fully
// utilize TCP connection.
//
// TODO Verify that this approach performs better in performance
// wise.
DList<Http2Session> http2_freelist;
// Next downstream address index in addrs.
size_t next;
std::vector<DownstreamAddrConfig> addrs;
};
struct TicketKey {
@ -563,7 +550,7 @@ struct ConnectionConfig {
ev_tstamp write;
ev_tstamp idle_read;
} timeout;
std::vector<DownstreamAddrGroup> addr_groups;
std::vector<DownstreamAddrGroupConfig> addr_groups;
// The index of catch-all group in downstream_addr_groups.
size_t addr_group_catch_all;
size_t connections_per_host;
@ -657,16 +644,6 @@ std::unique_ptr<TicketKeys>
read_tls_ticket_key_file(const std::vector<std::string> &files,
const EVP_CIPHER *cipher, const EVP_MD *hmac);
// Selects group based on request's |hostport| and |path|. |hostport|
// is the value taken from :authority or host header field, and may
// contain port. The |path| may contain query part. We require the
// catch-all pattern in place, so this function always selects one
// group. The catch-all group index is given in |catch_all|. All
// patterns are given in |groups|.
size_t match_downstream_addr_group(
const Router &router, const StringRef &hostport, const StringRef &path,
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all);
} // namespace shrpx
#endif // SHRPX_CONFIG_H

View File

@ -26,12 +26,11 @@
#include "shrpx_client_handler.h"
#include "shrpx_downstream.h"
#include "shrpx_downstream_connection_pool.h"
namespace shrpx {
DownstreamConnection::DownstreamConnection(DownstreamConnectionPool *dconn_pool)
: dconn_pool_(dconn_pool), client_handler_(nullptr), downstream_(nullptr) {}
DownstreamConnection::DownstreamConnection()
: client_handler_(nullptr), downstream_(nullptr) {}
DownstreamConnection::~DownstreamConnection() {}
@ -45,8 +44,4 @@ ClientHandler *DownstreamConnection::get_client_handler() {
Downstream *DownstreamConnection::get_downstream() { return downstream_; }
DownstreamConnectionPool *DownstreamConnection::get_dconn_pool() const {
return dconn_pool_;
}
} // namespace shrpx

View File

@ -34,11 +34,11 @@ namespace shrpx {
class ClientHandler;
class Upstream;
class Downstream;
class DownstreamConnectionPool;
struct DownstreamAddrGroup;
class DownstreamConnection {
public:
DownstreamConnection(DownstreamConnectionPool *dconn_pool);
DownstreamConnection();
virtual ~DownstreamConnection();
virtual int attach_downstream(Downstream *downstream) = 0;
virtual void detach_downstream(Downstream *downstream) = 0;
@ -56,18 +56,17 @@ public:
virtual int on_timeout() { return 0; }
virtual void on_upstream_change(Upstream *uptream) = 0;
virtual size_t get_group() const = 0;
// true if this object is poolable.
virtual bool poolable() const = 0;
virtual DownstreamAddrGroup *get_downstream_addr_group() const = 0;
void set_client_handler(ClientHandler *client_handler);
ClientHandler *get_client_handler();
Downstream *get_downstream();
DownstreamConnectionPool *get_dconn_pool() const;
protected:
DownstreamConnectionPool *dconn_pool_;
ClientHandler *client_handler_;
Downstream *downstream_;
};

View File

@ -27,42 +27,35 @@
namespace shrpx {
DownstreamConnectionPool::DownstreamConnectionPool(size_t num_groups)
: gpool_(num_groups) {}
DownstreamConnectionPool::DownstreamConnectionPool() {}
DownstreamConnectionPool::~DownstreamConnectionPool() {
for (auto &pool : gpool_) {
for (auto dconn : pool) {
delete dconn;
}
for (auto dconn : pool_) {
delete dconn;
}
}
void DownstreamConnectionPool::add_downstream_connection(
std::unique_ptr<DownstreamConnection> dconn) {
auto group = dconn->get_group();
assert(gpool_.size() > group);
gpool_[group].insert(dconn.release());
pool_.insert(dconn.release());
}
std::unique_ptr<DownstreamConnection>
DownstreamConnectionPool::pop_downstream_connection(size_t group) {
assert(gpool_.size() > group);
auto &pool = gpool_[group];
if (pool.empty()) {
DownstreamConnectionPool::pop_downstream_connection() {
if (pool_.empty()) {
return nullptr;
}
auto dconn = std::unique_ptr<DownstreamConnection>(*std::begin(pool));
pool.erase(std::begin(pool));
auto it = std::begin(pool_);
auto dconn = std::unique_ptr<DownstreamConnection>(*it);
pool_.erase(it);
return dconn;
}
void DownstreamConnectionPool::remove_downstream_connection(
DownstreamConnection *dconn) {
auto group = dconn->get_group();
assert(gpool_.size() > group);
gpool_[group].erase(dconn);
pool_.erase(dconn);
delete dconn;
}

View File

@ -36,15 +36,15 @@ class DownstreamConnection;
class DownstreamConnectionPool {
public:
DownstreamConnectionPool(size_t num_groups);
DownstreamConnectionPool();
~DownstreamConnectionPool();
void add_downstream_connection(std::unique_ptr<DownstreamConnection> dconn);
std::unique_ptr<DownstreamConnection> pop_downstream_connection(size_t group);
std::unique_ptr<DownstreamConnection> pop_downstream_connection();
void remove_downstream_connection(DownstreamConnection *dconn);
private:
std::vector<std::set<DownstreamConnection *>> gpool_;
std::set<DownstreamConnection *> pool_;
};
} // namespace shrpx

View File

@ -37,6 +37,7 @@
#include "shrpx_error.h"
#include "shrpx_http.h"
#include "shrpx_http2_session.h"
#include "shrpx_worker.h"
#include "http2.h"
#include "util.h"
@ -44,10 +45,8 @@ using namespace nghttp2;
namespace shrpx {
Http2DownstreamConnection::Http2DownstreamConnection(
DownstreamConnectionPool *dconn_pool, Http2Session *http2session)
: DownstreamConnection(dconn_pool),
dlnext(nullptr),
Http2DownstreamConnection::Http2DownstreamConnection(Http2Session *http2session)
: dlnext(nullptr),
dlprev(nullptr),
http2session_(http2session),
sd_(nullptr) {}
@ -557,10 +556,9 @@ int Http2DownstreamConnection::on_timeout() {
return submit_rst_stream(downstream_, NGHTTP2_NO_ERROR);
}
size_t Http2DownstreamConnection::get_group() const {
// HTTP/2 backend connections are managed by Http2Session object,
// and it stores group index.
return http2session_->get_group();
DownstreamAddrGroup *
Http2DownstreamConnection::get_downstream_addr_group() const {
return http2session_->get_downstream_addr_group();
}
} // namespace shrpx

View File

@ -41,8 +41,7 @@ class DownstreamConnectionPool;
class Http2DownstreamConnection : public DownstreamConnection {
public:
Http2DownstreamConnection(DownstreamConnectionPool *dconn_pool,
Http2Session *http2session);
Http2DownstreamConnection(Http2Session *http2session);
virtual ~Http2DownstreamConnection();
virtual int attach_downstream(Downstream *downstream);
virtual void detach_downstream(Downstream *downstream);
@ -60,12 +59,13 @@ public:
virtual int on_timeout();
virtual void on_upstream_change(Upstream *upstream) {}
virtual size_t get_group() const;
// This object is not poolable because we dont' have facility to
// migrate to another Http2Session object.
virtual bool poolable() const { return false; }
virtual DownstreamAddrGroup *get_downstream_addr_group() const;
int send();
void attach_stream_data(StreamData *sd);

View File

@ -166,7 +166,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
} // namespace
Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
Worker *worker, size_t group)
Worker *worker, DownstreamAddrGroup *group)
: dlnext(nullptr),
dlprev(nullptr),
conn_(loop, -1, nullptr, worker->get_mcpool(),
@ -177,9 +177,9 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
wb_(worker->get_mcpool()),
worker_(worker),
ssl_ctx_(ssl_ctx),
group_(group),
addr_(nullptr),
session_(nullptr),
group_(group),
state_(DISCONNECTED),
connection_check_state_(CONNECTION_CHECK_NONE),
flow_control_(false) {
@ -208,8 +208,7 @@ Http2Session::~Http2Session() {
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Removed from http2_freelist";
}
auto &addr_group = worker_->get_downstream_addr_groups()[group_];
addr_group.http2_freelist.remove(this);
group_->http2_freelist.remove(this);
}
}
@ -281,8 +280,7 @@ int Http2Session::disconnect(bool hard) {
int Http2Session::initiate_connection() {
int rv = 0;
auto &addr_group = worker_->get_downstream_addr_groups()[group_];
auto &addrs = addr_group.addrs;
auto &addrs = group_->addrs;
auto worker_blocker = worker_->get_connect_blocker();
if (state_ == DISCONNECTED) {
@ -294,7 +292,7 @@ int Http2Session::initiate_connection() {
return -1;
}
auto &next_downstream = addr_group.next;
auto &next_downstream = group_->next;
auto end = next_downstream;
for (;;) {
@ -636,8 +634,7 @@ void Http2Session::remove_downstream_connection(
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Append to Http2Session freelist";
}
auto &addr_group = worker_->get_downstream_addr_groups()[group_];
addr_group.http2_freelist.append(this);
group_->http2_freelist.append(this);
}
}
@ -1929,9 +1926,7 @@ bool Http2Session::should_hard_fail() const {
}
}
const DownstreamAddr *Http2Session::get_addr() const { return addr_; }
size_t Http2Session::get_group() const { return group_; }
DownstreamAddr *Http2Session::get_addr() const { return addr_; }
int Http2Session::handle_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id) {
@ -1950,10 +1945,8 @@ int Http2Session::handle_downstream_push_promise(Downstream *downstream,
// promised_downstream->get_stream() still returns 0.
auto handler = upstream->get_client_handler();
auto worker = handler->get_worker();
auto promised_dconn =
make_unique<Http2DownstreamConnection>(worker->get_dconn_pool(), this);
auto promised_dconn = make_unique<Http2DownstreamConnection>(this);
promised_dconn->set_client_handler(handler);
auto ptr = promised_dconn.get();
@ -2028,11 +2021,9 @@ int Http2Session::handle_downstream_push_promise_complete(
size_t Http2Session::get_num_dconns() const { return dconns_.size(); }
bool Http2Session::in_freelist() const {
auto &addr_group = worker_->get_downstream_addr_groups()[group_];
return dlnext != nullptr || dlprev != nullptr ||
addr_group.http2_freelist.head == this ||
addr_group.http2_freelist.tail == this;
group_->http2_freelist.head == this ||
group_->http2_freelist.tail == this;
}
bool Http2Session::max_concurrency_reached(size_t extra) const {
@ -2045,4 +2036,8 @@ bool Http2Session::max_concurrency_reached(size_t extra) const {
session_, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
}
DownstreamAddrGroup *Http2Session::get_downstream_addr_group() const {
return group_;
}
} // namespace shrpx

View File

@ -48,6 +48,8 @@ namespace shrpx {
class Http2DownstreamConnection;
class Worker;
struct DownstreamAddrGroup;
struct DownstreamAddr;
struct StreamData {
StreamData *dlnext, *dlprev;
@ -57,7 +59,7 @@ struct StreamData {
class Http2Session {
public:
Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
size_t group);
DownstreamAddrGroup *group);
~Http2Session();
// If hard is true, all pending requests are abandoned and
@ -145,9 +147,9 @@ public:
void submit_pending_requests();
const DownstreamAddr *get_addr() const;
DownstreamAddr *get_addr() const;
size_t get_group() const;
DownstreamAddrGroup *get_downstream_addr_group() const;
int handle_downstream_push_promise(Downstream *downstream,
int32_t promised_stream_id);
@ -219,10 +221,10 @@ private:
Worker *worker_;
// NULL if no TLS is configured
SSL_CTX *ssl_ctx_;
DownstreamAddrGroup *group_;
// Address of remote endpoint
const DownstreamAddr *addr_;
DownstreamAddr *addr_;
nghttp2_session *session_;
size_t group_;
int state_;
int connection_check_state_;
bool flow_control_;

View File

@ -111,11 +111,10 @@ void connectcb(struct ev_loop *loop, ev_io *w, int revents) {
}
} // namespace
HttpDownstreamConnection::HttpDownstreamConnection(
DownstreamConnectionPool *dconn_pool, size_t group, struct ev_loop *loop,
Worker *worker)
: DownstreamConnection(dconn_pool),
conn_(loop, -1, nullptr, worker->get_mcpool(),
HttpDownstreamConnection::HttpDownstreamConnection(DownstreamAddrGroup *group,
struct ev_loop *loop,
Worker *worker)
: conn_(loop, -1, nullptr, worker->get_mcpool(),
get_config()->conn.downstream.timeout.write,
get_config()->conn.downstream.timeout.read, {}, {}, connectcb,
readcb, timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
@ -124,10 +123,10 @@ HttpDownstreamConnection::HttpDownstreamConnection(
do_write_(&HttpDownstreamConnection::noop),
worker_(worker),
ssl_ctx_(worker->get_cl_ssl_ctx()),
group_(group),
addr_(nullptr),
ioctrl_(&conn_.rlimit),
response_htp_{0},
group_(group) {}
response_htp_{0} {}
HttpDownstreamConnection::~HttpDownstreamConnection() {}
@ -157,9 +156,8 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
conn_.set_ssl(ssl);
}
auto &groups = worker_->get_downstream_addr_groups();
auto &addrs = groups[group_].addrs;
auto &next_downstream = groups[group_].next;
auto &addrs = group_->addrs;
auto &next_downstream = group_->next;
auto end = next_downstream;
for (;;) {
auto &addr = addrs[next_downstream];
@ -508,8 +506,8 @@ void idle_readcb(struct ev_loop *loop, ev_io *w, int revents) {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, dconn) << "Idle connection EOF";
}
auto dconn_pool = dconn->get_dconn_pool();
dconn_pool->remove_downstream_connection(dconn);
auto &dconn_pool = dconn->get_downstream_addr_group()->dconn_pool;
dconn_pool.remove_downstream_connection(dconn);
// dconn was deleted
}
} // namespace
@ -521,8 +519,8 @@ void idle_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, dconn) << "Idle connection timeout";
}
auto dconn_pool = dconn->get_dconn_pool();
dconn_pool->remove_downstream_connection(dconn);
auto &dconn_pool = dconn->get_downstream_addr_group()->dconn_pool;
dconn_pool.remove_downstream_connection(dconn);
// dconn was deleted
}
} // namespace
@ -1033,7 +1031,7 @@ int HttpDownstreamConnection::process_input(const uint8_t *data,
}
int HttpDownstreamConnection::connected() {
auto connect_blocker = addr_->connect_blocker;
auto &connect_blocker = addr_->connect_blocker;
if (!util::check_socket_connected(conn_.fd)) {
conn_.wlimit.stopw();
@ -1083,8 +1081,11 @@ void HttpDownstreamConnection::signal_write() {
ev_feed_event(conn_.loop, &conn_.wev, EV_WRITE);
}
size_t HttpDownstreamConnection::get_group() const { return group_; }
int HttpDownstreamConnection::noop() { return 0; }
DownstreamAddrGroup *
HttpDownstreamConnection::get_downstream_addr_group() const {
return group_;
}
} // namespace shrpx

View File

@ -37,11 +37,13 @@ namespace shrpx {
class DownstreamConnectionPool;
class Worker;
struct DownstreamAddrGroup;
struct DownstreamAddr;
class HttpDownstreamConnection : public DownstreamConnection {
public:
HttpDownstreamConnection(DownstreamConnectionPool *dconn_pool, size_t group,
struct ev_loop *loop, Worker *worker);
HttpDownstreamConnection(DownstreamAddrGroup *group, struct ev_loop *loop,
Worker *worker);
virtual ~HttpDownstreamConnection();
virtual int attach_downstream(Downstream *downstream);
virtual void detach_downstream(Downstream *downstream);
@ -58,10 +60,11 @@ public:
virtual int on_write();
virtual void on_upstream_change(Upstream *upstream);
virtual size_t get_group() const;
virtual bool poolable() const { return true; }
virtual DownstreamAddrGroup *get_downstream_addr_group() const;
int read_clear();
int write_clear();
int read_tls();
@ -81,11 +84,11 @@ private:
Worker *worker_;
// nullptr if TLS is not used.
SSL_CTX *ssl_ctx_;
DownstreamAddrGroup *group_;
// Address of remote endpoint
DownstreamAddr *addr_;
IOControl ioctrl_;
http_parser response_htp_;
size_t group_;
};
} // namespace shrpx

View File

@ -71,14 +71,13 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
ssl::CertLookupTree *cert_tree,
const std::shared_ptr<TicketKeys> &ticket_keys)
: randgen_(rd()),
dconn_pool_(get_config()->conn.downstream.addr_groups.size()),
worker_stat_(get_config()->conn.downstream.addr_groups.size()),
worker_stat_{},
loop_(loop),
sv_ssl_ctx_(sv_ssl_ctx),
cl_ssl_ctx_(cl_ssl_ctx),
cert_tree_(cert_tree),
ticket_keys_(ticket_keys),
downstream_addr_groups_(get_config()->conn.downstream.addr_groups),
downstream_addr_groups_(get_config()->conn.downstream.addr_groups.size()),
connect_blocker_(make_unique<ConnectBlocker>(randgen_, loop_)),
graceful_shutdown_(false) {
ev_async_init(&w_, eventcb);
@ -97,9 +96,26 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
StringRef{session_cacheconf.memcached.host}, &mcpool_);
}
for (auto &group : downstream_addr_groups_) {
for (auto &addr : group.addrs) {
addr.connect_blocker = new ConnectBlocker(randgen_, loop_);
auto &downstreamconf = get_config()->conn.downstream;
for (size_t i = 0; i < downstreamconf.addr_groups.size(); ++i) {
auto &src = downstreamconf.addr_groups[i];
auto &dst = downstream_addr_groups_[i];
dst.pattern = src.pattern;
dst.addrs.resize(src.addrs.size());
for (size_t j = 0; j < src.addrs.size(); ++j) {
auto &src_addr = src.addrs[j];
auto &dst_addr = dst.addrs[j];
dst_addr.addr = src_addr.addr;
dst_addr.host = src_addr.host;
dst_addr.hostport = src_addr.hostport;
dst_addr.port = src_addr.port;
dst_addr.host_unix = src_addr.host_unix;
dst_addr.connect_blocker = make_unique<ConnectBlocker>(randgen_, loop_);
}
}
}
@ -107,12 +123,6 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
Worker::~Worker() {
ev_async_stop(loop_, &w_);
ev_timer_stop(loop_, &mcpool_clear_timer_);
for (auto &group : downstream_addr_groups_) {
for (auto &addr : group.addrs) {
delete addr.connect_blocker;
}
}
}
void Worker::schedule_clear_mcpool() {
@ -234,8 +244,6 @@ void Worker::set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys) {
WorkerStat *Worker::get_worker_stat() { return &worker_stat_; }
DownstreamConnectionPool *Worker::get_dconn_pool() { return &dconn_pool_; }
struct ev_loop *Worker::get_loop() const {
return loop_;
}
@ -279,4 +287,93 @@ ConnectBlocker *Worker::get_connect_blocker() const {
return connect_blocker_.get();
}
namespace {
size_t match_downstream_addr_group_host(
const Router &router, const StringRef &host, const StringRef &path,
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all) {
if (path.empty() || path[0] != '/') {
auto group = router.match(host, StringRef::from_lit("/"));
if (group != -1) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found pattern with query " << host
<< ", matched pattern=" << groups[group].pattern;
}
return group;
}
return catch_all;
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Perform mapping selection, using host=" << host
<< ", path=" << path;
}
auto group = router.match(host, path);
if (group != -1) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found pattern with query " << host << path
<< ", matched pattern=" << groups[group].pattern;
}
return group;
}
group = router.match("", path);
if (group != -1) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found pattern with query " << path
<< ", matched pattern=" << groups[group].pattern;
}
return group;
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "None match. Use catch-all pattern";
}
return catch_all;
}
} // namespace
size_t match_downstream_addr_group(
const Router &router, const StringRef &hostport, const StringRef &raw_path,
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all) {
if (std::find(std::begin(hostport), std::end(hostport), '/') !=
std::end(hostport)) {
// We use '/' specially, and if '/' is included in host, it breaks
// our code. Select catch-all case.
return catch_all;
}
auto fragment = std::find(std::begin(raw_path), std::end(raw_path), '#');
auto query = std::find(std::begin(raw_path), fragment, '?');
auto path = StringRef{std::begin(raw_path), query};
if (hostport.empty()) {
return match_downstream_addr_group_host(router, hostport, path, groups,
catch_all);
}
std::string host;
if (hostport[0] == '[') {
// assume this is IPv6 numeric address
auto p = std::find(std::begin(hostport), std::end(hostport), ']');
if (p == std::end(hostport)) {
return catch_all;
}
if (p + 1 < std::end(hostport) && *(p + 1) != ':') {
return catch_all;
}
host.assign(std::begin(hostport), p + 1);
} else {
auto p = std::find(std::begin(hostport), std::end(hostport), ':');
if (p == std::begin(hostport)) {
return catch_all;
}
host.assign(std::begin(hostport), p);
}
util::inp_strlower(host);
return match_downstream_addr_group_host(router, StringRef{host}, path, groups,
catch_all);
}
} // namespace shrpx

View File

@ -67,9 +67,39 @@ namespace ssl {
class CertLookupTree;
} // namespace ssl
struct WorkerStat {
WorkerStat(size_t num_groups) : num_connections(0) {}
struct DownstreamAddr {
Address addr;
// backend address. If |host_unix| is true, this is UNIX domain
// socket path.
ImmutableString host;
ImmutableString hostport;
// backend port. 0 if |host_unix| is true.
uint16_t port;
// true if |host| contains UNIX domain socket path.
bool host_unix;
std::unique_ptr<ConnectBlocker> connect_blocker;
// Client side TLS session cache
TLSSessionCache tls_session_cache;
};
struct DownstreamAddrGroup {
ImmutableString pattern;
std::vector<DownstreamAddr> addrs;
// List of Http2Session which is not fully utilized (i.e., the
// server advertized maximum concurrency is not reached). We will
// coalesce as much stream as possible in one Http2Session to fully
// utilize TCP connection.
//
// TODO Verify that this approach performs better in performance
// wise.
DList<Http2Session> http2_freelist;
DownstreamConnectionPool dconn_pool;
// Next downstream address index in addrs.
size_t next;
};
struct WorkerStat {
size_t num_connections;
};
@ -110,7 +140,6 @@ public:
void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys);
WorkerStat *get_worker_stat();
DownstreamConnectionPool *get_dconn_pool();
struct ev_loop *get_loop() const;
SSL_CTX *get_sv_ssl_ctx() const;
SSL_CTX *get_cl_ssl_ctx() const;
@ -145,7 +174,6 @@ private:
ev_async w_;
ev_timer mcpool_clear_timer_;
MemchunkPool mcpool_;
DownstreamConnectionPool dconn_pool_;
WorkerStat worker_stat_;
std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_;
@ -169,6 +197,16 @@ private:
bool graceful_shutdown_;
};
// Selects group based on request's |hostport| and |path|. |hostport|
// is the value taken from :authority or host header field, and may
// contain port. The |path| may contain query part. We require the
// catch-all pattern in place, so this function always selects one
// group. The catch-all group index is given in |catch_all|. All
// patterns are given in |groups|.
size_t match_downstream_addr_group(
const Router &router, const StringRef &hostport, const StringRef &path,
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all);
} // namespace shrpx
#endif // SHRPX_WORKER_H

View File

@ -101,11 +101,8 @@ template <typename T, typename F> bool test_flags(T t, F flags) {
template <typename T> struct DList {
DList() : head(nullptr), tail(nullptr), n(0) {}
// We should delete these copy ctor and assignment operator. We
// need to them where copy is required before we add item to it. If
// you doubt, make them delete and try to compile.
DList(const DList &) = default;
DList &operator=(const DList &) = default;
DList(const DList &) = delete;
DList &operator=(const DList &) = delete;
DList(DList &&other) : head(other.head), tail(other.tail), n(other.n) {
other.head = other.tail = nullptr;