nghttpx: Use std::shared_ptr for downstream addresses so that we can swap them

This commit is contained in:
Tatsuhiro Tsujikawa 2016-06-03 01:20:49 +09:00
parent 2fd095d036
commit fe58614b23
11 changed files with 93 additions and 56 deletions

View File

@ -1036,6 +1036,7 @@ void fill_default_config() {
mod_config()->num_worker = 1;
mod_config()->conf_path = "/etc/nghttpx/nghttpx.conf";
mod_config()->pid = getpid();
mod_config()->downstream_router = std::make_shared<DownstreamRouter>();
if (ev_supported_backends() & ~ev_recommended_backends() & EVBACKEND_KQUEUE) {
mod_config()->ev_loop_flags = ev_recommended_backends() | EVBACKEND_KQUEUE;

View File

@ -690,8 +690,9 @@ bool load_lighter(const DownstreamAddr *lhs, const DownstreamAddr *rhs) {
}
} // namespace
Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) {
auto &shared_addr = group.shared_addr;
Http2Session *ClientHandler::select_http2_session(
const std::shared_ptr<DownstreamAddrGroup> &group) {
auto &shared_addr = group->shared_addr;
// First count the working backend addresses.
size_t min = 0;
@ -779,7 +780,7 @@ Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) {
}
auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(),
worker_, &group, selected_addr);
worker_, group, selected_addr);
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Create new Http2Session " << session;
@ -815,9 +816,9 @@ uint32_t next_cycle(const WeightedPri &pri) {
std::unique_ptr<DownstreamConnection>
ClientHandler::get_downstream_connection(Downstream *downstream) {
size_t group_idx;
auto &downstreamconf = get_config()->conn.downstream;
auto catch_all = downstreamconf.addr_group_catch_all;
auto catch_all = worker_->get_addr_group_catch_all();
auto &groups = worker_->get_downstream_addr_groups();
auto downstream_router = worker_->get_downstream_router();
const auto &req = downstream->request();
@ -835,8 +836,8 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
// have dealt with proxy case already, just use catch-all group.
group_idx = catch_all;
} else {
auto &router = get_config()->router;
auto &wildcard_patterns = get_config()->wildcard_patterns;
auto &router = downstream_router->router;
auto &wildcard_patterns = downstream_router->wildcard_patterns;
if (!req.authority.empty()) {
group_idx =
match_downstream_addr_group(router, wildcard_patterns, req.authority,
@ -859,7 +860,7 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
}
auto &group = worker_->get_downstream_addr_groups()[group_idx];
auto &shared_addr = group.shared_addr;
auto &shared_addr = group->shared_addr;
auto proto = PROTO_NONE;
@ -925,7 +926,7 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
<< " Create new one";
}
dconn = make_unique<HttpDownstreamConnection>(&group, conn_.loop, worker_);
dconn = make_unique<HttpDownstreamConnection>(group, conn_.loop, worker_);
}
dconn->set_client_handler(this);

View File

@ -142,7 +142,8 @@ public:
// header field.
StringRef get_forwarded_for() const;
Http2Session *select_http2_session(DownstreamAddrGroup &group);
Http2Session *
select_http2_session(const std::shared_ptr<DownstreamAddrGroup> &group);
const UpstreamAddr *get_upstream_addr() const;

View File

@ -788,6 +788,8 @@ int parse_mapping(Config *config, DownstreamAddrConfig addr,
DownstreamAddrGroupConfig g(StringRef{pattern});
g.addrs.push_back(addr);
auto &downstream_router = config->downstream_router;
if (pattern[0] == '*') {
// wildcard pattern
auto path_first =
@ -796,23 +798,24 @@ int parse_mapping(Config *config, DownstreamAddrConfig addr,
auto host = StringRef{std::begin(g.pattern) + 1, path_first};
auto path = StringRef{path_first, std::end(g.pattern)};
auto &wildcard_patterns = config->wildcard_patterns;
auto &wildcard_patterns = downstream_router->wildcard_patterns;
auto it = std::find_if(
std::begin(wildcard_patterns), std::end(wildcard_patterns),
[&host](const WildcardPattern &wp) { return wp.host == host; });
if (it == std::end(wildcard_patterns)) {
config->wildcard_patterns.push_back(
wildcard_patterns.push_back(
{ImmutableString{std::begin(host), std::end(host)}});
auto &router = config->wildcard_patterns.back().router;
auto &router = wildcard_patterns.back().router;
router.add_route(path, addr_groups.size());
} else {
(*it).router.add_route(path, addr_groups.size());
}
} else {
config->router.add_route(StringRef{g.pattern}, addr_groups.size());
downstream_router->router.add_route(StringRef{g.pattern},
addr_groups.size());
}
addr_groups.push_back(std::move(g));
@ -2904,13 +2907,15 @@ StringRef strproto(shrpx_proto proto) {
assert(0);
}
// Configures the following member in |config|: router,
// conn.downstream.addr_groups, wildcard_patterns,
// Configures the following member in |config|:
// conn.downstream_router, conn.downstream.addr_groups,
// conn.downstream.addr_group_catch_all.
int configure_downstream_group(Config *config, bool http2_proxy,
bool numeric_addr_only,
const TLSConfig &tlsconf) {
auto &downstreamconf = config->conn.downstream;
auto &addr_groups = downstreamconf.addr_groups;
auto &downstream_router = config->downstream_router;
if (addr_groups.empty()) {
DownstreamAddrConfig addr{};
@ -2920,7 +2925,8 @@ int configure_downstream_group(Config *config, bool http2_proxy,
DownstreamAddrGroupConfig g(StringRef::from_lit("/"));
g.addrs.push_back(std::move(addr));
config->router.add_route(StringRef{g.pattern}, addr_groups.size());
downstream_router->router.add_route(StringRef{g.pattern},
addr_groups.size());
addr_groups.push_back(std::move(g));
} else if (http2_proxy) {
// We don't support host mapping in these cases. Move all
@ -2931,13 +2937,13 @@ int configure_downstream_group(Config *config, bool http2_proxy,
std::back_inserter(catch_all.addrs));
}
std::vector<DownstreamAddrGroupConfig>().swap(addr_groups);
std::vector<WildcardPattern>().swap(config->wildcard_patterns);
// maybe not necessary?
config->router = Router();
config->router.add_route(StringRef{catch_all.pattern}, addr_groups.size());
downstream_router = std::make_shared<DownstreamRouter>();
downstream_router->router.add_route(StringRef{catch_all.pattern},
addr_groups.size());
addr_groups.push_back(std::move(catch_all));
} else {
auto &wildcard_patterns = config->wildcard_patterns;
auto &wildcard_patterns = downstream_router->wildcard_patterns;
std::sort(std::begin(wildcard_patterns), std::end(wildcard_patterns),
[](const WildcardPattern &lhs, const WildcardPattern &rhs) {
return std::lexicographical_compare(
@ -2947,7 +2953,7 @@ int configure_downstream_group(Config *config, bool http2_proxy,
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Reverse sorted wildcard hosts (compared from tail to head, "
"and sorted in reverse order):";
for (auto &wp : config->wildcard_patterns) {
for (auto &wp : wildcard_patterns) {
LOG(INFO) << wp.host;
}
}

View File

@ -642,9 +642,13 @@ struct WildcardPattern {
Router router;
};
struct Config {
struct DownstreamRouter {
Router router;
std::vector<WildcardPattern> wildcard_patterns;
};
struct Config {
std::shared_ptr<DownstreamRouter> downstream_router;
HttpProxy downstream_http_proxy;
HttpConfig http;
Http2Config http2;

View File

@ -172,7 +172,8 @@ void initiate_connection_cb(struct ev_loop *loop, ev_timer *w, int revents) {
} // namespace
Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
Worker *worker, DownstreamAddrGroup *group,
Worker *worker,
const std::shared_ptr<DownstreamAddrGroup> &group,
DownstreamAddr *addr)
: dlnext(nullptr),
dlprev(nullptr),
@ -2111,7 +2112,7 @@ bool Http2Session::max_concurrency_reached(size_t extra) const {
}
DownstreamAddrGroup *Http2Session::get_downstream_addr_group() const {
return group_;
return group_.get();
}
void Http2Session::add_to_avail_freelist() {
@ -2120,8 +2121,8 @@ void Http2Session::add_to_avail_freelist() {
}
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Append to http2_avail_freelist, group=" << group_
<< ", freelist.size="
SSLOG(INFO, this) << "Append to http2_avail_freelist, group="
<< group_.get() << ", freelist.size="
<< group_->shared_addr->http2_avail_freelist.size();
}

View File

@ -73,7 +73,8 @@ enum FreelistZone {
class Http2Session {
public:
Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
DownstreamAddrGroup *group, DownstreamAddr *addr);
const std::shared_ptr<DownstreamAddrGroup> &group,
DownstreamAddr *addr);
~Http2Session();
// If hard is true, all pending requests are abandoned and
@ -250,7 +251,7 @@ private:
Worker *worker_;
// NULL if no TLS is configured
SSL_CTX *ssl_ctx_;
DownstreamAddrGroup *group_;
std::shared_ptr<DownstreamAddrGroup> group_;
// Address of remote endpoint
DownstreamAddr *addr_;
nghttp2_session *session_;

View File

@ -147,8 +147,8 @@ void connectcb(struct ev_loop *loop, ev_io *w, int revents) {
}
} // namespace
HttpDownstreamConnection::HttpDownstreamConnection(DownstreamAddrGroup *group,
struct ev_loop *loop,
HttpDownstreamConnection::HttpDownstreamConnection(
const std::shared_ptr<DownstreamAddrGroup> &group, struct ev_loop *loop,
Worker *worker)
: conn_(loop, -1, nullptr, worker->get_mcpool(),
get_config()->conn.downstream.timeout.write,
@ -1164,7 +1164,7 @@ int HttpDownstreamConnection::noop() { return 0; }
DownstreamAddrGroup *
HttpDownstreamConnection::get_downstream_addr_group() const {
return group_;
return group_.get();
}
DownstreamAddr *HttpDownstreamConnection::get_addr() const { return addr_; }

View File

@ -42,8 +42,8 @@ struct DownstreamAddr;
class HttpDownstreamConnection : public DownstreamConnection {
public:
HttpDownstreamConnection(DownstreamAddrGroup *group, struct ev_loop *loop,
Worker *worker);
HttpDownstreamConnection(const std::shared_ptr<DownstreamAddrGroup> &group,
struct ev_loop *loop, Worker *worker);
virtual ~HttpDownstreamConnection();
virtual int attach_downstream(Downstream *downstream);
virtual void detach_downstream(Downstream *downstream);
@ -88,7 +88,7 @@ private:
Worker *worker_;
// nullptr if TLS is not used.
SSL_CTX *ssl_ctx_;
DownstreamAddrGroup *group_;
const std::shared_ptr<DownstreamAddrGroup> &group_;
// Address of remote endpoint
DownstreamAddr *addr_;
IOControl ioctrl_;

View File

@ -109,6 +109,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
const std::shared_ptr<TicketKeys> &ticket_keys)
: randgen_(rd()),
worker_stat_{},
downstream_router_(get_config()->downstream_router),
loop_(loop),
sv_ssl_ctx_(sv_ssl_ctx),
cl_ssl_ctx_(cl_ssl_ctx),
@ -117,6 +118,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
downstream_addr_groups_(get_config()->conn.downstream.addr_groups.size()),
connect_blocker_(
make_unique<ConnectBlocker>(randgen_, loop_, []() {}, []() {})),
addr_group_catch_all_(get_config()->conn.downstream.addr_group_catch_all),
graceful_shutdown_(false) {
ev_async_init(&w_, eventcb);
w_.data = this;
@ -140,7 +142,8 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
auto &src = downstreamconf.addr_groups[i];
auto &dst = downstream_addr_groups_[i];
dst.pattern = src.pattern;
dst = std::make_shared<DownstreamAddrGroup>();
dst->pattern = src.pattern;
auto shared_addr = std::make_shared<SharedDownstreamAddr>();
@ -210,10 +213,10 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
// share the connection if patterns have the same set of backend
// addresses.
auto end = std::begin(downstream_addr_groups_) + i;
auto it = std::find_if(std::begin(downstream_addr_groups_), end,
[&shared_addr](const DownstreamAddrGroup &group) {
return match_shared_downstream_addr(
group.shared_addr, shared_addr);
auto it = std::find_if(
std::begin(downstream_addr_groups_), end,
[&shared_addr](const std::shared_ptr<DownstreamAddrGroup> &group) {
return match_shared_downstream_addr(group->shared_addr, shared_addr);
});
if (it == end) {
@ -225,13 +228,13 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
shared_addr->http1_pri.weight = num_http1;
shared_addr->http2_pri.weight = num_http2;
dst.shared_addr = shared_addr;
dst->shared_addr = shared_addr;
} else {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << dst.pattern << " shares the same backend group with "
<< (*it).pattern;
LOG(INFO) << dst->pattern << " shares the same backend group with "
<< (*it)->pattern;
}
dst.shared_addr = (*it).shared_addr;
dst->shared_addr = (*it)->shared_addr;
}
}
}
@ -395,7 +398,8 @@ mruby::MRubyContext *Worker::get_mruby_context() const {
}
#endif // HAVE_MRUBY
std::vector<DownstreamAddrGroup> &Worker::get_downstream_addr_groups() {
std::vector<std::shared_ptr<DownstreamAddrGroup>> &
Worker::get_downstream_addr_groups() {
return downstream_addr_groups_;
}
@ -403,17 +407,26 @@ ConnectBlocker *Worker::get_connect_blocker() const {
return connect_blocker_.get();
}
const DownstreamRouter *Worker::get_downstream_router() const {
return downstream_router_.get();
}
size_t Worker::get_addr_group_catch_all() const {
return addr_group_catch_all_;
}
namespace {
size_t match_downstream_addr_group_host(
const Router &router, const std::vector<WildcardPattern> &wildcard_patterns,
const StringRef &host, const StringRef &path,
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all) {
const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
size_t catch_all) {
if (path.empty() || path[0] != '/') {
auto group = router.match(host, StringRef::from_lit("/"));
if (group != -1) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found pattern with query " << host
<< ", matched pattern=" << groups[group].pattern;
<< ", matched pattern=" << groups[group]->pattern;
}
return group;
}
@ -429,7 +442,7 @@ size_t match_downstream_addr_group_host(
if (group != -1) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found pattern with query " << host << path
<< ", matched pattern=" << groups[group].pattern;
<< ", matched pattern=" << groups[group]->pattern;
}
return group;
}
@ -448,7 +461,7 @@ size_t match_downstream_addr_group_host(
// longest host pattern.
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found wildcard pattern with query " << host << path
<< ", matched pattern=" << groups[group].pattern;
<< ", matched pattern=" << groups[group]->pattern;
}
return group;
}
@ -458,7 +471,7 @@ size_t match_downstream_addr_group_host(
if (group != -1) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found pattern with query " << path
<< ", matched pattern=" << groups[group].pattern;
<< ", matched pattern=" << groups[group]->pattern;
}
return group;
}
@ -473,7 +486,8 @@ size_t match_downstream_addr_group_host(
size_t match_downstream_addr_group(
const Router &router, const std::vector<WildcardPattern> &wildcard_patterns,
const StringRef &hostport, const StringRef &raw_path,
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all) {
const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
size_t catch_all) {
if (std::find(std::begin(hostport), std::end(hostport), '/') !=
std::end(hostport)) {
// We use '/' specially, and if '/' is included in host, it breaks

View File

@ -206,10 +206,14 @@ public:
mruby::MRubyContext *get_mruby_context() const;
#endif // HAVE_MRUBY
std::vector<DownstreamAddrGroup> &get_downstream_addr_groups();
std::vector<std::shared_ptr<DownstreamAddrGroup>> &
get_downstream_addr_groups();
ConnectBlocker *get_connect_blocker() const;
const DownstreamRouter *get_downstream_router() const;
size_t get_addr_group_catch_all() const;
private:
#ifndef NOTHREADS
std::future<void> fut_;
@ -222,6 +226,7 @@ private:
MemchunkPool mcpool_;
WorkerStat worker_stat_;
std::shared_ptr<DownstreamRouter> downstream_router_;
std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_;
#ifdef HAVE_MRUBY
std::unique_ptr<mruby::MRubyContext> mruby_ctx_;
@ -235,11 +240,13 @@ private:
ssl::CertLookupTree *cert_tree_;
std::shared_ptr<TicketKeys> ticket_keys_;
std::vector<DownstreamAddrGroup> downstream_addr_groups_;
std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_;
// Worker level blocker for downstream connection. For example,
// this is used when file decriptor is exhausted.
std::unique_ptr<ConnectBlocker> connect_blocker_;
size_t addr_group_catch_all_;
bool graceful_shutdown_;
};
@ -252,7 +259,8 @@ private:
size_t match_downstream_addr_group(
const Router &router, const std::vector<WildcardPattern> &wildcard_patterns,
const StringRef &hostport, const StringRef &path,
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all);
const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
size_t catch_all);
void downstream_failure(DownstreamAddr *addr);