nghttpx: Implement client IP based session affinity

This commit is contained in:
Tatsuhiro Tsujikawa 2016-06-09 22:35:59 +09:00
parent ac97c122d4
commit 143d0b69b7
9 changed files with 191 additions and 15 deletions

View File

@ -1284,12 +1284,13 @@ Connections:
Several parameters <PARAM> are accepted after <PATTERN>.
The parameters are delimited by ";". The available
parameters are: "proto=<PROTO>", "tls",
"sni=<SNI_HOST>", "fall=<N>", and "rise=<N>". The
parameter consists of keyword, and optionally followed
by "=" and value. For example, the parameter "proto=h2"
consists of the keyword "proto" and value "h2". The
parameter "tls" consists of the keyword "tls" without
value. Each parameter is described as follows.
"sni=<SNI_HOST>", "fall=<N>", "rise=<N>", and
"affinity=<METHOD>". The parameter consists of keyword,
and optionally followed by "=" and value. For example,
the parameter "proto=h2" consists of the keyword "proto"
and value "h2". The parameter "tls" consists of the
keyword "tls" without value. Each parameter is
described as follows.
The backend application protocol can be specified using
optional "proto" parameter, and in the form of
@ -1324,6 +1325,20 @@ Connections:
backend is permanently offline, once it goes in that
state, and this is the default behaviour.
The session affinity is enabled using
"affinity=<METHOD>" parameter. If "ip" is given in
<METHOD>, client IP based session affinity is enabled.
If "none" is given in <METHOD>, session affinity is
disabled, and this is the default. The session affinity
is enabled per <PATTERN>. If at least one backend has
"affinity" parameter, and its <METHOD> is not "none",
session affinity is enabled for all backend servers
sharing the same <PATTERN>. It is advised to set
"affinity" parameter to all backend explicitly if
session affinity is desired. The session affinity may
break if one of the backend gets unreachable, or backend
settings are reload or replaced by API.
Since ";" and ":" are used as delimiter, <PATTERN> must
not contain these characters. Since ";" has special
meaning in shell, the option value must be quoted.

View File

@ -407,6 +407,7 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
faddr_(faddr),
worker_(worker),
left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN),
affinity_hash_(-1),
should_close_after_write_(false),
reset_conn_rtimer_required_(false) {
@ -682,6 +683,68 @@ void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) {
dconn_pool.remove_downstream_connection(dconn);
}
namespace {
uint32_t hash32(const StringRef &s) {
/* 32 bit FNV-1a: http://isthe.com/chongo/tech/comp/fnv/ */
uint32_t h = 2166136261u;
size_t i;
for (i = 0; i < s.size(); ++i) {
h ^= s[i];
h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24);
}
return h;
}
} // namespace
namespace {
int32_t calculate_affinity_from_ip(const StringRef &ip) {
return hash32(ip) & 0x7fffffff;
}
} // namespace
Http2Session *ClientHandler::select_http2_session_with_affinity(
const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr) {
auto &shared_addr = group->shared_addr;
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Selected DownstreamAddr=" << addr
<< ", index=" << (addr - shared_addr->addrs.data());
}
if (addr->http2_extra_freelist.size()) {
auto session = addr->http2_extra_freelist.head;
session->remove_from_freelist();
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Use Http2Session " << session
<< " from http2_extra_freelist";
}
if (session->max_concurrency_reached(1)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
<< session << ").";
}
} else {
session->add_to_avail_freelist();
}
return session;
}
auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(),
worker_, group, addr);
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Create new Http2Session " << session;
}
session->add_to_avail_freelist();
return session;
}
namespace {
// Returns true if load of |lhs| is lighter than that of |rhs|.
// Currently, we assume that lesser streams means lesser load.
@ -863,11 +926,42 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
auto &group = groups[group_idx];
auto &shared_addr = group->shared_addr;
auto proto = PROTO_NONE;
if (shared_addr->affinity == AFFINITY_IP) {
if (affinity_hash_ == -1) {
affinity_hash_ = calculate_affinity_from_ip(StringRef{ipaddr_});
}
auto idx = affinity_hash_ % shared_addr->addrs.size();
auto &addr = shared_addr->addrs[idx];
if (addr.proto == PROTO_HTTP2) {
auto http2session = select_http2_session_with_affinity(group, &addr);
auto dconn = make_unique<Http2DownstreamConnection>(http2session);
dconn->set_client_handler(this);
return std::move(dconn);
}
auto &dconn_pool = addr.dconn_pool;
auto dconn = dconn_pool->pop_downstream_connection();
if (!dconn) {
dconn = make_unique<HttpDownstreamConnection>(group, idx, conn_.loop,
worker_);
}
dconn->set_client_handler(this);
return dconn;
}
auto http1_weight = shared_addr->http1_pri.weight;
auto http2_weight = shared_addr->http2_pri.weight;
auto proto = PROTO_NONE;
if (http1_weight > 0 && http2_weight > 0) {
// We only advance cycle if both weight has nonzero to keep its
// distance under WEIGHT_MAX.
@ -927,7 +1021,8 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
<< " Create new one";
}
dconn = make_unique<HttpDownstreamConnection>(group, conn_.loop, worker_);
dconn =
make_unique<HttpDownstreamConnection>(group, -1, conn_.loop, worker_);
}
dconn->set_client_handler(this);

View File

@ -50,6 +50,7 @@ class DownstreamConnectionPool;
class Worker;
struct WorkerStat;
struct DownstreamAddrGroup;
struct DownstreamAddr;
class ClientHandler {
public:
@ -145,6 +146,9 @@ public:
Http2Session *
select_http2_session(const std::shared_ptr<DownstreamAddrGroup> &group);
Http2Session *select_http2_session_with_affinity(
const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr);
const UpstreamAddr *get_upstream_addr() const;
private:
@ -167,6 +171,7 @@ private:
Worker *worker_;
// The number of bytes of HTTP/2 client connection header to read
size_t left_connhd_len_;
int32_t affinity_hash_;
bool should_close_after_write_;
bool reset_conn_rtimer_required_;
ReadBuf rb_;

View File

@ -651,6 +651,7 @@ struct DownstreamParams {
size_t fall;
size_t rise;
shrpx_proto proto;
shrpx_session_affinity affinity;
bool tls;
};
@ -715,6 +716,16 @@ int parse_downstream_params(DownstreamParams &out,
out.tls = false;
} else if (util::istarts_with_l(param, "sni=")) {
out.sni = StringRef{first + str_size("sni="), end};
} else if (util::istarts_with_l(param, "affinity=")) {
auto valstr = StringRef{first + str_size("affinity="), end};
if (util::strieq_l("none", valstr)) {
out.affinity = AFFINITY_NONE;
} else if (util::strieq_l("ip", valstr)) {
out.affinity = AFFINITY_IP;
} else {
LOG(ERROR) << "backend: affinity: value must be either none or ip";
return -1;
}
} else if (!param.empty()) {
LOG(ERROR) << "backend: " << param << ": unknown keyword";
return -1;
@ -778,6 +789,11 @@ int parse_mapping(Config *config, DownstreamAddrConfig addr,
}
for (auto &g : addr_groups) {
if (g.pattern == pattern) {
// Last value wins if we have multiple different affinity
// value under one group.
if (params.affinity != AFFINITY_NONE) {
g.affinity = params.affinity;
}
g.addrs.push_back(addr);
done = true;
break;
@ -791,6 +807,7 @@ int parse_mapping(Config *config, DownstreamAddrConfig addr,
addr_groups.emplace_back(StringRef{pattern});
auto &g = addr_groups.back();
g.addrs.push_back(addr);
g.affinity = params.affinity;
if (pattern[0] == '*') {
// wildcard pattern

View File

@ -290,6 +290,13 @@ constexpr int16_t DEFAULT_DOWNSTREAM_PORT = 80;
enum shrpx_proto { PROTO_NONE, PROTO_HTTP1, PROTO_HTTP2, PROTO_MEMCACHED };
enum shrpx_session_affinity {
// No session affinity
AFFINITY_NONE,
// Client IP affinity
AFFINITY_IP,
};
enum shrpx_forwarded_param {
FORWARDED_NONE = 0,
FORWARDED_BY = 0x1,
@ -354,10 +361,12 @@ struct DownstreamAddrConfig {
struct DownstreamAddrGroupConfig {
DownstreamAddrGroupConfig(const StringRef &pattern)
: pattern(pattern.c_str(), pattern.size()) {}
: pattern(pattern.c_str(), pattern.size()), affinity(AFFINITY_NONE) {}
ImmutableString pattern;
std::vector<DownstreamAddrConfig> addrs;
// Session affinity
shrpx_session_affinity affinity;
};
struct TicketKey {

View File

@ -148,8 +148,8 @@ void connectcb(struct ev_loop *loop, ev_io *w, int revents) {
} // namespace
HttpDownstreamConnection::HttpDownstreamConnection(
const std::shared_ptr<DownstreamAddrGroup> &group, struct ev_loop *loop,
Worker *worker)
const std::shared_ptr<DownstreamAddrGroup> &group, ssize_t initial_addr_idx,
struct ev_loop *loop, Worker *worker)
: conn_(loop, -1, nullptr, worker->get_mcpool(),
worker->get_downstream_config()->timeout.write,
worker->get_downstream_config()->timeout.read, {}, {}, connectcb,
@ -164,7 +164,8 @@ HttpDownstreamConnection::HttpDownstreamConnection(
group_(group),
addr_(nullptr),
ioctrl_(&conn_.rlimit),
response_htp_{0} {}
response_htp_{0},
initial_addr_idx_(initial_addr_idx) {}
HttpDownstreamConnection::~HttpDownstreamConnection() {
if (LOG_ENABLED(INFO)) {
@ -191,7 +192,13 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
if (conn_.fd == -1) {
auto &shared_addr = group_->shared_addr;
auto &addrs = shared_addr->addrs;
auto &next_downstream = shared_addr->next;
// If session affinity is enabled, we always start with address at
// initial_addr_idx_.
size_t temp_idx = initial_addr_idx_;
auto &next_downstream =
shared_addr->affinity == AFFINITY_NONE ? shared_addr->next : temp_idx;
auto end = next_downstream;
for (;;) {
auto &addr = addrs[next_downstream];

View File

@ -43,7 +43,8 @@ struct DownstreamAddr;
class HttpDownstreamConnection : public DownstreamConnection {
public:
HttpDownstreamConnection(const std::shared_ptr<DownstreamAddrGroup> &group,
struct ev_loop *loop, Worker *worker);
ssize_t initial_addr_idx, struct ev_loop *loop,
Worker *worker);
virtual ~HttpDownstreamConnection();
virtual int attach_downstream(Downstream *downstream);
virtual void detach_downstream(Downstream *downstream);
@ -93,6 +94,7 @@ private:
DownstreamAddr *addr_;
IOControl ioctrl_;
http_parser response_htp_;
ssize_t initial_addr_idx_;
};
} // namespace shrpx

View File

@ -71,6 +71,10 @@ bool match_shared_downstream_addr(
return false;
}
if (lhs->affinity != rhs->affinity) {
return false;
}
auto used = std::vector<bool>(lhs->addrs.size());
for (auto &a : lhs->addrs) {
@ -143,7 +147,17 @@ void Worker::replace_downstream_config(
std::shared_ptr<DownstreamConfig> downstreamconf) {
for (auto &g : downstream_addr_groups_) {
g->retired = true;
g->shared_addr->dconn_pool.remove_all();
auto &shared_addr = g->shared_addr;
if (shared_addr->affinity == AFFINITY_NONE) {
shared_addr->dconn_pool.remove_all();
continue;
}
for (auto &addr : shared_addr->addrs) {
addr.dconn_pool->remove_all();
}
}
downstreamconf_ = downstreamconf;
@ -163,6 +177,7 @@ void Worker::replace_downstream_config(
auto shared_addr = std::make_shared<SharedDownstreamAddr>();
shared_addr->addrs.resize(src.addrs.size());
shared_addr->affinity = src.affinity;
size_t num_http1 = 0;
size_t num_http2 = 0;
@ -240,6 +255,12 @@ void Worker::replace_downstream_config(
shared_addr->http1_pri.weight = num_http1;
shared_addr->http2_pri.weight = num_http2;
if (shared_addr->affinity != AFFINITY_NONE) {
for (auto &addr : shared_addr->addrs) {
addr.dconn_pool = make_unique<DownstreamConnectionPool>();
}
}
dst->shared_addr = shared_addr;
} else {
if (LOG_ENABLED(INFO)) {

View File

@ -86,6 +86,9 @@ struct DownstreamAddr {
std::unique_ptr<ConnectBlocker> connect_blocker;
std::unique_ptr<LiveCheck> live_check;
// Connection pool for this particular address if session affinity
// is enabled
std::unique_ptr<DownstreamConnectionPool> dconn_pool;
size_t fall;
size_t rise;
// Client side TLS session cache
@ -139,6 +142,8 @@ struct SharedDownstreamAddr {
// HTTP/1.1. Otherwise, choose HTTP/2.
WeightedPri http1_pri;
WeightedPri http2_pri;
// Session affinity
shrpx_session_affinity affinity;
};
struct DownstreamAddrGroup {