From 5d3535126ed9ed21c935f8006cd49c6052a6110e Mon Sep 17 00:00:00 2001 From: Tomasz Buchert Date: Sun, 3 Jul 2016 23:00:15 +0200 Subject: [PATCH 1/6] Fix FTBFS on armel by explicitly including the header. --- src/shrpx_connection_handler.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/shrpx_connection_handler.h b/src/shrpx_connection_handler.h index 1f9986af..c45c7462 100644 --- a/src/shrpx_connection_handler.h +++ b/src/shrpx_connection_handler.h @@ -32,6 +32,7 @@ #include #endif // HAVE_SYS_SOCKET_H +#include #include #include #include From 2bbe4422d2c694b6cd50976352b5f1347bebe3d1 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Wed, 6 Jul 2016 22:31:28 +0900 Subject: [PATCH 2/6] nghttpx: Use consistent hashing for client IP based session affinity We use technique described in https://github.com/RJ/ketama --- src/shrpx_client_handler.cc | 46 +++++++++++++++++------------- src/shrpx_client_handler.h | 5 +++- src/shrpx_config.cc | 56 +++++++++++++++++++++++++++++++++++++ src/shrpx_config.h | 12 ++++++++ src/shrpx_worker.cc | 1 + src/shrpx_worker.h | 3 ++ src/util.cc | 56 +++++++++++++++++++++++++++++++++++++ src/util.h | 6 ++++ 8 files changed, 165 insertions(+), 20 deletions(-) diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 64e83c3a..1e05e350 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -386,8 +386,9 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl, faddr_(faddr), worker_(worker), left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN), - affinity_hash_(-1), - should_close_after_write_(false) { + affinity_hash_(0), + should_close_after_write_(false), + affinity_hash_computed_(false) { ++worker_->get_worker_stat()->num_connections; @@ -674,23 +675,19 @@ void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) { } namespace { -uint32_t hash32(const StringRef &s) { - /* 32 bit FNV-1a: http://isthe.com/chongo/tech/comp/fnv/ */ - uint32_t h = 2166136261u; - size_t i; +// Computes 32bits hash for session affinity for IP address |ip|. +uint32_t compute_affinity_from_ip(const StringRef &ip) { + int rv; + std::array buf; - for (i = 0; i < s.size(); ++i) { - h ^= s[i]; - h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24); + rv = util::sha256(buf.data(), ip); + if (rv != 0) { + // Not sure when sha256 failed. Just fall back to another + // function. + return util::hash32(ip); } - return h; -} -} // namespace - -namespace { -int32_t calculate_affinity_from_ip(const StringRef &ip) { - return hash32(ip) & 0x7fffffff; + return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; } } // namespace @@ -918,11 +915,22 @@ ClientHandler::get_downstream_connection(Downstream *downstream) { auto &shared_addr = group->shared_addr; if (shared_addr->affinity == AFFINITY_IP) { - if (affinity_hash_ == -1) { - affinity_hash_ = calculate_affinity_from_ip(StringRef{ipaddr_}); + if (!affinity_hash_computed_) { + affinity_hash_ = compute_affinity_from_ip(StringRef{ipaddr_}); + affinity_hash_computed_ = true; } - auto idx = affinity_hash_ % shared_addr->addrs.size(); + const auto &affinity_hash = shared_addr->affinity_hash; + + auto it = std::lower_bound( + std::begin(affinity_hash), std::end(affinity_hash), affinity_hash_, + [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; }); + + if (it == std::end(affinity_hash)) { + it = std::begin(affinity_hash); + } + + auto idx = (*it).idx; auto &addr = shared_addr->addrs[idx]; if (addr.proto == PROTO_HTTP2) { diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 73117720..bdfdafd2 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -174,8 +174,11 @@ private: Worker *worker_; // The number of bytes of HTTP/2 client connection header to read size_t left_connhd_len_; - int32_t affinity_hash_; + // hash for session affinity using client IP + uint32_t affinity_hash_; bool should_close_after_write_; + // true if affinity_hash_ is computed + bool affinity_hash_computed_; ReadBuf rb_; }; diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index e059b276..3d27e6f2 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -2830,12 +2830,49 @@ StringRef strproto(shrpx_proto proto) { assert(0); } +namespace { +// Consistent hashing method described in +// https://github.com/RJ/ketama. Generate 160 32-bit hashes per |s|, +// which is usually backend address. The each hash is associated to +// index of backend address. When all hashes for every backend +// address are calculated, sort it in ascending order of hash. To +// choose the index, compute 32-bit hash based on client IP address, +// and do lower bound search in the array. The returned index is the +// backend to use. +int compute_affinity_hash(std::vector &res, size_t idx, + const StringRef &s) { + int rv; + std::array buf; + + for (auto i = 0; i < 20; ++i) { + auto t = s.str(); + t += i; + + rv = util::sha256(buf.data(), StringRef{t}); + if (rv != 0) { + return -1; + } + + for (int i = 0; i < 8; ++i) { + auto h = (buf[4 * i] << 24) | (buf[4 * i + 1] << 16) | + (buf[4 * i + 2] << 8) | buf[4 * i + 3]; + + res.emplace_back(idx, h); + } + } + + return 0; +} +} // namespace + // Configures the following member in |config|: // conn.downstream_router, conn.downstream.addr_groups, // conn.downstream.addr_group_catch_all. int configure_downstream_group(Config *config, bool http2_proxy, bool numeric_addr_only, const TLSConfig &tlsconf) { + int rv; + auto &downstreamconf = *config->conn.downstream; auto &addr_groups = downstreamconf.addr_groups; auto &routerconf = downstreamconf.router; @@ -2959,6 +2996,25 @@ int configure_downstream_group(Config *config, bool http2_proxy, << util::to_numeric_addr(&addr.addr); } } + + if (g.affinity == AFFINITY_IP) { + size_t idx = 0; + for (auto &addr : g.addrs) { + auto p = reinterpret_cast(&addr.addr.su); + rv = compute_affinity_hash(g.affinity_hash, idx, + StringRef{p, addr.addr.len}); + if (rv != 0) { + return -1; + } + + ++idx; + } + + std::sort(std::begin(g.affinity_hash), std::end(g.affinity_hash), + [](const AffinityHash &lhs, const AffinityHash &rhs) { + return lhs.hash < rhs.hash; + }); + } } return 0; diff --git a/src/shrpx_config.h b/src/shrpx_config.h index 25a34dac..2ea76015 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -370,12 +370,24 @@ struct DownstreamAddrConfig { bool tls; }; +// Mapping hash to idx which is an index into +// DownstreamAddrGroupConfig::addrs. +struct AffinityHash { + AffinityHash(size_t idx, uint32_t hash) : idx(idx), hash(hash) {} + + size_t idx; + uint32_t hash; +}; + struct DownstreamAddrGroupConfig { DownstreamAddrGroupConfig(const StringRef &pattern) : pattern(pattern.c_str(), pattern.size()), affinity(AFFINITY_NONE) {} ImmutableString pattern; std::vector addrs; + // Bunch of session affinity hash. Only used if affinity == + // AFFINITY_IP. + std::vector affinity_hash; // Session affinity shrpx_session_affinity affinity; }; diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index f815149b..5927a8d5 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -192,6 +192,7 @@ void Worker::replace_downstream_config( shared_addr->addrs.resize(src.addrs.size()); shared_addr->affinity = src.affinity; + shared_addr->affinity_hash = src.affinity_hash; size_t num_http1 = 0; size_t num_http2 = 0; diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index d187743c..3503d0f7 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -127,6 +127,9 @@ struct WeightedPri { struct SharedDownstreamAddr { std::vector addrs; + // Bunch of session affinity hash. Only used if affinity == + // AFFINITY_IP. + std::vector affinity_hash; // List of Http2Session which is not fully utilized (i.e., the // server advertized maximum concurrency is not reached). We will // coalesce as much stream as possible in one Http2Session to fully diff --git a/src/util.cc b/src/util.cc index d24c5c8d..af7f187d 100644 --- a/src/util.cc +++ b/src/util.cc @@ -54,8 +54,11 @@ #include #include +#include + #include +#include "ssl_compat.h" #include "timegm.h" namespace nghttp2 { @@ -1325,6 +1328,59 @@ double int_pow(double x, size_t y) { return res; } +uint32_t hash32(const StringRef &s) { + /* 32 bit FNV-1a: http://isthe.com/chongo/tech/comp/fnv/ */ + uint32_t h = 2166136261u; + size_t i; + + for (i = 0; i < s.size(); ++i) { + h ^= s[i]; + h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24); + } + + return h; +} + +#if !OPENSSL_101_API +namespace { +EVP_MD_CTX *EVP_MD_CTX_new(void) { return EVP_MD_CTX_create(); } +} // namespace + +namespace { +void EVP_MD_CTX_free(EVP_MD_CTX *ctx) { EVP_MD_CTX_destroy(ctx); } +} // namespace +#endif + +int sha256(uint8_t *res, const StringRef &s) { + int rv; + + auto ctx = EVP_MD_CTX_new(); + if (ctx == nullptr) { + return -1; + } + + auto ctx_deleter = defer(EVP_MD_CTX_free, ctx); + + rv = EVP_DigestInit_ex(ctx, EVP_sha256(), nullptr); + if (rv != 1) { + return -1; + } + + rv = EVP_DigestUpdate(ctx, s.c_str(), s.size()); + if (rv != 1) { + return -1; + } + + unsigned int mdlen = 32; + + rv = EVP_DigestFinal_ex(ctx, res, &mdlen); + if (rv != 1) { + return -1; + } + + return 0; +} + } // namespace util } // namespace nghttp2 diff --git a/src/util.h b/src/util.h index 375dacfd..5ae8cfb5 100644 --- a/src/util.h +++ b/src/util.h @@ -680,6 +680,12 @@ OutputIterator copy_lit(OutputIterator it, CharT(&s)[N]) { // Returns x**y double int_pow(double x, size_t y); +uint32_t hash32(const StringRef &s); + +// Computes SHA-256 of |s|, and stores it in |buf|. This function +// returns 0 if it succeeds, or -1. +int sha256(uint8_t *buf, const StringRef &s); + } // namespace util } // namespace nghttp2 From ca39c71ac363addbfe349ddf5f95cdee30c54909 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Wed, 6 Jul 2016 23:32:50 +0900 Subject: [PATCH 3/6] examples: Fix compile error with OpenSSL v1.1.0-beta2 --- examples/client.c | 3 --- examples/libevent-client.c | 3 --- examples/libevent-server.c | 3 --- 3 files changed, 9 deletions(-) diff --git a/examples/client.c b/examples/client.c index 7fbdf3f6..9496430e 100644 --- a/examples/client.c +++ b/examples/client.c @@ -695,9 +695,6 @@ int main(int argc, char **argv) { act.sa_handler = SIG_IGN; sigaction(SIGPIPE, &act, 0); -#ifndef OPENSSL_IS_BORINGSSL - OPENSSL_config(NULL); -#endif /* OPENSSL_IS_BORINGSSL */ SSL_load_error_strings(); SSL_library_init(); diff --git a/examples/libevent-client.c b/examples/libevent-client.c index 0b03f7de..4e3eb8da 100644 --- a/examples/libevent-client.c +++ b/examples/libevent-client.c @@ -594,9 +594,6 @@ int main(int argc, char **argv) { act.sa_handler = SIG_IGN; sigaction(SIGPIPE, &act, NULL); -#ifndef OPENSSL_IS_BORINGSSL - OPENSSL_config(NULL); -#endif /* OPENSSL_IS_BORINGSSL */ SSL_load_error_strings(); SSL_library_init(); diff --git a/examples/libevent-server.c b/examples/libevent-server.c index 09f52f99..16b8f871 100644 --- a/examples/libevent-server.c +++ b/examples/libevent-server.c @@ -781,9 +781,6 @@ int main(int argc, char **argv) { act.sa_handler = SIG_IGN; sigaction(SIGPIPE, &act, NULL); -#ifndef OPENSSL_IS_BORINGSSL - OPENSSL_config(NULL); -#endif /* OPENSSL_IS_BORINGSSL */ SSL_load_error_strings(); SSL_library_init(); From 30f26a2b9d69c0629b339cc215891268b801f089 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Wed, 6 Jul 2016 23:58:53 +0900 Subject: [PATCH 4/6] nghttpx: Explicitly cast to uint32_t for hash calculation --- src/shrpx_client_handler.cc | 4 +++- src/shrpx_config.cc | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 1e05e350..2c9b2a10 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -687,7 +687,9 @@ uint32_t compute_affinity_from_ip(const StringRef &ip) { return util::hash32(ip); } - return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; + return (static_cast(buf[0]) << 24) | + (static_cast(buf[1]) << 16) | + (static_cast(buf[2]) << 8) | static_cast(buf[3]); } } // namespace diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index 3d27e6f2..f356a3fb 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -2854,8 +2854,10 @@ int compute_affinity_hash(std::vector &res, size_t idx, } for (int i = 0; i < 8; ++i) { - auto h = (buf[4 * i] << 24) | (buf[4 * i + 1] << 16) | - (buf[4 * i + 2] << 8) | buf[4 * i + 3]; + auto h = (static_cast(buf[4 * i]) << 24) | + (static_cast(buf[4 * i + 1]) << 16) | + (static_cast(buf[4 * i + 2]) << 8) | + static_cast(buf[4 * i + 3]); res.emplace_back(idx, h); } From 2c500b62fd8634613b14f85357a173c89fd90bad Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Thu, 7 Jul 2016 23:26:15 +0900 Subject: [PATCH 5/6] Update doc --- README.rst | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/README.rst b/README.rst index 96390c70..4d83a0ac 100644 --- a/README.rst +++ b/README.rst @@ -176,6 +176,23 @@ To compile the source code, gcc >= 4.8.3 or clang >= 3.4 is required. applications were not built, then using ``--enable-app`` may find that cause, such as the missing dependency. +Notes for building on Windows (MSVC) +------------------------------------ + +The easiest way to build native Windows nghttp2 dll is use +[cmake](https://cmake.org/). The free version of [Visual C++ Build +Tools](http://landinghub.visualstudio.com/visual-cpp-build-tools) +works fine. + +1. Install cmake for windows +2. Open "Visual C++ ... Native Build Tool Command Prompt", and inside + nghttp2 directly, run ``cmake``. +3. Then run ``cmake --build`` to build library. +4. nghttp2.dll, nghttp2.lib, nghttp2.exp are placed under lib directory. + +Note that the above steps most likely produce nghttp2 library only. +No bundled applications are compiled. + Notes for building on Windows (Mingw/Cygwin) -------------------------------------------- From 33153010c5080ef731a7b1f4c069c351e43308cb Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Fri, 8 Jul 2016 23:41:53 +0900 Subject: [PATCH 6/6] nghttpx: Retry memcached connection Previously, we didn't retry request on connection failure. Sometimes we hit the edge case where connection is about to lost just when we write request. To avoid this situation, we now retry request to failed attempt. We also add ConnectBlocker to MemcachedConnection not to attempt to connect to memcached if connection could not be made previously. --- src/shrpx_connection_handler.cc | 30 +++++---- src/shrpx_connection_handler.h | 2 +- src/shrpx_memcached_connection.cc | 106 +++++++++++++++++++++++++++--- src/shrpx_memcached_connection.h | 7 +- src/shrpx_memcached_dispatcher.cc | 5 +- src/shrpx_memcached_dispatcher.h | 3 +- src/shrpx_worker.cc | 2 +- src/shrpx_worker_process.cc | 10 ++- 8 files changed, 136 insertions(+), 29 deletions(-) diff --git a/src/shrpx_connection_handler.cc b/src/shrpx_connection_handler.cc index af919514..50f54664 100644 --- a/src/shrpx_connection_handler.cc +++ b/src/shrpx_connection_handler.cc @@ -110,12 +110,8 @@ void serial_event_async_cb(struct ev_loop *loop, ev_async *w, int revent) { } } // namespace -namespace { -std::random_device rd; -} // namespace - -ConnectionHandler::ConnectionHandler(struct ev_loop *loop) - : gen_(rd()), +ConnectionHandler::ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen) + : gen_(gen), single_worker_(nullptr), loop_(loop), tls_ticket_key_memcached_get_retry_count_(0), @@ -736,6 +732,14 @@ ConnectionHandler::get_tls_ticket_key_memcached_dispatcher() const { return tls_ticket_key_memcached_dispatcher_.get(); } +// Use the similar backoff algorithm described in +// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md +namespace { +constexpr size_t MAX_BACKOFF_EXP = 10; +constexpr auto MULTIPLIER = 3.2; +constexpr auto JITTER = 0.2; +} // namespace + void ConnectionHandler::on_tls_ticket_key_network_error(ev_timer *w) { if (++tls_ticket_key_memcached_get_retry_count_ >= get_config()->tls.ticket.memcached.max_retry) { @@ -746,15 +750,19 @@ void ConnectionHandler::on_tls_ticket_key_network_error(ev_timer *w) { return; } - auto dist = std::uniform_int_distribution( - 1, std::min(60, 1 << tls_ticket_key_memcached_get_retry_count_)); - auto t = dist(gen_); + auto base_backoff = util::int_pow( + MULTIPLIER, + std::min(MAX_BACKOFF_EXP, tls_ticket_key_memcached_get_retry_count_)); + auto dist = std::uniform_real_distribution<>(-JITTER * base_backoff, + JITTER * base_backoff); + + auto backoff = base_backoff + dist(gen_); LOG(WARN) << "Memcached: tls ticket get failed due to network error, retrying in " - << t << " seconds"; + << backoff << " seconds"; - ev_timer_set(w, t, 0.); + ev_timer_set(w, backoff, 0.); ev_timer_start(loop_, w); } diff --git a/src/shrpx_connection_handler.h b/src/shrpx_connection_handler.h index c45c7462..f798b73b 100644 --- a/src/shrpx_connection_handler.h +++ b/src/shrpx_connection_handler.h @@ -101,7 +101,7 @@ struct SerialEvent { class ConnectionHandler { public: - ConnectionHandler(struct ev_loop *loop); + ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen); ~ConnectionHandler(); int handle_connection(int fd, sockaddr *addr, int addrlen, const UpstreamAddr *faddr); diff --git a/src/shrpx_memcached_connection.cc b/src/shrpx_memcached_connection.cc index 98554076..d28b5fab 100644 --- a/src/shrpx_memcached_connection.cc +++ b/src/shrpx_memcached_connection.cc @@ -56,7 +56,7 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) { auto mconn = static_cast(conn->data); if (mconn->on_read() != 0) { - mconn->disconnect(); + mconn->reconnect_or_fail(); return; } } @@ -68,7 +68,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) { auto mconn = static_cast(conn->data); if (mconn->on_write() != 0) { - mconn->disconnect(); + mconn->reconnect_or_fail(); return; } } @@ -88,22 +88,25 @@ void connectcb(struct ev_loop *loop, ev_io *w, int revents) { } } // namespace -constexpr ev_tstamp write_timeout = 10.; -constexpr ev_tstamp read_timeout = 10.; +constexpr auto write_timeout = 10_s; +constexpr auto read_timeout = 10_s; MemcachedConnection::MemcachedConnection(const Address *addr, struct ev_loop *loop, SSL_CTX *ssl_ctx, const StringRef &sni_name, - MemchunkPool *mcpool) + MemchunkPool *mcpool, + std::mt19937 &gen) : conn_(loop, -1, nullptr, mcpool, write_timeout, read_timeout, {}, {}, connectcb, readcb, timeoutcb, this, 0, 0., PROTO_MEMCACHED), do_read_(&MemcachedConnection::noop), do_write_(&MemcachedConnection::noop), sni_name_(sni_name.str()), + connect_blocker_(gen, loop, [] {}, [] {}), parse_state_{}, addr_(addr), ssl_ctx_(ssl_ctx), sendsum_(0), + try_count_(0), connected_(false) {} MemcachedConnection::~MemcachedConnection() { disconnect(); } @@ -201,6 +204,8 @@ int MemcachedConnection::initiate_connection() { int MemcachedConnection::connected() { if (!util::check_socket_connected(conn_.fd)) { + connect_blocker_.on_failure(); + conn_.wlimit.stopw(); if (LOG_ENABLED(INFO)) { @@ -214,10 +219,7 @@ int MemcachedConnection::connected() { MCLOG(INFO, this) << "connected to memcached server"; } - connected_ = true; - conn_.rlimit.startw(); - ev_timer_again(conn_.loop, &conn_.rt); ev_set_cb(&conn_.wev, writecb); @@ -228,6 +230,12 @@ int MemcachedConnection::connected() { return 0; } + ev_timer_stop(conn_.loop, &conn_.wt); + + connected_ = true; + + connect_blocker_.on_success(); + do_read_ = &MemcachedConnection::read_clear; do_write_ = &MemcachedConnection::write_clear; @@ -248,6 +256,7 @@ int MemcachedConnection::tls_handshake() { } if (rv < 0) { + connect_blocker_.on_failure(); return rv; } @@ -259,6 +268,7 @@ int MemcachedConnection::tls_handshake() { if (!tlsconf.insecure && ssl::check_cert(conn_.tls.ssl, addr_, StringRef(sni_name_)) != 0) { + connect_blocker_.on_failure(); return -1; } @@ -270,6 +280,13 @@ int MemcachedConnection::tls_handshake() { } } + ev_timer_stop(conn_.loop, &conn_.rt); + ev_timer_stop(conn_.loop, &conn_.wt); + + connected_ = true; + + connect_blocker_.on_success(); + do_read_ = &MemcachedConnection::read_tls; do_write_ = &MemcachedConnection::write_tls; @@ -325,7 +342,9 @@ int MemcachedConnection::read_tls() { return 0; } - ev_timer_again(conn_.loop, &conn_.rt); + if (ev_is_active(&conn_.rt)) { + ev_timer_again(conn_.loop, &conn_.rt); + } for (;;) { auto nread = conn_.read_tls(recvbuf_.last, recvbuf_.wleft()); @@ -386,7 +405,9 @@ int MemcachedConnection::read_clear() { return 0; } - ev_timer_again(conn_.loop, &conn_.rt); + if (ev_is_active(&conn_.rt)) { + ev_timer_again(conn_.loop, &conn_.rt); + } for (;;) { auto nread = conn_.read_clear(recvbuf_.last, recvbuf_.wleft()); @@ -535,9 +556,16 @@ int MemcachedConnection::parse_packet() { } } + // We require at least one complete response to clear try count. + try_count_ = 0; + auto req = std::move(recvq_.front()); recvq_.pop_front(); + if (sendq_.empty() && recvq_.empty()) { + ev_timer_stop(conn_.loop, &conn_.rt); + } + if (!req->canceled && req->cb) { req->cb(req.get(), MemcachedResult(parse_state_.status_code, std::move(parse_state_.value))); @@ -635,6 +663,13 @@ void MemcachedConnection::drain_send_queue(size_t nwrite) { recvq_.push_back(std::move(sendq_.front())); sendq_.pop_front(); } + + // start read timer only when we wait for responses. + if (recvq_.empty()) { + ev_timer_stop(conn_.loop, &conn_.rt); + } else if (!ev_is_active(&conn_.rt)) { + ev_timer_again(conn_.loop, &conn_.rt); + } } size_t MemcachedConnection::serialized_size(MemcachedRequest *req) { @@ -676,6 +711,10 @@ void MemcachedConnection::make_request(MemcachedSendbuf *sendbuf, } int MemcachedConnection::add_request(std::unique_ptr req) { + if (connect_blocker_.blocked()) { + return -1; + } + sendq_.push_back(std::move(req)); if (connected_) { @@ -684,6 +723,7 @@ int MemcachedConnection::add_request(std::unique_ptr req) { } if (conn_.fd == -1 && initiate_connection() != 0) { + connect_blocker_.on_failure(); disconnect(); return -1; } @@ -696,4 +736,50 @@ void MemcachedConnection::signal_write() { conn_.wlimit.startw(); } int MemcachedConnection::noop() { return 0; } +void MemcachedConnection::reconnect_or_fail() { + if (!connected_ || (recvq_.empty() && sendq_.empty())) { + disconnect(); + return; + } + + constexpr size_t MAX_TRY_COUNT = 3; + + if (++try_count_ >= MAX_TRY_COUNT) { + if (LOG_ENABLED(INFO)) { + MCLOG(INFO, this) << "Tried " << MAX_TRY_COUNT + << " times, and all failed. Aborting"; + } + try_count_ = 0; + disconnect(); + return; + } + + std::vector> q; + q.reserve(recvq_.size() + sendq_.size()); + + if (LOG_ENABLED(INFO)) { + MCLOG(INFO, this) << "Retry connection, enqueue " + << recvq_.size() + sendq_.size() << " request(s) again"; + } + + q.insert(std::end(q), std::make_move_iterator(std::begin(recvq_)), + std::make_move_iterator(std::end(recvq_))); + q.insert(std::end(q), std::make_move_iterator(std::begin(sendq_)), + std::make_move_iterator(std::end(sendq_))); + + recvq_.clear(); + sendq_.clear(); + + disconnect(); + + sendq_.insert(std::end(sendq_), std::make_move_iterator(std::begin(q)), + std::make_move_iterator(std::end(q))); + + if (initiate_connection() != 0) { + connect_blocker_.on_failure(); + disconnect(); + return; + } +} + } // namespace shrpx diff --git a/src/shrpx_memcached_connection.h b/src/shrpx_memcached_connection.h index 290442f0..858d3001 100644 --- a/src/shrpx_memcached_connection.h +++ b/src/shrpx_memcached_connection.h @@ -34,6 +34,7 @@ #include "shrpx_connection.h" #include "shrpx_ssl.h" +#include "shrpx_connect_blocker.h" #include "buffer.h" #include "network.h" @@ -96,7 +97,7 @@ class MemcachedConnection { public: MemcachedConnection(const Address *addr, struct ev_loop *loop, SSL_CTX *ssl_ctx, const StringRef &sni_name, - MemchunkPool *mcpool); + MemchunkPool *mcpool, std::mt19937 &gen); ~MemcachedConnection(); void disconnect(); @@ -126,6 +127,8 @@ public: int noop(); + void reconnect_or_fail(); + private: Connection conn_; std::deque> recvq_; @@ -134,11 +137,13 @@ private: std::function do_read_, do_write_; std::string sni_name_; ssl::TLSSessionCache tls_session_cache_; + ConnectBlocker connect_blocker_; MemcachedParseState parse_state_; const Address *addr_; SSL_CTX *ssl_ctx_; // Sum of the bytes to be transmitted in sendbufv_. size_t sendsum_; + size_t try_count_; bool connected_; Buffer<8_k> recvbuf_; }; diff --git a/src/shrpx_memcached_dispatcher.cc b/src/shrpx_memcached_dispatcher.cc index 796fef8e..a3f1357c 100644 --- a/src/shrpx_memcached_dispatcher.cc +++ b/src/shrpx_memcached_dispatcher.cc @@ -33,10 +33,11 @@ namespace shrpx { MemcachedDispatcher::MemcachedDispatcher(const Address *addr, struct ev_loop *loop, SSL_CTX *ssl_ctx, const StringRef &sni_name, - MemchunkPool *mcpool) + MemchunkPool *mcpool, + std::mt19937 &gen) : loop_(loop), mconn_(make_unique(addr, loop_, ssl_ctx, sni_name, - mcpool)) {} + mcpool, gen)) {} MemcachedDispatcher::~MemcachedDispatcher() {} diff --git a/src/shrpx_memcached_dispatcher.h b/src/shrpx_memcached_dispatcher.h index 7cca3bcb..e7b78d27 100644 --- a/src/shrpx_memcached_dispatcher.h +++ b/src/shrpx_memcached_dispatcher.h @@ -28,6 +28,7 @@ #include "shrpx.h" #include +#include #include @@ -45,7 +46,7 @@ class MemcachedDispatcher { public: MemcachedDispatcher(const Address *addr, struct ev_loop *loop, SSL_CTX *ssl_ctx, const StringRef &sni_name, - MemchunkPool *mcpool); + MemchunkPool *mcpool, std::mt19937 &gen); ~MemcachedDispatcher(); int add_request(std::unique_ptr req); diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 5927a8d5..0d4a113a 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -147,7 +147,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, session_cache_memcached_dispatcher_ = make_unique( &session_cacheconf.memcached.addr, loop, tls_session_cache_memcached_ssl_ctx, - StringRef{session_cacheconf.memcached.host}, &mcpool_); + StringRef{session_cacheconf.memcached.host}, &mcpool_, randgen_); } replace_downstream_config(std::move(downstreamconf)); diff --git a/src/shrpx_worker_process.cc b/src/shrpx_worker_process.cc index 8f7e817f..f7c84e96 100644 --- a/src/shrpx_worker_process.cc +++ b/src/shrpx_worker_process.cc @@ -379,6 +379,10 @@ void nb_child_cb(struct ev_loop *loop, ev_child *w, int revents) { } // namespace #endif // HAVE_NEVERBLEED +namespace { +std::random_device rd; +} // namespace + int worker_process_event_loop(WorkerProcessConfig *wpconf) { if (reopen_log_files() != 0) { LOG(FATAL) << "Failed to open log file"; @@ -387,7 +391,9 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) { auto loop = EV_DEFAULT; - ConnectionHandler conn_handler(loop); + auto gen = std::mt19937(rd()); + + ConnectionHandler conn_handler(loop, gen); for (auto &addr : get_config()->conn.listener.addrs) { conn_handler.add_acceptor(make_unique(&addr, &conn_handler)); @@ -435,7 +441,7 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) { conn_handler.set_tls_ticket_key_memcached_dispatcher( make_unique( &ticketconf.memcached.addr, loop, ssl_ctx, - StringRef{memcachedconf.host}, &mcpool)); + StringRef{memcachedconf.host}, &mcpool, gen)); ev_timer_init(&renew_ticket_key_timer, memcached_get_ticket_key_cb, 0., 0.);