diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 64e83c3a..1e05e350 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -386,8 +386,9 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl, faddr_(faddr), worker_(worker), left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN), - affinity_hash_(-1), - should_close_after_write_(false) { + affinity_hash_(0), + should_close_after_write_(false), + affinity_hash_computed_(false) { ++worker_->get_worker_stat()->num_connections; @@ -674,23 +675,19 @@ void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) { } namespace { -uint32_t hash32(const StringRef &s) { - /* 32 bit FNV-1a: http://isthe.com/chongo/tech/comp/fnv/ */ - uint32_t h = 2166136261u; - size_t i; +// Computes 32bits hash for session affinity for IP address |ip|. +uint32_t compute_affinity_from_ip(const StringRef &ip) { + int rv; + std::array buf; - for (i = 0; i < s.size(); ++i) { - h ^= s[i]; - h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24); + rv = util::sha256(buf.data(), ip); + if (rv != 0) { + // Not sure when sha256 failed. Just fall back to another + // function. + return util::hash32(ip); } - return h; -} -} // namespace - -namespace { -int32_t calculate_affinity_from_ip(const StringRef &ip) { - return hash32(ip) & 0x7fffffff; + return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; } } // namespace @@ -918,11 +915,22 @@ ClientHandler::get_downstream_connection(Downstream *downstream) { auto &shared_addr = group->shared_addr; if (shared_addr->affinity == AFFINITY_IP) { - if (affinity_hash_ == -1) { - affinity_hash_ = calculate_affinity_from_ip(StringRef{ipaddr_}); + if (!affinity_hash_computed_) { + affinity_hash_ = compute_affinity_from_ip(StringRef{ipaddr_}); + affinity_hash_computed_ = true; } - auto idx = affinity_hash_ % shared_addr->addrs.size(); + const auto &affinity_hash = shared_addr->affinity_hash; + + auto it = std::lower_bound( + std::begin(affinity_hash), std::end(affinity_hash), affinity_hash_, + [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; }); + + if (it == std::end(affinity_hash)) { + it = std::begin(affinity_hash); + } + + auto idx = (*it).idx; auto &addr = shared_addr->addrs[idx]; if (addr.proto == PROTO_HTTP2) { diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 73117720..bdfdafd2 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -174,8 +174,11 @@ private: Worker *worker_; // The number of bytes of HTTP/2 client connection header to read size_t left_connhd_len_; - int32_t affinity_hash_; + // hash for session affinity using client IP + uint32_t affinity_hash_; bool should_close_after_write_; + // true if affinity_hash_ is computed + bool affinity_hash_computed_; ReadBuf rb_; }; diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index e059b276..3d27e6f2 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -2830,12 +2830,49 @@ StringRef strproto(shrpx_proto proto) { assert(0); } +namespace { +// Consistent hashing method described in +// https://github.com/RJ/ketama. Generate 160 32-bit hashes per |s|, +// which is usually backend address. The each hash is associated to +// index of backend address. When all hashes for every backend +// address are calculated, sort it in ascending order of hash. To +// choose the index, compute 32-bit hash based on client IP address, +// and do lower bound search in the array. The returned index is the +// backend to use. +int compute_affinity_hash(std::vector &res, size_t idx, + const StringRef &s) { + int rv; + std::array buf; + + for (auto i = 0; i < 20; ++i) { + auto t = s.str(); + t += i; + + rv = util::sha256(buf.data(), StringRef{t}); + if (rv != 0) { + return -1; + } + + for (int i = 0; i < 8; ++i) { + auto h = (buf[4 * i] << 24) | (buf[4 * i + 1] << 16) | + (buf[4 * i + 2] << 8) | buf[4 * i + 3]; + + res.emplace_back(idx, h); + } + } + + return 0; +} +} // namespace + // Configures the following member in |config|: // conn.downstream_router, conn.downstream.addr_groups, // conn.downstream.addr_group_catch_all. int configure_downstream_group(Config *config, bool http2_proxy, bool numeric_addr_only, const TLSConfig &tlsconf) { + int rv; + auto &downstreamconf = *config->conn.downstream; auto &addr_groups = downstreamconf.addr_groups; auto &routerconf = downstreamconf.router; @@ -2959,6 +2996,25 @@ int configure_downstream_group(Config *config, bool http2_proxy, << util::to_numeric_addr(&addr.addr); } } + + if (g.affinity == AFFINITY_IP) { + size_t idx = 0; + for (auto &addr : g.addrs) { + auto p = reinterpret_cast(&addr.addr.su); + rv = compute_affinity_hash(g.affinity_hash, idx, + StringRef{p, addr.addr.len}); + if (rv != 0) { + return -1; + } + + ++idx; + } + + std::sort(std::begin(g.affinity_hash), std::end(g.affinity_hash), + [](const AffinityHash &lhs, const AffinityHash &rhs) { + return lhs.hash < rhs.hash; + }); + } } return 0; diff --git a/src/shrpx_config.h b/src/shrpx_config.h index 25a34dac..2ea76015 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -370,12 +370,24 @@ struct DownstreamAddrConfig { bool tls; }; +// Mapping hash to idx which is an index into +// DownstreamAddrGroupConfig::addrs. +struct AffinityHash { + AffinityHash(size_t idx, uint32_t hash) : idx(idx), hash(hash) {} + + size_t idx; + uint32_t hash; +}; + struct DownstreamAddrGroupConfig { DownstreamAddrGroupConfig(const StringRef &pattern) : pattern(pattern.c_str(), pattern.size()), affinity(AFFINITY_NONE) {} ImmutableString pattern; std::vector addrs; + // Bunch of session affinity hash. Only used if affinity == + // AFFINITY_IP. + std::vector affinity_hash; // Session affinity shrpx_session_affinity affinity; }; diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index f815149b..5927a8d5 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -192,6 +192,7 @@ void Worker::replace_downstream_config( shared_addr->addrs.resize(src.addrs.size()); shared_addr->affinity = src.affinity; + shared_addr->affinity_hash = src.affinity_hash; size_t num_http1 = 0; size_t num_http2 = 0; diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index d187743c..3503d0f7 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -127,6 +127,9 @@ struct WeightedPri { struct SharedDownstreamAddr { std::vector addrs; + // Bunch of session affinity hash. Only used if affinity == + // AFFINITY_IP. + std::vector affinity_hash; // List of Http2Session which is not fully utilized (i.e., the // server advertized maximum concurrency is not reached). We will // coalesce as much stream as possible in one Http2Session to fully diff --git a/src/util.cc b/src/util.cc index d24c5c8d..af7f187d 100644 --- a/src/util.cc +++ b/src/util.cc @@ -54,8 +54,11 @@ #include #include +#include + #include +#include "ssl_compat.h" #include "timegm.h" namespace nghttp2 { @@ -1325,6 +1328,59 @@ double int_pow(double x, size_t y) { return res; } +uint32_t hash32(const StringRef &s) { + /* 32 bit FNV-1a: http://isthe.com/chongo/tech/comp/fnv/ */ + uint32_t h = 2166136261u; + size_t i; + + for (i = 0; i < s.size(); ++i) { + h ^= s[i]; + h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24); + } + + return h; +} + +#if !OPENSSL_101_API +namespace { +EVP_MD_CTX *EVP_MD_CTX_new(void) { return EVP_MD_CTX_create(); } +} // namespace + +namespace { +void EVP_MD_CTX_free(EVP_MD_CTX *ctx) { EVP_MD_CTX_destroy(ctx); } +} // namespace +#endif + +int sha256(uint8_t *res, const StringRef &s) { + int rv; + + auto ctx = EVP_MD_CTX_new(); + if (ctx == nullptr) { + return -1; + } + + auto ctx_deleter = defer(EVP_MD_CTX_free, ctx); + + rv = EVP_DigestInit_ex(ctx, EVP_sha256(), nullptr); + if (rv != 1) { + return -1; + } + + rv = EVP_DigestUpdate(ctx, s.c_str(), s.size()); + if (rv != 1) { + return -1; + } + + unsigned int mdlen = 32; + + rv = EVP_DigestFinal_ex(ctx, res, &mdlen); + if (rv != 1) { + return -1; + } + + return 0; +} + } // namespace util } // namespace nghttp2 diff --git a/src/util.h b/src/util.h index 375dacfd..5ae8cfb5 100644 --- a/src/util.h +++ b/src/util.h @@ -680,6 +680,12 @@ OutputIterator copy_lit(OutputIterator it, CharT(&s)[N]) { // Returns x**y double int_pow(double x, size_t y); +uint32_t hash32(const StringRef &s); + +// Computes SHA-256 of |s|, and stores it in |buf|. This function +// returns 0 if it succeeds, or -1. +int sha256(uint8_t *buf, const StringRef &s); + } // namespace util } // namespace nghttp2