diff --git a/src/Makefile.am b/src/Makefile.am index 6658e05c..7f1cabe7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -120,6 +120,7 @@ NGHTTPX_SRCS = \ shrpx_worker.cc shrpx_worker.h \ shrpx_log_config.cc shrpx_log_config.h \ shrpx_connect_blocker.cc shrpx_connect_blocker.h \ + shrpx_live_check.cc shrpx_live_check.h \ shrpx_downstream_connection_pool.cc shrpx_downstream_connection_pool.h \ shrpx_rate_limit.cc shrpx_rate_limit.h \ shrpx_connection.cc shrpx_connection.h \ diff --git a/src/shrpx_connect_blocker.cc b/src/shrpx_connect_blocker.cc index 82f2b455..ae6a6c60 100644 --- a/src/shrpx_connect_blocker.cc +++ b/src/shrpx_connect_blocker.cc @@ -43,7 +43,13 @@ ConnectBlocker::~ConnectBlocker() { ev_timer_stop(loop_, &timer_); } bool ConnectBlocker::blocked() const { return ev_is_active(&timer_); } -void ConnectBlocker::on_success() { fail_count_ = 0; } +void ConnectBlocker::on_success() { + if (ev_is_active(&timer_)) { + return; + } + + fail_count_ = 0; +} // Use the similar backoff algorithm described in // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md @@ -72,4 +78,18 @@ void ConnectBlocker::on_failure() { ev_timer_start(loop_, &timer_); } +size_t ConnectBlocker::get_fail_count() const { return fail_count_; } + +void ConnectBlocker::offline() { + ev_timer_stop(loop_, &timer_); + ev_timer_set(&timer_, std::numeric_limits::max(), 0.); + ev_timer_start(loop_, &timer_); +} + +void ConnectBlocker::online() { + ev_timer_stop(loop_, &timer_); + + fail_count_ = 0; +} + } // namespace shrpx diff --git a/src/shrpx_connect_blocker.h b/src/shrpx_connect_blocker.h index 63a1e3f9..9d60697f 100644 --- a/src/shrpx_connect_blocker.h +++ b/src/shrpx_connect_blocker.h @@ -48,6 +48,15 @@ public: // backoff. void on_failure(); + size_t get_fail_count() const; + + // Peer is now considered offline. This effectively means that the + // connection is blocked until online() is called. + void offline(); + + // Peer is now considered online + void online(); + private: std::mt19937 gen_; ev_timer timer_; diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index f2076117..aecfa134 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -1736,7 +1736,7 @@ int Http2Session::connected() { << util::to_numeric_addr(&addr_->addr); } - connect_blocker->on_failure(); + downstream_failure(addr_); return -1; } diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 4474b33c..e572feb2 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -1054,7 +1054,7 @@ int HttpDownstreamConnection::connected() { << util::to_numeric_addr(&addr_->addr); } - connect_blocker->on_failure(); + downstream_failure(addr_); downstream_->set_request_state(Downstream::CONNECT_FAIL); diff --git a/src/shrpx_live_check.cc b/src/shrpx_live_check.cc new file mode 100644 index 00000000..2cf22619 --- /dev/null +++ b/src/shrpx_live_check.cc @@ -0,0 +1,317 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2016 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "shrpx_live_check.h" +#include "shrpx_worker.h" +#include "shrpx_connect_blocker.h" +#include "shrpx_ssl.h" + +namespace shrpx { + +namespace { +void readcb(struct ev_loop *loop, ev_io *w, int revents) { + int rv; + auto conn = static_cast(w->data); + auto live_check = static_cast(conn->data); + + rv = live_check->do_read(); + if (rv != 0) { + live_check->on_failure(); + return; + } +} +} // namespace + +namespace { +void writecb(struct ev_loop *loop, ev_io *w, int revents) { + int rv; + auto conn = static_cast(w->data); + auto live_check = static_cast(conn->data); + + rv = live_check->do_write(); + if (rv != 0) { + live_check->on_failure(); + return; + } +} +} // namespace + +namespace { +void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { + auto conn = static_cast(w->data); + auto live_check = static_cast(conn->data); + + live_check->on_failure(); +} +} // namespace + +namespace { +void backoff_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { + int rv; + auto live_check = static_cast(w->data); + + rv = live_check->initiate_connection(); + if (rv != 0) { + live_check->on_failure(); + return; + } +} +} // namespace + +LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, + DownstreamAddrGroup *group, DownstreamAddr *addr) + : conn_(loop, -1, nullptr, worker->get_mcpool(), + get_config()->conn.downstream.timeout.write, + get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb, + timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold, + get_config()->tls.dyn_rec.idle_timeout, PROTO_NONE), + read_(&LiveCheck::noop), + write_(&LiveCheck::noop), + worker_(worker), + ssl_ctx_(ssl_ctx), + group_(group), + addr_(addr), + success_count_(0), + fail_count_(0) { + ev_timer_init(&backoff_timer_, backoff_timeoutcb, 0., 0.); + backoff_timer_.data = this; +} + +LiveCheck::~LiveCheck() { + disconnect(); + + ev_timer_stop(conn_.loop, &backoff_timer_); +} + +void LiveCheck::disconnect() { + conn_.rlimit.stopw(); + conn_.wlimit.stopw(); + + read_ = write_ = &LiveCheck::noop; + + conn_.disconnect(); +} + +void LiveCheck::schedule() { + // TODO use exponential backoff based on fail_count_. + ev_timer_set(&backoff_timer_, 1.6, 0.); + ev_timer_start(conn_.loop, &backoff_timer_); +} + +int LiveCheck::do_read() { return read_(*this); } + +int LiveCheck::do_write() { return write_(*this); } + +int LiveCheck::initiate_connection() { + int rv; + + const auto &shared_addr = group_->shared_addr; + + auto worker_blocker = worker_->get_connect_blocker(); + if (worker_blocker->blocked()) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Worker wide backend connection was blocked temporarily"; + } + return -1; + } + + if (ssl_ctx_) { + auto ssl = ssl::create_ssl(ssl_ctx_); + if (!ssl) { + return -1; + } + + switch (shared_addr->proto) { + case PROTO_HTTP1: + ssl::setup_downstream_http1_alpn(ssl); + break; + case PROTO_HTTP2: + ssl::setup_downstream_http2_alpn(ssl); + break; + default: + assert(0); + } + + conn_.set_ssl(ssl); + } + + conn_.fd = util::create_nonblock_socket(addr_->addr.su.storage.ss_family); + + if (conn_.fd == -1) { + auto error = errno; + LOG(WARN) << "socket() failed; addr=" << util::to_numeric_addr(&addr_->addr) + << ", errno=" << error; + return -1; + } + + rv = connect(conn_.fd, &addr_->addr.su.sa, addr_->addr.len); + if (rv != 0 && errno != EINPROGRESS) { + auto error = errno; + LOG(WARN) << "connect() failed; addr=" + << util::to_numeric_addr(&addr_->addr) << ", errno=" << error; + LOG(WARN) << strerror(error); + + close(conn_.fd); + conn_.fd = -1; + + return -1; + } + + if (ssl_ctx_) { + auto sni_name = !get_config()->tls.backend_sni_name.empty() + ? StringRef(get_config()->tls.backend_sni_name) + : StringRef(addr_->host); + if (!util::numeric_host(sni_name.c_str())) { + SSL_set_tlsext_host_name(conn_.tls.ssl, sni_name.c_str()); + } + + auto session = ssl::reuse_tls_session(addr_); + if (session) { + SSL_set_session(conn_.tls.ssl, session); + SSL_SESSION_free(session); + } + + conn_.prepare_client_handshake(); + } + + write_ = &LiveCheck::connected; + + ev_io_set(&conn_.wev, conn_.fd, EV_WRITE); + ev_io_set(&conn_.rev, conn_.fd, EV_READ); + + conn_.wlimit.startw(); + + // TODO we should have timeout for connection establishment + ev_timer_again(conn_.loop, &conn_.wt); + + return 0; +} + +int LiveCheck::connected() { + if (!util::check_socket_connected(conn_.fd)) { + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Backend connect failed; addr=" + << util::to_numeric_addr(&addr_->addr); + } + + return -1; + } + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "Connection established"; + } + + conn_.rlimit.startw(); + + if (conn_.tls.ssl) { + read_ = &LiveCheck::tls_handshake; + write_ = &LiveCheck::tls_handshake; + + return do_write(); + } + + on_success(); + disconnect(); + + return 0; +} + +int LiveCheck::tls_handshake() { + ev_timer_again(conn_.loop, &conn_.rt); + + ERR_clear_error(); + + auto rv = conn_.tls_handshake(); + + if (rv == SHRPX_ERR_INPROGRESS) { + return 0; + } + + if (rv < 0) { + return rv; + } + + if (LOG_ENABLED(INFO)) { + LOG(INFO) << "SSL/TLS handshake completed"; + } + + if (!get_config()->tls.insecure && + ssl::check_cert(conn_.tls.ssl, addr_) != 0) { + return -1; + } + + if (!SSL_session_reused(conn_.tls.ssl)) { + auto tls_session = SSL_get0_session(conn_.tls.ssl); + if (tls_session) { + ssl::try_cache_tls_session(addr_, tls_session, ev_now(conn_.loop)); + } + } + + on_success(); + disconnect(); + + // TODO Check ALPN identifier here + + return 0; +} + +void LiveCheck::on_failure() { + ++fail_count_; + + LOG(WARN) << "Liveness check for " << util::to_numeric_addr(&addr_->addr) + << " failed " << fail_count_ << " time(s) in a row"; + + disconnect(); + + schedule(); +} + +void LiveCheck::on_success() { + ++success_count_; + fail_count_ = 0; + + LOG(WARN) << "Liveness check for " << util::to_numeric_addr(&addr_->addr) + << " succeeded " << success_count_ << " time(s) in a row"; + + if (success_count_ < 3) { + disconnect(); + + schedule(); + + return; + } + + LOG(NOTICE) << util::to_numeric_addr(&addr_->addr) << " is considered online"; + + addr_->connect_blocker->online(); + + success_count_ = 0; + fail_count_ = 0; + + disconnect(); +} + +int LiveCheck::noop() { return 0; } + +} // namespace shrpx diff --git a/src/shrpx_live_check.h b/src/shrpx_live_check.h new file mode 100644 index 00000000..f42f8bcb --- /dev/null +++ b/src/shrpx_live_check.h @@ -0,0 +1,88 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2016 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef SHRPX_LIVE_CHECK_H +#define SHRPX_LIVE_CHECK_H + +#include "shrpx.h" + +#include + +#include + +#include + +#include "shrpx_connection.h" + +namespace shrpx { + +class Worker; +struct DownstreamAddrGroup; +struct DownstreamAddr; + +class LiveCheck { +public: + LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, + DownstreamAddrGroup *group, DownstreamAddr *addr); + ~LiveCheck(); + + void disconnect(); + + void on_success(); + void on_failure(); + + int initiate_connection(); + + void schedule(); + + // Low level I/O operation callback; they are called from do_read() + // or do_write(). + int noop(); + int connected(); + int tls_handshake(); + + int do_read(); + int do_write(); + + void signal_write(); + +private: + Connection conn_; + ev_timer backoff_timer_; + std::function read_, write_; + Worker *worker_; + // nullptr if no TLS is configured + SSL_CTX *ssl_ctx_; + DownstreamAddrGroup *group_; + // Address of remote endpoint + DownstreamAddr *addr_; + // The number of successful connect attempt in a row. + size_t success_count_; + // The number of unsuccessful connect attempt in a row. + size_t fail_count_; +}; + +} // namespace shrpx + +#endif // SHRPX_LIVE_CHECK_H diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 4ce6d29b..c354d1aa 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -36,6 +36,7 @@ #include "shrpx_http2_session.h" #include "shrpx_log_config.h" #include "shrpx_connect_blocker.h" +#include "shrpx_live_check.h" #include "shrpx_memcached_dispatcher.h" #ifdef HAVE_MRUBY #include "shrpx_mruby.h" @@ -160,6 +161,9 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, dst_addr.host_unix = src_addr.host_unix; dst_addr.connect_blocker = make_unique(randgen_, loop_); + dst_addr.live_check = make_unique( + loop_, shared_addr->tls ? cl_ssl_ctx_ : nullptr, this, &dst, + &dst_addr); } // share the connection if patterns have the same set of backend @@ -464,4 +468,20 @@ size_t match_downstream_addr_group( groups, catch_all); } +void downstream_failure(DownstreamAddr *addr) { + const auto &connect_blocker = addr->connect_blocker; + + connect_blocker->on_failure(); + + auto fail_count = connect_blocker->get_fail_count(); + + if (fail_count >= 3) { + LOG(WARN) << "Could not connect to " << util::to_numeric_addr(&addr->addr) + << " " << fail_count << " times in a row; considered as offline"; + + connect_blocker->offline(); + addr->live_check->schedule(); + } +} + } // namespace shrpx diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index 8224c497..b6f1834a 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -52,6 +52,7 @@ namespace shrpx { class Http2Session; class ConnectBlocker; +class LiveCheck; class MemcachedDispatcher; struct UpstreamAddr; @@ -79,6 +80,7 @@ struct DownstreamAddr { bool host_unix; std::unique_ptr connect_blocker; + std::unique_ptr live_check; // Client side TLS session cache TLSSessionCache tls_session_cache; // Http2Session object created for this address. This list chains @@ -225,6 +227,8 @@ size_t match_downstream_addr_group( const StringRef &hostport, const StringRef &path, const std::vector &groups, size_t catch_all); +void downstream_failure(DownstreamAddr *addr); + } // namespace shrpx #endif // SHRPX_WORKER_H