diff --git a/src/shrpx.cc b/src/shrpx.cc index 65d41635..959c15bf 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -1284,12 +1284,13 @@ Connections: Several parameters are accepted after . The parameters are delimited by ";". The available parameters are: "proto=", "tls", - "sni=", "fall=", and "rise=". The - parameter consists of keyword, and optionally followed - by "=" and value. For example, the parameter "proto=h2" - consists of the keyword "proto" and value "h2". The - parameter "tls" consists of the keyword "tls" without - value. Each parameter is described as follows. + "sni=", "fall=", "rise=", and + "affinity=". The parameter consists of keyword, + and optionally followed by "=" and value. For example, + the parameter "proto=h2" consists of the keyword "proto" + and value "h2". The parameter "tls" consists of the + keyword "tls" without value. Each parameter is + described as follows. The backend application protocol can be specified using optional "proto" parameter, and in the form of @@ -1324,6 +1325,20 @@ Connections: backend is permanently offline, once it goes in that state, and this is the default behaviour. + The session affinity is enabled using + "affinity=" parameter. If "ip" is given in + , client IP based session affinity is enabled. + If "none" is given in , session affinity is + disabled, and this is the default. The session affinity + is enabled per . If at least one backend has + "affinity" parameter, and its is not "none", + session affinity is enabled for all backend servers + sharing the same . It is advised to set + "affinity" parameter to all backend explicitly if + session affinity is desired. The session affinity may + break if one of the backend gets unreachable, or backend + settings are reload or replaced by API. + Since ";" and ":" are used as delimiter, must not contain these characters. Since ";" has special meaning in shell, the option value must be quoted. diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 5f4eb6e5..3f98bc36 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -407,6 +407,7 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl, faddr_(faddr), worker_(worker), left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN), + affinity_hash_(-1), should_close_after_write_(false), reset_conn_rtimer_required_(false) { @@ -682,6 +683,68 @@ void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) { dconn_pool.remove_downstream_connection(dconn); } +namespace { +uint32_t hash32(const StringRef &s) { + /* 32 bit FNV-1a: http://isthe.com/chongo/tech/comp/fnv/ */ + uint32_t h = 2166136261u; + size_t i; + + for (i = 0; i < s.size(); ++i) { + h ^= s[i]; + h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24); + } + + return h; +} +} // namespace + +namespace { +int32_t calculate_affinity_from_ip(const StringRef &ip) { + return hash32(ip) & 0x7fffffff; +} +} // namespace + +Http2Session *ClientHandler::select_http2_session_with_affinity( + const std::shared_ptr &group, DownstreamAddr *addr) { + auto &shared_addr = group->shared_addr; + + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Selected DownstreamAddr=" << addr + << ", index=" << (addr - shared_addr->addrs.data()); + } + + if (addr->http2_extra_freelist.size()) { + auto session = addr->http2_extra_freelist.head; + session->remove_from_freelist(); + + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Use Http2Session " << session + << " from http2_extra_freelist"; + } + + if (session->max_concurrency_reached(1)) { + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Maximum streams are reached for Http2Session(" + << session << ")."; + } + } else { + session->add_to_avail_freelist(); + } + return session; + } + + auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(), + worker_, group, addr); + + if (LOG_ENABLED(INFO)) { + CLOG(INFO, this) << "Create new Http2Session " << session; + } + + session->add_to_avail_freelist(); + + return session; +} + namespace { // Returns true if load of |lhs| is lighter than that of |rhs|. // Currently, we assume that lesser streams means lesser load. @@ -863,11 +926,42 @@ ClientHandler::get_downstream_connection(Downstream *downstream) { auto &group = groups[group_idx]; auto &shared_addr = group->shared_addr; - auto proto = PROTO_NONE; + if (shared_addr->affinity == AFFINITY_IP) { + if (affinity_hash_ == -1) { + affinity_hash_ = calculate_affinity_from_ip(StringRef{ipaddr_}); + } + + auto idx = affinity_hash_ % shared_addr->addrs.size(); + + auto &addr = shared_addr->addrs[idx]; + if (addr.proto == PROTO_HTTP2) { + auto http2session = select_http2_session_with_affinity(group, &addr); + + auto dconn = make_unique(http2session); + + dconn->set_client_handler(this); + + return std::move(dconn); + } + + auto &dconn_pool = addr.dconn_pool; + auto dconn = dconn_pool->pop_downstream_connection(); + + if (!dconn) { + dconn = make_unique(group, idx, conn_.loop, + worker_); + } + + dconn->set_client_handler(this); + + return dconn; + } auto http1_weight = shared_addr->http1_pri.weight; auto http2_weight = shared_addr->http2_pri.weight; + auto proto = PROTO_NONE; + if (http1_weight > 0 && http2_weight > 0) { // We only advance cycle if both weight has nonzero to keep its // distance under WEIGHT_MAX. @@ -927,7 +1021,8 @@ ClientHandler::get_downstream_connection(Downstream *downstream) { << " Create new one"; } - dconn = make_unique(group, conn_.loop, worker_); + dconn = + make_unique(group, -1, conn_.loop, worker_); } dconn->set_client_handler(this); diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 93f7b845..a207956d 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -50,6 +50,7 @@ class DownstreamConnectionPool; class Worker; struct WorkerStat; struct DownstreamAddrGroup; +struct DownstreamAddr; class ClientHandler { public: @@ -145,6 +146,9 @@ public: Http2Session * select_http2_session(const std::shared_ptr &group); + Http2Session *select_http2_session_with_affinity( + const std::shared_ptr &group, DownstreamAddr *addr); + const UpstreamAddr *get_upstream_addr() const; private: @@ -167,6 +171,7 @@ private: Worker *worker_; // The number of bytes of HTTP/2 client connection header to read size_t left_connhd_len_; + int32_t affinity_hash_; bool should_close_after_write_; bool reset_conn_rtimer_required_; ReadBuf rb_; diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index 0fa7082f..d0e4c9b5 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -651,6 +651,7 @@ struct DownstreamParams { size_t fall; size_t rise; shrpx_proto proto; + shrpx_session_affinity affinity; bool tls; }; @@ -715,6 +716,16 @@ int parse_downstream_params(DownstreamParams &out, out.tls = false; } else if (util::istarts_with_l(param, "sni=")) { out.sni = StringRef{first + str_size("sni="), end}; + } else if (util::istarts_with_l(param, "affinity=")) { + auto valstr = StringRef{first + str_size("affinity="), end}; + if (util::strieq_l("none", valstr)) { + out.affinity = AFFINITY_NONE; + } else if (util::strieq_l("ip", valstr)) { + out.affinity = AFFINITY_IP; + } else { + LOG(ERROR) << "backend: affinity: value must be either none or ip"; + return -1; + } } else if (!param.empty()) { LOG(ERROR) << "backend: " << param << ": unknown keyword"; return -1; @@ -778,6 +789,11 @@ int parse_mapping(Config *config, DownstreamAddrConfig addr, } for (auto &g : addr_groups) { if (g.pattern == pattern) { + // Last value wins if we have multiple different affinity + // value under one group. + if (params.affinity != AFFINITY_NONE) { + g.affinity = params.affinity; + } g.addrs.push_back(addr); done = true; break; @@ -791,6 +807,7 @@ int parse_mapping(Config *config, DownstreamAddrConfig addr, addr_groups.emplace_back(StringRef{pattern}); auto &g = addr_groups.back(); g.addrs.push_back(addr); + g.affinity = params.affinity; if (pattern[0] == '*') { // wildcard pattern diff --git a/src/shrpx_config.h b/src/shrpx_config.h index c22835cb..7bfe2990 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -290,6 +290,13 @@ constexpr int16_t DEFAULT_DOWNSTREAM_PORT = 80; enum shrpx_proto { PROTO_NONE, PROTO_HTTP1, PROTO_HTTP2, PROTO_MEMCACHED }; +enum shrpx_session_affinity { + // No session affinity + AFFINITY_NONE, + // Client IP affinity + AFFINITY_IP, +}; + enum shrpx_forwarded_param { FORWARDED_NONE = 0, FORWARDED_BY = 0x1, @@ -354,10 +361,12 @@ struct DownstreamAddrConfig { struct DownstreamAddrGroupConfig { DownstreamAddrGroupConfig(const StringRef &pattern) - : pattern(pattern.c_str(), pattern.size()) {} + : pattern(pattern.c_str(), pattern.size()), affinity(AFFINITY_NONE) {} ImmutableString pattern; std::vector addrs; + // Session affinity + shrpx_session_affinity affinity; }; struct TicketKey { diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 14d1e000..77770a09 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -148,8 +148,8 @@ void connectcb(struct ev_loop *loop, ev_io *w, int revents) { } // namespace HttpDownstreamConnection::HttpDownstreamConnection( - const std::shared_ptr &group, struct ev_loop *loop, - Worker *worker) + const std::shared_ptr &group, ssize_t initial_addr_idx, + struct ev_loop *loop, Worker *worker) : conn_(loop, -1, nullptr, worker->get_mcpool(), worker->get_downstream_config()->timeout.write, worker->get_downstream_config()->timeout.read, {}, {}, connectcb, @@ -164,7 +164,8 @@ HttpDownstreamConnection::HttpDownstreamConnection( group_(group), addr_(nullptr), ioctrl_(&conn_.rlimit), - response_htp_{0} {} + response_htp_{0}, + initial_addr_idx_(initial_addr_idx) {} HttpDownstreamConnection::~HttpDownstreamConnection() { if (LOG_ENABLED(INFO)) { @@ -191,7 +192,13 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) { if (conn_.fd == -1) { auto &shared_addr = group_->shared_addr; auto &addrs = shared_addr->addrs; - auto &next_downstream = shared_addr->next; + + // If session affinity is enabled, we always start with address at + // initial_addr_idx_. + size_t temp_idx = initial_addr_idx_; + + auto &next_downstream = + shared_addr->affinity == AFFINITY_NONE ? shared_addr->next : temp_idx; auto end = next_downstream; for (;;) { auto &addr = addrs[next_downstream]; diff --git a/src/shrpx_http_downstream_connection.h b/src/shrpx_http_downstream_connection.h index 40b53f44..48241e2d 100644 --- a/src/shrpx_http_downstream_connection.h +++ b/src/shrpx_http_downstream_connection.h @@ -43,7 +43,8 @@ struct DownstreamAddr; class HttpDownstreamConnection : public DownstreamConnection { public: HttpDownstreamConnection(const std::shared_ptr &group, - struct ev_loop *loop, Worker *worker); + ssize_t initial_addr_idx, struct ev_loop *loop, + Worker *worker); virtual ~HttpDownstreamConnection(); virtual int attach_downstream(Downstream *downstream); virtual void detach_downstream(Downstream *downstream); @@ -93,6 +94,7 @@ private: DownstreamAddr *addr_; IOControl ioctrl_; http_parser response_htp_; + ssize_t initial_addr_idx_; }; } // namespace shrpx diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 745de073..c0f9ee01 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -71,6 +71,10 @@ bool match_shared_downstream_addr( return false; } + if (lhs->affinity != rhs->affinity) { + return false; + } + auto used = std::vector(lhs->addrs.size()); for (auto &a : lhs->addrs) { @@ -143,7 +147,17 @@ void Worker::replace_downstream_config( std::shared_ptr downstreamconf) { for (auto &g : downstream_addr_groups_) { g->retired = true; - g->shared_addr->dconn_pool.remove_all(); + + auto &shared_addr = g->shared_addr; + + if (shared_addr->affinity == AFFINITY_NONE) { + shared_addr->dconn_pool.remove_all(); + continue; + } + + for (auto &addr : shared_addr->addrs) { + addr.dconn_pool->remove_all(); + } } downstreamconf_ = downstreamconf; @@ -163,6 +177,7 @@ void Worker::replace_downstream_config( auto shared_addr = std::make_shared(); shared_addr->addrs.resize(src.addrs.size()); + shared_addr->affinity = src.affinity; size_t num_http1 = 0; size_t num_http2 = 0; @@ -240,6 +255,12 @@ void Worker::replace_downstream_config( shared_addr->http1_pri.weight = num_http1; shared_addr->http2_pri.weight = num_http2; + if (shared_addr->affinity != AFFINITY_NONE) { + for (auto &addr : shared_addr->addrs) { + addr.dconn_pool = make_unique(); + } + } + dst->shared_addr = shared_addr; } else { if (LOG_ENABLED(INFO)) { diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index 1a334723..26aabe38 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -86,6 +86,9 @@ struct DownstreamAddr { std::unique_ptr connect_blocker; std::unique_ptr live_check; + // Connection pool for this particular address if session affinity + // is enabled + std::unique_ptr dconn_pool; size_t fall; size_t rise; // Client side TLS session cache @@ -139,6 +142,8 @@ struct SharedDownstreamAddr { // HTTP/1.1. Otherwise, choose HTTP/2. WeightedPri http1_pri; WeightedPri http2_pri; + // Session affinity + shrpx_session_affinity affinity; }; struct DownstreamAddrGroup {