nghttpx: Share downstream config object
This is the unit of sharing configurations to change
This commit is contained in:
parent
fe58614b23
commit
845aa7a710
18
src/shrpx.cc
18
src/shrpx.cc
|
@ -1036,7 +1036,6 @@ void fill_default_config() {
|
|||
mod_config()->num_worker = 1;
|
||||
mod_config()->conf_path = "/etc/nghttpx/nghttpx.conf";
|
||||
mod_config()->pid = getpid();
|
||||
mod_config()->downstream_router = std::make_shared<DownstreamRouter>();
|
||||
|
||||
if (ev_supported_backends() & ~ev_recommended_backends() & EVBACKEND_KQUEUE) {
|
||||
mod_config()->ev_loop_flags = ev_recommended_backends() | EVBACKEND_KQUEUE;
|
||||
|
@ -1168,7 +1167,8 @@ void fill_default_config() {
|
|||
}
|
||||
|
||||
{
|
||||
auto &downstreamconf = connconf.downstream;
|
||||
connconf.downstream = std::make_shared<DownstreamConfig>();
|
||||
auto &downstreamconf = *connconf.downstream;
|
||||
{
|
||||
auto &timeoutconf = downstreamconf.timeout;
|
||||
// Read/Write timeouts for downstream connection
|
||||
|
@ -1424,7 +1424,7 @@ Performance:
|
|||
HTTP/2). To limit the number of connections per
|
||||
frontend for default mode, use
|
||||
--backend-connections-per-frontend.
|
||||
Default: )" << get_config()->conn.downstream.connections_per_host
|
||||
Default: )" << get_config()->conn.downstream->connections_per_host
|
||||
<< R"(
|
||||
--backend-connections-per-frontend=<N>
|
||||
Set maximum number of backend concurrent connections
|
||||
|
@ -1434,7 +1434,7 @@ Performance:
|
|||
with --http2-proxy option, use
|
||||
--backend-connections-per-host.
|
||||
Default: )"
|
||||
<< get_config()->conn.downstream.connections_per_frontend << R"(
|
||||
<< get_config()->conn.downstream->connections_per_frontend << R"(
|
||||
--rlimit-nofile=<N>
|
||||
Set maximum number of open files (RLIMIT_NOFILE) to <N>.
|
||||
If 0 is given, nghttpx does not set the limit.
|
||||
|
@ -1442,12 +1442,12 @@ Performance:
|
|||
--backend-request-buffer=<SIZE>
|
||||
Set buffer size used to store backend request.
|
||||
Default: )"
|
||||
<< util::utos_unit(get_config()->conn.downstream.request_buffer_size)
|
||||
<< util::utos_unit(get_config()->conn.downstream->request_buffer_size)
|
||||
<< R"(
|
||||
--backend-response-buffer=<SIZE>
|
||||
Set buffer size used to store backend response.
|
||||
Default: )"
|
||||
<< util::utos_unit(get_config()->conn.downstream.response_buffer_size)
|
||||
<< util::utos_unit(get_config()->conn.downstream->response_buffer_size)
|
||||
<< R"(
|
||||
--fastopen=<N>
|
||||
Enables "TCP Fast Open" for the listening socket and
|
||||
|
@ -1487,15 +1487,15 @@ Timeout:
|
|||
--backend-read-timeout=<DURATION>
|
||||
Specify read timeout for backend connection.
|
||||
Default: )"
|
||||
<< util::duration_str(get_config()->conn.downstream.timeout.read) << R"(
|
||||
<< util::duration_str(get_config()->conn.downstream->timeout.read) << R"(
|
||||
--backend-write-timeout=<DURATION>
|
||||
Specify write timeout for backend connection.
|
||||
Default: )"
|
||||
<< util::duration_str(get_config()->conn.downstream.timeout.write) << R"(
|
||||
<< util::duration_str(get_config()->conn.downstream->timeout.write) << R"(
|
||||
--backend-keep-alive-timeout=<DURATION>
|
||||
Specify keep-alive timeout for backend connection.
|
||||
Default: )"
|
||||
<< util::duration_str(get_config()->conn.downstream.timeout.idle_read)
|
||||
<< util::duration_str(get_config()->conn.downstream->timeout.idle_read)
|
||||
<< R"(
|
||||
--listener-disable-timeout=<DURATION>
|
||||
After accepting connection failed, connection listener
|
||||
|
|
|
@ -80,6 +80,77 @@ int APIDownstreamConnection::push_upload_data_chunk(const uint8_t *data,
|
|||
int APIDownstreamConnection::end_upload_data() {
|
||||
// TODO process request payload here
|
||||
(void)worker_;
|
||||
|
||||
auto upstream = downstream_->get_upstream();
|
||||
auto output = downstream_->get_request_buf();
|
||||
auto &resp = downstream_->response();
|
||||
struct iovec iov;
|
||||
|
||||
auto iovcnt = output->riovec(&iov, 1);
|
||||
|
||||
constexpr auto body = StringRef::from_lit("OK");
|
||||
|
||||
if (iovcnt == 0) {
|
||||
resp.http_status = 200;
|
||||
|
||||
upstream->send_reply(downstream_, body.byte(), body.size());
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
Config config{};
|
||||
config.conn.downstream = std::make_shared<DownstreamConfig>();
|
||||
|
||||
std::set<StringRef> include_set;
|
||||
|
||||
constexpr auto error_body = StringRef::from_lit("invalid configuration");
|
||||
|
||||
for (auto first = reinterpret_cast<const uint8_t *>(iov.iov_base),
|
||||
last = first + iov.iov_len;
|
||||
first != last;) {
|
||||
auto eol = std::find(first, last, '\n');
|
||||
if (eol == last) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (first == eol || *first == '#') {
|
||||
first = ++eol;
|
||||
continue;
|
||||
}
|
||||
|
||||
auto eq = std::find(first, eol, '=');
|
||||
if (eq == eol) {
|
||||
resp.http_status = 500;
|
||||
|
||||
upstream->send_reply(downstream_, error_body.byte(), error_body.size());
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (parse_config(&config, StringRef{first, eq}, StringRef{eq + 1, eol},
|
||||
include_set) != 0) {
|
||||
resp.http_status = 500;
|
||||
|
||||
upstream->send_reply(downstream_, error_body.byte(), error_body.size());
|
||||
return 0;
|
||||
}
|
||||
|
||||
first = ++eol;
|
||||
}
|
||||
|
||||
auto &tlsconf = get_config()->tls;
|
||||
if (configure_downstream_group(&config, get_config()->http2_proxy, true,
|
||||
tlsconf) != 0) {
|
||||
resp.http_status = 500;
|
||||
upstream->send_reply(downstream_, error_body.byte(), error_body.size());
|
||||
return 0;
|
||||
}
|
||||
|
||||
worker_->replace_downstream_config(config.conn.downstream);
|
||||
|
||||
resp.http_status = 200;
|
||||
|
||||
upstream->send_reply(downstream_, body.byte(), body.size());
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -816,9 +816,10 @@ uint32_t next_cycle(const WeightedPri &pri) {
|
|||
std::unique_ptr<DownstreamConnection>
|
||||
ClientHandler::get_downstream_connection(Downstream *downstream) {
|
||||
size_t group_idx;
|
||||
auto catch_all = worker_->get_addr_group_catch_all();
|
||||
auto &downstreamconf = *worker_->get_downstream_config();
|
||||
|
||||
auto catch_all = downstreamconf.addr_group_catch_all;
|
||||
auto &groups = worker_->get_downstream_addr_groups();
|
||||
auto downstream_router = worker_->get_downstream_router();
|
||||
|
||||
const auto &req = downstream->request();
|
||||
|
||||
|
@ -836,8 +837,8 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
|
|||
// have dealt with proxy case already, just use catch-all group.
|
||||
group_idx = catch_all;
|
||||
} else {
|
||||
auto &router = downstream_router->router;
|
||||
auto &wildcard_patterns = downstream_router->wildcard_patterns;
|
||||
auto &router = downstreamconf.router;
|
||||
auto &wildcard_patterns = downstreamconf.wildcard_patterns;
|
||||
if (!req.authority.empty()) {
|
||||
group_idx =
|
||||
match_downstream_addr_group(router, wildcard_patterns, req.authority,
|
||||
|
@ -859,7 +860,7 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
|
|||
CLOG(INFO, this) << "Downstream address group_idx: " << group_idx;
|
||||
}
|
||||
|
||||
auto &group = worker_->get_downstream_addr_groups()[group_idx];
|
||||
auto &group = groups[group_idx];
|
||||
auto &shared_addr = group->shared_addr;
|
||||
|
||||
auto proto = PROTO_NONE;
|
||||
|
|
|
@ -745,7 +745,8 @@ int parse_mapping(Config *config, DownstreamAddrConfig addr,
|
|||
// will append '/' to all patterns, so it becomes catch-all pattern.
|
||||
auto mapping = util::split_str(src_pattern, ':');
|
||||
assert(!mapping.empty());
|
||||
auto &addr_groups = config->conn.downstream.addr_groups;
|
||||
auto &downstreamconf = *config->conn.downstream;
|
||||
auto &addr_groups = downstreamconf.addr_groups;
|
||||
|
||||
DownstreamParams params{};
|
||||
params.proto = PROTO_HTTP1;
|
||||
|
@ -788,8 +789,6 @@ int parse_mapping(Config *config, DownstreamAddrConfig addr,
|
|||
DownstreamAddrGroupConfig g(StringRef{pattern});
|
||||
g.addrs.push_back(addr);
|
||||
|
||||
auto &downstream_router = config->downstream_router;
|
||||
|
||||
if (pattern[0] == '*') {
|
||||
// wildcard pattern
|
||||
auto path_first =
|
||||
|
@ -798,7 +797,7 @@ int parse_mapping(Config *config, DownstreamAddrConfig addr,
|
|||
auto host = StringRef{std::begin(g.pattern) + 1, path_first};
|
||||
auto path = StringRef{path_first, std::end(g.pattern)};
|
||||
|
||||
auto &wildcard_patterns = downstream_router->wildcard_patterns;
|
||||
auto &wildcard_patterns = downstreamconf.wildcard_patterns;
|
||||
|
||||
auto it = std::find_if(
|
||||
std::begin(wildcard_patterns), std::end(wildcard_patterns),
|
||||
|
@ -814,8 +813,7 @@ int parse_mapping(Config *config, DownstreamAddrConfig addr,
|
|||
(*it).router.add_route(path, addr_groups.size());
|
||||
}
|
||||
} else {
|
||||
downstream_router->router.add_route(StringRef{g.pattern},
|
||||
addr_groups.size());
|
||||
downstreamconf.router.add_route(StringRef{g.pattern}, addr_groups.size());
|
||||
}
|
||||
|
||||
addr_groups.push_back(std::move(g));
|
||||
|
@ -1944,9 +1942,9 @@ int parse_config(Config *config, const StringRef &opt, const StringRef &optarg,
|
|||
case SHRPX_OPTID_FRONTEND_WRITE_TIMEOUT:
|
||||
return parse_duration(&config->conn.upstream.timeout.write, opt, optarg);
|
||||
case SHRPX_OPTID_BACKEND_READ_TIMEOUT:
|
||||
return parse_duration(&config->conn.downstream.timeout.read, opt, optarg);
|
||||
return parse_duration(&config->conn.downstream->timeout.read, opt, optarg);
|
||||
case SHRPX_OPTID_BACKEND_WRITE_TIMEOUT:
|
||||
return parse_duration(&config->conn.downstream.timeout.write, opt, optarg);
|
||||
return parse_duration(&config->conn.downstream->timeout.write, opt, optarg);
|
||||
case SHRPX_OPTID_STREAM_READ_TIMEOUT:
|
||||
return parse_duration(&config->http2.timeout.stream_read, opt, optarg);
|
||||
case SHRPX_OPTID_STREAM_WRITE_TIMEOUT:
|
||||
|
@ -1989,7 +1987,7 @@ int parse_config(Config *config, const StringRef &opt, const StringRef &optarg,
|
|||
return 0;
|
||||
}
|
||||
case SHRPX_OPTID_BACKEND_KEEP_ALIVE_TIMEOUT:
|
||||
return parse_duration(&config->conn.downstream.timeout.idle_read, opt,
|
||||
return parse_duration(&config->conn.downstream->timeout.idle_read, opt,
|
||||
optarg);
|
||||
case SHRPX_OPTID_FRONTEND_HTTP2_WINDOW_BITS:
|
||||
case SHRPX_OPTID_BACKEND_HTTP2_WINDOW_BITS: {
|
||||
|
@ -2176,14 +2174,14 @@ int parse_config(Config *config, const StringRef &opt, const StringRef &optarg,
|
|||
LOG(WARN) << opt
|
||||
<< ": deprecated. Use backend-address-family=IPv4 instead.";
|
||||
|
||||
config->conn.downstream.family = AF_INET;
|
||||
config->conn.downstream->family = AF_INET;
|
||||
|
||||
return 0;
|
||||
case SHRPX_OPTID_BACKEND_IPV6:
|
||||
LOG(WARN) << opt
|
||||
<< ": deprecated. Use backend-address-family=IPv6 instead.";
|
||||
|
||||
config->conn.downstream.family = AF_INET6;
|
||||
config->conn.downstream->family = AF_INET6;
|
||||
|
||||
return 0;
|
||||
case SHRPX_OPTID_BACKEND_HTTP_PROXY_URI: {
|
||||
|
@ -2383,7 +2381,7 @@ int parse_config(Config *config, const StringRef &opt, const StringRef &optarg,
|
|||
return -1;
|
||||
}
|
||||
|
||||
config->conn.downstream.connections_per_host = n;
|
||||
config->conn.downstream->connections_per_host = n;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -2392,7 +2390,7 @@ int parse_config(Config *config, const StringRef &opt, const StringRef &optarg,
|
|||
<< SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND << " instead.";
|
||||
// fall through
|
||||
case SHRPX_OPTID_BACKEND_CONNECTIONS_PER_FRONTEND:
|
||||
return parse_uint(&config->conn.downstream.connections_per_frontend, opt,
|
||||
return parse_uint(&config->conn.downstream->connections_per_frontend, opt,
|
||||
optarg);
|
||||
case SHRPX_OPTID_LISTENER_DISABLE_TIMEOUT:
|
||||
return parse_duration(&config->conn.listener.timeout.sleep, opt, optarg);
|
||||
|
@ -2430,9 +2428,9 @@ int parse_config(Config *config, const StringRef &opt, const StringRef &optarg,
|
|||
}
|
||||
|
||||
if (optid == SHRPX_OPTID_BACKEND_REQUEST_BUFFER) {
|
||||
config->conn.downstream.request_buffer_size = n;
|
||||
config->conn.downstream->request_buffer_size = n;
|
||||
} else {
|
||||
config->conn.downstream.response_buffer_size = n;
|
||||
config->conn.downstream->response_buffer_size = n;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -2690,7 +2688,7 @@ int parse_config(Config *config, const StringRef &opt, const StringRef &optarg,
|
|||
return parse_address_family(&config->tls.session_cache.memcached.family,
|
||||
opt, optarg);
|
||||
case SHRPX_OPTID_BACKEND_ADDRESS_FAMILY:
|
||||
return parse_address_family(&config->conn.downstream.family, opt, optarg);
|
||||
return parse_address_family(&config->conn.downstream->family, opt, optarg);
|
||||
case SHRPX_OPTID_FRONTEND_HTTP2_MAX_CONCURRENT_STREAMS:
|
||||
return parse_uint(&config->http2.upstream.max_concurrent_streams, opt,
|
||||
optarg);
|
||||
|
@ -2913,9 +2911,9 @@ StringRef strproto(shrpx_proto proto) {
|
|||
int configure_downstream_group(Config *config, bool http2_proxy,
|
||||
bool numeric_addr_only,
|
||||
const TLSConfig &tlsconf) {
|
||||
auto &downstreamconf = config->conn.downstream;
|
||||
auto &downstreamconf = *config->conn.downstream;
|
||||
auto &addr_groups = downstreamconf.addr_groups;
|
||||
auto &downstream_router = config->downstream_router;
|
||||
auto &router = downstreamconf.router;
|
||||
|
||||
if (addr_groups.empty()) {
|
||||
DownstreamAddrConfig addr{};
|
||||
|
@ -2925,8 +2923,7 @@ int configure_downstream_group(Config *config, bool http2_proxy,
|
|||
|
||||
DownstreamAddrGroupConfig g(StringRef::from_lit("/"));
|
||||
g.addrs.push_back(std::move(addr));
|
||||
downstream_router->router.add_route(StringRef{g.pattern},
|
||||
addr_groups.size());
|
||||
router.add_route(StringRef{g.pattern}, addr_groups.size());
|
||||
addr_groups.push_back(std::move(g));
|
||||
} else if (http2_proxy) {
|
||||
// We don't support host mapping in these cases. Move all
|
||||
|
@ -2938,12 +2935,11 @@ int configure_downstream_group(Config *config, bool http2_proxy,
|
|||
}
|
||||
std::vector<DownstreamAddrGroupConfig>().swap(addr_groups);
|
||||
// maybe not necessary?
|
||||
downstream_router = std::make_shared<DownstreamRouter>();
|
||||
downstream_router->router.add_route(StringRef{catch_all.pattern},
|
||||
addr_groups.size());
|
||||
router = Router();
|
||||
router.add_route(StringRef{catch_all.pattern}, addr_groups.size());
|
||||
addr_groups.push_back(std::move(catch_all));
|
||||
} else {
|
||||
auto &wildcard_patterns = downstream_router->wildcard_patterns;
|
||||
auto &wildcard_patterns = downstreamconf.wildcard_patterns;
|
||||
std::sort(std::begin(wildcard_patterns), std::end(wildcard_patterns),
|
||||
[](const WildcardPattern &lhs, const WildcardPattern &rhs) {
|
||||
return std::lexicographical_compare(
|
||||
|
|
|
@ -587,6 +587,35 @@ struct RateLimitConfig {
|
|||
size_t burst;
|
||||
};
|
||||
|
||||
// Wildcard host pattern routing. We strips left most '*' from host
|
||||
// field. router includes all path pattern sharing same wildcard
|
||||
// host.
|
||||
struct WildcardPattern {
|
||||
ImmutableString host;
|
||||
Router router;
|
||||
};
|
||||
|
||||
struct DownstreamConfig {
|
||||
struct {
|
||||
ev_tstamp read;
|
||||
ev_tstamp write;
|
||||
ev_tstamp idle_read;
|
||||
} timeout;
|
||||
Router router;
|
||||
std::vector<WildcardPattern> wildcard_patterns;
|
||||
std::vector<DownstreamAddrGroupConfig> addr_groups;
|
||||
// The index of catch-all group in downstream_addr_groups.
|
||||
size_t addr_group_catch_all;
|
||||
size_t connections_per_host;
|
||||
size_t connections_per_frontend;
|
||||
size_t request_buffer_size;
|
||||
size_t response_buffer_size;
|
||||
// Address family of backend connection. One of either AF_INET,
|
||||
// AF_INET6 or AF_UNSPEC. This is ignored if backend connection
|
||||
// is made via Unix domain socket.
|
||||
int family;
|
||||
};
|
||||
|
||||
struct ConnectionConfig {
|
||||
struct {
|
||||
struct {
|
||||
|
@ -614,41 +643,10 @@ struct ConnectionConfig {
|
|||
bool accept_proxy_protocol;
|
||||
} upstream;
|
||||
|
||||
struct {
|
||||
struct {
|
||||
ev_tstamp read;
|
||||
ev_tstamp write;
|
||||
ev_tstamp idle_read;
|
||||
} timeout;
|
||||
std::vector<DownstreamAddrGroupConfig> addr_groups;
|
||||
// The index of catch-all group in downstream_addr_groups.
|
||||
size_t addr_group_catch_all;
|
||||
size_t connections_per_host;
|
||||
size_t connections_per_frontend;
|
||||
size_t request_buffer_size;
|
||||
size_t response_buffer_size;
|
||||
// Address family of backend connection. One of either AF_INET,
|
||||
// AF_INET6 or AF_UNSPEC. This is ignored if backend connection
|
||||
// is made via Unix domain socket.
|
||||
int family;
|
||||
} downstream;
|
||||
};
|
||||
|
||||
// Wildcard host pattern routing. We strips left most '*' from host
|
||||
// field. router includes all path pattern sharing same wildcard
|
||||
// host.
|
||||
struct WildcardPattern {
|
||||
ImmutableString host;
|
||||
Router router;
|
||||
};
|
||||
|
||||
struct DownstreamRouter {
|
||||
Router router;
|
||||
std::vector<WildcardPattern> wildcard_patterns;
|
||||
std::shared_ptr<DownstreamConfig> downstream;
|
||||
};
|
||||
|
||||
struct Config {
|
||||
std::shared_ptr<DownstreamRouter> downstream_router;
|
||||
HttpProxy downstream_http_proxy;
|
||||
HttpConfig http;
|
||||
Http2Config http2;
|
||||
|
|
|
@ -502,6 +502,7 @@ void Downstream::set_chunked_request(bool f) { chunked_request_ = f; }
|
|||
bool Downstream::request_buf_full() {
|
||||
auto handler = upstream_->get_client_handler();
|
||||
auto faddr = handler->get_upstream_addr();
|
||||
auto worker = handler->get_worker();
|
||||
|
||||
// We don't check buffer size here for API endpoint.
|
||||
if (faddr->api) {
|
||||
|
@ -509,11 +510,11 @@ bool Downstream::request_buf_full() {
|
|||
}
|
||||
|
||||
if (dconn_) {
|
||||
return request_buf_.rleft() >=
|
||||
get_config()->conn.downstream.request_buffer_size;
|
||||
} else {
|
||||
return false;
|
||||
auto downstreamconf = worker->get_downstream_config();
|
||||
return request_buf_.rleft() >= downstreamconf->request_buffer_size;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
DefaultMemchunks *Downstream::get_request_buf() { return &request_buf_; }
|
||||
|
@ -601,11 +602,14 @@ DefaultMemchunks *Downstream::get_response_buf() { return &response_buf_; }
|
|||
|
||||
bool Downstream::response_buf_full() {
|
||||
if (dconn_) {
|
||||
return response_buf_.rleft() >=
|
||||
get_config()->conn.downstream.response_buffer_size;
|
||||
} else {
|
||||
return false;
|
||||
auto handler = upstream_->get_client_handler();
|
||||
auto worker = handler->get_worker();
|
||||
auto downstreamconf = worker->get_downstream_config();
|
||||
|
||||
return response_buf_.rleft() >= downstreamconf->response_buffer_size;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool Downstream::validate_request_recv_body_length() const {
|
||||
|
|
|
@ -178,9 +178,9 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
|
|||
: dlnext(nullptr),
|
||||
dlprev(nullptr),
|
||||
conn_(loop, -1, nullptr, worker->get_mcpool(),
|
||||
get_config()->conn.downstream.timeout.write,
|
||||
get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb,
|
||||
timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
|
||||
worker->get_downstream_config()->timeout.write,
|
||||
worker->get_downstream_config()->timeout.read, {}, {}, writecb,
|
||||
readcb, timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
|
||||
get_config()->tls.dyn_rec.idle_timeout, PROTO_HTTP2),
|
||||
wb_(worker->get_mcpool()),
|
||||
worker_(worker),
|
||||
|
|
|
@ -413,6 +413,13 @@ void Http2Upstream::initiate_downstream(Downstream *downstream) {
|
|||
|
||||
downstream_queue_.mark_active(downstream);
|
||||
|
||||
auto &req = downstream->request();
|
||||
if (!req.http2_expect_body) {
|
||||
downstream->end_upload_data();
|
||||
// TODO is this necessary?
|
||||
handler_->signal_write();
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -846,12 +853,21 @@ nghttp2_session_callbacks *create_http2_upstream_callbacks() {
|
|||
return callbacks;
|
||||
}
|
||||
|
||||
namespace {
|
||||
size_t downstream_queue_size(Worker *worker) {
|
||||
auto downstreamconf = worker->get_downstream_config();
|
||||
|
||||
if (get_config()->http2_proxy) {
|
||||
return downstreamconf->connections_per_host;
|
||||
}
|
||||
|
||||
return downstreamconf->connections_per_frontend;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
Http2Upstream::Http2Upstream(ClientHandler *handler)
|
||||
: wb_(handler->get_worker()->get_mcpool()),
|
||||
downstream_queue_(
|
||||
get_config()->http2_proxy
|
||||
? get_config()->conn.downstream.connections_per_host
|
||||
: get_config()->conn.downstream.connections_per_frontend,
|
||||
downstream_queue_(downstream_queue_size(handler->get_worker()),
|
||||
!get_config()->http2_proxy),
|
||||
handler_(handler),
|
||||
session_(nullptr),
|
||||
|
|
|
@ -151,8 +151,8 @@ HttpDownstreamConnection::HttpDownstreamConnection(
|
|||
const std::shared_ptr<DownstreamAddrGroup> &group, struct ev_loop *loop,
|
||||
Worker *worker)
|
||||
: conn_(loop, -1, nullptr, worker->get_mcpool(),
|
||||
get_config()->conn.downstream.timeout.write,
|
||||
get_config()->conn.downstream.timeout.read, {}, {}, connectcb,
|
||||
worker->get_downstream_config()->timeout.write,
|
||||
worker->get_downstream_config()->timeout.read, {}, {}, connectcb,
|
||||
readcb, connect_timeoutcb, this,
|
||||
get_config()->tls.dyn_rec.warmup_threshold,
|
||||
get_config()->tls.dyn_rec.idle_timeout, PROTO_HTTP1),
|
||||
|
@ -182,7 +182,7 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
|
|||
return SHRPX_ERR_NETWORK;
|
||||
}
|
||||
|
||||
auto &downstreamconf = get_config()->conn.downstream;
|
||||
auto &downstreamconf = *worker_->get_downstream_config();
|
||||
|
||||
if (conn_.fd == -1) {
|
||||
auto &shared_addr = group_->shared_addr;
|
||||
|
@ -588,7 +588,9 @@ void HttpDownstreamConnection::detach_downstream(Downstream *downstream) {
|
|||
ev_set_cb(&conn_.rev, idle_readcb);
|
||||
ioctrl_.force_resume_read();
|
||||
|
||||
conn_.rt.repeat = get_config()->conn.downstream.timeout.idle_read;
|
||||
auto &downstreamconf = *worker_->get_downstream_config();
|
||||
|
||||
conn_.rt.repeat = downstreamconf.timeout.idle_read;
|
||||
ev_set_cb(&conn_.rt, idle_timeoutcb);
|
||||
ev_timer_again(conn_.loop, &conn_.rt);
|
||||
|
||||
|
@ -602,8 +604,10 @@ void HttpDownstreamConnection::pause_read(IOCtrlReason reason) {
|
|||
|
||||
int HttpDownstreamConnection::resume_read(IOCtrlReason reason,
|
||||
size_t consumed) {
|
||||
auto &downstreamconf = *worker_->get_downstream_config();
|
||||
|
||||
if (downstream_->get_response_buf()->rleft() <=
|
||||
get_config()->conn.downstream.request_buffer_size / 2) {
|
||||
downstreamconf.request_buffer_size / 2) {
|
||||
ioctrl_.resume_read(reason);
|
||||
}
|
||||
|
||||
|
|
|
@ -98,9 +98,9 @@ void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
|
|||
LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
|
||||
DownstreamAddr *addr, std::mt19937 &gen)
|
||||
: conn_(loop, -1, nullptr, worker->get_mcpool(),
|
||||
get_config()->conn.downstream.timeout.write,
|
||||
get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb,
|
||||
timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
|
||||
worker->get_downstream_config()->timeout.write,
|
||||
worker->get_downstream_config()->timeout.read, {}, {}, writecb,
|
||||
readcb, timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
|
||||
get_config()->tls.dyn_rec.idle_timeout, PROTO_NONE),
|
||||
wb_(worker->get_mcpool()),
|
||||
gen_(gen),
|
||||
|
|
|
@ -512,12 +512,21 @@ uint32_t infer_upstream_rst_stream_status_code(uint32_t downstream_error_code) {
|
|||
}
|
||||
} // namespace
|
||||
|
||||
namespace {
|
||||
size_t downstream_queue_size(Worker *worker) {
|
||||
auto downstreamconf = worker->get_downstream_config();
|
||||
|
||||
if (get_config()->http2_proxy) {
|
||||
return downstreamconf->connections_per_host;
|
||||
}
|
||||
|
||||
return downstreamconf->connections_per_frontend;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler)
|
||||
: wb_(handler->get_worker()->get_mcpool()),
|
||||
downstream_queue_(
|
||||
get_config()->http2_proxy
|
||||
? get_config()->conn.downstream.connections_per_host
|
||||
: get_config()->conn.downstream.connections_per_frontend,
|
||||
downstream_queue_(downstream_queue_size(handler->get_worker()),
|
||||
!get_config()->http2_proxy),
|
||||
handler_(handler),
|
||||
session_(nullptr) {
|
||||
|
|
|
@ -1390,26 +1390,11 @@ SSL_CTX *setup_server_ssl_context(std::vector<SSL_CTX *> &all_ssl_ctx,
|
|||
return ssl_ctx;
|
||||
}
|
||||
|
||||
bool downstream_tls_enabled() {
|
||||
const auto &groups = get_config()->conn.downstream.addr_groups;
|
||||
|
||||
return std::any_of(std::begin(groups), std::end(groups),
|
||||
[](const DownstreamAddrGroupConfig &g) {
|
||||
return std::any_of(
|
||||
std::begin(g.addrs), std::end(g.addrs),
|
||||
[](const DownstreamAddrConfig &a) { return a.tls; });
|
||||
});
|
||||
}
|
||||
|
||||
SSL_CTX *setup_downstream_client_ssl_context(
|
||||
#ifdef HAVE_NEVERBLEED
|
||||
neverbleed_t *nb
|
||||
#endif // HAVE_NEVERBLEED
|
||||
) {
|
||||
if (!downstream_tls_enabled()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
auto &tlsconf = get_config()->tls;
|
||||
|
||||
return ssl::create_ssl_client_context(
|
||||
|
|
|
@ -201,9 +201,7 @@ SSL_CTX *setup_server_ssl_context(std::vector<SSL_CTX *> &all_ssl_ctx,
|
|||
#endif // HAVE_NEVERBLEED
|
||||
);
|
||||
|
||||
// Setups client side SSL_CTX. This function inspects get_config()
|
||||
// and if TLS is disabled in all downstreams, returns nullptr.
|
||||
// Otherwise, only construct SSL_CTX.
|
||||
// Setups client side SSL_CTX.
|
||||
SSL_CTX *setup_downstream_client_ssl_context(
|
||||
#ifdef HAVE_NEVERBLEED
|
||||
neverbleed_t *nb
|
||||
|
@ -224,9 +222,6 @@ SSL *create_ssl(SSL_CTX *ssl_ctx);
|
|||
// Returns true if SSL/TLS is enabled on upstream
|
||||
bool upstream_tls_enabled();
|
||||
|
||||
// Returns true if SSL/TLS is enabled on downstream
|
||||
bool downstream_tls_enabled();
|
||||
|
||||
// Performs TLS hostname match. |pattern| can contain wildcard
|
||||
// character '*', which matches prefix of target hostname. There are
|
||||
// several restrictions to make wildcard work. The matching algorithm
|
||||
|
|
|
@ -109,16 +109,13 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
|
|||
const std::shared_ptr<TicketKeys> &ticket_keys)
|
||||
: randgen_(rd()),
|
||||
worker_stat_{},
|
||||
downstream_router_(get_config()->downstream_router),
|
||||
loop_(loop),
|
||||
sv_ssl_ctx_(sv_ssl_ctx),
|
||||
cl_ssl_ctx_(cl_ssl_ctx),
|
||||
cert_tree_(cert_tree),
|
||||
ticket_keys_(ticket_keys),
|
||||
downstream_addr_groups_(get_config()->conn.downstream.addr_groups.size()),
|
||||
connect_blocker_(
|
||||
make_unique<ConnectBlocker>(randgen_, loop_, []() {}, []() {})),
|
||||
addr_group_catch_all_(get_config()->conn.downstream.addr_group_catch_all),
|
||||
graceful_shutdown_(false) {
|
||||
ev_async_init(&w_, eventcb);
|
||||
w_.data = this;
|
||||
|
@ -136,10 +133,18 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
|
|||
StringRef{session_cacheconf.memcached.host}, &mcpool_);
|
||||
}
|
||||
|
||||
auto &downstreamconf = get_config()->conn.downstream;
|
||||
replace_downstream_config(get_config()->conn.downstream);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < downstreamconf.addr_groups.size(); ++i) {
|
||||
auto &src = downstreamconf.addr_groups[i];
|
||||
void Worker::replace_downstream_config(
|
||||
const std::shared_ptr<DownstreamConfig> &downstreamconf) {
|
||||
downstreamconf_ = downstreamconf;
|
||||
|
||||
downstream_addr_groups_ = std::vector<std::shared_ptr<DownstreamAddrGroup>>(
|
||||
downstreamconf->addr_groups.size());
|
||||
|
||||
for (size_t i = 0; i < downstreamconf->addr_groups.size(); ++i) {
|
||||
auto &src = downstreamconf->addr_groups[i];
|
||||
auto &dst = downstream_addr_groups_[i];
|
||||
|
||||
dst = std::make_shared<DownstreamAddrGroup>();
|
||||
|
@ -407,12 +412,8 @@ ConnectBlocker *Worker::get_connect_blocker() const {
|
|||
return connect_blocker_.get();
|
||||
}
|
||||
|
||||
const DownstreamRouter *Worker::get_downstream_router() const {
|
||||
return downstream_router_.get();
|
||||
}
|
||||
|
||||
size_t Worker::get_addr_group_catch_all() const {
|
||||
return addr_group_catch_all_;
|
||||
const DownstreamConfig *Worker::get_downstream_config() const {
|
||||
return downstreamconf_.get();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
|
|
@ -211,8 +211,10 @@ public:
|
|||
|
||||
ConnectBlocker *get_connect_blocker() const;
|
||||
|
||||
const DownstreamRouter *get_downstream_router() const;
|
||||
size_t get_addr_group_catch_all() const;
|
||||
const DownstreamConfig *get_downstream_config() const;
|
||||
|
||||
void replace_downstream_config(
|
||||
const std::shared_ptr<DownstreamConfig> &downstreamconf);
|
||||
|
||||
private:
|
||||
#ifndef NOTHREADS
|
||||
|
@ -226,7 +228,7 @@ private:
|
|||
MemchunkPool mcpool_;
|
||||
WorkerStat worker_stat_;
|
||||
|
||||
std::shared_ptr<DownstreamRouter> downstream_router_;
|
||||
std::shared_ptr<DownstreamConfig> downstreamconf_;
|
||||
std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_;
|
||||
#ifdef HAVE_MRUBY
|
||||
std::unique_ptr<mruby::MRubyContext> mruby_ctx_;
|
||||
|
@ -245,8 +247,6 @@ private:
|
|||
// this is used when file decriptor is exhausted.
|
||||
std::unique_ptr<ConnectBlocker> connect_blocker_;
|
||||
|
||||
size_t addr_group_catch_all_;
|
||||
|
||||
bool graceful_shutdown_;
|
||||
};
|
||||
|
||||
|
|
|
@ -394,7 +394,7 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) {
|
|||
}
|
||||
|
||||
#ifdef HAVE_NEVERBLEED
|
||||
if (ssl::upstream_tls_enabled() || ssl::downstream_tls_enabled()) {
|
||||
{
|
||||
std::array<char, NEVERBLEED_ERRBUF_SIZE> errbuf;
|
||||
auto nb = make_unique<neverbleed_t>();
|
||||
if (neverbleed_init(nb.get(), errbuf.data()) != 0) {
|
||||
|
|
Loading…
Reference in New Issue