diff --git a/src/shrpx_connection_handler.cc b/src/shrpx_connection_handler.cc index af919514..50f54664 100644 --- a/src/shrpx_connection_handler.cc +++ b/src/shrpx_connection_handler.cc @@ -110,12 +110,8 @@ void serial_event_async_cb(struct ev_loop *loop, ev_async *w, int revent) { } } // namespace -namespace { -std::random_device rd; -} // namespace - -ConnectionHandler::ConnectionHandler(struct ev_loop *loop) - : gen_(rd()), +ConnectionHandler::ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen) + : gen_(gen), single_worker_(nullptr), loop_(loop), tls_ticket_key_memcached_get_retry_count_(0), @@ -736,6 +732,14 @@ ConnectionHandler::get_tls_ticket_key_memcached_dispatcher() const { return tls_ticket_key_memcached_dispatcher_.get(); } +// Use the similar backoff algorithm described in +// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md +namespace { +constexpr size_t MAX_BACKOFF_EXP = 10; +constexpr auto MULTIPLIER = 3.2; +constexpr auto JITTER = 0.2; +} // namespace + void ConnectionHandler::on_tls_ticket_key_network_error(ev_timer *w) { if (++tls_ticket_key_memcached_get_retry_count_ >= get_config()->tls.ticket.memcached.max_retry) { @@ -746,15 +750,19 @@ void ConnectionHandler::on_tls_ticket_key_network_error(ev_timer *w) { return; } - auto dist = std::uniform_int_distribution( - 1, std::min(60, 1 << tls_ticket_key_memcached_get_retry_count_)); - auto t = dist(gen_); + auto base_backoff = util::int_pow( + MULTIPLIER, + std::min(MAX_BACKOFF_EXP, tls_ticket_key_memcached_get_retry_count_)); + auto dist = std::uniform_real_distribution<>(-JITTER * base_backoff, + JITTER * base_backoff); + + auto backoff = base_backoff + dist(gen_); LOG(WARN) << "Memcached: tls ticket get failed due to network error, retrying in " - << t << " seconds"; + << backoff << " seconds"; - ev_timer_set(w, t, 0.); + ev_timer_set(w, backoff, 0.); ev_timer_start(loop_, w); } diff --git a/src/shrpx_connection_handler.h b/src/shrpx_connection_handler.h index c45c7462..f798b73b 100644 --- a/src/shrpx_connection_handler.h +++ b/src/shrpx_connection_handler.h @@ -101,7 +101,7 @@ struct SerialEvent { class ConnectionHandler { public: - ConnectionHandler(struct ev_loop *loop); + ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen); ~ConnectionHandler(); int handle_connection(int fd, sockaddr *addr, int addrlen, const UpstreamAddr *faddr); diff --git a/src/shrpx_memcached_connection.cc b/src/shrpx_memcached_connection.cc index 98554076..d28b5fab 100644 --- a/src/shrpx_memcached_connection.cc +++ b/src/shrpx_memcached_connection.cc @@ -56,7 +56,7 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) { auto mconn = static_cast(conn->data); if (mconn->on_read() != 0) { - mconn->disconnect(); + mconn->reconnect_or_fail(); return; } } @@ -68,7 +68,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { auto mconn = static_cast(conn->data); if (mconn->on_write() != 0) { - mconn->disconnect(); + mconn->reconnect_or_fail(); return; } } @@ -88,22 +88,25 @@ void connectcb(struct ev_loop *loop, ev_io *w, int revents) { } } // namespace -constexpr ev_tstamp write_timeout = 10.; -constexpr ev_tstamp read_timeout = 10.; +constexpr auto write_timeout = 10_s; +constexpr auto read_timeout = 10_s; MemcachedConnection::MemcachedConnection(const Address *addr, struct ev_loop *loop, SSL_CTX *ssl_ctx, const StringRef &sni_name, - MemchunkPool *mcpool) + MemchunkPool *mcpool, + std::mt19937 &gen) : conn_(loop, -1, nullptr, mcpool, write_timeout, read_timeout, {}, {}, connectcb, readcb, timeoutcb, this, 0, 0., PROTO_MEMCACHED), do_read_(&MemcachedConnection::noop), do_write_(&MemcachedConnection::noop), sni_name_(sni_name.str()), + connect_blocker_(gen, loop, [] {}, [] {}), parse_state_{}, addr_(addr), ssl_ctx_(ssl_ctx), sendsum_(0), + try_count_(0), connected_(false) {} MemcachedConnection::~MemcachedConnection() { disconnect(); } @@ -201,6 +204,8 @@ int MemcachedConnection::initiate_connection() { int MemcachedConnection::connected() { if (!util::check_socket_connected(conn_.fd)) { + connect_blocker_.on_failure(); + conn_.wlimit.stopw(); if (LOG_ENABLED(INFO)) { @@ -214,10 +219,7 @@ int MemcachedConnection::connected() { MCLOG(INFO, this) << "connected to memcached server"; } - connected_ = true; - conn_.rlimit.startw(); - ev_timer_again(conn_.loop, &conn_.rt); ev_set_cb(&conn_.wev, writecb); @@ -228,6 +230,12 @@ int MemcachedConnection::connected() { return 0; } + ev_timer_stop(conn_.loop, &conn_.wt); + + connected_ = true; + + connect_blocker_.on_success(); + do_read_ = &MemcachedConnection::read_clear; do_write_ = &MemcachedConnection::write_clear; @@ -248,6 +256,7 @@ int MemcachedConnection::tls_handshake() { } if (rv < 0) { + connect_blocker_.on_failure(); return rv; } @@ -259,6 +268,7 @@ int MemcachedConnection::tls_handshake() { if (!tlsconf.insecure && ssl::check_cert(conn_.tls.ssl, addr_, StringRef(sni_name_)) != 0) { + connect_blocker_.on_failure(); return -1; } @@ -270,6 +280,13 @@ int MemcachedConnection::tls_handshake() { } } + ev_timer_stop(conn_.loop, &conn_.rt); + ev_timer_stop(conn_.loop, &conn_.wt); + + connected_ = true; + + connect_blocker_.on_success(); + do_read_ = &MemcachedConnection::read_tls; do_write_ = &MemcachedConnection::write_tls; @@ -325,7 +342,9 @@ int MemcachedConnection::read_tls() { return 0; } - ev_timer_again(conn_.loop, &conn_.rt); + if (ev_is_active(&conn_.rt)) { + ev_timer_again(conn_.loop, &conn_.rt); + } for (;;) { auto nread = conn_.read_tls(recvbuf_.last, recvbuf_.wleft()); @@ -386,7 +405,9 @@ int MemcachedConnection::read_clear() { return 0; } - ev_timer_again(conn_.loop, &conn_.rt); + if (ev_is_active(&conn_.rt)) { + ev_timer_again(conn_.loop, &conn_.rt); + } for (;;) { auto nread = conn_.read_clear(recvbuf_.last, recvbuf_.wleft()); @@ -535,9 +556,16 @@ int MemcachedConnection::parse_packet() { } } + // We require at least one complete response to clear try count. + try_count_ = 0; + auto req = std::move(recvq_.front()); recvq_.pop_front(); + if (sendq_.empty() && recvq_.empty()) { + ev_timer_stop(conn_.loop, &conn_.rt); + } + if (!req->canceled && req->cb) { req->cb(req.get(), MemcachedResult(parse_state_.status_code, std::move(parse_state_.value))); @@ -635,6 +663,13 @@ void MemcachedConnection::drain_send_queue(size_t nwrite) { recvq_.push_back(std::move(sendq_.front())); sendq_.pop_front(); } + + // start read timer only when we wait for responses. + if (recvq_.empty()) { + ev_timer_stop(conn_.loop, &conn_.rt); + } else if (!ev_is_active(&conn_.rt)) { + ev_timer_again(conn_.loop, &conn_.rt); + } } size_t MemcachedConnection::serialized_size(MemcachedRequest *req) { @@ -676,6 +711,10 @@ void MemcachedConnection::make_request(MemcachedSendbuf *sendbuf, } int MemcachedConnection::add_request(std::unique_ptr req) { + if (connect_blocker_.blocked()) { + return -1; + } + sendq_.push_back(std::move(req)); if (connected_) { @@ -684,6 +723,7 @@ int MemcachedConnection::add_request(std::unique_ptr req) { } if (conn_.fd == -1 && initiate_connection() != 0) { + connect_blocker_.on_failure(); disconnect(); return -1; } @@ -696,4 +736,50 @@ void MemcachedConnection::signal_write() { conn_.wlimit.startw(); } int MemcachedConnection::noop() { return 0; } +void MemcachedConnection::reconnect_or_fail() { + if (!connected_ || (recvq_.empty() && sendq_.empty())) { + disconnect(); + return; + } + + constexpr size_t MAX_TRY_COUNT = 3; + + if (++try_count_ >= MAX_TRY_COUNT) { + if (LOG_ENABLED(INFO)) { + MCLOG(INFO, this) << "Tried " << MAX_TRY_COUNT + << " times, and all failed. Aborting"; + } + try_count_ = 0; + disconnect(); + return; + } + + std::vector> q; + q.reserve(recvq_.size() + sendq_.size()); + + if (LOG_ENABLED(INFO)) { + MCLOG(INFO, this) << "Retry connection, enqueue " + << recvq_.size() + sendq_.size() << " request(s) again"; + } + + q.insert(std::end(q), std::make_move_iterator(std::begin(recvq_)), + std::make_move_iterator(std::end(recvq_))); + q.insert(std::end(q), std::make_move_iterator(std::begin(sendq_)), + std::make_move_iterator(std::end(sendq_))); + + recvq_.clear(); + sendq_.clear(); + + disconnect(); + + sendq_.insert(std::end(sendq_), std::make_move_iterator(std::begin(q)), + std::make_move_iterator(std::end(q))); + + if (initiate_connection() != 0) { + connect_blocker_.on_failure(); + disconnect(); + return; + } +} + } // namespace shrpx diff --git a/src/shrpx_memcached_connection.h b/src/shrpx_memcached_connection.h index 290442f0..858d3001 100644 --- a/src/shrpx_memcached_connection.h +++ b/src/shrpx_memcached_connection.h @@ -34,6 +34,7 @@ #include "shrpx_connection.h" #include "shrpx_ssl.h" +#include "shrpx_connect_blocker.h" #include "buffer.h" #include "network.h" @@ -96,7 +97,7 @@ class MemcachedConnection { public: MemcachedConnection(const Address *addr, struct ev_loop *loop, SSL_CTX *ssl_ctx, const StringRef &sni_name, - MemchunkPool *mcpool); + MemchunkPool *mcpool, std::mt19937 &gen); ~MemcachedConnection(); void disconnect(); @@ -126,6 +127,8 @@ public: int noop(); + void reconnect_or_fail(); + private: Connection conn_; std::deque> recvq_; @@ -134,11 +137,13 @@ private: std::function do_read_, do_write_; std::string sni_name_; ssl::TLSSessionCache tls_session_cache_; + ConnectBlocker connect_blocker_; MemcachedParseState parse_state_; const Address *addr_; SSL_CTX *ssl_ctx_; // Sum of the bytes to be transmitted in sendbufv_. size_t sendsum_; + size_t try_count_; bool connected_; Buffer<8_k> recvbuf_; }; diff --git a/src/shrpx_memcached_dispatcher.cc b/src/shrpx_memcached_dispatcher.cc index 796fef8e..a3f1357c 100644 --- a/src/shrpx_memcached_dispatcher.cc +++ b/src/shrpx_memcached_dispatcher.cc @@ -33,10 +33,11 @@ namespace shrpx { MemcachedDispatcher::MemcachedDispatcher(const Address *addr, struct ev_loop *loop, SSL_CTX *ssl_ctx, const StringRef &sni_name, - MemchunkPool *mcpool) + MemchunkPool *mcpool, + std::mt19937 &gen) : loop_(loop), mconn_(make_unique(addr, loop_, ssl_ctx, sni_name, - mcpool)) {} + mcpool, gen)) {} MemcachedDispatcher::~MemcachedDispatcher() {} diff --git a/src/shrpx_memcached_dispatcher.h b/src/shrpx_memcached_dispatcher.h index 7cca3bcb..e7b78d27 100644 --- a/src/shrpx_memcached_dispatcher.h +++ b/src/shrpx_memcached_dispatcher.h @@ -28,6 +28,7 @@ #include "shrpx.h" #include +#include #include @@ -45,7 +46,7 @@ class MemcachedDispatcher { public: MemcachedDispatcher(const Address *addr, struct ev_loop *loop, SSL_CTX *ssl_ctx, const StringRef &sni_name, - MemchunkPool *mcpool); + MemchunkPool *mcpool, std::mt19937 &gen); ~MemcachedDispatcher(); int add_request(std::unique_ptr req); diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 5927a8d5..0d4a113a 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -147,7 +147,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, session_cache_memcached_dispatcher_ = make_unique( &session_cacheconf.memcached.addr, loop, tls_session_cache_memcached_ssl_ctx, - StringRef{session_cacheconf.memcached.host}, &mcpool_); + StringRef{session_cacheconf.memcached.host}, &mcpool_, randgen_); } replace_downstream_config(std::move(downstreamconf)); diff --git a/src/shrpx_worker_process.cc b/src/shrpx_worker_process.cc index 8f7e817f..f7c84e96 100644 --- a/src/shrpx_worker_process.cc +++ b/src/shrpx_worker_process.cc @@ -379,6 +379,10 @@ void nb_child_cb(struct ev_loop *loop, ev_child *w, int revents) { } // namespace #endif // HAVE_NEVERBLEED +namespace { +std::random_device rd; +} // namespace + int worker_process_event_loop(WorkerProcessConfig *wpconf) { if (reopen_log_files() != 0) { LOG(FATAL) << "Failed to open log file"; @@ -387,7 +391,9 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) { auto loop = EV_DEFAULT; - ConnectionHandler conn_handler(loop); + auto gen = std::mt19937(rd()); + + ConnectionHandler conn_handler(loop, gen); for (auto &addr : get_config()->conn.listener.addrs) { conn_handler.add_acceptor(make_unique(&addr, &conn_handler)); @@ -435,7 +441,7 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) { conn_handler.set_tls_ticket_key_memcached_dispatcher( make_unique( &ticketconf.memcached.addr, loop, ssl_ctx, - StringRef{memcachedconf.host}, &mcpool)); + StringRef{memcachedconf.host}, &mcpool, gen)); ev_timer_init(&renew_ticket_key_timer, memcached_get_ticket_key_cb, 0., 0.);