nghttpx: Detect online/offline state of backend servers

This commit is contained in:
Tatsuhiro Tsujikawa 2016-04-08 01:04:16 +09:00
parent ffddefc177
commit f9b872ab78
9 changed files with 462 additions and 3 deletions

View File

@ -120,6 +120,7 @@ NGHTTPX_SRCS = \
shrpx_worker.cc shrpx_worker.h \
shrpx_log_config.cc shrpx_log_config.h \
shrpx_connect_blocker.cc shrpx_connect_blocker.h \
shrpx_live_check.cc shrpx_live_check.h \
shrpx_downstream_connection_pool.cc shrpx_downstream_connection_pool.h \
shrpx_rate_limit.cc shrpx_rate_limit.h \
shrpx_connection.cc shrpx_connection.h \

View File

@ -43,7 +43,13 @@ ConnectBlocker::~ConnectBlocker() { ev_timer_stop(loop_, &timer_); }
bool ConnectBlocker::blocked() const { return ev_is_active(&timer_); }
void ConnectBlocker::on_success() { fail_count_ = 0; }
void ConnectBlocker::on_success() {
if (ev_is_active(&timer_)) {
return;
}
fail_count_ = 0;
}
// Use the similar backoff algorithm described in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
@ -72,4 +78,18 @@ void ConnectBlocker::on_failure() {
ev_timer_start(loop_, &timer_);
}
size_t ConnectBlocker::get_fail_count() const { return fail_count_; }
void ConnectBlocker::offline() {
ev_timer_stop(loop_, &timer_);
ev_timer_set(&timer_, std::numeric_limits<double>::max(), 0.);
ev_timer_start(loop_, &timer_);
}
void ConnectBlocker::online() {
ev_timer_stop(loop_, &timer_);
fail_count_ = 0;
}
} // namespace shrpx

View File

@ -48,6 +48,15 @@ public:
// backoff.
void on_failure();
size_t get_fail_count() const;
// Peer is now considered offline. This effectively means that the
// connection is blocked until online() is called.
void offline();
// Peer is now considered online
void online();
private:
std::mt19937 gen_;
ev_timer timer_;

View File

@ -1736,7 +1736,7 @@ int Http2Session::connected() {
<< util::to_numeric_addr(&addr_->addr);
}
connect_blocker->on_failure();
downstream_failure(addr_);
return -1;
}

View File

@ -1054,7 +1054,7 @@ int HttpDownstreamConnection::connected() {
<< util::to_numeric_addr(&addr_->addr);
}
connect_blocker->on_failure();
downstream_failure(addr_);
downstream_->set_request_state(Downstream::CONNECT_FAIL);

317
src/shrpx_live_check.cc Normal file
View File

@ -0,0 +1,317 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2016 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_live_check.h"
#include "shrpx_worker.h"
#include "shrpx_connect_blocker.h"
#include "shrpx_ssl.h"
namespace shrpx {
namespace {
void readcb(struct ev_loop *loop, ev_io *w, int revents) {
int rv;
auto conn = static_cast<Connection *>(w->data);
auto live_check = static_cast<LiveCheck *>(conn->data);
rv = live_check->do_read();
if (rv != 0) {
live_check->on_failure();
return;
}
}
} // namespace
namespace {
void writecb(struct ev_loop *loop, ev_io *w, int revents) {
int rv;
auto conn = static_cast<Connection *>(w->data);
auto live_check = static_cast<LiveCheck *>(conn->data);
rv = live_check->do_write();
if (rv != 0) {
live_check->on_failure();
return;
}
}
} // namespace
namespace {
void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
auto conn = static_cast<Connection *>(w->data);
auto live_check = static_cast<LiveCheck *>(conn->data);
live_check->on_failure();
}
} // namespace
namespace {
void backoff_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
int rv;
auto live_check = static_cast<LiveCheck *>(w->data);
rv = live_check->initiate_connection();
if (rv != 0) {
live_check->on_failure();
return;
}
}
} // namespace
LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
DownstreamAddrGroup *group, DownstreamAddr *addr)
: conn_(loop, -1, nullptr, worker->get_mcpool(),
get_config()->conn.downstream.timeout.write,
get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb,
timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
get_config()->tls.dyn_rec.idle_timeout, PROTO_NONE),
read_(&LiveCheck::noop),
write_(&LiveCheck::noop),
worker_(worker),
ssl_ctx_(ssl_ctx),
group_(group),
addr_(addr),
success_count_(0),
fail_count_(0) {
ev_timer_init(&backoff_timer_, backoff_timeoutcb, 0., 0.);
backoff_timer_.data = this;
}
LiveCheck::~LiveCheck() {
disconnect();
ev_timer_stop(conn_.loop, &backoff_timer_);
}
void LiveCheck::disconnect() {
conn_.rlimit.stopw();
conn_.wlimit.stopw();
read_ = write_ = &LiveCheck::noop;
conn_.disconnect();
}
void LiveCheck::schedule() {
// TODO use exponential backoff based on fail_count_.
ev_timer_set(&backoff_timer_, 1.6, 0.);
ev_timer_start(conn_.loop, &backoff_timer_);
}
int LiveCheck::do_read() { return read_(*this); }
int LiveCheck::do_write() { return write_(*this); }
int LiveCheck::initiate_connection() {
int rv;
const auto &shared_addr = group_->shared_addr;
auto worker_blocker = worker_->get_connect_blocker();
if (worker_blocker->blocked()) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Worker wide backend connection was blocked temporarily";
}
return -1;
}
if (ssl_ctx_) {
auto ssl = ssl::create_ssl(ssl_ctx_);
if (!ssl) {
return -1;
}
switch (shared_addr->proto) {
case PROTO_HTTP1:
ssl::setup_downstream_http1_alpn(ssl);
break;
case PROTO_HTTP2:
ssl::setup_downstream_http2_alpn(ssl);
break;
default:
assert(0);
}
conn_.set_ssl(ssl);
}
conn_.fd = util::create_nonblock_socket(addr_->addr.su.storage.ss_family);
if (conn_.fd == -1) {
auto error = errno;
LOG(WARN) << "socket() failed; addr=" << util::to_numeric_addr(&addr_->addr)
<< ", errno=" << error;
return -1;
}
rv = connect(conn_.fd, &addr_->addr.su.sa, addr_->addr.len);
if (rv != 0 && errno != EINPROGRESS) {
auto error = errno;
LOG(WARN) << "connect() failed; addr="
<< util::to_numeric_addr(&addr_->addr) << ", errno=" << error;
LOG(WARN) << strerror(error);
close(conn_.fd);
conn_.fd = -1;
return -1;
}
if (ssl_ctx_) {
auto sni_name = !get_config()->tls.backend_sni_name.empty()
? StringRef(get_config()->tls.backend_sni_name)
: StringRef(addr_->host);
if (!util::numeric_host(sni_name.c_str())) {
SSL_set_tlsext_host_name(conn_.tls.ssl, sni_name.c_str());
}
auto session = ssl::reuse_tls_session(addr_);
if (session) {
SSL_set_session(conn_.tls.ssl, session);
SSL_SESSION_free(session);
}
conn_.prepare_client_handshake();
}
write_ = &LiveCheck::connected;
ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
ev_io_set(&conn_.rev, conn_.fd, EV_READ);
conn_.wlimit.startw();
// TODO we should have timeout for connection establishment
ev_timer_again(conn_.loop, &conn_.wt);
return 0;
}
int LiveCheck::connected() {
if (!util::check_socket_connected(conn_.fd)) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Backend connect failed; addr="
<< util::to_numeric_addr(&addr_->addr);
}
return -1;
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Connection established";
}
conn_.rlimit.startw();
if (conn_.tls.ssl) {
read_ = &LiveCheck::tls_handshake;
write_ = &LiveCheck::tls_handshake;
return do_write();
}
on_success();
disconnect();
return 0;
}
int LiveCheck::tls_handshake() {
ev_timer_again(conn_.loop, &conn_.rt);
ERR_clear_error();
auto rv = conn_.tls_handshake();
if (rv == SHRPX_ERR_INPROGRESS) {
return 0;
}
if (rv < 0) {
return rv;
}
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "SSL/TLS handshake completed";
}
if (!get_config()->tls.insecure &&
ssl::check_cert(conn_.tls.ssl, addr_) != 0) {
return -1;
}
if (!SSL_session_reused(conn_.tls.ssl)) {
auto tls_session = SSL_get0_session(conn_.tls.ssl);
if (tls_session) {
ssl::try_cache_tls_session(addr_, tls_session, ev_now(conn_.loop));
}
}
on_success();
disconnect();
// TODO Check ALPN identifier here
return 0;
}
void LiveCheck::on_failure() {
++fail_count_;
LOG(WARN) << "Liveness check for " << util::to_numeric_addr(&addr_->addr)
<< " failed " << fail_count_ << " time(s) in a row";
disconnect();
schedule();
}
void LiveCheck::on_success() {
++success_count_;
fail_count_ = 0;
LOG(WARN) << "Liveness check for " << util::to_numeric_addr(&addr_->addr)
<< " succeeded " << success_count_ << " time(s) in a row";
if (success_count_ < 3) {
disconnect();
schedule();
return;
}
LOG(NOTICE) << util::to_numeric_addr(&addr_->addr) << " is considered online";
addr_->connect_blocker->online();
success_count_ = 0;
fail_count_ = 0;
disconnect();
}
int LiveCheck::noop() { return 0; }
} // namespace shrpx

88
src/shrpx_live_check.h Normal file
View File

@ -0,0 +1,88 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2016 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_LIVE_CHECK_H
#define SHRPX_LIVE_CHECK_H
#include "shrpx.h"
#include <functional>
#include <openssl/ssl.h>
#include <ev.h>
#include "shrpx_connection.h"
namespace shrpx {
class Worker;
struct DownstreamAddrGroup;
struct DownstreamAddr;
class LiveCheck {
public:
LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
DownstreamAddrGroup *group, DownstreamAddr *addr);
~LiveCheck();
void disconnect();
void on_success();
void on_failure();
int initiate_connection();
void schedule();
// Low level I/O operation callback; they are called from do_read()
// or do_write().
int noop();
int connected();
int tls_handshake();
int do_read();
int do_write();
void signal_write();
private:
Connection conn_;
ev_timer backoff_timer_;
std::function<int(LiveCheck &)> read_, write_;
Worker *worker_;
// nullptr if no TLS is configured
SSL_CTX *ssl_ctx_;
DownstreamAddrGroup *group_;
// Address of remote endpoint
DownstreamAddr *addr_;
// The number of successful connect attempt in a row.
size_t success_count_;
// The number of unsuccessful connect attempt in a row.
size_t fail_count_;
};
} // namespace shrpx
#endif // SHRPX_LIVE_CHECK_H

View File

@ -36,6 +36,7 @@
#include "shrpx_http2_session.h"
#include "shrpx_log_config.h"
#include "shrpx_connect_blocker.h"
#include "shrpx_live_check.h"
#include "shrpx_memcached_dispatcher.h"
#ifdef HAVE_MRUBY
#include "shrpx_mruby.h"
@ -160,6 +161,9 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
dst_addr.host_unix = src_addr.host_unix;
dst_addr.connect_blocker = make_unique<ConnectBlocker>(randgen_, loop_);
dst_addr.live_check = make_unique<LiveCheck>(
loop_, shared_addr->tls ? cl_ssl_ctx_ : nullptr, this, &dst,
&dst_addr);
}
// share the connection if patterns have the same set of backend
@ -464,4 +468,20 @@ size_t match_downstream_addr_group(
groups, catch_all);
}
void downstream_failure(DownstreamAddr *addr) {
const auto &connect_blocker = addr->connect_blocker;
connect_blocker->on_failure();
auto fail_count = connect_blocker->get_fail_count();
if (fail_count >= 3) {
LOG(WARN) << "Could not connect to " << util::to_numeric_addr(&addr->addr)
<< " " << fail_count << " times in a row; considered as offline";
connect_blocker->offline();
addr->live_check->schedule();
}
}
} // namespace shrpx

View File

@ -52,6 +52,7 @@ namespace shrpx {
class Http2Session;
class ConnectBlocker;
class LiveCheck;
class MemcachedDispatcher;
struct UpstreamAddr;
@ -79,6 +80,7 @@ struct DownstreamAddr {
bool host_unix;
std::unique_ptr<ConnectBlocker> connect_blocker;
std::unique_ptr<LiveCheck> live_check;
// Client side TLS session cache
TLSSessionCache tls_session_cache;
// Http2Session object created for this address. This list chains
@ -225,6 +227,8 @@ size_t match_downstream_addr_group(
const StringRef &hostport, const StringRef &path,
const std::vector<DownstreamAddrGroup> &groups, size_t catch_all);
void downstream_failure(DownstreamAddr *addr);
} // namespace shrpx
#endif // SHRPX_WORKER_H