diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 4d063647..6e89a5c4 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -389,7 +389,7 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl, pinned_http2sessions_( get_config()->conn.downstream.proto == PROTO_HTTP2 ? make_unique>( - get_config()->conn.downstream.addr_groups.size(), -1) + worker->get_downstream_addr_groups().size(), -1) : nullptr), ipaddr_(ipaddr), port_(port), @@ -664,8 +664,8 @@ std::unique_ptr ClientHandler::get_downstream_connection(Downstream *downstream) { size_t group; auto &downstreamconf = get_config()->conn.downstream; - auto &groups = downstreamconf.addr_groups; auto catch_all = downstreamconf.addr_group_catch_all; + auto &groups = worker_->get_downstream_addr_groups(); const auto &req = downstream->request(); @@ -746,10 +746,6 @@ MemchunkPool *ClientHandler::get_mcpool() { return worker_->get_mcpool(); } SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; } -ConnectBlocker *ClientHandler::get_connect_blocker() const { - return worker_->get_connect_blocker(); -} - void ClientHandler::direct_http2_upgrade() { upstream_ = make_unique(this); alpn_ = NGHTTP2_CLEARTEXT_PROTO_VERSION_ID; diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 4830e93e..9e932ae2 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -99,7 +99,6 @@ public: get_downstream_connection(Downstream *downstream); MemchunkPool *get_mcpool(); SSL *get_ssl() const; - ConnectBlocker *get_connect_blocker() const; // Call this function when HTTP/2 connection header is received at // the start of the connection. void direct_http2_upgrade(); diff --git a/src/shrpx_config.h b/src/shrpx_config.h index 80f0e06e..7913fa19 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -59,6 +59,7 @@ using namespace nghttp2; namespace shrpx { struct LogFragment; +class ConnectBlocker; namespace ssl { @@ -294,6 +295,7 @@ struct DownstreamAddr { // socket path. ImmutableString host; ImmutableString hostport; + ConnectBlocker *connect_blocker; // backend port. 0 if |host_unix| is true. uint16_t port; // true if |host| contains UNIX domain socket path. diff --git a/src/shrpx_connect_blocker.cc b/src/shrpx_connect_blocker.cc index 3eb6de16..cdab59a9 100644 --- a/src/shrpx_connect_blocker.cc +++ b/src/shrpx_connect_blocker.cc @@ -26,20 +26,16 @@ namespace shrpx { -namespace { -const ev_tstamp INITIAL_SLEEP = 2.; -} // namespace - namespace { void connect_blocker_cb(struct ev_loop *loop, ev_timer *w, int revents) { if (LOG_ENABLED(INFO)) { - LOG(INFO) << "unblock downstream connection"; + LOG(INFO) << "Unblock"; } } } // namespace -ConnectBlocker::ConnectBlocker(struct ev_loop *loop) - : loop_(loop), sleep_(INITIAL_SLEEP) { +ConnectBlocker::ConnectBlocker(std::mt19937 &gen, struct ev_loop *loop) + : gen_(gen), loop_(loop), fail_count_(0) { ev_timer_init(&timer_, connect_blocker_cb, 0., 0.); } @@ -47,18 +43,27 @@ ConnectBlocker::~ConnectBlocker() { ev_timer_stop(loop_, &timer_); } bool ConnectBlocker::blocked() const { return ev_is_active(&timer_); } -void ConnectBlocker::on_success() { sleep_ = INITIAL_SLEEP; } +void ConnectBlocker::on_success() { fail_count_ = 0; } + +namespace { +constexpr size_t MAX_BACKOFF_EXP = 10; +} // namespace void ConnectBlocker::on_failure() { if (ev_is_active(&timer_)) { return; } - sleep_ = std::min(128., sleep_ * 2); + ++fail_count_; - LOG(WARN) << "connect failure, start sleeping " << sleep_; + auto max_backoff = (1 << std::min(MAX_BACKOFF_EXP, fail_count_)) - 1; + auto dist = std::uniform_int_distribution<>(0, max_backoff); + auto backoff = dist(gen_); - ev_timer_set(&timer_, sleep_, 0.); + LOG(WARN) << "Could not connect " << fail_count_ + << " times in a row; sleep for " << backoff << " seconds"; + + ev_timer_set(&timer_, backoff, 0.); ev_timer_start(loop_, &timer_); } diff --git a/src/shrpx_connect_blocker.h b/src/shrpx_connect_blocker.h index af445644..63a1e3f9 100644 --- a/src/shrpx_connect_blocker.h +++ b/src/shrpx_connect_blocker.h @@ -27,13 +27,15 @@ #include "shrpx.h" +#include + #include namespace shrpx { class ConnectBlocker { public: - ConnectBlocker(struct ev_loop *loop); + ConnectBlocker(std::mt19937 &gen, struct ev_loop *loop); ~ConnectBlocker(); // Returns true if making connection is not allowed. @@ -41,14 +43,18 @@ public: // Call this function if connect operation succeeded. This will // reset sleep_ to minimum value. void on_success(); - // Call this function if connect operation failed. This will start - // timer and blocks connection establishment for sleep_ seconds. + // Call this function if connect operations failed. This will start + // timer and blocks connection establishment with exponential + // backoff. void on_failure(); private: + std::mt19937 gen_; ev_timer timer_; struct ev_loop *loop_; - ev_tstamp sleep_; + // The number of consecutive connection failure. Reset to 0 on + // success. + size_t fail_count_; }; } // namespace diff --git a/src/shrpx_http2_downstream_connection.cc b/src/shrpx_http2_downstream_connection.cc index 02982b8a..da2d953d 100644 --- a/src/shrpx_http2_downstream_connection.cc +++ b/src/shrpx_http2_downstream_connection.cc @@ -98,6 +98,9 @@ int Http2DownstreamConnection::attach_downstream(Downstream *downstream) { http2session_->add_downstream_connection(this); if (http2session_->get_state() == Http2Session::DISCONNECTED) { http2session_->signal_write(); + if (http2session_->get_state() == Http2Session::DISCONNECTED) { + return -1; + } } downstream_ = downstream; diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index d11e3fd7..1f660649 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -147,8 +147,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { } // namespace Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, - ConnectBlocker *connect_blocker, Worker *worker, - size_t group, size_t idx) + Worker *worker, size_t group, size_t idx) : conn_(loop, -1, nullptr, worker->get_mcpool(), get_config()->conn.downstream.timeout.write, get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb, @@ -156,7 +155,6 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, get_config()->tls.dyn_rec.idle_timeout), wb_(worker->get_mcpool()), worker_(worker), - connect_blocker_(connect_blocker), ssl_ctx_(ssl_ctx), addr_(nullptr), session_(nullptr), @@ -254,30 +252,49 @@ int Http2Session::disconnect(bool hard) { int Http2Session::initiate_connection() { int rv = 0; - auto &addrs = get_config()->conn.downstream.addr_groups[group_].addrs; + auto &groups = worker_->get_downstream_addr_groups(); + auto &addrs = groups[group_].addrs; if (state_ == DISCONNECTED) { - if (connect_blocker_->blocked()) { - if (LOG_ENABLED(INFO)) { - DCLOG(INFO, this) - << "Downstream connection was blocked by connect_blocker"; - } - return -1; - } - auto &next_downstream = worker_->get_dgrp(group_)->next; - addr_ = &addrs[next_downstream]; + auto end = next_downstream; - if (LOG_ENABLED(INFO)) { - SSLOG(INFO, this) << "Using downstream address idx=" << next_downstream - << " out of " << addrs.size(); - } + for (;;) { + auto &addr = addrs[next_downstream]; - if (++next_downstream >= addrs.size()) { - next_downstream = 0; + if (++next_downstream >= addrs.size()) { + next_downstream = 0; + } + + auto &connect_blocker = addr.connect_blocker; + + if (connect_blocker->blocked()) { + if (LOG_ENABLED(INFO)) { + DCLOG(INFO, this) << "Backend server " + << (addr.host_unix ? addr.host : addr.hostport) + << " was not available temporarily"; + } + + if (end == next_downstream) { + return -1; + } + + continue; + } + + if (LOG_ENABLED(INFO)) { + SSLOG(INFO, this) << "Using downstream address idx=" << next_downstream + << " out of " << addrs.size(); + } + + addr_ = &addr; + + break; } } + auto &connect_blocker = addr_->connect_blocker; + const auto &proxy = get_config()->downstream_http_proxy; if (!proxy.host.empty() && state_ == DISCONNECTED) { if (LOG_ENABLED(INFO)) { @@ -288,7 +305,7 @@ int Http2Session::initiate_connection() { conn_.fd = util::create_nonblock_socket(proxy.addr.su.storage.ss_family); if (conn_.fd == -1) { - connect_blocker_->on_failure(); + connect_blocker->on_failure(); return -1; } @@ -296,7 +313,7 @@ int Http2Session::initiate_connection() { if (rv != 0 && errno != EINPROGRESS) { SSLOG(ERROR, this) << "Failed to connect to the proxy " << proxy.host << ":" << proxy.port; - connect_blocker_->on_failure(); + connect_blocker->on_failure(); return -1; } @@ -356,7 +373,7 @@ int Http2Session::initiate_connection() { conn_.fd = util::create_nonblock_socket(addr_->addr.su.storage.ss_family); if (conn_.fd == -1) { - connect_blocker_->on_failure(); + connect_blocker->on_failure(); return -1; } @@ -365,7 +382,7 @@ int Http2Session::initiate_connection() { const_cast(&addr_->addr.su.sa), addr_->addr.len); if (rv != 0 && errno != EINPROGRESS) { - connect_blocker_->on_failure(); + connect_blocker->on_failure(); return -1; } @@ -383,14 +400,14 @@ int Http2Session::initiate_connection() { util::create_nonblock_socket(addr_->addr.su.storage.ss_family); if (conn_.fd == -1) { - connect_blocker_->on_failure(); + connect_blocker->on_failure(); return -1; } rv = connect(conn_.fd, const_cast(&addr_->addr.su.sa), addr_->addr.len); if (rv != 0 && errno != EINPROGRESS) { - connect_blocker_->on_failure(); + connect_blocker->on_failure(); return -1; } @@ -1615,11 +1632,15 @@ int Http2Session::read_noop(const uint8_t *data, size_t datalen) { return 0; } int Http2Session::write_noop() { return 0; } int Http2Session::connected() { + auto &connect_blocker = addr_->connect_blocker; + if (!util::check_socket_connected(conn_.fd)) { + connect_blocker->on_failure(); + return -1; } - connect_blocker_->on_success(); + connect_blocker->on_success(); if (LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "Connection established"; diff --git a/src/shrpx_http2_session.h b/src/shrpx_http2_session.h index a858b1bc..62e1235c 100644 --- a/src/shrpx_http2_session.h +++ b/src/shrpx_http2_session.h @@ -48,7 +48,6 @@ namespace shrpx { class Http2DownstreamConnection; class Worker; -class ConnectBlocker; struct StreamData { StreamData *dlnext, *dlprev; @@ -57,9 +56,8 @@ struct StreamData { class Http2Session { public: - Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, - ConnectBlocker *connect_blocker, Worker *worker, size_t group, - size_t idx); + Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker, + size_t group, size_t idx); ~Http2Session(); // If hard is true, all pending requests are abandoned and @@ -203,7 +201,6 @@ private: // Used to parse the response from HTTP proxy std::unique_ptr proxy_htp_; Worker *worker_; - ConnectBlocker *connect_blocker_; // NULL if no TLS is configured SSL_CTX *ssl_ctx_; // Address of remote endpoint diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 9e55f39b..151bd45b 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -147,16 +147,6 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { auto &downstreamconf = get_config()->conn.downstream; if (conn_.fd == -1) { - auto connect_blocker = client_handler_->get_connect_blocker(); - - if (connect_blocker->blocked()) { - if (LOG_ENABLED(INFO)) { - DCLOG(INFO, this) - << "Downstream connection was blocked by connect_blocker"; - } - return -1; - } - if (ssl_ctx_) { auto ssl = ssl::create_ssl(ssl_ctx_); if (!ssl) { @@ -168,7 +158,8 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { auto &next_downstream = worker_->get_dgrp(group_)->next; auto end = next_downstream; - auto &addrs = downstreamconf.addr_groups[group_].addrs; + auto &groups = worker_->get_downstream_addr_groups(); + auto &addrs = groups[group_].addrs; for (;;) { auto &addr = addrs[next_downstream]; @@ -176,6 +167,22 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { next_downstream = 0; } + auto &connect_blocker = addr.connect_blocker; + + if (connect_blocker->blocked()) { + if (LOG_ENABLED(INFO)) { + DCLOG(INFO, this) << "Backend server " + << (addr.host_unix ? addr.host : addr.hostport) + << " was not available temporarily"; + } + + if (end == next_downstream) { + return SHRPX_ERR_NETWORK; + } + + continue; + } + conn_.fd = util::create_nonblock_socket(addr.addr.su.storage.ss_family); if (conn_.fd == -1) { @@ -1012,7 +1019,7 @@ int HttpDownstreamConnection::process_input(const uint8_t *data, } int HttpDownstreamConnection::connected() { - auto connect_blocker = client_handler_->get_connect_blocker(); + auto connect_blocker = addr_->connect_blocker; if (!util::check_socket_connected(conn_.fd)) { conn_.wlimit.stopw(); @@ -1021,6 +1028,8 @@ int HttpDownstreamConnection::connected() { DLOG(INFO, this) << "downstream connect failed"; } + connect_blocker->on_failure(); + downstream_->set_request_state(Downstream::CONNECT_FAIL); return -1; diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 604f9058..566ac334 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -80,7 +80,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, cl_ssl_ctx_(cl_ssl_ctx), cert_tree_(cert_tree), ticket_keys_(ticket_keys), - connect_blocker_(make_unique(loop_)), + downstream_addr_groups_(get_config()->conn.downstream.addr_groups), graceful_shutdown_(false) { ev_async_init(&w_, eventcb); w_.data = this; @@ -109,17 +109,29 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, m = downstreamconf.addr_groups[group].addrs.size(); } for (size_t idx = 0; idx < m; ++idx) { - dgrp.http2sessions.push_back(make_unique( - loop_, cl_ssl_ctx, connect_blocker_.get(), this, group, idx)); + dgrp.http2sessions.push_back( + make_unique(loop_, cl_ssl_ctx, this, group, idx)); } ++group; } } + + for (auto &group : downstream_addr_groups_) { + for (auto &addr : group.addrs) { + addr.connect_blocker = new ConnectBlocker(randgen_, loop_); + } + } } Worker::~Worker() { ev_async_stop(loop_, &w_); ev_timer_stop(loop_, &mcpool_clear_timer_); + + for (auto &group : downstream_addr_groups_) { + for (auto &addr : group.addrs) { + delete addr.connect_blocker; + } + } } void Worker::schedule_clear_mcpool() { @@ -259,10 +271,6 @@ Http2Session *Worker::next_http2_session(size_t group) { return res; } -ConnectBlocker *Worker::get_connect_blocker() const { - return connect_blocker_.get(); -} - struct ev_loop *Worker::get_loop() const { return loop_; } @@ -361,4 +369,8 @@ SSL_SESSION *Worker::reuse_client_tls_session(const Address *addr) { return d2i_SSL_SESSION(nullptr, &p, ent.session_data.size()); } +std::vector &Worker::get_downstream_addr_groups() { + return downstream_addr_groups_; +} + } // namespace shrpx diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index 8d352429..e56595c1 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -131,7 +131,6 @@ public: WorkerStat *get_worker_stat(); DownstreamConnectionPool *get_dconn_pool(); Http2Session *next_http2_session(size_t group); - ConnectBlocker *get_connect_blocker() const; struct ev_loop *get_loop() const; SSL_CTX *get_sv_ssl_ctx() const; SSL_CTX *get_cl_ssl_ctx() const; @@ -164,6 +163,8 @@ public: // found associated to |addr|, nullptr will be returned. SSL_SESSION *reuse_client_tls_session(const Address *addr); + std::vector &get_downstream_addr_groups(); + private: #ifndef NOTHREADS std::future fut_; @@ -196,7 +197,7 @@ private: ssl::CertLookupTree *cert_tree_; std::shared_ptr ticket_keys_; - std::unique_ptr connect_blocker_; + std::vector downstream_addr_groups_; bool graceful_shutdown_; };