Merge branch 'nghttpx-api-endpoint'

This commit is contained in:
Tatsuhiro Tsujikawa 2016-06-04 18:51:56 +09:00
commit 2c33da36cc
36 changed files with 1454 additions and 684 deletions

View File

@ -462,6 +462,42 @@ addresses:
App.new
API ENDPOINTS
-------------
nghttpx exposes API endpoints to manipulate it via HTTP based API. By
default, API endpoint is disabled. To enable it, add a dedicated
frontend for API using :option:`--frontend` option with "api"
parameter. All requests which come from this frontend address, will
be treated as API request.
The following section describes available API endpoints.
PUT /api/v1beta/backend/replace
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This API replaces the current set of backend servers with the
requested ones. The request must carry request body with method PUT
or POST. The request body must be nghttpx configuration file format.
For configuration file format, see `FILES`_ section. The line
separator inside the request body must be single LF (0x0A).
Currently, only :option:`backend <--backend>` option is parsed, the
others are simply ignored. The semantics of this API is replace the
current backend with the backend options in request body. Describe
the desired set of backend severs, and nghttpx makes it happen. If
there is no :option:`backend <--backend>` option is found in request
body, the current set of backend is replaced with the :option:`backend
<--backend>` option's default value, which is ``127.0.0.1,80``.
The replacement is done instantly without breaking existing
connections or requests. It also avoids any process creation as is
the case with hot swapping with signals.
The one limitation is that only numeric IP address is allowd in
:option:`backend <--backend>` in request body while non numeric
hostname is allowed in command-line or configuration file is read
using :option:`--conf`.
SEE ALSO
--------

View File

@ -132,6 +132,7 @@ OPTIONS = [
"no-kqueue",
"frontend-http2-settings-timeout",
"backend-http2-settings-timeout",
"api-max-request-body",
]
LOGVARS = [

View File

@ -109,6 +109,7 @@ if(ENABLE_APP)
shrpx_worker_process.cc
shrpx_signal.cc
shrpx_router.cc
shrpx_api_downstream_connection.cc
)
if(HAVE_SPDYLAY)
list(APPEND NGHTTPX_SRCS

View File

@ -132,6 +132,7 @@ NGHTTPX_SRCS = \
shrpx_process.h \
shrpx_signal.cc shrpx_signal.h \
shrpx_router.cc shrpx_router.h \
shrpx_api_downstream_connection.cc shrpx_api_downstream_connection.h \
buffer.h memchunk.h template.h allocator.h
if HAVE_SPDYLAY

View File

@ -54,20 +54,45 @@ struct BlockAllocator {
block_size(block_size),
isolation_threshold(std::min(block_size, isolation_threshold)) {}
~BlockAllocator() {
~BlockAllocator() { reset(); }
BlockAllocator(BlockAllocator &&other) noexcept
: retain(other.retain),
head(other.head),
block_size(other.block_size),
isolation_threshold(other.isolation_threshold) {
other.retain = nullptr;
other.head = nullptr;
}
BlockAllocator &operator=(BlockAllocator &&other) noexcept {
reset();
retain = other.retain;
head = other.head;
block_size = other.block_size;
isolation_threshold = other.isolation_threshold;
other.retain = nullptr;
other.head = nullptr;
return *this;
}
BlockAllocator(const BlockAllocator &) = delete;
BlockAllocator &operator=(const BlockAllocator &) = delete;
void reset() {
for (auto mb = retain; mb;) {
auto next = mb->next;
delete[] reinterpret_cast<uint8_t *>(mb);
mb = next;
}
retain = nullptr;
head = nullptr;
}
BlockAllocator(BlockAllocator &&) = default;
BlockAllocator &operator=(BlockAllocator &&) = default;
BlockAllocator(const BlockAllocator &) = delete;
BlockAllocator &operator=(const BlockAllocator &) = delete;
MemBlock *alloc_mem_block(size_t size) {
auto block = new uint8_t[sizeof(MemBlock) + size];
auto mb = reinterpret_cast<MemBlock *>(block);

View File

@ -146,52 +146,6 @@ struct SignalServer {
pid_t worker_process_pid;
};
namespace {
int resolve_hostname(Address *addr, const char *hostname, uint16_t port,
int family) {
int rv;
auto service = util::utos(port);
addrinfo hints{};
hints.ai_family = family;
hints.ai_socktype = SOCK_STREAM;
#ifdef AI_ADDRCONFIG
hints.ai_flags |= AI_ADDRCONFIG;
#endif // AI_ADDRCONFIG
addrinfo *res;
rv = getaddrinfo(hostname, service.c_str(), &hints, &res);
if (rv != 0) {
LOG(FATAL) << "Unable to resolve address for " << hostname << ": "
<< gai_strerror(rv);
return -1;
}
auto res_d = defer(freeaddrinfo, res);
char host[NI_MAXHOST];
rv = getnameinfo(res->ai_addr, res->ai_addrlen, host, sizeof(host), nullptr,
0, NI_NUMERICHOST);
if (rv != 0) {
LOG(FATAL) << "Address resolution for " << hostname
<< " failed: " << gai_strerror(rv);
return -1;
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Address resolution for " << hostname
<< " succeeded: " << host;
}
memcpy(&addr->su, res->ai_addr, res->ai_addrlen);
addr->len = res->ai_addrlen;
return 0;
}
} // namespace
namespace {
int chown_to_running_user(const char *path) {
return chown(path, get_config()->uid, get_config()->gid);
@ -1075,11 +1029,6 @@ constexpr auto DEFAULT_ACCESSLOG_FORMAT = StringRef::from_lit(
R"("$http_referer" "$http_user_agent")");
} // namespace
namespace {
constexpr char DEFAULT_DOWNSTREAM_HOST[] = "127.0.0.1";
constexpr int16_t DEFAULT_DOWNSTREAM_PORT = 80;
} // namespace;
namespace {
void fill_default_config() {
*mod_config() = {};
@ -1157,6 +1106,11 @@ void fill_default_config() {
nghttp2_option_new(&upstreamconf.option);
nghttp2_option_set_no_auto_window_update(upstreamconf.option, 1);
nghttp2_option_set_no_recv_client_magic(upstreamconf.option, 1);
// For API endpoint, we enable automatic window update. This is
// because we are a sink.
nghttp2_option_new(&upstreamconf.api_option);
nghttp2_option_set_no_recv_client_magic(upstreamconf.api_option, 1);
}
{
@ -1213,7 +1167,8 @@ void fill_default_config() {
}
{
auto &downstreamconf = connconf.downstream;
connconf.downstream = std::make_shared<DownstreamConfig>();
auto &downstreamconf = *connconf.downstream;
{
auto &timeoutconf = downstreamconf.timeout;
// Read/Write timeouts for downstream connection
@ -1228,6 +1183,9 @@ void fill_default_config() {
downstreamconf.response_buffer_size = 128_k;
downstreamconf.family = AF_UNSPEC;
}
auto &apiconf = mod_config()->api;
apiconf.max_request_body = 16_k;
}
} // namespace
@ -1383,6 +1341,13 @@ Connections:
Optionally, TLS can be disabled by specifying "no-tls"
parameter. TLS is enabled by default.
To make this frontend as API endpoint, specify "api"
parameter. This is disabled by default. It is
important to limit the access to the API frontend.
Otherwise, someone may change the backend server, and
break your services, or expose confidential information
to the outside the world.
Default: *,3000
--backlog=<N>
Set listen backlog size.
@ -1469,7 +1434,7 @@ Performance:
HTTP/2). To limit the number of connections per
frontend for default mode, use
--backend-connections-per-frontend.
Default: )" << get_config()->conn.downstream.connections_per_host
Default: )" << get_config()->conn.downstream->connections_per_host
<< R"(
--backend-connections-per-frontend=<N>
Set maximum number of backend concurrent connections
@ -1479,7 +1444,7 @@ Performance:
with --http2-proxy option, use
--backend-connections-per-host.
Default: )"
<< get_config()->conn.downstream.connections_per_frontend << R"(
<< get_config()->conn.downstream->connections_per_frontend << R"(
--rlimit-nofile=<N>
Set maximum number of open files (RLIMIT_NOFILE) to <N>.
If 0 is given, nghttpx does not set the limit.
@ -1487,12 +1452,12 @@ Performance:
--backend-request-buffer=<SIZE>
Set buffer size used to store backend request.
Default: )"
<< util::utos_unit(get_config()->conn.downstream.request_buffer_size)
<< util::utos_unit(get_config()->conn.downstream->request_buffer_size)
<< R"(
--backend-response-buffer=<SIZE>
Set buffer size used to store backend response.
Default: )"
<< util::utos_unit(get_config()->conn.downstream.response_buffer_size)
<< util::utos_unit(get_config()->conn.downstream->response_buffer_size)
<< R"(
--fastopen=<N>
Enables "TCP Fast Open" for the listening socket and
@ -1532,15 +1497,15 @@ Timeout:
--backend-read-timeout=<DURATION>
Specify read timeout for backend connection.
Default: )"
<< util::duration_str(get_config()->conn.downstream.timeout.read) << R"(
<< util::duration_str(get_config()->conn.downstream->timeout.read) << R"(
--backend-write-timeout=<DURATION>
Specify write timeout for backend connection.
Default: )"
<< util::duration_str(get_config()->conn.downstream.timeout.write) << R"(
<< util::duration_str(get_config()->conn.downstream->timeout.write) << R"(
--backend-keep-alive-timeout=<DURATION>
Specify keep-alive timeout for backend connection.
Default: )"
<< util::duration_str(get_config()->conn.downstream.timeout.idle_read)
<< util::duration_str(get_config()->conn.downstream->timeout.idle_read)
<< R"(
--listener-disable-timeout=<DURATION>
After accepting connection failed, connection listener
@ -1959,6 +1924,12 @@ HTTP:
HTTP status code. If error status code comes from
backend server, the custom error pages are not used.
API:
--api-max-request-body=<SIZE>
Set the maximum size of request body for API request.
Default: )" << util::utos_unit(get_config()->api.max_request_body)
<< R"(
Debug:
--frontend-http2-dump-request-header=<PATH>
Dumps request headers received by HTTP/2 frontend to the
@ -2041,7 +2012,7 @@ void process_options(int argc, char **argv,
std::set<StringRef> include_set;
for (auto &p : cmdcfgs) {
if (parse_config(p.first, p.second, include_set) == -1) {
if (parse_config(mod_config(), p.first, p.second, include_set) == -1) {
LOG(FATAL) << "Failed to parse command-line argument.";
exit(EXIT_FAILURE);
}
@ -2146,7 +2117,6 @@ void process_options(int argc, char **argv,
auto &listenerconf = mod_config()->conn.listener;
auto &upstreamconf = mod_config()->conn.upstream;
auto &downstreamconf = mod_config()->conn.downstream;
if (listenerconf.addrs.empty()) {
UpstreamAddr addr{};
@ -2180,140 +2150,11 @@ void process_options(int argc, char **argv,
}
}
auto &addr_groups = downstreamconf.addr_groups;
if (addr_groups.empty()) {
DownstreamAddrConfig addr{};
addr.host = ImmutableString::from_lit(DEFAULT_DOWNSTREAM_HOST);
addr.port = DEFAULT_DOWNSTREAM_PORT;
addr.proto = PROTO_HTTP1;
DownstreamAddrGroupConfig g(StringRef::from_lit("/"));
g.addrs.push_back(std::move(addr));
mod_config()->router.add_route(StringRef{g.pattern}, addr_groups.size());
addr_groups.push_back(std::move(g));
} else if (get_config()->http2_proxy) {
// We don't support host mapping in these cases. Move all
// non-catch-all patterns to catch-all pattern.
DownstreamAddrGroupConfig catch_all(StringRef::from_lit("/"));
for (auto &g : addr_groups) {
std::move(std::begin(g.addrs), std::end(g.addrs),
std::back_inserter(catch_all.addrs));
}
std::vector<DownstreamAddrGroupConfig>().swap(addr_groups);
std::vector<WildcardPattern>().swap(mod_config()->wildcard_patterns);
// maybe not necessary?
mod_config()->router = Router();
mod_config()->router.add_route(StringRef{catch_all.pattern},
addr_groups.size());
addr_groups.push_back(std::move(catch_all));
} else {
auto &wildcard_patterns = mod_config()->wildcard_patterns;
std::sort(std::begin(wildcard_patterns), std::end(wildcard_patterns),
[](const WildcardPattern &lhs, const WildcardPattern &rhs) {
return std::lexicographical_compare(
rhs.host.rbegin(), rhs.host.rend(), lhs.host.rbegin(),
lhs.host.rend());
});
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Reverse sorted wildcard hosts (compared from tail to head, "
"and sorted in reverse order):";
for (auto &wp : mod_config()->wildcard_patterns) {
LOG(INFO) << wp.host;
}
}
}
// backward compatibility: override all SNI fields with the option
// value --backend-tls-sni-field
if (!tlsconf.backend_sni_name.empty()) {
auto &sni = tlsconf.backend_sni_name;
for (auto &addr_group : addr_groups) {
for (auto &addr : addr_group.addrs) {
addr.sni = sni;
}
}
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Resolving backend address";
}
ssize_t catch_all_group = -1;
for (size_t i = 0; i < addr_groups.size(); ++i) {
auto &g = addr_groups[i];
if (g.pattern == StringRef::from_lit("/")) {
catch_all_group = i;
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Host-path pattern: group " << i << ": '" << g.pattern
<< "'";
for (auto &addr : g.addrs) {
LOG(INFO) << "group " << i << " -> " << addr.host.c_str()
<< (addr.host_unix ? "" : ":" + util::utos(addr.port))
<< ", proto=" << strproto(addr.proto)
<< (addr.tls ? ", tls" : "");
}
}
}
if (catch_all_group == -1) {
LOG(FATAL) << "backend: No catch-all backend address is configured";
if (configure_downstream_group(mod_config(), get_config()->http2_proxy, false,
tlsconf) != 0) {
exit(EXIT_FAILURE);
}
downstreamconf.addr_group_catch_all = catch_all_group;
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Catch-all pattern is group " << catch_all_group;
}
for (auto &g : addr_groups) {
for (auto &addr : g.addrs) {
if (addr.host_unix) {
// for AF_UNIX socket, we use "localhost" as host for backend
// hostport. This is used as Host header field to backend and
// not going to be passed to any syscalls.
addr.hostport = "localhost";
auto path = addr.host.c_str();
auto pathlen = addr.host.size();
if (pathlen + 1 > sizeof(addr.addr.su.un.sun_path)) {
LOG(FATAL) << "UNIX domain socket path " << path << " is too long > "
<< sizeof(addr.addr.su.un.sun_path);
exit(EXIT_FAILURE);
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Use UNIX domain socket path " << path
<< " for backend connection";
}
addr.addr.su.un.sun_family = AF_UNIX;
// copy path including terminal NULL
std::copy_n(path, pathlen + 1, addr.addr.su.un.sun_path);
addr.addr.len = sizeof(addr.addr.su.un);
continue;
}
addr.hostport = ImmutableString(
util::make_http_hostport(StringRef(addr.host), addr.port));
auto hostport = util::make_hostport(StringRef{addr.host}, addr.port);
if (resolve_hostname(&addr.addr, addr.host.c_str(), addr.port,
downstreamconf.family) == -1) {
LOG(FATAL) << "Resolving backend address failed: " << hostport;
exit(EXIT_FAILURE);
}
LOG(NOTICE) << "Resolved backend address: " << hostport << " -> "
<< util::to_numeric_addr(&addr.addr);
}
}
auto &proxy = mod_config()->downstream_http_proxy;
if (!proxy.host.empty()) {
auto hostport = util::make_hostport(StringRef{proxy.host}, proxy.port);
@ -2622,6 +2463,7 @@ int main(int argc, char **argv) {
&flag, 124},
{SHRPX_OPT_BACKEND_HTTP2_SETTINGS_TIMEOUT.c_str(), required_argument,
&flag, 125},
{SHRPX_OPT_API_MAX_REQUEST_BODY.c_str(), required_argument, &flag, 126},
{nullptr, 0, nullptr, 0}};
int option_index = 0;
@ -3211,6 +3053,10 @@ int main(int argc, char **argv) {
cmdcfgs.emplace_back(SHRPX_OPT_BACKEND_HTTP2_SETTINGS_TIMEOUT,
StringRef{optarg});
break;
case 126:
// --api-max-request-body
cmdcfgs.emplace_back(SHRPX_OPT_API_MAX_REQUEST_BODY, StringRef{optarg});
break;
default:
break;
}

View File

@ -0,0 +1,305 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2016 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_api_downstream_connection.h"
#include "shrpx_client_handler.h"
#include "shrpx_upstream.h"
#include "shrpx_downstream.h"
#include "shrpx_worker.h"
#include "shrpx_connection_handler.h"
namespace shrpx {
APIDownstreamConnection::APIDownstreamConnection(Worker *worker)
: worker_(worker), abandoned_(false) {}
APIDownstreamConnection::~APIDownstreamConnection() {}
int APIDownstreamConnection::attach_downstream(Downstream *downstream) {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "Attaching to DOWNSTREAM:" << downstream;
}
downstream_ = downstream;
return 0;
}
void APIDownstreamConnection::detach_downstream(Downstream *downstream) {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "Detaching from DOWNSTREAM:" << downstream;
}
downstream_ = nullptr;
}
// API status, which is independent from HTTP status code. But
// generally, 2xx code for API_SUCCESS, and otherwise API_FAILURE.
enum {
API_SUCCESS,
API_FAILURE,
};
int APIDownstreamConnection::send_reply(unsigned int http_status,
int api_status) {
abandoned_ = true;
auto upstream = downstream_->get_upstream();
auto &resp = downstream_->response();
resp.http_status = http_status;
auto &balloc = downstream_->get_block_allocator();
StringRef api_status_str;
switch (api_status) {
case API_SUCCESS:
api_status_str = StringRef::from_lit("Success");
break;
case API_FAILURE:
api_status_str = StringRef::from_lit("Failure");
break;
default:
assert(0);
}
constexpr auto M1 = StringRef::from_lit("{\"status\":\"");
constexpr auto M2 = StringRef::from_lit("\",\"code\":");
constexpr auto M3 = StringRef::from_lit("}");
// 3 is the number of digits in http_status, assuming it is 3 digits
// number.
auto buflen = M1.size() + M2.size() + M3.size() + api_status_str.size() + 3;
auto buf = make_byte_ref(balloc, buflen);
auto p = buf.base;
p = std::copy(std::begin(M1), std::end(M1), p);
p = std::copy(std::begin(api_status_str), std::end(api_status_str), p);
p = std::copy(std::begin(M2), std::end(M2), p);
p = util::utos(p, http_status);
p = std::copy(std::begin(M3), std::end(M3), p);
buf.len = p - buf.base;
auto content_length = util::make_string_ref_uint(balloc, buf.len);
resp.fs.add_header_token(StringRef::from_lit("content-length"),
content_length, false, http2::HD_CONTENT_LENGTH);
switch (http_status) {
case 400:
case 405:
case 413:
resp.fs.add_header_token(StringRef::from_lit("connection"),
StringRef::from_lit("close"), false,
http2::HD_CONNECTION);
break;
}
if (upstream->send_reply(downstream_, buf.base, buf.len) != 0) {
return -1;
}
return 0;
}
int APIDownstreamConnection::push_request_headers() {
auto &req = downstream_->request();
auto &resp = downstream_->response();
if (req.path != StringRef::from_lit("/api/v1beta1/backend/replace")) {
send_reply(404, API_FAILURE);
return 0;
}
if (req.method != HTTP_POST && req.method != HTTP_PUT) {
resp.fs.add_header_token(StringRef::from_lit("allow"),
StringRef::from_lit("POST, PUT"), false, -1);
send_reply(405, API_FAILURE);
return 0;
}
// This works with req.fs.content_length == -1
if (req.fs.content_length >
static_cast<int64_t>(get_config()->api.max_request_body)) {
send_reply(413, API_FAILURE);
return 0;
}
return 0;
}
int APIDownstreamConnection::push_upload_data_chunk(const uint8_t *data,
size_t datalen) {
if (abandoned_) {
return 0;
}
auto output = downstream_->get_request_buf();
auto &apiconf = get_config()->api;
if (output->rleft() + datalen > apiconf.max_request_body) {
send_reply(413, API_FAILURE);
return 0;
}
output->append(data, datalen);
// We don't have to call Upstream::resume_read() here, because
// request buffer is effectively unlimited. Actually, we cannot
// call it here since it could recursively call this function again.
return 0;
}
int APIDownstreamConnection::end_upload_data() {
if (abandoned_) {
return 0;
}
auto output = downstream_->get_request_buf();
struct iovec iov;
auto iovcnt = output->riovec(&iov, 2);
if (iovcnt == 0) {
send_reply(200, API_SUCCESS);
return 0;
}
std::unique_ptr<uint8_t[]> large_buf;
// If data spans in multiple chunks, pull them up into one
// contiguous buffer.
if (iovcnt > 1) {
large_buf = make_unique<uint8_t[]>(output->rleft());
auto len = output->rleft();
output->remove(large_buf.get(), len);
iov.iov_base = large_buf.get();
iov.iov_len = len;
}
Config config{};
config.conn.downstream = std::make_shared<DownstreamConfig>();
const auto &downstreamconf = config.conn.downstream;
auto &src = get_config()->conn.downstream;
downstreamconf->timeout = src->timeout;
downstreamconf->connections_per_host = src->connections_per_host;
downstreamconf->connections_per_frontend = src->connections_per_frontend;
downstreamconf->request_buffer_size = src->request_buffer_size;
downstreamconf->response_buffer_size = src->response_buffer_size;
downstreamconf->family = src->family;
std::set<StringRef> include_set;
for (auto first = reinterpret_cast<const uint8_t *>(iov.iov_base),
last = first + iov.iov_len;
first != last;) {
auto eol = std::find(first, last, '\n');
if (eol == last) {
break;
}
if (first == eol || *first == '#') {
first = ++eol;
continue;
}
auto eq = std::find(first, eol, '=');
if (eq == eol) {
send_reply(400, API_FAILURE);
return 0;
}
auto opt = StringRef{first, eq};
auto optval = StringRef{eq + 1, eol};
auto optid = option_lookup_token(opt.c_str(), opt.size());
switch (optid) {
case SHRPX_OPTID_BACKEND:
break;
default:
first = ++eol;
continue;
}
if (parse_config(&config, optid, opt, optval, include_set) != 0) {
send_reply(400, API_FAILURE);
return 0;
}
first = ++eol;
}
auto &tlsconf = get_config()->tls;
if (configure_downstream_group(&config, get_config()->http2_proxy, true,
tlsconf) != 0) {
send_reply(400, API_FAILURE);
return 0;
}
auto conn_handler = worker_->get_connection_handler();
conn_handler->send_replace_downstream(downstreamconf);
send_reply(200, API_SUCCESS);
return 0;
}
void APIDownstreamConnection::pause_read(IOCtrlReason reason) {}
int APIDownstreamConnection::resume_read(IOCtrlReason reason, size_t consumed) {
return 0;
}
void APIDownstreamConnection::force_resume_read() {}
int APIDownstreamConnection::on_read() { return 0; }
int APIDownstreamConnection::on_write() { return 0; }
void APIDownstreamConnection::on_upstream_change(Upstream *uptream) {}
bool APIDownstreamConnection::poolable() const { return false; }
DownstreamAddrGroup *
APIDownstreamConnection::get_downstream_addr_group() const {
return nullptr;
}
} // namespace shrpx

View File

@ -0,0 +1,68 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2016 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_API_DOWNSTREAM_CONNECTION_H
#define SHRPX_API_DOWNSTREAM_CONNECTION_H
#include "shrpx_downstream_connection.h"
namespace shrpx {
class Worker;
class APIDownstreamConnection : public DownstreamConnection {
public:
APIDownstreamConnection(Worker *worker);
virtual ~APIDownstreamConnection();
virtual int attach_downstream(Downstream *downstream);
virtual void detach_downstream(Downstream *downstream);
virtual int push_request_headers();
virtual int push_upload_data_chunk(const uint8_t *data, size_t datalen);
virtual int end_upload_data();
virtual void pause_read(IOCtrlReason reason);
virtual int resume_read(IOCtrlReason reason, size_t consumed);
virtual void force_resume_read();
virtual int on_read();
virtual int on_write();
virtual void on_upstream_change(Upstream *uptream);
// true if this object is poolable.
virtual bool poolable() const;
virtual DownstreamAddrGroup *get_downstream_addr_group() const;
int send_reply(unsigned int http_status, int api_status);
private:
Worker *worker_;
bool abandoned_;
};
} // namespace shrpx
#endif // SHRPX_API_DOWNSTREAM_CONNECTION_H

View File

@ -48,6 +48,7 @@
#include "shrpx_downstream.h"
#include "shrpx_http2_session.h"
#include "shrpx_connect_blocker.h"
#include "shrpx_api_downstream_connection.h"
#ifdef HAVE_SPDYLAY
#include "shrpx_spdy_upstream.h"
#endif // HAVE_SPDYLAY
@ -689,8 +690,9 @@ bool load_lighter(const DownstreamAddr *lhs, const DownstreamAddr *rhs) {
}
} // namespace
Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) {
auto &shared_addr = group.shared_addr;
Http2Session *ClientHandler::select_http2_session(
const std::shared_ptr<DownstreamAddrGroup> &group) {
auto &shared_addr = group->shared_addr;
// First count the working backend addresses.
size_t min = 0;
@ -778,7 +780,7 @@ Http2Session *ClientHandler::select_http2_session(DownstreamAddrGroup &group) {
}
auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(),
worker_, &group, selected_addr);
worker_, group, selected_addr);
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Create new Http2Session " << session;
@ -814,12 +816,17 @@ uint32_t next_cycle(const WeightedPri &pri) {
std::unique_ptr<DownstreamConnection>
ClientHandler::get_downstream_connection(Downstream *downstream) {
size_t group_idx;
auto &downstreamconf = get_config()->conn.downstream;
auto &downstreamconf = *worker_->get_downstream_config();
auto catch_all = downstreamconf.addr_group_catch_all;
auto &groups = worker_->get_downstream_addr_groups();
const auto &req = downstream->request();
if (faddr_->api) {
return make_unique<APIDownstreamConnection>(worker_);
}
// Fast path. If we have one group, it must be catch-all group.
// proxy mode falls in this case.
if (groups.size() == 1) {
@ -830,8 +837,8 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
// have dealt with proxy case already, just use catch-all group.
group_idx = catch_all;
} else {
auto &router = get_config()->router;
auto &wildcard_patterns = get_config()->wildcard_patterns;
auto &router = downstreamconf.router;
auto &wildcard_patterns = downstreamconf.wildcard_patterns;
if (!req.authority.empty()) {
group_idx =
match_downstream_addr_group(router, wildcard_patterns, req.authority,
@ -853,8 +860,8 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
CLOG(INFO, this) << "Downstream address group_idx: " << group_idx;
}
auto &group = worker_->get_downstream_addr_groups()[group_idx];
auto &shared_addr = group.shared_addr;
auto &group = groups[group_idx];
auto &shared_addr = group->shared_addr;
auto proto = PROTO_NONE;
@ -920,7 +927,7 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
<< " Create new one";
}
dconn = make_unique<HttpDownstreamConnection>(&group, conn_.loop, worker_);
dconn = make_unique<HttpDownstreamConnection>(group, conn_.loop, worker_);
}
dconn->set_client_handler(this);

View File

@ -142,7 +142,8 @@ public:
// header field.
StringRef get_forwarded_for() const;
Http2Session *select_http2_session(DownstreamAddrGroup &group);
Http2Session *
select_http2_session(const std::shared_ptr<DownstreamAddrGroup> &group);
const UpstreamAddr *get_upstream_addr() const;

File diff suppressed because it is too large Load Diff

View File

@ -280,9 +280,14 @@ constexpr auto SHRPX_OPT_FRONTEND_HTTP2_SETTINGS_TIMEOUT =
StringRef::from_lit("frontend-http2-settings-timeout");
constexpr auto SHRPX_OPT_BACKEND_HTTP2_SETTINGS_TIMEOUT =
StringRef::from_lit("backend-http2-settings-timeout");
constexpr auto SHRPX_OPT_API_MAX_REQUEST_BODY =
StringRef::from_lit("api-max-request-body");
constexpr size_t SHRPX_OBFUSCATED_NODE_LENGTH = 8;
constexpr char DEFAULT_DOWNSTREAM_HOST[] = "127.0.0.1";
constexpr int16_t DEFAULT_DOWNSTREAM_PORT = 80;
enum shrpx_proto { PROTO_NONE, PROTO_HTTP1, PROTO_HTTP2, PROTO_MEMCACHED };
enum shrpx_forwarded_param {
@ -321,6 +326,8 @@ struct UpstreamAddr {
bool host_unix;
// true if TLS is enabled.
bool tls;
// true if this is an API endpoint.
bool api;
int fd;
};
@ -538,6 +545,7 @@ struct Http2Config {
ev_tstamp settings;
} timeout;
nghttp2_option *option;
nghttp2_option *api_option;
nghttp2_session_callbacks *callbacks;
size_t window_bits;
size_t connection_window_bits;
@ -581,6 +589,38 @@ struct RateLimitConfig {
size_t burst;
};
// Wildcard host pattern routing. We strips left most '*' from host
// field. router includes all path pattern sharing same wildcard
// host.
struct WildcardPattern {
WildcardPattern(const StringRef &host)
: host(std::begin(host), std::end(host)) {}
ImmutableString host;
Router router;
};
struct DownstreamConfig {
struct {
ev_tstamp read;
ev_tstamp write;
ev_tstamp idle_read;
} timeout;
Router router;
std::vector<WildcardPattern> wildcard_patterns;
std::vector<DownstreamAddrGroupConfig> addr_groups;
// The index of catch-all group in downstream_addr_groups.
size_t addr_group_catch_all;
size_t connections_per_host;
size_t connections_per_frontend;
size_t request_buffer_size;
size_t response_buffer_size;
// Address family of backend connection. One of either AF_INET,
// AF_INET6 or AF_UNSPEC. This is ignored if backend connection
// is made via Unix domain socket.
int family;
};
struct ConnectionConfig {
struct {
struct {
@ -608,43 +648,22 @@ struct ConnectionConfig {
bool accept_proxy_protocol;
} upstream;
struct {
struct {
ev_tstamp read;
ev_tstamp write;
ev_tstamp idle_read;
} timeout;
std::vector<DownstreamAddrGroupConfig> addr_groups;
// The index of catch-all group in downstream_addr_groups.
size_t addr_group_catch_all;
size_t connections_per_host;
size_t connections_per_frontend;
size_t request_buffer_size;
size_t response_buffer_size;
// Address family of backend connection. One of either AF_INET,
// AF_INET6 or AF_UNSPEC. This is ignored if backend connection
// is made via Unix domain socket.
int family;
} downstream;
std::shared_ptr<DownstreamConfig> downstream;
};
// Wildcard host pattern routing. We strips left most '*' from host
// field. router includes all path pattern sharing same wildcard
// host.
struct WildcardPattern {
ImmutableString host;
Router router;
struct APIConfig {
// Maximum request body size for one API request
size_t max_request_body;
};
struct Config {
Router router;
std::vector<WildcardPattern> wildcard_patterns;
HttpProxy downstream_http_proxy;
HttpConfig http;
Http2Config http2;
TLSConfig tls;
LoggingConfig logging;
ConnectionConfig conn;
APIConfig api;
ImmutableString pid_file;
ImmutableString conf_path;
ImmutableString user;
@ -670,14 +689,157 @@ const Config *get_config();
Config *mod_config();
void create_config();
// generated by gennghttpxfun.py
enum {
SHRPX_OPTID_ACCEPT_PROXY_PROTOCOL,
SHRPX_OPTID_ACCESSLOG_FILE,
SHRPX_OPTID_ACCESSLOG_FORMAT,
SHRPX_OPTID_ACCESSLOG_SYSLOG,
SHRPX_OPTID_ADD_FORWARDED,
SHRPX_OPTID_ADD_REQUEST_HEADER,
SHRPX_OPTID_ADD_RESPONSE_HEADER,
SHRPX_OPTID_ADD_X_FORWARDED_FOR,
SHRPX_OPTID_ALTSVC,
SHRPX_OPTID_API_MAX_REQUEST_BODY,
SHRPX_OPTID_BACKEND,
SHRPX_OPTID_BACKEND_ADDRESS_FAMILY,
SHRPX_OPTID_BACKEND_CONNECTIONS_PER_FRONTEND,
SHRPX_OPTID_BACKEND_CONNECTIONS_PER_HOST,
SHRPX_OPTID_BACKEND_HTTP_PROXY_URI,
SHRPX_OPTID_BACKEND_HTTP1_CONNECTIONS_PER_FRONTEND,
SHRPX_OPTID_BACKEND_HTTP1_CONNECTIONS_PER_HOST,
SHRPX_OPTID_BACKEND_HTTP1_TLS,
SHRPX_OPTID_BACKEND_HTTP2_CONNECTION_WINDOW_BITS,
SHRPX_OPTID_BACKEND_HTTP2_CONNECTIONS_PER_WORKER,
SHRPX_OPTID_BACKEND_HTTP2_MAX_CONCURRENT_STREAMS,
SHRPX_OPTID_BACKEND_HTTP2_SETTINGS_TIMEOUT,
SHRPX_OPTID_BACKEND_HTTP2_WINDOW_BITS,
SHRPX_OPTID_BACKEND_IPV4,
SHRPX_OPTID_BACKEND_IPV6,
SHRPX_OPTID_BACKEND_KEEP_ALIVE_TIMEOUT,
SHRPX_OPTID_BACKEND_NO_TLS,
SHRPX_OPTID_BACKEND_READ_TIMEOUT,
SHRPX_OPTID_BACKEND_REQUEST_BUFFER,
SHRPX_OPTID_BACKEND_RESPONSE_BUFFER,
SHRPX_OPTID_BACKEND_TLS,
SHRPX_OPTID_BACKEND_TLS_SNI_FIELD,
SHRPX_OPTID_BACKEND_WRITE_TIMEOUT,
SHRPX_OPTID_BACKLOG,
SHRPX_OPTID_CACERT,
SHRPX_OPTID_CERTIFICATE_FILE,
SHRPX_OPTID_CIPHERS,
SHRPX_OPTID_CLIENT,
SHRPX_OPTID_CLIENT_CERT_FILE,
SHRPX_OPTID_CLIENT_PRIVATE_KEY_FILE,
SHRPX_OPTID_CLIENT_PROXY,
SHRPX_OPTID_CONF,
SHRPX_OPTID_DAEMON,
SHRPX_OPTID_DH_PARAM_FILE,
SHRPX_OPTID_ERROR_PAGE,
SHRPX_OPTID_ERRORLOG_FILE,
SHRPX_OPTID_ERRORLOG_SYSLOG,
SHRPX_OPTID_FASTOPEN,
SHRPX_OPTID_FETCH_OCSP_RESPONSE_FILE,
SHRPX_OPTID_FORWARDED_BY,
SHRPX_OPTID_FORWARDED_FOR,
SHRPX_OPTID_FRONTEND,
SHRPX_OPTID_FRONTEND_FRAME_DEBUG,
SHRPX_OPTID_FRONTEND_HTTP2_CONNECTION_WINDOW_BITS,
SHRPX_OPTID_FRONTEND_HTTP2_DUMP_REQUEST_HEADER,
SHRPX_OPTID_FRONTEND_HTTP2_DUMP_RESPONSE_HEADER,
SHRPX_OPTID_FRONTEND_HTTP2_MAX_CONCURRENT_STREAMS,
SHRPX_OPTID_FRONTEND_HTTP2_READ_TIMEOUT,
SHRPX_OPTID_FRONTEND_HTTP2_SETTINGS_TIMEOUT,
SHRPX_OPTID_FRONTEND_HTTP2_WINDOW_BITS,
SHRPX_OPTID_FRONTEND_NO_TLS,
SHRPX_OPTID_FRONTEND_READ_TIMEOUT,
SHRPX_OPTID_FRONTEND_WRITE_TIMEOUT,
SHRPX_OPTID_HEADER_FIELD_BUFFER,
SHRPX_OPTID_HOST_REWRITE,
SHRPX_OPTID_HTTP2_BRIDGE,
SHRPX_OPTID_HTTP2_MAX_CONCURRENT_STREAMS,
SHRPX_OPTID_HTTP2_NO_COOKIE_CRUMBLING,
SHRPX_OPTID_HTTP2_PROXY,
SHRPX_OPTID_INCLUDE,
SHRPX_OPTID_INSECURE,
SHRPX_OPTID_LISTENER_DISABLE_TIMEOUT,
SHRPX_OPTID_LOG_LEVEL,
SHRPX_OPTID_MAX_HEADER_FIELDS,
SHRPX_OPTID_MAX_REQUEST_HEADER_FIELDS,
SHRPX_OPTID_MAX_RESPONSE_HEADER_FIELDS,
SHRPX_OPTID_MRUBY_FILE,
SHRPX_OPTID_NO_HOST_REWRITE,
SHRPX_OPTID_NO_HTTP2_CIPHER_BLACK_LIST,
SHRPX_OPTID_NO_KQUEUE,
SHRPX_OPTID_NO_LOCATION_REWRITE,
SHRPX_OPTID_NO_OCSP,
SHRPX_OPTID_NO_SERVER_PUSH,
SHRPX_OPTID_NO_VIA,
SHRPX_OPTID_NPN_LIST,
SHRPX_OPTID_OCSP_UPDATE_INTERVAL,
SHRPX_OPTID_PADDING,
SHRPX_OPTID_PID_FILE,
SHRPX_OPTID_PRIVATE_KEY_FILE,
SHRPX_OPTID_PRIVATE_KEY_PASSWD_FILE,
SHRPX_OPTID_READ_BURST,
SHRPX_OPTID_READ_RATE,
SHRPX_OPTID_REQUEST_HEADER_FIELD_BUFFER,
SHRPX_OPTID_RESPONSE_HEADER_FIELD_BUFFER,
SHRPX_OPTID_RLIMIT_NOFILE,
SHRPX_OPTID_STREAM_READ_TIMEOUT,
SHRPX_OPTID_STREAM_WRITE_TIMEOUT,
SHRPX_OPTID_STRIP_INCOMING_FORWARDED,
SHRPX_OPTID_STRIP_INCOMING_X_FORWARDED_FOR,
SHRPX_OPTID_SUBCERT,
SHRPX_OPTID_SYSLOG_FACILITY,
SHRPX_OPTID_TLS_DYN_REC_IDLE_TIMEOUT,
SHRPX_OPTID_TLS_DYN_REC_WARMUP_THRESHOLD,
SHRPX_OPTID_TLS_PROTO_LIST,
SHRPX_OPTID_TLS_SESSION_CACHE_MEMCACHED,
SHRPX_OPTID_TLS_SESSION_CACHE_MEMCACHED_ADDRESS_FAMILY,
SHRPX_OPTID_TLS_SESSION_CACHE_MEMCACHED_CERT_FILE,
SHRPX_OPTID_TLS_SESSION_CACHE_MEMCACHED_PRIVATE_KEY_FILE,
SHRPX_OPTID_TLS_SESSION_CACHE_MEMCACHED_TLS,
SHRPX_OPTID_TLS_TICKET_KEY_CIPHER,
SHRPX_OPTID_TLS_TICKET_KEY_FILE,
SHRPX_OPTID_TLS_TICKET_KEY_MEMCACHED,
SHRPX_OPTID_TLS_TICKET_KEY_MEMCACHED_ADDRESS_FAMILY,
SHRPX_OPTID_TLS_TICKET_KEY_MEMCACHED_CERT_FILE,
SHRPX_OPTID_TLS_TICKET_KEY_MEMCACHED_INTERVAL,
SHRPX_OPTID_TLS_TICKET_KEY_MEMCACHED_MAX_FAIL,
SHRPX_OPTID_TLS_TICKET_KEY_MEMCACHED_MAX_RETRY,
SHRPX_OPTID_TLS_TICKET_KEY_MEMCACHED_PRIVATE_KEY_FILE,
SHRPX_OPTID_TLS_TICKET_KEY_MEMCACHED_TLS,
SHRPX_OPTID_USER,
SHRPX_OPTID_VERIFY_CLIENT,
SHRPX_OPTID_VERIFY_CLIENT_CACERT,
SHRPX_OPTID_WORKER_FRONTEND_CONNECTIONS,
SHRPX_OPTID_WORKER_READ_BURST,
SHRPX_OPTID_WORKER_READ_RATE,
SHRPX_OPTID_WORKER_WRITE_BURST,
SHRPX_OPTID_WORKER_WRITE_RATE,
SHRPX_OPTID_WORKERS,
SHRPX_OPTID_WRITE_BURST,
SHRPX_OPTID_WRITE_RATE,
SHRPX_OPTID_MAXIDX,
};
// Looks up token for given option name |name| of length |namelen|.
int option_lookup_token(const char *name, size_t namelen);
// Parses option name |opt| and value |optarg|. The results are
// stored into statically allocated Config object. This function
// returns 0 if it succeeds, or -1. The |included_set| contains the
// all paths already included while processing this configuration, to
// avoid loop in --include option.
int parse_config(const StringRef &opt, const StringRef &optarg,
// stored into the object pointed by |config|. This function returns 0
// if it succeeds, or -1. The |included_set| contains the all paths
// already included while processing this configuration, to avoid loop
// in --include option.
int parse_config(Config *config, const StringRef &opt, const StringRef &optarg,
std::set<StringRef> &included_set);
// Similar to parse_config() above, but additional |optid| which
// should be the return value of option_lookup_token(opt).
int parse_config(Config *config, int optid, const StringRef &opt,
const StringRef &optarg, std::set<StringRef> &included_set);
// Loads configurations from |filename| and stores them in statically
// allocated Config object. This function returns 0 if it succeeds, or
// -1. See parse_config() for |include_set|.
@ -710,6 +872,13 @@ read_tls_ticket_key_file(const std::vector<std::string> &files,
// Returns string representation of |proto|.
StringRef strproto(shrpx_proto proto);
int configure_downstream_group(Config *config, bool http2_proxy,
bool numeric_addr_only,
const TLSConfig &tlsconf);
int resolve_hostname(Address *addr, const char *hostname, uint16_t port,
int family, int additional_flags = 0);
} // namespace shrpx
#endif // SHRPX_CONFIG_H

View File

@ -102,6 +102,14 @@ void thread_join_async_cb(struct ev_loop *loop, ev_async *w, int revent) {
}
} // namespace
namespace {
void serial_event_async_cb(struct ev_loop *loop, ev_async *w, int revent) {
auto h = static_cast<ConnectionHandler *>(w->data);
h->handle_serial_event();
}
} // namespace
namespace {
std::random_device rd;
} // namespace
@ -125,6 +133,11 @@ ConnectionHandler::ConnectionHandler(struct ev_loop *loop)
ev_async_init(&thread_join_asyncev_, thread_join_async_cb);
ev_async_init(&serial_event_asyncev_, serial_event_async_cb);
serial_event_asyncev_.data = this;
ev_async_start(loop_, &serial_event_asyncev_);
ev_child_init(&ocsp_.chldev, ocsp_chld_cb, 0, 0);
ocsp_.chldev.data = this;
@ -136,6 +149,7 @@ ConnectionHandler::ConnectionHandler(struct ev_loop *loop)
ConnectionHandler::~ConnectionHandler() {
ev_child_stop(loop_, &ocsp_.chldev);
ev_async_stop(loop_, &serial_event_asyncev_);
ev_async_stop(loop_, &thread_join_asyncev_);
ev_io_stop(loop_, &ocsp_.rev);
ev_timer_stop(loop_, &ocsp_timer_);
@ -175,6 +189,18 @@ void ConnectionHandler::worker_reopen_log_files() {
}
}
void ConnectionHandler::worker_replace_downstream(
std::shared_ptr<DownstreamConfig> downstreamconf) {
WorkerEvent wev{};
wev.type = REPLACE_DOWNSTREAM;
wev.downstreamconf = std::move(downstreamconf);
for (auto &worker : workers_) {
worker->send(wev);
}
}
int ConnectionHandler::create_single_worker() {
auto cert_tree = ssl::create_cert_lookup_tree();
auto sv_ssl_ctx = ssl::setup_server_ssl_context(all_ssl_ctx_, cert_tree
@ -207,9 +233,9 @@ int ConnectionHandler::create_single_worker() {
all_ssl_ctx_.push_back(session_cache_ssl_ctx);
}
single_worker_ =
make_unique<Worker>(loop_, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx,
cert_tree, ticket_keys_);
single_worker_ = make_unique<Worker>(
loop_, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree,
ticket_keys_, this, get_config()->conn.downstream);
#ifdef HAVE_MRUBY
if (single_worker_->create_mruby_context() != 0) {
return -1;
@ -256,9 +282,9 @@ int ConnectionHandler::create_worker_thread(size_t num) {
StringRef{memcachedconf.private_key_file}, nullptr);
all_ssl_ctx_.push_back(session_cache_ssl_ctx);
}
auto worker =
make_unique<Worker>(loop, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx,
cert_tree, ticket_keys_);
auto worker = make_unique<Worker>(
loop, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree,
ticket_keys_, this, get_config()->conn.downstream);
#ifdef HAVE_MRUBY
if (worker->create_mruby_context() != 0) {
return -1;
@ -782,4 +808,46 @@ neverbleed_t *ConnectionHandler::get_neverbleed() const { return nb_.get(); }
#endif // HAVE_NEVERBLEED
void ConnectionHandler::handle_serial_event() {
std::vector<SerialEvent> q;
{
std::lock_guard<std::mutex> g(serial_event_mu_);
q.swap(serial_events_);
}
for (auto &sev : q) {
switch (sev.type) {
case SEV_REPLACE_DOWNSTREAM:
// TODO make sure that none of worker uses
// get_config()->conn.downstream
mod_config()->conn.downstream = sev.downstreamconf;
if (single_worker_) {
single_worker_->replace_downstream_config(sev.downstreamconf);
break;
}
worker_replace_downstream(sev.downstreamconf);
break;
}
}
}
void ConnectionHandler::send_replace_downstream(
const std::shared_ptr<DownstreamConfig> &downstreamconf) {
send_serial_event(SerialEvent(SEV_REPLACE_DOWNSTREAM, downstreamconf));
}
void ConnectionHandler::send_serial_event(SerialEvent ev) {
{
std::lock_guard<std::mutex> g(serial_event_mu_);
serial_events_.push_back(std::move(ev));
}
ev_async_send(loop_, &serial_event_asyncev_);
}
} // namespace shrpx

View File

@ -48,6 +48,7 @@
#endif // HAVE_NEVERBLEED
#include "shrpx_downstream_connection_pool.h"
#include "shrpx_config.h"
namespace shrpx {
@ -76,6 +77,21 @@ struct OCSPUpdateContext {
pid_t pid;
};
// SerialEvent is an event sent from Worker thread.
enum SerialEventType {
SEV_NONE,
SEV_REPLACE_DOWNSTREAM,
};
struct SerialEvent {
// ctor for event uses DownstreamConfig
SerialEvent(int type, const std::shared_ptr<DownstreamConfig> &downstreamconf)
: type(type), downstreamconf(downstreamconf) {}
int type;
std::shared_ptr<DownstreamConfig> downstreamconf;
};
class ConnectionHandler {
public:
ConnectionHandler(struct ev_loop *loop);
@ -136,6 +152,17 @@ public:
neverbleed_t *get_neverbleed() const;
#endif // HAVE_NEVERBLEED
// Send SerialEvent SEV_REPLACE_DOWNSTREAM to this object.
void send_replace_downstream(
const std::shared_ptr<DownstreamConfig> &downstreamconf);
// Internal function to send |ev| to this object.
void send_serial_event(SerialEvent ev);
// Handles SerialEvents received.
void handle_serial_event();
// Sends WorkerEvent to make them replace downstream.
void
worker_replace_downstream(std::shared_ptr<DownstreamConfig> downstreamconf);
private:
// Stores all SSL_CTX objects.
std::vector<SSL_CTX *> all_ssl_ctx_;
@ -145,6 +172,10 @@ private:
std::vector<struct ev_loop *> worker_loops_;
// Worker instances when multi threaded mode (-nN, N >= 2) is used.
std::vector<std::unique_ptr<Worker>> workers_;
// mutex for serial event resive buffer handling
std::mutex serial_event_mu_;
// SerialEvent receive buffer
std::vector<SerialEvent> serial_events_;
// Worker instance used when single threaded mode (-n1) is used.
// Otherwise, nullptr and workers_ has instances of Worker instead.
std::unique_ptr<Worker> single_worker_;
@ -161,6 +192,7 @@ private:
ev_timer disable_acceptor_timer_;
ev_timer ocsp_timer_;
ev_async thread_join_asyncev_;
ev_async serial_event_asyncev_;
#ifndef NOTHREADS
std::future<void> thread_join_fut_;
#endif // NOTHREADS

View File

@ -500,12 +500,21 @@ bool Downstream::get_chunked_request() const { return chunked_request_; }
void Downstream::set_chunked_request(bool f) { chunked_request_ = f; }
bool Downstream::request_buf_full() {
if (dconn_) {
return request_buf_.rleft() >=
get_config()->conn.downstream.request_buffer_size;
} else {
auto handler = upstream_->get_client_handler();
auto faddr = handler->get_upstream_addr();
auto worker = handler->get_worker();
// We don't check buffer size here for API endpoint.
if (faddr->api) {
return false;
}
if (dconn_) {
auto &downstreamconf = *worker->get_downstream_config();
return request_buf_.rleft() >= downstreamconf.request_buffer_size;
}
return false;
}
DefaultMemchunks *Downstream::get_request_buf() { return &request_buf_; }
@ -593,11 +602,14 @@ DefaultMemchunks *Downstream::get_response_buf() { return &response_buf_; }
bool Downstream::response_buf_full() {
if (dconn_) {
return response_buf_.rleft() >=
get_config()->conn.downstream.response_buffer_size;
} else {
return false;
auto handler = upstream_->get_client_handler();
auto worker = handler->get_worker();
auto &downstreamconf = *worker->get_downstream_config();
return response_buf_.rleft() >= downstreamconf.response_buffer_size;
}
return false;
}
bool Downstream::validate_request_recv_body_length() const {

View File

@ -29,10 +29,14 @@ namespace shrpx {
DownstreamConnectionPool::DownstreamConnectionPool() {}
DownstreamConnectionPool::~DownstreamConnectionPool() {
DownstreamConnectionPool::~DownstreamConnectionPool() { remove_all(); }
void DownstreamConnectionPool::remove_all() {
for (auto dconn : pool_) {
delete dconn;
}
pool_.clear();
}
void DownstreamConnectionPool::add_downstream_connection(

View File

@ -42,6 +42,7 @@ public:
void add_downstream_connection(std::unique_ptr<DownstreamConnection> dconn);
std::unique_ptr<DownstreamConnection> pop_downstream_connection();
void remove_downstream_connection(DownstreamConnection *dconn);
void remove_all();
private:
std::set<DownstreamConnection *> pool_;

View File

@ -479,8 +479,7 @@ int Http2DownstreamConnection::resume_read(IOCtrlReason reason,
size_t consumed) {
int rv;
if (http2session_->get_state() != Http2Session::CONNECTED ||
!http2session_->get_flow_control()) {
if (http2session_->get_state() != Http2Session::CONNECTED) {
return 0;
}

View File

@ -171,15 +171,23 @@ void initiate_connection_cb(struct ev_loop *loop, ev_timer *w, int revents) {
}
} // namespace
namespace {
void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) {
auto http2session = static_cast<Http2Session *>(w->data);
http2session->check_retire();
}
} // namespace
Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
Worker *worker, DownstreamAddrGroup *group,
Worker *worker,
const std::shared_ptr<DownstreamAddrGroup> &group,
DownstreamAddr *addr)
: dlnext(nullptr),
dlprev(nullptr),
conn_(loop, -1, nullptr, worker->get_mcpool(),
get_config()->conn.downstream.timeout.write,
get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb,
timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
worker->get_downstream_config()->timeout.write,
worker->get_downstream_config()->timeout.read, {}, {}, writecb,
readcb, timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
get_config()->tls.dyn_rec.idle_timeout, PROTO_HTTP2),
wb_(worker->get_mcpool()),
worker_(worker),
@ -189,8 +197,7 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
session_(nullptr),
state_(DISCONNECTED),
connection_check_state_(CONNECTION_CHECK_NONE),
freelist_zone_(FREELIST_ZONE_NONE),
flow_control_(false) {
freelist_zone_(FREELIST_ZONE_NONE) {
read_ = write_ = &Http2Session::noop;
on_read_ = &Http2Session::read_noop;
@ -210,6 +217,10 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
ev_timer_init(&initiate_connection_timer_, initiate_connection_cb, 0., 0.);
initiate_connection_timer_.data = this;
ev_prepare_init(&prep_, prepare_cb);
prep_.data = this;
ev_prepare_start(loop, &prep_);
}
Http2Session::~Http2Session() {
@ -229,6 +240,8 @@ int Http2Session::disconnect(bool hard) {
conn_.rlimit.stopw();
conn_.wlimit.stopw();
ev_prepare_stop(conn_.loop, &prep_);
ev_timer_stop(conn_.loop, &initiate_connection_timer_);
ev_timer_stop(conn_.loop, &settings_timer_);
ev_timer_stop(conn_.loop, &connchk_timer_);
@ -661,8 +674,6 @@ int Http2Session::submit_rst_stream(int32_t stream_id, uint32_t error_code) {
nghttp2_session *Http2Session::get_session() const { return session_; }
bool Http2Session::get_flow_control() const { return flow_control_; }
int Http2Session::resume_data(Http2DownstreamConnection *dconn) {
assert(state_ == CONNECTED);
auto downstream = dconn->get_downstream();
@ -1511,8 +1522,6 @@ int Http2Session::connection_made() {
return -1;
}
flow_control_ = true;
std::array<nghttp2_settings_entry, 3> entry;
size_t nentry = 2;
entry[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
@ -2111,7 +2120,7 @@ bool Http2Session::max_concurrency_reached(size_t extra) const {
}
DownstreamAddrGroup *Http2Session::get_downstream_addr_group() const {
return group_;
return group_.get();
}
void Http2Session::add_to_avail_freelist() {
@ -2120,8 +2129,8 @@ void Http2Session::add_to_avail_freelist() {
}
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Append to http2_avail_freelist, group=" << group_
<< ", freelist.size="
SSLOG(INFO, this) << "Append to http2_avail_freelist, group="
<< group_.get() << ", freelist.size="
<< group_->shared_addr->http2_avail_freelist.size();
}
@ -2194,4 +2203,18 @@ void Http2Session::on_timeout() {
}
}
void Http2Session::check_retire() {
if (!group_->retired) {
return;
}
ev_prepare_stop(conn_.loop, &prep_);
auto last_stream_id = nghttp2_session_get_last_proc_stream_id(session_);
nghttp2_submit_goaway(session_, NGHTTP2_FLAG_NONE, last_stream_id,
NGHTTP2_NO_ERROR, nullptr, 0);
signal_write();
}
} // namespace shrpx

View File

@ -73,7 +73,8 @@ enum FreelistZone {
class Http2Session {
public:
Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
DownstreamAddrGroup *group, DownstreamAddr *addr);
const std::shared_ptr<DownstreamAddrGroup> &group,
DownstreamAddr *addr);
~Http2Session();
// If hard is true, all pending requests are abandoned and
@ -95,8 +96,6 @@ public:
nghttp2_session *get_session() const;
bool get_flow_control() const;
int resume_data(Http2DownstreamConnection *dconn);
int connection_made();
@ -199,6 +198,11 @@ public:
void on_timeout();
// This is called periodically using ev_prepare watcher, and if
// group_ is retired (backend has been replaced), send GOAWAY to
// shutdown the connection.
void check_retire();
enum {
// Disconnected
DISCONNECTED,
@ -240,6 +244,7 @@ private:
ev_timer connchk_timer_;
// timer to initiate connection. usually, this fires immediately.
ev_timer initiate_connection_timer_;
ev_prepare prep_;
DList<Http2DownstreamConnection> dconns_;
DList<StreamData> streams_;
std::function<int(Http2Session &)> read_, write_;
@ -250,14 +255,13 @@ private:
Worker *worker_;
// NULL if no TLS is configured
SSL_CTX *ssl_ctx_;
DownstreamAddrGroup *group_;
std::shared_ptr<DownstreamAddrGroup> group_;
// Address of remote endpoint
DownstreamAddr *addr_;
nghttp2_session *session_;
int state_;
int connection_check_state_;
int freelist_zone_;
bool flow_control_;
};
nghttp2_session_callbacks *create_http2_downstream_callbacks();

View File

@ -413,6 +413,13 @@ void Http2Upstream::initiate_downstream(Downstream *downstream) {
downstream_queue_.mark_active(downstream);
auto &req = downstream->request();
if (!req.http2_expect_body) {
downstream->end_upload_data();
// TODO is this necessary?
handler_->signal_write();
}
return;
}
@ -780,23 +787,25 @@ void Http2Upstream::submit_goaway() {
void Http2Upstream::check_shutdown() {
int rv;
if (shutdown_handled_) {
return;
}
auto worker = handler_->get_worker();
if (worker->get_graceful_shutdown()) {
shutdown_handled_ = true;
rv = nghttp2_submit_shutdown_notice(session_);
if (rv != 0) {
ULOG(FATAL, this) << "nghttp2_submit_shutdown_notice() failed: "
<< nghttp2_strerror(rv);
return;
}
handler_->signal_write();
ev_timer_start(handler_->get_loop(), &shutdown_timer_);
if (!worker->get_graceful_shutdown()) {
return;
}
ev_prepare_stop(handler_->get_loop(), &prep_);
rv = nghttp2_submit_shutdown_notice(session_);
if (rv != 0) {
ULOG(FATAL, this) << "nghttp2_submit_shutdown_notice() failed: "
<< nghttp2_strerror(rv);
return;
}
handler_->signal_write();
ev_timer_start(handler_->get_loop(), &shutdown_timer_);
}
nghttp2_session_callbacks *create_http2_upstream_callbacks() {
@ -846,23 +855,33 @@ nghttp2_session_callbacks *create_http2_upstream_callbacks() {
return callbacks;
}
namespace {
size_t downstream_queue_size(Worker *worker) {
auto &downstreamconf = *worker->get_downstream_config();
if (get_config()->http2_proxy) {
return downstreamconf.connections_per_host;
}
return downstreamconf.connections_per_frontend;
}
} // namespace
Http2Upstream::Http2Upstream(ClientHandler *handler)
: wb_(handler->get_worker()->get_mcpool()),
downstream_queue_(
get_config()->http2_proxy
? get_config()->conn.downstream.connections_per_host
: get_config()->conn.downstream.connections_per_frontend,
!get_config()->http2_proxy),
downstream_queue_(downstream_queue_size(handler->get_worker()),
!get_config()->http2_proxy),
handler_(handler),
session_(nullptr),
shutdown_handled_(false) {
session_(nullptr) {
int rv;
auto &http2conf = get_config()->http2;
rv = nghttp2_session_server_new2(&session_, http2conf.upstream.callbacks,
this, http2conf.upstream.option);
auto faddr = handler_->get_upstream_addr();
rv = nghttp2_session_server_new2(
&session_, http2conf.upstream.callbacks, this,
faddr->api ? http2conf.upstream.api_option : http2conf.upstream.option);
assert(rv == 0);
@ -874,7 +893,11 @@ Http2Upstream::Http2Upstream(ClientHandler *handler)
entry[0].value = http2conf.upstream.max_concurrent_streams;
entry[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
entry[1].value = (1 << http2conf.upstream.window_bits) - 1;
if (faddr->api) {
entry[1].value = (1u << 31) - 1;
} else {
entry[1].value = (1 << http2conf.upstream.window_bits) - 1;
}
rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, entry.data(),
entry.size());
@ -883,8 +906,11 @@ Http2Upstream::Http2Upstream(ClientHandler *handler)
<< nghttp2_strerror(rv);
}
if (http2conf.upstream.connection_window_bits != 16) {
int32_t window_size = (1 << http2conf.upstream.connection_window_bits) - 1;
int32_t window_bits =
faddr->api ? 31 : http2conf.upstream.connection_window_bits;
if (window_bits != 16) {
int32_t window_size = (1u << window_bits) - 1;
rv = nghttp2_session_set_local_window_size(session_, NGHTTP2_FLAG_NONE, 0,
window_size);
@ -1675,6 +1701,12 @@ int Http2Upstream::on_downstream_abort_request(Downstream *downstream,
int Http2Upstream::consume(int32_t stream_id, size_t len) {
int rv;
auto faddr = handler_->get_upstream_addr();
if (faddr->api) {
return 0;
}
rv = nghttp2_session_consume(session_, stream_id, len);
if (rv != 0) {

View File

@ -132,7 +132,6 @@ private:
ClientHandler *handler_;
nghttp2_session *session_;
bool flow_control_;
bool shutdown_handled_;
};
nghttp2_session_callbacks *create_http2_upstream_callbacks();

View File

@ -147,12 +147,12 @@ void connectcb(struct ev_loop *loop, ev_io *w, int revents) {
}
} // namespace
HttpDownstreamConnection::HttpDownstreamConnection(DownstreamAddrGroup *group,
struct ev_loop *loop,
Worker *worker)
HttpDownstreamConnection::HttpDownstreamConnection(
const std::shared_ptr<DownstreamAddrGroup> &group, struct ev_loop *loop,
Worker *worker)
: conn_(loop, -1, nullptr, worker->get_mcpool(),
get_config()->conn.downstream.timeout.write,
get_config()->conn.downstream.timeout.read, {}, {}, connectcb,
worker->get_downstream_config()->timeout.write,
worker->get_downstream_config()->timeout.read, {}, {}, connectcb,
readcb, connect_timeoutcb, this,
get_config()->tls.dyn_rec.warmup_threshold,
get_config()->tls.dyn_rec.idle_timeout, PROTO_HTTP1),
@ -166,7 +166,11 @@ HttpDownstreamConnection::HttpDownstreamConnection(DownstreamAddrGroup *group,
ioctrl_(&conn_.rlimit),
response_htp_{0} {}
HttpDownstreamConnection::~HttpDownstreamConnection() {}
HttpDownstreamConnection::~HttpDownstreamConnection() {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "Deleted";
}
}
int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
if (LOG_ENABLED(INFO)) {
@ -182,7 +186,7 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
return SHRPX_ERR_NETWORK;
}
auto &downstreamconf = get_config()->conn.downstream;
auto &downstreamconf = *worker_->get_downstream_config();
if (conn_.fd == -1) {
auto &shared_addr = group_->shared_addr;
@ -588,7 +592,9 @@ void HttpDownstreamConnection::detach_downstream(Downstream *downstream) {
ev_set_cb(&conn_.rev, idle_readcb);
ioctrl_.force_resume_read();
conn_.rt.repeat = get_config()->conn.downstream.timeout.idle_read;
auto &downstreamconf = *worker_->get_downstream_config();
conn_.rt.repeat = downstreamconf.timeout.idle_read;
ev_set_cb(&conn_.rt, idle_timeoutcb);
ev_timer_again(conn_.loop, &conn_.rt);
@ -602,8 +608,10 @@ void HttpDownstreamConnection::pause_read(IOCtrlReason reason) {
int HttpDownstreamConnection::resume_read(IOCtrlReason reason,
size_t consumed) {
auto &downstreamconf = *worker_->get_downstream_config();
if (downstream_->get_response_buf()->rleft() <=
get_config()->conn.downstream.request_buffer_size / 2) {
downstreamconf.request_buffer_size / 2) {
ioctrl_.resume_read(reason);
}
@ -1164,9 +1172,11 @@ int HttpDownstreamConnection::noop() { return 0; }
DownstreamAddrGroup *
HttpDownstreamConnection::get_downstream_addr_group() const {
return group_;
return group_.get();
}
DownstreamAddr *HttpDownstreamConnection::get_addr() const { return addr_; }
bool HttpDownstreamConnection::poolable() const { return !group_->retired; }
} // namespace shrpx

View File

@ -42,8 +42,8 @@ struct DownstreamAddr;
class HttpDownstreamConnection : public DownstreamConnection {
public:
HttpDownstreamConnection(DownstreamAddrGroup *group, struct ev_loop *loop,
Worker *worker);
HttpDownstreamConnection(const std::shared_ptr<DownstreamAddrGroup> &group,
struct ev_loop *loop, Worker *worker);
virtual ~HttpDownstreamConnection();
virtual int attach_downstream(Downstream *downstream);
virtual void detach_downstream(Downstream *downstream);
@ -61,7 +61,7 @@ public:
virtual void on_upstream_change(Upstream *upstream);
virtual bool poolable() const { return true; }
virtual bool poolable() const;
virtual DownstreamAddrGroup *get_downstream_addr_group() const;
@ -88,7 +88,7 @@ private:
Worker *worker_;
// nullptr if TLS is not used.
SSL_CTX *ssl_ctx_;
DownstreamAddrGroup *group_;
const std::shared_ptr<DownstreamAddrGroup> &group_;
// Address of remote endpoint
DownstreamAddr *addr_;
IOControl ioctrl_;

View File

@ -411,6 +411,23 @@ int htp_hdrs_completecb(http_parser *htp) {
return -1;
}
auto faddr = handler->get_upstream_addr();
if (faddr->api) {
// Normally, we forward expect: 100-continue to backend server,
// and let them decide whether responds with 100 Continue or not.
// For API endpoint, we have no backend, so just send 100 Continue
// here to make the client happy.
auto expect = req.fs.header(http2::HD_EXPECT);
if (expect &&
util::strieq(expect->value, StringRef::from_lit("100-continue"))) {
auto output = downstream->get_response_buf();
constexpr auto res = StringRef::from_lit("HTTP/1.1 100 Continue\r\n\r\n");
output->append(res);
handler->signal_write();
}
}
return 0;
}
} // namespace
@ -450,7 +467,7 @@ int htp_msg_completecb(http_parser *htp) {
// reason why end_upload_data() failed is when we sent response
// in request phase hook. We only delete and proceed to the
// next request handling (if we don't close the connection). We
// first pause parser here jsut as we normally do, and call
// first pause parser here just as we normally do, and call
// signal_write() to run on_write().
http_parser_pause(htp, 1);

View File

@ -98,9 +98,9 @@ void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
DownstreamAddr *addr, std::mt19937 &gen)
: conn_(loop, -1, nullptr, worker->get_mcpool(),
get_config()->conn.downstream.timeout.write,
get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb,
timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
worker->get_downstream_config()->timeout.write,
worker->get_downstream_config()->timeout.read, {}, {}, writecb,
readcb, timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
get_config()->tls.dyn_rec.idle_timeout, PROTO_NONE),
wb_(worker->get_mcpool()),
gen_(gen),

View File

@ -35,7 +35,9 @@ RNode::RNode() : s(nullptr), len(0), index(-1) {}
RNode::RNode(const char *s, size_t len, size_t index)
: s(s), len(len), index(index) {}
Router::Router() : root_{} {}
Router::Router() : balloc_(1024, 1024), root_{} {}
Router::~Router() {}
namespace {
RNode *find_next_node(const RNode *node, char c) {
@ -62,7 +64,8 @@ void add_next_node(RNode *node, std::unique_ptr<RNode> new_node) {
void Router::add_node(RNode *node, const char *pattern, size_t patlen,
size_t index) {
auto new_node = make_unique<RNode>(pattern, patlen, index);
auto pat = make_string_ref(balloc_, StringRef{pattern, patlen});
auto new_node = make_unique<RNode>(pat.c_str(), pat.size(), index);
add_next_node(node, std::move(new_node));
}

View File

@ -30,6 +30,8 @@
#include <vector>
#include <memory>
#include "allocator.h"
namespace shrpx {
struct RNode {
@ -55,6 +57,12 @@ struct RNode {
class Router {
public:
Router();
~Router();
Router(Router &&) = default;
Router(const Router &) = delete;
Router &operator=(Router &&) = default;
Router &operator=(const Router &) = delete;
// Adds route |pattern| with its |index|.
bool add_route(const StringRef &pattern, size_t index);
// Returns the matched index of pattern. -1 if there is no match.
@ -65,6 +73,7 @@ public:
void dump() const;
private:
BlockAllocator balloc_;
// The root node of Patricia tree. This is special node and its s
// field is nulptr, and len field is 0.
RNode root_;

View File

@ -512,13 +512,22 @@ uint32_t infer_upstream_rst_stream_status_code(uint32_t downstream_error_code) {
}
} // namespace
namespace {
size_t downstream_queue_size(Worker *worker) {
auto &downstreamconf = *worker->get_downstream_config();
if (get_config()->http2_proxy) {
return downstreamconf.connections_per_host;
}
return downstreamconf.connections_per_frontend;
}
} // namespace
SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler)
: wb_(handler->get_worker()->get_mcpool()),
downstream_queue_(
get_config()->http2_proxy
? get_config()->conn.downstream.connections_per_host
: get_config()->conn.downstream.connections_per_frontend,
!get_config()->http2_proxy),
downstream_queue_(downstream_queue_size(handler->get_worker()),
!get_config()->http2_proxy),
handler_(handler),
session_(nullptr) {
spdylay_session_callbacks callbacks{};

View File

@ -1390,26 +1390,11 @@ SSL_CTX *setup_server_ssl_context(std::vector<SSL_CTX *> &all_ssl_ctx,
return ssl_ctx;
}
bool downstream_tls_enabled() {
const auto &groups = get_config()->conn.downstream.addr_groups;
return std::any_of(std::begin(groups), std::end(groups),
[](const DownstreamAddrGroupConfig &g) {
return std::any_of(
std::begin(g.addrs), std::end(g.addrs),
[](const DownstreamAddrConfig &a) { return a.tls; });
});
}
SSL_CTX *setup_downstream_client_ssl_context(
#ifdef HAVE_NEVERBLEED
neverbleed_t *nb
#endif // HAVE_NEVERBLEED
) {
if (!downstream_tls_enabled()) {
return nullptr;
}
auto &tlsconf = get_config()->tls;
return ssl::create_ssl_client_context(

View File

@ -201,9 +201,7 @@ SSL_CTX *setup_server_ssl_context(std::vector<SSL_CTX *> &all_ssl_ctx,
#endif // HAVE_NEVERBLEED
);
// Setups client side SSL_CTX. This function inspects get_config()
// and if TLS is disabled in all downstreams, returns nullptr.
// Otherwise, only construct SSL_CTX.
// Setups client side SSL_CTX.
SSL_CTX *setup_downstream_client_ssl_context(
#ifdef HAVE_NEVERBLEED
neverbleed_t *nb
@ -224,9 +222,6 @@ SSL *create_ssl(SSL_CTX *ssl_ctx);
// Returns true if SSL/TLS is enabled on upstream
bool upstream_tls_enabled();
// Returns true if SSL/TLS is enabled on downstream
bool downstream_tls_enabled();
// Performs TLS hostname match. |pattern| can contain wildcard
// character '*', which matches prefix of target hostname. There are
// several restrictions to make wildcard work. The matching algorithm

View File

@ -106,15 +106,17 @@ std::random_device rd;
Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
SSL_CTX *tls_session_cache_memcached_ssl_ctx,
ssl::CertLookupTree *cert_tree,
const std::shared_ptr<TicketKeys> &ticket_keys)
const std::shared_ptr<TicketKeys> &ticket_keys,
ConnectionHandler *conn_handler,
std::shared_ptr<DownstreamConfig> downstreamconf)
: randgen_(rd()),
worker_stat_{},
loop_(loop),
sv_ssl_ctx_(sv_ssl_ctx),
cl_ssl_ctx_(cl_ssl_ctx),
cert_tree_(cert_tree),
conn_handler_(conn_handler),
ticket_keys_(ticket_keys),
downstream_addr_groups_(get_config()->conn.downstream.addr_groups.size()),
connect_blocker_(
make_unique<ConnectBlocker>(randgen_, loop_, []() {}, []() {})),
graceful_shutdown_(false) {
@ -134,22 +136,33 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
StringRef{session_cacheconf.memcached.host}, &mcpool_);
}
auto &downstreamconf = get_config()->conn.downstream;
replace_downstream_config(std::move(downstreamconf));
}
for (size_t i = 0; i < downstreamconf.addr_groups.size(); ++i) {
auto &src = downstreamconf.addr_groups[i];
void Worker::replace_downstream_config(
std::shared_ptr<DownstreamConfig> downstreamconf) {
for (auto &g : downstream_addr_groups_) {
g->retired = true;
g->shared_addr->dconn_pool.remove_all();
}
downstreamconf_ = downstreamconf;
downstream_addr_groups_ = std::vector<std::shared_ptr<DownstreamAddrGroup>>(
downstreamconf->addr_groups.size());
for (size_t i = 0; i < downstreamconf->addr_groups.size(); ++i) {
auto &src = downstreamconf->addr_groups[i];
auto &dst = downstream_addr_groups_[i];
dst.pattern = src.pattern;
auto shared_addr = std::make_shared<SharedDownstreamAddr>();
dst = std::make_shared<DownstreamAddrGroup>();
dst->pattern = src.pattern;
// TODO for some reason, clang-3.6 which comes with Ubuntu 15.10
// does not value initialize SharedDownstreamAddr above.
shared_addr->next = 0;
// does not value initialize with std::make_shared.
auto shared_addr = std::make_shared<SharedDownstreamAddr>();
shared_addr->addrs.resize(src.addrs.size());
shared_addr->http1_pri = {};
shared_addr->http2_pri = {};
size_t num_http1 = 0;
size_t num_http2 = 0;
@ -210,11 +223,11 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
// share the connection if patterns have the same set of backend
// addresses.
auto end = std::begin(downstream_addr_groups_) + i;
auto it = std::find_if(std::begin(downstream_addr_groups_), end,
[&shared_addr](const DownstreamAddrGroup &group) {
return match_shared_downstream_addr(
group.shared_addr, shared_addr);
});
auto it = std::find_if(
std::begin(downstream_addr_groups_), end,
[&shared_addr](const std::shared_ptr<DownstreamAddrGroup> &group) {
return match_shared_downstream_addr(group->shared_addr, shared_addr);
});
if (it == end) {
if (LOG_ENABLED(INFO)) {
@ -225,13 +238,13 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
shared_addr->http1_pri.weight = num_http1;
shared_addr->http2_pri.weight = num_http2;
dst.shared_addr = shared_addr;
dst->shared_addr = shared_addr;
} else {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << dst.pattern << " shares the same backend group with "
<< (*it).pattern;
LOG(INFO) << dst->pattern << " shares the same backend group with "
<< (*it)->pattern;
}
dst.shared_addr = (*it).shared_addr;
dst->shared_addr = (*it)->shared_addr;
}
}
}
@ -337,6 +350,12 @@ void Worker::process_events() {
return;
}
break;
case REPLACE_DOWNSTREAM:
WLOG(NOTICE, this) << "Replace downstream";
replace_downstream_config(wev.downstreamconf);
break;
default:
if (LOG_ENABLED(INFO)) {
@ -395,7 +414,8 @@ mruby::MRubyContext *Worker::get_mruby_context() const {
}
#endif // HAVE_MRUBY
std::vector<DownstreamAddrGroup> &Worker::get_downstream_addr_groups() {
std::vector<std::shared_ptr<DownstreamAddrGroup>> &
Worker::get_downstream_addr_groups() {
return downstream_addr_groups_;
}
@ -403,17 +423,26 @@ ConnectBlocker *Worker::get_connect_blocker() const {
return connect_blocker_.get();
}
const DownstreamConfig *Worker::get_downstream_config() const {
return downstreamconf_.get();
}
ConnectionHandler *Worker::get_connection_handler() const {
return conn_handler_;
}
namespace {
size_t match_downstream_addr_group_host(
const Router &router, const std::vector<WildcardPattern> &wildcard_patterns,
const StringRef &host, const StringRef &path,
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all) {
const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
size_t catch_all) {
if (path.empty() || path[0] != '/') {
auto group = router.match(host, StringRef::from_lit("/"));
if (group != -1) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found pattern with query " << host
<< ", matched pattern=" << groups[group].pattern;
<< ", matched pattern=" << groups[group]->pattern;
}
return group;
}
@ -429,7 +458,7 @@ size_t match_downstream_addr_group_host(
if (group != -1) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found pattern with query " << host << path
<< ", matched pattern=" << groups[group].pattern;
<< ", matched pattern=" << groups[group]->pattern;
}
return group;
}
@ -448,7 +477,7 @@ size_t match_downstream_addr_group_host(
// longest host pattern.
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found wildcard pattern with query " << host << path
<< ", matched pattern=" << groups[group].pattern;
<< ", matched pattern=" << groups[group]->pattern;
}
return group;
}
@ -458,7 +487,7 @@ size_t match_downstream_addr_group_host(
if (group != -1) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Found pattern with query " << path
<< ", matched pattern=" << groups[group].pattern;
<< ", matched pattern=" << groups[group]->pattern;
}
return group;
}
@ -473,7 +502,8 @@ size_t match_downstream_addr_group_host(
size_t match_downstream_addr_group(
const Router &router, const std::vector<WildcardPattern> &wildcard_patterns,
const StringRef &hostport, const StringRef &raw_path,
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all) {
const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
size_t catch_all) {
if (std::find(std::begin(hostport), std::end(hostport), '/') !=
std::end(hostport)) {
// We use '/' specially, and if '/' is included in host, it breaks

View File

@ -56,6 +56,7 @@ class ConnectBlocker;
class LiveCheck;
class MemcachedDispatcher;
struct UpstreamAddr;
class ConnectionHandler;
#ifdef HAVE_MRUBY
namespace mruby {
@ -143,6 +144,10 @@ struct SharedDownstreamAddr {
struct DownstreamAddrGroup {
ImmutableString pattern;
std::shared_ptr<SharedDownstreamAddr> shared_addr;
// true if this group is no longer used for new request. If this is
// true, the connection made using one of address in shared_addr
// must not be pooled.
bool retired;
};
struct WorkerStat {
@ -153,6 +158,7 @@ enum WorkerEventType {
NEW_CONNECTION = 0x01,
REOPEN_LOG = 0x02,
GRACEFUL_SHUTDOWN = 0x03,
REPLACE_DOWNSTREAM = 0x04,
};
struct WorkerEvent {
@ -164,6 +170,7 @@ struct WorkerEvent {
const UpstreamAddr *faddr;
};
std::shared_ptr<TicketKeys> ticket_keys;
std::shared_ptr<DownstreamConfig> downstreamconf;
};
class Worker {
@ -171,7 +178,9 @@ public:
Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
SSL_CTX *tls_session_cache_memcached_ssl_ctx,
ssl::CertLookupTree *cert_tree,
const std::shared_ptr<TicketKeys> &ticket_keys);
const std::shared_ptr<TicketKeys> &ticket_keys,
ConnectionHandler *conn_handler,
std::shared_ptr<DownstreamConfig> downstreamconf);
~Worker();
void run_async();
void wait();
@ -206,10 +215,18 @@ public:
mruby::MRubyContext *get_mruby_context() const;
#endif // HAVE_MRUBY
std::vector<DownstreamAddrGroup> &get_downstream_addr_groups();
std::vector<std::shared_ptr<DownstreamAddrGroup>> &
get_downstream_addr_groups();
ConnectBlocker *get_connect_blocker() const;
const DownstreamConfig *get_downstream_config() const;
void
replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf);
ConnectionHandler *get_connection_handler() const;
private:
#ifndef NOTHREADS
std::future<void> fut_;
@ -222,6 +239,7 @@ private:
MemchunkPool mcpool_;
WorkerStat worker_stat_;
std::shared_ptr<DownstreamConfig> downstreamconf_;
std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_;
#ifdef HAVE_MRUBY
std::unique_ptr<mruby::MRubyContext> mruby_ctx_;
@ -233,9 +251,10 @@ private:
SSL_CTX *sv_ssl_ctx_;
SSL_CTX *cl_ssl_ctx_;
ssl::CertLookupTree *cert_tree_;
ConnectionHandler *conn_handler_;
std::shared_ptr<TicketKeys> ticket_keys_;
std::vector<DownstreamAddrGroup> downstream_addr_groups_;
std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_;
// Worker level blocker for downstream connection. For example,
// this is used when file decriptor is exhausted.
std::unique_ptr<ConnectBlocker> connect_blocker_;
@ -252,7 +271,8 @@ private:
size_t match_downstream_addr_group(
const Router &router, const std::vector<WildcardPattern> &wildcard_patterns,
const StringRef &hostport, const StringRef &path,
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all);
const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
size_t catch_all);
void downstream_failure(DownstreamAddr *addr);

View File

@ -394,7 +394,7 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) {
}
#ifdef HAVE_NEVERBLEED
if (ssl::upstream_tls_enabled() || ssl::downstream_tls_enabled()) {
{
std::array<char, NEVERBLEED_ERRBUF_SIZE> errbuf;
auto nb = make_unique<neverbleed_t>();
if (neverbleed_init(nb.get(), errbuf.data()) != 0) {

View File

@ -38,20 +38,22 @@
namespace shrpx {
void test_shrpx_worker_match_downstream_addr_group(void) {
auto groups = std::vector<DownstreamAddrGroup>();
auto groups = std::vector<std::shared_ptr<DownstreamAddrGroup>>();
for (auto &s : {"nghttp2.org/", "nghttp2.org/alpha/bravo/",
"nghttp2.org/alpha/charlie", "nghttp2.org/delta%3A",
"www.nghttp2.org/", "[::1]/", "nghttp2.org/alpha/bravo/delta",
// Check that match is done in the single node
"example.com/alpha/bravo", "192.168.0.1/alpha/", "/golf/"}) {
groups.push_back(DownstreamAddrGroup{ImmutableString(s)});
auto g = std::make_shared<DownstreamAddrGroup>();
g->pattern = ImmutableString(s);
groups.push_back(std::move(g));
}
Router router;
for (size_t i = 0; i < groups.size(); ++i) {
auto &g = groups[i];
router.add_route(StringRef{g.pattern}, i);
router.add_route(StringRef{g->pattern}, i);
}
std::vector<WildcardPattern> wp;
@ -176,15 +178,18 @@ void test_shrpx_worker_match_downstream_addr_group(void) {
StringRef::from_lit("/"), groups, 255));
// Test for wildcard hosts
groups.push_back(
DownstreamAddrGroup{ImmutableString::from_lit("git.nghttp2.org")});
groups.push_back(
DownstreamAddrGroup{ImmutableString::from_lit(".nghttp2.org")});
auto g1 = std::make_shared<DownstreamAddrGroup>();
g1->pattern = ImmutableString::from_lit("git.nghttp2.org");
groups.push_back(std::move(g1));
wp.push_back({ImmutableString("git.nghttp2.org")});
auto g2 = std::make_shared<DownstreamAddrGroup>();
g2->pattern = ImmutableString::from_lit(".nghttp2.org");
groups.push_back(std::move(g2));
wp.emplace_back(StringRef::from_lit("git.nghttp2.org"));
wp.back().router.add_route(StringRef::from_lit("/echo/"), 10);
wp.push_back({ImmutableString(".nghttp2.org")});
wp.emplace_back(StringRef::from_lit(".nghttp2.org"));
wp.back().router.add_route(StringRef::from_lit("/echo/"), 11);
wp.back().router.add_route(StringRef::from_lit("/echo/foxtrot"), 12);

View File

@ -502,6 +502,10 @@ inline bool operator==(const char *lhs, const StringRef &rhs) {
return rhs == lhs;
}
inline bool operator!=(const StringRef &lhs, const StringRef &rhs) {
return !(lhs == rhs);
}
inline bool operator!=(const StringRef &lhs, const std::string &rhs) {
return !(lhs == rhs);
}