nghttpx: Use consistent hashing for client IP based session affinity

We use technique described in https://github.com/RJ/ketama
This commit is contained in:
Tatsuhiro Tsujikawa 2016-07-06 22:31:28 +09:00
parent 5d3535126e
commit 2bbe4422d2
8 changed files with 165 additions and 20 deletions

View File

@ -386,8 +386,9 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
faddr_(faddr),
worker_(worker),
left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN),
affinity_hash_(-1),
should_close_after_write_(false) {
affinity_hash_(0),
should_close_after_write_(false),
affinity_hash_computed_(false) {
++worker_->get_worker_stat()->num_connections;
@ -674,23 +675,19 @@ void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) {
}
namespace {
uint32_t hash32(const StringRef &s) {
/* 32 bit FNV-1a: http://isthe.com/chongo/tech/comp/fnv/ */
uint32_t h = 2166136261u;
size_t i;
// Computes 32bits hash for session affinity for IP address |ip|.
uint32_t compute_affinity_from_ip(const StringRef &ip) {
int rv;
std::array<uint8_t, 32> buf;
for (i = 0; i < s.size(); ++i) {
h ^= s[i];
h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24);
rv = util::sha256(buf.data(), ip);
if (rv != 0) {
// Not sure when sha256 failed. Just fall back to another
// function.
return util::hash32(ip);
}
return h;
}
} // namespace
namespace {
int32_t calculate_affinity_from_ip(const StringRef &ip) {
return hash32(ip) & 0x7fffffff;
return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
}
} // namespace
@ -918,11 +915,22 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
auto &shared_addr = group->shared_addr;
if (shared_addr->affinity == AFFINITY_IP) {
if (affinity_hash_ == -1) {
affinity_hash_ = calculate_affinity_from_ip(StringRef{ipaddr_});
if (!affinity_hash_computed_) {
affinity_hash_ = compute_affinity_from_ip(StringRef{ipaddr_});
affinity_hash_computed_ = true;
}
auto idx = affinity_hash_ % shared_addr->addrs.size();
const auto &affinity_hash = shared_addr->affinity_hash;
auto it = std::lower_bound(
std::begin(affinity_hash), std::end(affinity_hash), affinity_hash_,
[](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; });
if (it == std::end(affinity_hash)) {
it = std::begin(affinity_hash);
}
auto idx = (*it).idx;
auto &addr = shared_addr->addrs[idx];
if (addr.proto == PROTO_HTTP2) {

View File

@ -174,8 +174,11 @@ private:
Worker *worker_;
// The number of bytes of HTTP/2 client connection header to read
size_t left_connhd_len_;
int32_t affinity_hash_;
// hash for session affinity using client IP
uint32_t affinity_hash_;
bool should_close_after_write_;
// true if affinity_hash_ is computed
bool affinity_hash_computed_;
ReadBuf rb_;
};

View File

@ -2830,12 +2830,49 @@ StringRef strproto(shrpx_proto proto) {
assert(0);
}
namespace {
// Consistent hashing method described in
// https://github.com/RJ/ketama. Generate 160 32-bit hashes per |s|,
// which is usually backend address. The each hash is associated to
// index of backend address. When all hashes for every backend
// address are calculated, sort it in ascending order of hash. To
// choose the index, compute 32-bit hash based on client IP address,
// and do lower bound search in the array. The returned index is the
// backend to use.
int compute_affinity_hash(std::vector<AffinityHash> &res, size_t idx,
const StringRef &s) {
int rv;
std::array<uint8_t, 32> buf;
for (auto i = 0; i < 20; ++i) {
auto t = s.str();
t += i;
rv = util::sha256(buf.data(), StringRef{t});
if (rv != 0) {
return -1;
}
for (int i = 0; i < 8; ++i) {
auto h = (buf[4 * i] << 24) | (buf[4 * i + 1] << 16) |
(buf[4 * i + 2] << 8) | buf[4 * i + 3];
res.emplace_back(idx, h);
}
}
return 0;
}
} // namespace
// Configures the following member in |config|:
// conn.downstream_router, conn.downstream.addr_groups,
// conn.downstream.addr_group_catch_all.
int configure_downstream_group(Config *config, bool http2_proxy,
bool numeric_addr_only,
const TLSConfig &tlsconf) {
int rv;
auto &downstreamconf = *config->conn.downstream;
auto &addr_groups = downstreamconf.addr_groups;
auto &routerconf = downstreamconf.router;
@ -2959,6 +2996,25 @@ int configure_downstream_group(Config *config, bool http2_proxy,
<< util::to_numeric_addr(&addr.addr);
}
}
if (g.affinity == AFFINITY_IP) {
size_t idx = 0;
for (auto &addr : g.addrs) {
auto p = reinterpret_cast<uint8_t *>(&addr.addr.su);
rv = compute_affinity_hash(g.affinity_hash, idx,
StringRef{p, addr.addr.len});
if (rv != 0) {
return -1;
}
++idx;
}
std::sort(std::begin(g.affinity_hash), std::end(g.affinity_hash),
[](const AffinityHash &lhs, const AffinityHash &rhs) {
return lhs.hash < rhs.hash;
});
}
}
return 0;

View File

@ -370,12 +370,24 @@ struct DownstreamAddrConfig {
bool tls;
};
// Mapping hash to idx which is an index into
// DownstreamAddrGroupConfig::addrs.
struct AffinityHash {
AffinityHash(size_t idx, uint32_t hash) : idx(idx), hash(hash) {}
size_t idx;
uint32_t hash;
};
struct DownstreamAddrGroupConfig {
DownstreamAddrGroupConfig(const StringRef &pattern)
: pattern(pattern.c_str(), pattern.size()), affinity(AFFINITY_NONE) {}
ImmutableString pattern;
std::vector<DownstreamAddrConfig> addrs;
// Bunch of session affinity hash. Only used if affinity ==
// AFFINITY_IP.
std::vector<AffinityHash> affinity_hash;
// Session affinity
shrpx_session_affinity affinity;
};

View File

@ -192,6 +192,7 @@ void Worker::replace_downstream_config(
shared_addr->addrs.resize(src.addrs.size());
shared_addr->affinity = src.affinity;
shared_addr->affinity_hash = src.affinity_hash;
size_t num_http1 = 0;
size_t num_http2 = 0;

View File

@ -127,6 +127,9 @@ struct WeightedPri {
struct SharedDownstreamAddr {
std::vector<DownstreamAddr> addrs;
// Bunch of session affinity hash. Only used if affinity ==
// AFFINITY_IP.
std::vector<AffinityHash> affinity_hash;
// List of Http2Session which is not fully utilized (i.e., the
// server advertized maximum concurrency is not reached). We will
// coalesce as much stream as possible in one Http2Session to fully

View File

@ -54,8 +54,11 @@
#include <iostream>
#include <fstream>
#include <openssl/evp.h>
#include <nghttp2/nghttp2.h>
#include "ssl_compat.h"
#include "timegm.h"
namespace nghttp2 {
@ -1325,6 +1328,59 @@ double int_pow(double x, size_t y) {
return res;
}
uint32_t hash32(const StringRef &s) {
/* 32 bit FNV-1a: http://isthe.com/chongo/tech/comp/fnv/ */
uint32_t h = 2166136261u;
size_t i;
for (i = 0; i < s.size(); ++i) {
h ^= s[i];
h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24);
}
return h;
}
#if !OPENSSL_101_API
namespace {
EVP_MD_CTX *EVP_MD_CTX_new(void) { return EVP_MD_CTX_create(); }
} // namespace
namespace {
void EVP_MD_CTX_free(EVP_MD_CTX *ctx) { EVP_MD_CTX_destroy(ctx); }
} // namespace
#endif
int sha256(uint8_t *res, const StringRef &s) {
int rv;
auto ctx = EVP_MD_CTX_new();
if (ctx == nullptr) {
return -1;
}
auto ctx_deleter = defer(EVP_MD_CTX_free, ctx);
rv = EVP_DigestInit_ex(ctx, EVP_sha256(), nullptr);
if (rv != 1) {
return -1;
}
rv = EVP_DigestUpdate(ctx, s.c_str(), s.size());
if (rv != 1) {
return -1;
}
unsigned int mdlen = 32;
rv = EVP_DigestFinal_ex(ctx, res, &mdlen);
if (rv != 1) {
return -1;
}
return 0;
}
} // namespace util
} // namespace nghttp2

View File

@ -680,6 +680,12 @@ OutputIterator copy_lit(OutputIterator it, CharT(&s)[N]) {
// Returns x**y
double int_pow(double x, size_t y);
uint32_t hash32(const StringRef &s);
// Computes SHA-256 of |s|, and stores it in |buf|. This function
// returns 0 if it succeeds, or -1.
int sha256(uint8_t *buf, const StringRef &s);
} // namespace util
} // namespace nghttp2