Merge branch 'nghttpx-downstream-live-check'
This commit is contained in:
commit
287d4e35f3
|
@ -129,6 +129,8 @@ OPTIONS = [
|
||||||
"backend-tls",
|
"backend-tls",
|
||||||
"backend-connections-per-host",
|
"backend-connections-per-host",
|
||||||
"error-page",
|
"error-page",
|
||||||
|
"backend-fall",
|
||||||
|
"backend-rise",
|
||||||
]
|
]
|
||||||
|
|
||||||
LOGVARS = [
|
LOGVARS = [
|
||||||
|
|
|
@ -120,6 +120,7 @@ NGHTTPX_SRCS = \
|
||||||
shrpx_worker.cc shrpx_worker.h \
|
shrpx_worker.cc shrpx_worker.h \
|
||||||
shrpx_log_config.cc shrpx_log_config.h \
|
shrpx_log_config.cc shrpx_log_config.h \
|
||||||
shrpx_connect_blocker.cc shrpx_connect_blocker.h \
|
shrpx_connect_blocker.cc shrpx_connect_blocker.h \
|
||||||
|
shrpx_live_check.cc shrpx_live_check.h \
|
||||||
shrpx_downstream_connection_pool.cc shrpx_downstream_connection_pool.h \
|
shrpx_downstream_connection_pool.cc shrpx_downstream_connection_pool.h \
|
||||||
shrpx_rate_limit.cc shrpx_rate_limit.h \
|
shrpx_rate_limit.cc shrpx_rate_limit.h \
|
||||||
shrpx_connection.cc shrpx_connection.h \
|
shrpx_connection.cc shrpx_connection.h \
|
||||||
|
|
28
src/shrpx.cc
28
src/shrpx.cc
|
@ -1314,6 +1314,24 @@ Connections:
|
||||||
--backend-write-timeout options.
|
--backend-write-timeout options.
|
||||||
--accept-proxy-protocol
|
--accept-proxy-protocol
|
||||||
Accept PROXY protocol version 1 on frontend connection.
|
Accept PROXY protocol version 1 on frontend connection.
|
||||||
|
--backend-fall=<N>
|
||||||
|
If nghttpx cannot connect to a specific backend <N>
|
||||||
|
times in a row, that backend is assumed to be offline,
|
||||||
|
and it is excluded from load balancing. See also
|
||||||
|
--backend-rise option. If <N> is 0, a backend never be
|
||||||
|
excluded from load balancing whatever times nghttpx
|
||||||
|
cannot connect to it.
|
||||||
|
Default: )" << get_config()->conn.downstream.fall << R"(
|
||||||
|
--backend-rise=<N>
|
||||||
|
As described in --backend-fall, a backend is excluded
|
||||||
|
from load balancing if nghttpx assumes that it is
|
||||||
|
offline. Then nghttpx periodically attempts to make a
|
||||||
|
connection to the failed backend, and if the connection
|
||||||
|
is made successfully <N> times in a row, the backend is
|
||||||
|
assumed to be online, and it is now eligible for load
|
||||||
|
balancing target. If <N> is 0, a backend is permanently
|
||||||
|
offline, once it goes in that state.
|
||||||
|
Default: )" << get_config()->conn.downstream.rise << R"(
|
||||||
|
|
||||||
Performance:
|
Performance:
|
||||||
-n, --workers=<N>
|
-n, --workers=<N>
|
||||||
|
@ -2519,6 +2537,8 @@ int main(int argc, char **argv) {
|
||||||
{SHRPX_OPT_BACKEND_CONNECTIONS_PER_HOST.c_str(), required_argument,
|
{SHRPX_OPT_BACKEND_CONNECTIONS_PER_HOST.c_str(), required_argument,
|
||||||
&flag, 121},
|
&flag, 121},
|
||||||
{SHRPX_OPT_ERROR_PAGE.c_str(), required_argument, &flag, 122},
|
{SHRPX_OPT_ERROR_PAGE.c_str(), required_argument, &flag, 122},
|
||||||
|
{SHRPX_OPT_BACKEND_FALL.c_str(), required_argument, &flag, 123},
|
||||||
|
{SHRPX_OPT_BACKEND_RISE.c_str(), required_argument, &flag, 124},
|
||||||
{nullptr, 0, nullptr, 0}};
|
{nullptr, 0, nullptr, 0}};
|
||||||
|
|
||||||
int option_index = 0;
|
int option_index = 0;
|
||||||
|
@ -3094,6 +3114,14 @@ int main(int argc, char **argv) {
|
||||||
// --error-page
|
// --error-page
|
||||||
cmdcfgs.emplace_back(SHRPX_OPT_ERROR_PAGE, StringRef{optarg});
|
cmdcfgs.emplace_back(SHRPX_OPT_ERROR_PAGE, StringRef{optarg});
|
||||||
break;
|
break;
|
||||||
|
case 123:
|
||||||
|
// --backend-fall
|
||||||
|
cmdcfgs.emplace_back(SHRPX_OPT_BACKEND_FALL, StringRef{optarg});
|
||||||
|
break;
|
||||||
|
case 124:
|
||||||
|
// --backend-rise
|
||||||
|
cmdcfgs.emplace_back(SHRPX_OPT_BACKEND_RISE, StringRef{optarg});
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -893,6 +893,7 @@ enum {
|
||||||
SHRPX_OPTID_BACKEND_ADDRESS_FAMILY,
|
SHRPX_OPTID_BACKEND_ADDRESS_FAMILY,
|
||||||
SHRPX_OPTID_BACKEND_CONNECTIONS_PER_FRONTEND,
|
SHRPX_OPTID_BACKEND_CONNECTIONS_PER_FRONTEND,
|
||||||
SHRPX_OPTID_BACKEND_CONNECTIONS_PER_HOST,
|
SHRPX_OPTID_BACKEND_CONNECTIONS_PER_HOST,
|
||||||
|
SHRPX_OPTID_BACKEND_FALL,
|
||||||
SHRPX_OPTID_BACKEND_HTTP_PROXY_URI,
|
SHRPX_OPTID_BACKEND_HTTP_PROXY_URI,
|
||||||
SHRPX_OPTID_BACKEND_HTTP1_CONNECTIONS_PER_FRONTEND,
|
SHRPX_OPTID_BACKEND_HTTP1_CONNECTIONS_PER_FRONTEND,
|
||||||
SHRPX_OPTID_BACKEND_HTTP1_CONNECTIONS_PER_HOST,
|
SHRPX_OPTID_BACKEND_HTTP1_CONNECTIONS_PER_HOST,
|
||||||
|
@ -908,6 +909,7 @@ enum {
|
||||||
SHRPX_OPTID_BACKEND_READ_TIMEOUT,
|
SHRPX_OPTID_BACKEND_READ_TIMEOUT,
|
||||||
SHRPX_OPTID_BACKEND_REQUEST_BUFFER,
|
SHRPX_OPTID_BACKEND_REQUEST_BUFFER,
|
||||||
SHRPX_OPTID_BACKEND_RESPONSE_BUFFER,
|
SHRPX_OPTID_BACKEND_RESPONSE_BUFFER,
|
||||||
|
SHRPX_OPTID_BACKEND_RISE,
|
||||||
SHRPX_OPTID_BACKEND_TLS,
|
SHRPX_OPTID_BACKEND_TLS,
|
||||||
SHRPX_OPTID_BACKEND_TLS_SNI_FIELD,
|
SHRPX_OPTID_BACKEND_TLS_SNI_FIELD,
|
||||||
SHRPX_OPTID_BACKEND_WRITE_TIMEOUT,
|
SHRPX_OPTID_BACKEND_WRITE_TIMEOUT,
|
||||||
|
@ -1187,6 +1189,9 @@ int option_lookup_token(const char *name, size_t namelen) {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case 'e':
|
case 'e':
|
||||||
|
if (util::strieq_l("backend-ris", name, 11)) {
|
||||||
|
return SHRPX_OPTID_BACKEND_RISE;
|
||||||
|
}
|
||||||
if (util::strieq_l("host-rewrit", name, 11)) {
|
if (util::strieq_l("host-rewrit", name, 11)) {
|
||||||
return SHRPX_OPTID_HOST_REWRITE;
|
return SHRPX_OPTID_HOST_REWRITE;
|
||||||
}
|
}
|
||||||
|
@ -1194,6 +1199,11 @@ int option_lookup_token(const char *name, size_t namelen) {
|
||||||
return SHRPX_OPTID_HTTP2_BRIDGE;
|
return SHRPX_OPTID_HTTP2_BRIDGE;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case 'l':
|
||||||
|
if (util::strieq_l("backend-fal", name, 11)) {
|
||||||
|
return SHRPX_OPTID_BACKEND_FALL;
|
||||||
|
}
|
||||||
|
break;
|
||||||
case 'y':
|
case 'y':
|
||||||
if (util::strieq_l("client-prox", name, 11)) {
|
if (util::strieq_l("client-prox", name, 11)) {
|
||||||
return SHRPX_OPTID_CLIENT_PROXY;
|
return SHRPX_OPTID_CLIENT_PROXY;
|
||||||
|
@ -2672,6 +2682,10 @@ int parse_config(const StringRef &opt, const StringRef &optarg,
|
||||||
opt, optarg);
|
opt, optarg);
|
||||||
case SHRPX_OPTID_ERROR_PAGE:
|
case SHRPX_OPTID_ERROR_PAGE:
|
||||||
return parse_error_page(mod_config()->http.error_pages, opt, optarg);
|
return parse_error_page(mod_config()->http.error_pages, opt, optarg);
|
||||||
|
case SHRPX_OPTID_BACKEND_FALL:
|
||||||
|
return parse_uint(&mod_config()->conn.downstream.fall, opt, optarg);
|
||||||
|
case SHRPX_OPTID_BACKEND_RISE:
|
||||||
|
return parse_uint(&mod_config()->conn.downstream.rise, opt, optarg);
|
||||||
case SHRPX_OPTID_CONF:
|
case SHRPX_OPTID_CONF:
|
||||||
LOG(WARN) << "conf: ignored";
|
LOG(WARN) << "conf: ignored";
|
||||||
|
|
||||||
|
|
|
@ -275,6 +275,8 @@ constexpr auto SHRPX_OPT_BACKEND_TLS = StringRef::from_lit("backend-tls");
|
||||||
constexpr auto SHRPX_OPT_BACKEND_CONNECTIONS_PER_HOST =
|
constexpr auto SHRPX_OPT_BACKEND_CONNECTIONS_PER_HOST =
|
||||||
StringRef::from_lit("backend-connections-per-host");
|
StringRef::from_lit("backend-connections-per-host");
|
||||||
constexpr auto SHRPX_OPT_ERROR_PAGE = StringRef::from_lit("error-page");
|
constexpr auto SHRPX_OPT_ERROR_PAGE = StringRef::from_lit("error-page");
|
||||||
|
constexpr auto SHRPX_OPT_BACKEND_FALL = StringRef::from_lit("backend-fall");
|
||||||
|
constexpr auto SHRPX_OPT_BACKEND_RISE = StringRef::from_lit("backend-rise");
|
||||||
|
|
||||||
constexpr size_t SHRPX_OBFUSCATED_NODE_LENGTH = 8;
|
constexpr size_t SHRPX_OBFUSCATED_NODE_LENGTH = 8;
|
||||||
|
|
||||||
|
@ -615,6 +617,8 @@ struct ConnectionConfig {
|
||||||
size_t connections_per_frontend;
|
size_t connections_per_frontend;
|
||||||
size_t request_buffer_size;
|
size_t request_buffer_size;
|
||||||
size_t response_buffer_size;
|
size_t response_buffer_size;
|
||||||
|
size_t fall;
|
||||||
|
size_t rise;
|
||||||
// Address family of backend connection. One of either AF_INET,
|
// Address family of backend connection. One of either AF_INET,
|
||||||
// AF_INET6 or AF_UNSPEC. This is ignored if backend connection
|
// AF_INET6 or AF_UNSPEC. This is ignored if backend connection
|
||||||
// is made via Unix domain socket.
|
// is made via Unix domain socket.
|
||||||
|
|
|
@ -43,7 +43,13 @@ ConnectBlocker::~ConnectBlocker() { ev_timer_stop(loop_, &timer_); }
|
||||||
|
|
||||||
bool ConnectBlocker::blocked() const { return ev_is_active(&timer_); }
|
bool ConnectBlocker::blocked() const { return ev_is_active(&timer_); }
|
||||||
|
|
||||||
void ConnectBlocker::on_success() { fail_count_ = 0; }
|
void ConnectBlocker::on_success() {
|
||||||
|
if (ev_is_active(&timer_)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
fail_count_ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
// Use the similar backoff algorithm described in
|
// Use the similar backoff algorithm described in
|
||||||
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
|
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
|
||||||
|
@ -72,4 +78,18 @@ void ConnectBlocker::on_failure() {
|
||||||
ev_timer_start(loop_, &timer_);
|
ev_timer_start(loop_, &timer_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t ConnectBlocker::get_fail_count() const { return fail_count_; }
|
||||||
|
|
||||||
|
void ConnectBlocker::offline() {
|
||||||
|
ev_timer_stop(loop_, &timer_);
|
||||||
|
ev_timer_set(&timer_, std::numeric_limits<double>::max(), 0.);
|
||||||
|
ev_timer_start(loop_, &timer_);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConnectBlocker::online() {
|
||||||
|
ev_timer_stop(loop_, &timer_);
|
||||||
|
|
||||||
|
fail_count_ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
|
@ -48,6 +48,15 @@ public:
|
||||||
// backoff.
|
// backoff.
|
||||||
void on_failure();
|
void on_failure();
|
||||||
|
|
||||||
|
size_t get_fail_count() const;
|
||||||
|
|
||||||
|
// Peer is now considered offline. This effectively means that the
|
||||||
|
// connection is blocked until online() is called.
|
||||||
|
void offline();
|
||||||
|
|
||||||
|
// Peer is now considered online
|
||||||
|
void online();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::mt19937 gen_;
|
std::mt19937 gen_;
|
||||||
ev_timer timer_;
|
ev_timer timer_;
|
||||||
|
|
|
@ -1736,7 +1736,7 @@ int Http2Session::connected() {
|
||||||
<< util::to_numeric_addr(&addr_->addr);
|
<< util::to_numeric_addr(&addr_->addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
connect_blocker->on_failure();
|
downstream_failure(addr_);
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1054,7 +1054,7 @@ int HttpDownstreamConnection::connected() {
|
||||||
<< util::to_numeric_addr(&addr_->addr);
|
<< util::to_numeric_addr(&addr_->addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
connect_blocker->on_failure();
|
downstream_failure(addr_);
|
||||||
|
|
||||||
downstream_->set_request_state(Downstream::CONNECT_FAIL);
|
downstream_->set_request_state(Downstream::CONNECT_FAIL);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,365 @@
|
||||||
|
/*
|
||||||
|
* nghttp2 - HTTP/2 C Library
|
||||||
|
*
|
||||||
|
* Copyright (c) 2016 Tatsuhiro Tsujikawa
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person obtaining
|
||||||
|
* a copy of this software and associated documentation files (the
|
||||||
|
* "Software"), to deal in the Software without restriction, including
|
||||||
|
* without limitation the rights to use, copy, modify, merge, publish,
|
||||||
|
* distribute, sublicense, and/or sell copies of the Software, and to
|
||||||
|
* permit persons to whom the Software is furnished to do so, subject to
|
||||||
|
* the following conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be
|
||||||
|
* included in all copies or substantial portions of the Software.
|
||||||
|
*
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||||
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||||
|
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||||
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
|
||||||
|
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
||||||
|
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||||
|
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
#include "shrpx_live_check.h"
|
||||||
|
#include "shrpx_worker.h"
|
||||||
|
#include "shrpx_connect_blocker.h"
|
||||||
|
#include "shrpx_ssl.h"
|
||||||
|
|
||||||
|
namespace shrpx {
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
void readcb(struct ev_loop *loop, ev_io *w, int revents) {
|
||||||
|
int rv;
|
||||||
|
auto conn = static_cast<Connection *>(w->data);
|
||||||
|
auto live_check = static_cast<LiveCheck *>(conn->data);
|
||||||
|
|
||||||
|
rv = live_check->do_read();
|
||||||
|
if (rv != 0) {
|
||||||
|
live_check->on_failure();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
void writecb(struct ev_loop *loop, ev_io *w, int revents) {
|
||||||
|
int rv;
|
||||||
|
auto conn = static_cast<Connection *>(w->data);
|
||||||
|
auto live_check = static_cast<LiveCheck *>(conn->data);
|
||||||
|
|
||||||
|
rv = live_check->do_write();
|
||||||
|
if (rv != 0) {
|
||||||
|
live_check->on_failure();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
|
||||||
|
auto conn = static_cast<Connection *>(w->data);
|
||||||
|
auto live_check = static_cast<LiveCheck *>(conn->data);
|
||||||
|
|
||||||
|
live_check->on_failure();
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
void backoff_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
|
||||||
|
int rv;
|
||||||
|
auto live_check = static_cast<LiveCheck *>(w->data);
|
||||||
|
|
||||||
|
rv = live_check->initiate_connection();
|
||||||
|
if (rv != 0) {
|
||||||
|
live_check->on_failure();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
|
||||||
|
DownstreamAddrGroup *group, DownstreamAddr *addr,
|
||||||
|
std::mt19937 &gen)
|
||||||
|
: conn_(loop, -1, nullptr, worker->get_mcpool(),
|
||||||
|
get_config()->conn.downstream.timeout.write,
|
||||||
|
get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb,
|
||||||
|
timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
|
||||||
|
get_config()->tls.dyn_rec.idle_timeout, PROTO_NONE),
|
||||||
|
gen_(gen),
|
||||||
|
read_(&LiveCheck::noop),
|
||||||
|
write_(&LiveCheck::noop),
|
||||||
|
worker_(worker),
|
||||||
|
ssl_ctx_(ssl_ctx),
|
||||||
|
group_(group),
|
||||||
|
addr_(addr),
|
||||||
|
success_count_(0),
|
||||||
|
fail_count_(0) {
|
||||||
|
ev_timer_init(&backoff_timer_, backoff_timeoutcb, 0., 0.);
|
||||||
|
backoff_timer_.data = this;
|
||||||
|
}
|
||||||
|
|
||||||
|
LiveCheck::~LiveCheck() {
|
||||||
|
disconnect();
|
||||||
|
|
||||||
|
ev_timer_stop(conn_.loop, &backoff_timer_);
|
||||||
|
}
|
||||||
|
|
||||||
|
void LiveCheck::disconnect() {
|
||||||
|
conn_.rlimit.stopw();
|
||||||
|
conn_.wlimit.stopw();
|
||||||
|
|
||||||
|
read_ = write_ = &LiveCheck::noop;
|
||||||
|
|
||||||
|
conn_.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use the similar backoff algorithm described in
|
||||||
|
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
|
||||||
|
namespace {
|
||||||
|
constexpr size_t MAX_BACKOFF_EXP = 10;
|
||||||
|
constexpr auto MULTIPLIER = 1.6;
|
||||||
|
constexpr auto JITTER = 0.2;
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
void LiveCheck::schedule() {
|
||||||
|
auto base_backoff = pow(MULTIPLIER, std::min(fail_count_, MAX_BACKOFF_EXP));
|
||||||
|
auto dist = std::uniform_real_distribution<>(-JITTER * base_backoff,
|
||||||
|
JITTER * base_backoff);
|
||||||
|
auto backoff = base_backoff + dist(gen_);
|
||||||
|
|
||||||
|
ev_timer_set(&backoff_timer_, backoff, 0.);
|
||||||
|
ev_timer_start(conn_.loop, &backoff_timer_);
|
||||||
|
}
|
||||||
|
|
||||||
|
int LiveCheck::do_read() { return read_(*this); }
|
||||||
|
|
||||||
|
int LiveCheck::do_write() { return write_(*this); }
|
||||||
|
|
||||||
|
int LiveCheck::initiate_connection() {
|
||||||
|
int rv;
|
||||||
|
|
||||||
|
const auto &shared_addr = group_->shared_addr;
|
||||||
|
|
||||||
|
auto worker_blocker = worker_->get_connect_blocker();
|
||||||
|
if (worker_blocker->blocked()) {
|
||||||
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
LOG(INFO) << "Worker wide backend connection was blocked temporarily";
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ssl_ctx_) {
|
||||||
|
auto ssl = ssl::create_ssl(ssl_ctx_);
|
||||||
|
if (!ssl) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (shared_addr->proto) {
|
||||||
|
case PROTO_HTTP1:
|
||||||
|
ssl::setup_downstream_http1_alpn(ssl);
|
||||||
|
break;
|
||||||
|
case PROTO_HTTP2:
|
||||||
|
ssl::setup_downstream_http2_alpn(ssl);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
conn_.set_ssl(ssl);
|
||||||
|
}
|
||||||
|
|
||||||
|
conn_.fd = util::create_nonblock_socket(addr_->addr.su.storage.ss_family);
|
||||||
|
|
||||||
|
if (conn_.fd == -1) {
|
||||||
|
auto error = errno;
|
||||||
|
LOG(WARN) << "socket() failed; addr=" << util::to_numeric_addr(&addr_->addr)
|
||||||
|
<< ", errno=" << error;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
rv = connect(conn_.fd, &addr_->addr.su.sa, addr_->addr.len);
|
||||||
|
if (rv != 0 && errno != EINPROGRESS) {
|
||||||
|
auto error = errno;
|
||||||
|
LOG(WARN) << "connect() failed; addr="
|
||||||
|
<< util::to_numeric_addr(&addr_->addr) << ", errno=" << error;
|
||||||
|
|
||||||
|
close(conn_.fd);
|
||||||
|
conn_.fd = -1;
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ssl_ctx_) {
|
||||||
|
auto sni_name = !get_config()->tls.backend_sni_name.empty()
|
||||||
|
? StringRef(get_config()->tls.backend_sni_name)
|
||||||
|
: StringRef(addr_->host);
|
||||||
|
if (!util::numeric_host(sni_name.c_str())) {
|
||||||
|
SSL_set_tlsext_host_name(conn_.tls.ssl, sni_name.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
auto session = ssl::reuse_tls_session(addr_);
|
||||||
|
if (session) {
|
||||||
|
SSL_set_session(conn_.tls.ssl, session);
|
||||||
|
SSL_SESSION_free(session);
|
||||||
|
}
|
||||||
|
|
||||||
|
conn_.prepare_client_handshake();
|
||||||
|
}
|
||||||
|
|
||||||
|
write_ = &LiveCheck::connected;
|
||||||
|
|
||||||
|
ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
|
||||||
|
ev_io_set(&conn_.rev, conn_.fd, EV_READ);
|
||||||
|
|
||||||
|
conn_.wlimit.startw();
|
||||||
|
|
||||||
|
// TODO we should have timeout for connection establishment
|
||||||
|
ev_timer_again(conn_.loop, &conn_.wt);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int LiveCheck::connected() {
|
||||||
|
if (!util::check_socket_connected(conn_.fd)) {
|
||||||
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
LOG(INFO) << "Backend connect failed; addr="
|
||||||
|
<< util::to_numeric_addr(&addr_->addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
LOG(INFO) << "Connection established";
|
||||||
|
}
|
||||||
|
|
||||||
|
conn_.rlimit.startw();
|
||||||
|
|
||||||
|
if (conn_.tls.ssl) {
|
||||||
|
read_ = &LiveCheck::tls_handshake;
|
||||||
|
write_ = &LiveCheck::tls_handshake;
|
||||||
|
|
||||||
|
return do_write();
|
||||||
|
}
|
||||||
|
|
||||||
|
on_success();
|
||||||
|
disconnect();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int LiveCheck::tls_handshake() {
|
||||||
|
ev_timer_again(conn_.loop, &conn_.rt);
|
||||||
|
|
||||||
|
ERR_clear_error();
|
||||||
|
|
||||||
|
auto rv = conn_.tls_handshake();
|
||||||
|
|
||||||
|
if (rv == SHRPX_ERR_INPROGRESS) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rv < 0) {
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
LOG(INFO) << "SSL/TLS handshake completed";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!get_config()->tls.insecure &&
|
||||||
|
ssl::check_cert(conn_.tls.ssl, addr_) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!SSL_session_reused(conn_.tls.ssl)) {
|
||||||
|
auto tls_session = SSL_get0_session(conn_.tls.ssl);
|
||||||
|
if (tls_session) {
|
||||||
|
ssl::try_cache_tls_session(addr_, tls_session, ev_now(conn_.loop));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check negotiated ALPN
|
||||||
|
|
||||||
|
const unsigned char *next_proto = nullptr;
|
||||||
|
unsigned int next_proto_len = 0;
|
||||||
|
|
||||||
|
SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len);
|
||||||
|
#if OPENSSL_VERSION_NUMBER >= 0x10002000L
|
||||||
|
if (next_proto == nullptr) {
|
||||||
|
SSL_get0_alpn_selected(conn_.tls.ssl, &next_proto, &next_proto_len);
|
||||||
|
}
|
||||||
|
#endif // OPENSSL_VERSION_NUMBER >= 0x10002000L
|
||||||
|
|
||||||
|
auto proto = StringRef{next_proto, next_proto_len};
|
||||||
|
|
||||||
|
const auto &shared_addr = group_->shared_addr;
|
||||||
|
|
||||||
|
switch (shared_addr->proto) {
|
||||||
|
case PROTO_HTTP1:
|
||||||
|
if (proto.empty() || proto == StringRef::from_lit("http/1.1")) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
case PROTO_HTTP2:
|
||||||
|
if (util::check_h2_is_selected(proto)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
on_success();
|
||||||
|
disconnect();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void LiveCheck::on_failure() {
|
||||||
|
++fail_count_;
|
||||||
|
|
||||||
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
LOG(INFO) << "Liveness check for " << util::to_numeric_addr(&addr_->addr)
|
||||||
|
<< " failed " << fail_count_ << " time(s) in a row";
|
||||||
|
}
|
||||||
|
|
||||||
|
disconnect();
|
||||||
|
|
||||||
|
schedule();
|
||||||
|
}
|
||||||
|
|
||||||
|
void LiveCheck::on_success() {
|
||||||
|
++success_count_;
|
||||||
|
fail_count_ = 0;
|
||||||
|
|
||||||
|
if (LOG_ENABLED(INFO)) {
|
||||||
|
LOG(INFO) << "Liveness check for " << util::to_numeric_addr(&addr_->addr)
|
||||||
|
<< " succeeded " << success_count_ << " time(s) in a row";
|
||||||
|
}
|
||||||
|
|
||||||
|
auto &downstreamconf = get_config()->conn.downstream;
|
||||||
|
|
||||||
|
if (success_count_ < downstreamconf.rise) {
|
||||||
|
disconnect();
|
||||||
|
|
||||||
|
schedule();
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG(NOTICE) << util::to_numeric_addr(&addr_->addr) << " is considered online";
|
||||||
|
|
||||||
|
addr_->connect_blocker->online();
|
||||||
|
|
||||||
|
success_count_ = 0;
|
||||||
|
fail_count_ = 0;
|
||||||
|
|
||||||
|
disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
int LiveCheck::noop() { return 0; }
|
||||||
|
|
||||||
|
} // namespace shrpx
|
|
@ -0,0 +1,90 @@
|
||||||
|
/*
|
||||||
|
* nghttp2 - HTTP/2 C Library
|
||||||
|
*
|
||||||
|
* Copyright (c) 2016 Tatsuhiro Tsujikawa
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person obtaining
|
||||||
|
* a copy of this software and associated documentation files (the
|
||||||
|
* "Software"), to deal in the Software without restriction, including
|
||||||
|
* without limitation the rights to use, copy, modify, merge, publish,
|
||||||
|
* distribute, sublicense, and/or sell copies of the Software, and to
|
||||||
|
* permit persons to whom the Software is furnished to do so, subject to
|
||||||
|
* the following conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be
|
||||||
|
* included in all copies or substantial portions of the Software.
|
||||||
|
*
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||||
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||||
|
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||||
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
|
||||||
|
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
||||||
|
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||||
|
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
#ifndef SHRPX_LIVE_CHECK_H
|
||||||
|
#define SHRPX_LIVE_CHECK_H
|
||||||
|
|
||||||
|
#include "shrpx.h"
|
||||||
|
|
||||||
|
#include <functional>
|
||||||
|
#include <random>
|
||||||
|
|
||||||
|
#include <openssl/ssl.h>
|
||||||
|
|
||||||
|
#include <ev.h>
|
||||||
|
|
||||||
|
#include "shrpx_connection.h"
|
||||||
|
|
||||||
|
namespace shrpx {
|
||||||
|
|
||||||
|
class Worker;
|
||||||
|
struct DownstreamAddrGroup;
|
||||||
|
struct DownstreamAddr;
|
||||||
|
|
||||||
|
class LiveCheck {
|
||||||
|
public:
|
||||||
|
LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
|
||||||
|
DownstreamAddrGroup *group, DownstreamAddr *addr,
|
||||||
|
std::mt19937 &gen);
|
||||||
|
~LiveCheck();
|
||||||
|
|
||||||
|
void disconnect();
|
||||||
|
|
||||||
|
void on_success();
|
||||||
|
void on_failure();
|
||||||
|
|
||||||
|
int initiate_connection();
|
||||||
|
|
||||||
|
// Schedules next connection attempt
|
||||||
|
void schedule();
|
||||||
|
|
||||||
|
// Low level I/O operation callback; they are called from do_read()
|
||||||
|
// or do_write().
|
||||||
|
int noop();
|
||||||
|
int connected();
|
||||||
|
int tls_handshake();
|
||||||
|
|
||||||
|
int do_read();
|
||||||
|
int do_write();
|
||||||
|
|
||||||
|
private:
|
||||||
|
Connection conn_;
|
||||||
|
std::mt19937 &gen_;
|
||||||
|
ev_timer backoff_timer_;
|
||||||
|
std::function<int(LiveCheck &)> read_, write_;
|
||||||
|
Worker *worker_;
|
||||||
|
// nullptr if no TLS is configured
|
||||||
|
SSL_CTX *ssl_ctx_;
|
||||||
|
DownstreamAddrGroup *group_;
|
||||||
|
// Address of remote endpoint
|
||||||
|
DownstreamAddr *addr_;
|
||||||
|
// The number of successful connect attempt in a row.
|
||||||
|
size_t success_count_;
|
||||||
|
// The number of unsuccessful connect attempt in a row.
|
||||||
|
size_t fail_count_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace shrpx
|
||||||
|
|
||||||
|
#endif // SHRPX_LIVE_CHECK_H
|
|
@ -36,6 +36,7 @@
|
||||||
#include "shrpx_http2_session.h"
|
#include "shrpx_http2_session.h"
|
||||||
#include "shrpx_log_config.h"
|
#include "shrpx_log_config.h"
|
||||||
#include "shrpx_connect_blocker.h"
|
#include "shrpx_connect_blocker.h"
|
||||||
|
#include "shrpx_live_check.h"
|
||||||
#include "shrpx_memcached_dispatcher.h"
|
#include "shrpx_memcached_dispatcher.h"
|
||||||
#ifdef HAVE_MRUBY
|
#ifdef HAVE_MRUBY
|
||||||
#include "shrpx_mruby.h"
|
#include "shrpx_mruby.h"
|
||||||
|
@ -160,6 +161,9 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
|
||||||
dst_addr.host_unix = src_addr.host_unix;
|
dst_addr.host_unix = src_addr.host_unix;
|
||||||
|
|
||||||
dst_addr.connect_blocker = make_unique<ConnectBlocker>(randgen_, loop_);
|
dst_addr.connect_blocker = make_unique<ConnectBlocker>(randgen_, loop_);
|
||||||
|
dst_addr.live_check = make_unique<LiveCheck>(
|
||||||
|
loop_, shared_addr->tls ? cl_ssl_ctx_ : nullptr, this, &dst,
|
||||||
|
&dst_addr, randgen_);
|
||||||
}
|
}
|
||||||
|
|
||||||
// share the connection if patterns have the same set of backend
|
// share the connection if patterns have the same set of backend
|
||||||
|
@ -464,4 +468,29 @@ size_t match_downstream_addr_group(
|
||||||
groups, catch_all);
|
groups, catch_all);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void downstream_failure(DownstreamAddr *addr) {
|
||||||
|
const auto &connect_blocker = addr->connect_blocker;
|
||||||
|
|
||||||
|
connect_blocker->on_failure();
|
||||||
|
|
||||||
|
auto fail_count = connect_blocker->get_fail_count();
|
||||||
|
|
||||||
|
auto &downstreamconf = get_config()->conn.downstream;
|
||||||
|
|
||||||
|
if (downstreamconf.fall == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fail_count >= downstreamconf.fall) {
|
||||||
|
LOG(WARN) << "Could not connect to " << util::to_numeric_addr(&addr->addr)
|
||||||
|
<< " " << fail_count << " times in a row; considered as offline";
|
||||||
|
|
||||||
|
connect_blocker->offline();
|
||||||
|
|
||||||
|
if (downstreamconf.rise) {
|
||||||
|
addr->live_check->schedule();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
|
@ -52,6 +52,7 @@ namespace shrpx {
|
||||||
|
|
||||||
class Http2Session;
|
class Http2Session;
|
||||||
class ConnectBlocker;
|
class ConnectBlocker;
|
||||||
|
class LiveCheck;
|
||||||
class MemcachedDispatcher;
|
class MemcachedDispatcher;
|
||||||
struct UpstreamAddr;
|
struct UpstreamAddr;
|
||||||
|
|
||||||
|
@ -79,6 +80,7 @@ struct DownstreamAddr {
|
||||||
bool host_unix;
|
bool host_unix;
|
||||||
|
|
||||||
std::unique_ptr<ConnectBlocker> connect_blocker;
|
std::unique_ptr<ConnectBlocker> connect_blocker;
|
||||||
|
std::unique_ptr<LiveCheck> live_check;
|
||||||
// Client side TLS session cache
|
// Client side TLS session cache
|
||||||
TLSSessionCache tls_session_cache;
|
TLSSessionCache tls_session_cache;
|
||||||
// Http2Session object created for this address. This list chains
|
// Http2Session object created for this address. This list chains
|
||||||
|
@ -225,6 +227,8 @@ size_t match_downstream_addr_group(
|
||||||
const StringRef &hostport, const StringRef &path,
|
const StringRef &hostport, const StringRef &path,
|
||||||
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all);
|
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all);
|
||||||
|
|
||||||
|
void downstream_failure(DownstreamAddr *addr);
|
||||||
|
|
||||||
} // namespace shrpx
|
} // namespace shrpx
|
||||||
|
|
||||||
#endif // SHRPX_WORKER_H
|
#endif // SHRPX_WORKER_H
|
||||||
|
|
Loading…
Reference in New Issue