diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f33f4bff..d5c67a3b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -163,6 +163,7 @@ if(ENABLE_APP) memchunk_test.cc template_test.cc base64_test.cc + priority_queue_test.cc ) add_executable(nghttpx-unittest EXCLUDE_FROM_ALL ${NGHTTPX_UNITTEST_SOURCES} diff --git a/src/Makefile.am b/src/Makefile.am index 440f6e76..df464aad 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -140,7 +140,8 @@ NGHTTPX_SRCS = \ shrpx_dual_dns_resolver.cc shrpx_dual_dns_resolver.h \ shrpx_dns_tracker.cc shrpx_dns_tracker.h \ buffer.h memchunk.h template.h allocator.h \ - xsi_strerror.c xsi_strerror.h + xsi_strerror.c xsi_strerror.h \ + priority_queue.h if HAVE_MRUBY NGHTTPX_SRCS += \ @@ -186,7 +187,8 @@ nghttpx_unittest_SOURCES = shrpx-unittest.cc \ buffer_test.cc buffer_test.h \ memchunk_test.cc memchunk_test.h \ template_test.cc template_test.h \ - base64_test.cc base64_test.h + base64_test.cc base64_test.h \ + priority_queue_test.cc priority_queue_test.h nghttpx_unittest_CPPFLAGS = ${AM_CPPFLAGS} \ -DNGHTTP2_SRC_DIR=\"$(top_srcdir)/src\" nghttpx_unittest_LDADD = libnghttpx.a ${LDADD} @CUNIT_LIBS@ @TESTLDADD@ diff --git a/src/memchunk.h b/src/memchunk.h index ba8d0e18..f0e24b85 100644 --- a/src/memchunk.h +++ b/src/memchunk.h @@ -187,6 +187,14 @@ template struct Memchunks { size_t append(const ImmutableString &s) { return append(s.c_str(), s.size()); } + size_t copy(Memchunks &dest) { + auto m = head; + while (m) { + dest.append(m->pos, m->len()); + m = m->next; + } + return len; + } size_t remove(void *dest, size_t count) { if (!tail || count == 0) { return 0; diff --git a/src/priority_queue.h b/src/priority_queue.h new file mode 100644 index 00000000..b7e0ae48 --- /dev/null +++ b/src/priority_queue.h @@ -0,0 +1,145 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2019 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef PRIORITY_QUEUE_H +#define PRIORITY_QUEUE_H + +#include "nghttp2_config.h" + +#include +#include +#include +#include + +namespace nghttp2 { + +template > +class PriorityQueue { +public: + const ValueType &top() const; + const KeyType &key_top() const; + void push(const KeyType &key, const ValueType &value); + void push(KeyType &&key, ValueType &&value); + template void emplace(Args &&... args); + void pop(); + bool empty() const; + size_t size() const; + +private: + void bubble_up(size_t idx); + void bubble_down(size_t idx); + + std::vector> c_; + Compare comp_; +}; + +template +const ValueType &PriorityQueue::top() const { + assert(!c_.empty()); + return c_[0].second; +} + +template +const KeyType &PriorityQueue::key_top() const { + assert(!c_.empty()); + return c_[0].first; +} + +template +void PriorityQueue::push(const KeyType &key, + const ValueType &value) { + c_.push_back(std::pair{key, value}); + bubble_up(c_.size() - 1); +} + +template +void PriorityQueue::push(KeyType &&key, + ValueType &&value) { + c_.push_back(std::pair{std::move(key), std::move(value)}); + bubble_up(c_.size() - 1); +} + +template +template +void PriorityQueue::emplace(Args &&... args) { + c_.emplace_back(std::forward(args)...); + bubble_up(c_.size() - 1); +} + +template +void PriorityQueue::pop() { + assert(!c_.empty()); + c_[0] = std::move(c_.back()); + c_.resize(c_.size() - 1); + bubble_down(0); +} + +template +bool PriorityQueue::empty() const { + return c_.empty(); +} + +template +size_t PriorityQueue::size() const { + return c_.size(); +} + +template +void PriorityQueue::bubble_up(size_t idx) { + using std::swap; + while (idx != 0) { + auto parent = (idx - 1) / 2; + if (!comp_(c_[idx].first, c_[parent].first)) { + return; + } + swap(c_[idx], c_[parent]); + idx = parent; + } +} + +template +void PriorityQueue::bubble_down(size_t idx) { + using std::swap; + for (;;) { + auto j = idx * 2 + 1; + auto minidx = idx; + for (auto i = 0; i < 2; ++i, ++j) { + if (j >= c_.size()) { + break; + } + if (comp_(c_[j].first, c_[minidx].first)) { + minidx = j; + } + } + if (minidx == idx) { + return; + } + swap(c_[idx], c_[minidx]); + idx = minidx; + } +} + +} // namespace nghttp2 + +#endif // PRIORITY_QUEUE_H diff --git a/src/priority_queue_test.cc b/src/priority_queue_test.cc new file mode 100644 index 00000000..046cb0f6 --- /dev/null +++ b/src/priority_queue_test.cc @@ -0,0 +1,93 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2019 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "priority_queue_test.h" + +#include + +#include + +#include "priority_queue.h" + +namespace nghttp2 { + +void test_priority_queue_push(void) { + PriorityQueue pq; + + CU_ASSERT(pq.empty()); + CU_ASSERT(0 == pq.size()); + + pq.push("foo", 1); + + CU_ASSERT(!pq.empty()); + CU_ASSERT(1 == pq.size()); + + auto top = pq.top(); + + CU_ASSERT(1 == top); + + pq.emplace("bar", 2); + top = pq.top(); + + CU_ASSERT(2 == top); + + pq.push("baz", 3); + top = pq.top(); + + CU_ASSERT(2 == top); + + pq.push("C", 4); + + CU_ASSERT(4 == pq.size()); + + top = pq.top(); + + CU_ASSERT(4 == top); + + pq.pop(); + + CU_ASSERT(3 == pq.size()); + + top = pq.top(); + + CU_ASSERT(2 == top); + + pq.pop(); + + top = pq.top(); + + CU_ASSERT(3 == top); + + pq.pop(); + top = pq.top(); + + CU_ASSERT(1 == top); + + pq.pop(); + + CU_ASSERT(pq.empty()); + CU_ASSERT(0 == pq.size()); +} + +} // namespace nghttp2 diff --git a/src/priority_queue_test.h b/src/priority_queue_test.h new file mode 100644 index 00000000..eb013865 --- /dev/null +++ b/src/priority_queue_test.h @@ -0,0 +1,38 @@ +/* + * nghttp2 - HTTP/2 C Library + * + * Copyright (c) 2019 Tatsuhiro Tsujikawa + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef PRIORITY_QUEUE_TEST_H +#define PRIORITY_QUEUE_TEST_H + +#ifdef HAVE_CONFIG_H +# include +#endif /* HAVE_CONFIG_H */ + +namespace nghttp2 { + +void test_priority_queue_push(void); + +} // namespace nghttp2 + +#endif // PRIORITY_QUEUE_TEST_H diff --git a/src/shrpx-unittest.cc b/src/shrpx-unittest.cc index dd4fadf1..3483df0b 100644 --- a/src/shrpx-unittest.cc +++ b/src/shrpx-unittest.cc @@ -45,6 +45,7 @@ #include "shrpx_config.h" #include "tls.h" #include "shrpx_router_test.h" +#include "priority_queue_test.h" #include "shrpx_log.h" static int init_suite1(void) { return 0; } @@ -217,7 +218,9 @@ int main(int argc, char *argv[]) { !CU_add_test(pSuite, "template_string_ref", nghttp2::test_template_string_ref) || !CU_add_test(pSuite, "base64_encode", nghttp2::test_base64_encode) || - !CU_add_test(pSuite, "base64_decode", nghttp2::test_base64_decode)) { + !CU_add_test(pSuite, "base64_decode", nghttp2::test_base64_decode) || + !CU_add_test(pSuite, "priority_queue_push", + nghttp2::test_priority_queue_push)) { CU_cleanup_registry(); return CU_get_error(); } diff --git a/src/shrpx.cc b/src/shrpx.cc index 2c0d0bba..85b69dcb 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -1738,13 +1738,13 @@ Connections: "sni=", "fall=", "rise=", "affinity=", "dns", "redirect-if-not-tls", "upgrade-scheme", "mruby=", - "read-timeout=", and - "write-timeout=". The parameter consists of - keyword, and optionally followed by "=" and value. For - example, the parameter "proto=h2" consists of the - keyword "proto" and value "h2". The parameter "tls" - consists of the keyword "tls" without value. Each - parameter is described as follows. + "read-timeout=", "write-timeout=", + "group=", "group-weight=", and "weight=". + The parameter consists of keyword, and optionally + followed by "=" and value. For example, the parameter + "proto=h2" consists of the keyword "proto" and value + "h2". The parameter "tls" consists of the keyword "tls" + without value. Each parameter is described as follows. The backend application protocol can be specified using optional "proto" parameter, and in the form of @@ -1850,6 +1850,31 @@ Connections: pattern, --backend-read-timeout and --backend-write-timeout are used. + "group=" parameter specifies the name of group + this backend address belongs to. By default, it belongs + to the unnamed default group. The name of group is + unique per pattern. "group-weight=" parameter + specifies the weight of the group. The higher weight + gets more frequently selected by the load balancing + algorithm. must be [1, 256] inclusive. The weight + 8 has 4 times more weight than 2. must be the same + for all addresses which share the same . If + "group-weight" is omitted in an address, but the other + address which belongs to the same group specifies + "group-weight", its weight is used. If no + "group-weight" is specified for all addresses, the + weight of a group becomes 1. "group" and "group-weight" + are ignored if session affinity is enabled. + + "weight=" parameter specifies the weight of the + backend address inside a group which this address + belongs to. The higher weight gets more frequently + selected by the load balancing algorithm. must be + [1, 256] inclusive. The weight 8 has 4 times more + weight than weight 2. If this parameter is omitted, + weight becomes 1. "weight" is ignored if session + affinity is enabled. + Since ";" and ":" are used as delimiter, must not contain these characters. Since ";" has special meaning in shell, the option value must be quoted. diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 71e522a0..7b377d38 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -682,7 +682,7 @@ uint32_t compute_affinity_from_ip(const StringRef &ip) { } } // namespace -Http2Session *ClientHandler::select_http2_session_with_affinity( +Http2Session *ClientHandler::get_http2_session( const std::shared_ptr &group, DownstreamAddr *addr) { auto &shared_addr = group->shared_addr; @@ -735,171 +735,6 @@ Http2Session *ClientHandler::select_http2_session_with_affinity( return session; } -namespace { -// Returns true if load of |lhs| is lighter than that of |rhs|. -// Currently, we assume that lesser streams means lesser load. -bool load_lighter(const DownstreamAddr *lhs, const DownstreamAddr *rhs) { - return lhs->num_dconn < rhs->num_dconn; -} -} // namespace - -Http2Session *ClientHandler::select_http2_session( - const std::shared_ptr &group) { - auto &shared_addr = group->shared_addr; - - // First count the working backend addresses. - size_t min = 0; - for (const auto &addr : shared_addr->addrs) { - if (addr.proto != Proto::HTTP2 || addr.connect_blocker->blocked()) { - continue; - } - - ++min; - } - - if (min == 0) { - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "No working backend address found"; - } - - return nullptr; - } - - auto &http2_avail_freelist = shared_addr->http2_avail_freelist; - - if (http2_avail_freelist.size() >= min) { - for (auto session = http2_avail_freelist.head; session;) { - auto next = session->dlnext; - - session->remove_from_freelist(); - - // session may be in graceful shutdown period now. - if (session->max_concurrency_reached(0)) { - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) - << "Maximum streams have been reached for Http2Session(" - << session << "). Skip it"; - } - - session = next; - - continue; - } - - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "Use Http2Session " << session - << " from http2_avail_freelist"; - } - - if (session->max_concurrency_reached(1)) { - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "Maximum streams are reached for Http2Session(" - << session << ")."; - } - } else { - session->add_to_avail_freelist(); - } - return session; - } - } - - DownstreamAddr *selected_addr = nullptr; - - for (auto &addr : shared_addr->addrs) { - if (addr.in_avail || addr.proto != Proto::HTTP2 || - (addr.http2_extra_freelist.size() == 0 && - addr.connect_blocker->blocked())) { - continue; - } - - for (auto session = addr.http2_extra_freelist.head; session;) { - auto next = session->dlnext; - - // session may be in graceful shutdown period now. - if (session->max_concurrency_reached(0)) { - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) - << "Maximum streams have been reached for Http2Session(" - << session << "). Skip it"; - } - - session->remove_from_freelist(); - - session = next; - - continue; - } - - break; - } - - if (selected_addr == nullptr || load_lighter(&addr, selected_addr)) { - selected_addr = &addr; - } - } - - assert(selected_addr); - - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "Selected DownstreamAddr=" << selected_addr - << ", index=" - << (selected_addr - shared_addr->addrs.data()); - } - - if (selected_addr->http2_extra_freelist.size()) { - auto session = selected_addr->http2_extra_freelist.head; - session->remove_from_freelist(); - - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "Use Http2Session " << session - << " from http2_extra_freelist"; - } - - if (session->max_concurrency_reached(1)) { - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "Maximum streams are reached for Http2Session(" - << session << ")."; - } - } else { - session->add_to_avail_freelist(); - } - return session; - } - - auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(), - worker_, group, selected_addr); - - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "Create new Http2Session " << session; - } - - session->add_to_avail_freelist(); - - return session; -} - -namespace { -// The chosen value is small enough for uint32_t, and large enough for -// the number of backend. -constexpr uint32_t WEIGHT_MAX = 65536; -} // namespace - -namespace { -bool pri_less(const WeightedPri &lhs, const WeightedPri &rhs) { - if (lhs.cycle < rhs.cycle) { - return rhs.cycle - lhs.cycle <= WEIGHT_MAX; - } - - return lhs.cycle - rhs.cycle > WEIGHT_MAX; -} -} // namespace - -namespace { -uint32_t next_cycle(const WeightedPri &pri) { - return pri.cycle + WEIGHT_MAX / std::min(WEIGHT_MAX, pri.weight); -} -} // namespace - uint32_t ClientHandler::get_affinity_cookie(Downstream *downstream, const StringRef &cookie_name) { auto h = downstream->find_affinity_cookie(cookie_name); @@ -918,9 +753,137 @@ uint32_t ClientHandler::get_affinity_cookie(Downstream *downstream, return h; } +namespace { +void reschedule_addr(PriorityQueue &pq, + DownstreamAddr *addr) { + auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + addr->pending_penalty; + addr->cycle += penalty / addr->weight; + addr->pending_penalty = penalty % addr->weight; + + pq.emplace(DownstreamAddrKey{addr->cycle, addr->seq}, addr); + addr->queued = true; +} +} // namespace + +namespace { +void reschedule_wg( + PriorityQueue &pq, + WeightGroup *wg) { + auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + wg->pending_penalty; + wg->cycle += penalty / wg->weight; + wg->pending_penalty = penalty % wg->weight; + + pq.emplace(DownstreamAddrKey{wg->cycle, wg->seq}, wg); + wg->queued = true; +} +} // namespace + +DownstreamAddr *ClientHandler::get_downstream_addr(int &err, + DownstreamAddrGroup *group, + Downstream *downstream) { + err = 0; + + switch (faddr_->alt_mode) { + case UpstreamAltMode::API: + case UpstreamAltMode::HEALTHMON: + assert(0); + default: + break; + } + + auto &shared_addr = group->shared_addr; + + if (shared_addr->affinity.type != SessionAffinity::NONE) { + uint32_t hash; + switch (shared_addr->affinity.type) { + case SessionAffinity::IP: + if (!affinity_hash_computed_) { + affinity_hash_ = compute_affinity_from_ip(ipaddr_); + affinity_hash_computed_ = true; + } + hash = affinity_hash_; + break; + case SessionAffinity::COOKIE: + hash = get_affinity_cookie(downstream, shared_addr->affinity.cookie.name); + break; + default: + assert(0); + } + + const auto &affinity_hash = shared_addr->affinity_hash; + + auto it = std::lower_bound( + std::begin(affinity_hash), std::end(affinity_hash), hash, + [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; }); + + if (it == std::end(affinity_hash)) { + it = std::begin(affinity_hash); + } + + auto aff_idx = + static_cast(std::distance(std::begin(affinity_hash), it)); + auto idx = (*it).idx; + auto addr = &shared_addr->addrs[idx]; + + if (addr->connect_blocker->blocked()) { + size_t i; + for (i = aff_idx + 1; i != aff_idx; ++i) { + if (i == shared_addr->affinity_hash.size()) { + i = 0; + } + addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx]; + if (addr->connect_blocker->blocked()) { + continue; + } + break; + } + if (i == aff_idx) { + err = -1; + return nullptr; + } + aff_idx = i; + } + + return addr; + } + + auto &wgpq = shared_addr->pq; + + for (;;) { + if (wgpq.empty()) { + CLOG(INFO, this) << "No working downstream address found"; + err = -1; + return nullptr; + } + + auto wg = wgpq.top(); + wgpq.pop(); + wg->queued = false; + + for (;;) { + if (wg->pq.empty()) { + break; + } + + auto addr = wg->pq.top(); + wg->pq.pop(); + addr->queued = false; + + if (addr->connect_blocker->blocked()) { + continue; + } + + reschedule_addr(wg->pq, addr); + reschedule_wg(wgpq, wg); + + return addr; + } + } +} + std::unique_ptr -ClientHandler::get_downstream_connection(int &err, Downstream *downstream, - Proto pref_proto) { +ClientHandler::get_downstream_connection(int &err, Downstream *downstream) { size_t group_idx; auto &downstreamconf = *worker_->get_downstream_config(); auto &routerconf = downstreamconf.router; @@ -984,173 +947,27 @@ ClientHandler::get_downstream_connection(int &err, Downstream *downstream, } auto &group = groups[group_idx]; - auto &shared_addr = group->shared_addr; - - if (shared_addr->affinity.type != SessionAffinity::NONE) { - uint32_t hash; - switch (shared_addr->affinity.type) { - case SessionAffinity::IP: - if (!affinity_hash_computed_) { - affinity_hash_ = compute_affinity_from_ip(ipaddr_); - affinity_hash_computed_ = true; - } - hash = affinity_hash_; - break; - case SessionAffinity::COOKIE: - hash = get_affinity_cookie(downstream, shared_addr->affinity.cookie.name); - break; - default: - assert(0); - } - - const auto &affinity_hash = shared_addr->affinity_hash; - - auto it = std::lower_bound( - std::begin(affinity_hash), std::end(affinity_hash), hash, - [](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; }); - - if (it == std::end(affinity_hash)) { - it = std::begin(affinity_hash); - } - - auto aff_idx = - static_cast(std::distance(std::begin(affinity_hash), it)); - auto idx = (*it).idx; - auto addr = &shared_addr->addrs[idx]; - - if (addr->connect_blocker->blocked()) { - size_t i; - for (i = aff_idx + 1; i != aff_idx; ++i) { - if (i == shared_addr->affinity_hash.size()) { - i = 0; - } - addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx]; - if (addr->connect_blocker->blocked() || - (pref_proto != Proto::NONE && pref_proto != addr->proto)) { - continue; - } - break; - } - if (i == aff_idx) { - err = -1; - return nullptr; - } - aff_idx = i; - } - - if (addr->proto == Proto::HTTP2) { - auto http2session = select_http2_session_with_affinity(group, addr); - - auto dconn = std::make_unique(http2session); - - dconn->set_client_handler(this); - - return std::move(dconn); - } - - auto &dconn_pool = addr->dconn_pool; - auto dconn = dconn_pool->pop_downstream_connection(); - - if (!dconn) { - dconn = std::make_unique(group, aff_idx, - conn_.loop, worker_); - } - - dconn->set_client_handler(this); - - return dconn; - } - - auto http1_weight = shared_addr->http1_pri.weight; - auto http2_weight = shared_addr->http2_pri.weight; - - auto proto = Proto::NONE; - - if (pref_proto == Proto::HTTP1) { - if (http1_weight > 0) { - proto = Proto::HTTP1; - } - } else if (pref_proto == Proto::HTTP2) { - if (http2_weight > 0) { - proto = Proto::HTTP2; - } - } else if (http1_weight > 0 && http2_weight > 0) { - // We only advance cycle if both weight has nonzero to keep its - // distance under WEIGHT_MAX. - if (pri_less(shared_addr->http1_pri, shared_addr->http2_pri)) { - proto = Proto::HTTP1; - shared_addr->http1_pri.cycle = next_cycle(shared_addr->http1_pri); - } else { - proto = Proto::HTTP2; - shared_addr->http2_pri.cycle = next_cycle(shared_addr->http2_pri); - } - } else if (http1_weight > 0) { - proto = Proto::HTTP1; - } else if (http2_weight > 0) { - proto = Proto::HTTP2; - } - - if (proto == Proto::NONE) { - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "No working downstream address found"; - } - - err = -1; + auto addr = get_downstream_addr(err, group.get(), downstream); + if (addr == nullptr) { return nullptr; } - if (proto == Proto::HTTP2) { + if (addr->proto == Proto::HTTP1) { + auto dconn = addr->dconn_pool->pop_downstream_connection(); + if (dconn) { + dconn->set_client_handler(this); + return dconn; + } + if (LOG_ENABLED(INFO)) { CLOG(INFO, this) << "Downstream connection pool is empty." << " Create new one"; } - auto http2session = select_http2_session(group); - - if (http2session == nullptr) { - err = -1; - return nullptr; - } - - auto dconn = std::make_unique(http2session); - + dconn = std::make_unique(group, addr, conn_.loop, + worker_); dconn->set_client_handler(this); - - return std::move(dconn); - } - - auto end = shared_addr->next; - for (;;) { - auto addr = &shared_addr->addrs[shared_addr->next]; - - if (addr->proto != Proto::HTTP1) { - if (++shared_addr->next >= shared_addr->addrs.size()) { - shared_addr->next = 0; - } - - assert(end != shared_addr->next); - - continue; - } - - // pool connection must be HTTP/1.1 connection - auto dconn = addr->dconn_pool->pop_downstream_connection(); - if (dconn) { - if (++shared_addr->next >= shared_addr->addrs.size()) { - shared_addr->next = 0; - } - - if (LOG_ENABLED(INFO)) { - CLOG(INFO, this) << "Reuse downstream connection DCONN:" << dconn.get() - << " from pool"; - } - - dconn->set_client_handler(this); - - return dconn; - } - - break; + return dconn; } if (LOG_ENABLED(INFO)) { @@ -1158,11 +975,9 @@ ClientHandler::get_downstream_connection(int &err, Downstream *downstream, << " Create new one"; } - auto dconn = - std::make_unique(group, 0, conn_.loop, worker_); - + auto http2session = get_http2_session(group, addr); + auto dconn = std::make_unique(http2session); dconn->set_client_handler(this); - return dconn; } diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 37999f26..c31b1ee4 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -99,14 +99,14 @@ public: void pool_downstream_connection(std::unique_ptr dconn); void remove_downstream_connection(DownstreamConnection *dconn); + DownstreamAddr *get_downstream_addr(int &err, DownstreamAddrGroup *group, + Downstream *downstream); // Returns DownstreamConnection object based on request path. This // function returns non-null DownstreamConnection, and assigns 0 to // |err| if it succeeds, or returns nullptr, and assigns negative - // error code to |err|. If |pref_proto| is not PROTO_NONE, choose - // backend whose protocol is |pref_proto|. + // error code to |err|. std::unique_ptr - get_downstream_connection(int &err, Downstream *downstream, - Proto pref_proto = Proto::NONE); + get_downstream_connection(int &err, Downstream *downstream); MemchunkPool *get_mcpool(); SSL *get_ssl() const; // Call this function when HTTP/2 connection header is received at @@ -150,10 +150,8 @@ public: StringRef get_forwarded_for() const; Http2Session * - select_http2_session(const std::shared_ptr &group); - - Http2Session *select_http2_session_with_affinity( - const std::shared_ptr &group, DownstreamAddr *addr); + get_http2_session(const std::shared_ptr &group, + DownstreamAddr *addr); // Returns an affinity cookie value for |downstream|. |cookie_name| // is used to inspect cookie header field in request header fields. diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index f1cdb121..bc02b148 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -47,6 +47,7 @@ #include #include #include +#include #include @@ -813,11 +814,14 @@ int parse_upstream_params(UpstreamParams &out, const StringRef &src_params) { struct DownstreamParams { StringRef sni; StringRef mruby; + StringRef group; AffinityConfig affinity; ev_tstamp read_timeout; ev_tstamp write_timeout; size_t fall; size_t rise; + uint32_t weight; + uint32_t group_weight; Proto proto; bool tls; bool dns; @@ -960,6 +964,43 @@ int parse_downstream_params(DownstreamParams &out, StringRef{first + str_size("write-timeout="), end}) == -1) { return -1; } + } else if (util::istarts_with_l(param, "weight=")) { + auto valstr = StringRef{first + str_size("weight="), end}; + if (valstr.empty()) { + LOG(ERROR) + << "backend: weight: non-negative integer [1, 256] is expected"; + return -1; + } + + auto n = util::parse_uint(valstr); + if (n < 1 || n > 256) { + LOG(ERROR) + << "backend: weight: non-negative integer [1, 256] is expected"; + return -1; + } + out.weight = n; + } else if (util::istarts_with_l(param, "group=")) { + auto valstr = StringRef{first + str_size("group="), end}; + if (valstr.empty()) { + LOG(ERROR) << "backend: group: empty string is not allowed"; + return -1; + } + out.group = valstr; + } else if (util::istarts_with_l(param, "group-weight=")) { + auto valstr = StringRef{first + str_size("group-weight="), end}; + if (valstr.empty()) { + LOG(ERROR) << "backend: group-weight: non-negative integer [1, 256] is " + "expected"; + return -1; + } + + auto n = util::parse_uint(valstr); + if (n < 1 || n > 256) { + LOG(ERROR) << "backend: group-weight: non-negative integer [1, 256] is " + "expected"; + return -1; + } + out.group_weight = n; } else if (!param.empty()) { LOG(ERROR) << "backend: " << param << ": unknown keyword"; return -1; @@ -996,6 +1037,7 @@ int parse_mapping(Config *config, DownstreamAddrConfig &addr, DownstreamParams params{}; params.proto = Proto::HTTP1; + params.weight = 1; if (parse_downstream_params(params, src_params) != 0) { return -1; @@ -1015,6 +1057,9 @@ int parse_mapping(Config *config, DownstreamAddrConfig &addr, addr.fall = params.fall; addr.rise = params.rise; + addr.weight = params.weight; + addr.group = make_string_ref(downstreamconf.balloc, params.group); + addr.group_weight = params.group_weight; addr.proto = params.proto; addr.tls = params.tls; addr.sni = make_string_ref(downstreamconf.balloc, params.sni); @@ -4025,7 +4070,17 @@ int configure_downstream_group(Config *config, bool http2_proxy, auto resolve_flags = numeric_addr_only ? AI_NUMERICHOST | AI_NUMERICSERV : 0; for (auto &g : addr_groups) { + std::unordered_map wgchk; for (auto &addr : g.addrs) { + if (addr.group_weight) { + auto it = wgchk.find(addr.group); + if (it == std::end(wgchk)) { + wgchk.emplace(addr.group, addr.group_weight); + } else if ((*it).second != addr.group_weight) { + LOG(FATAL) << "backend: inconsistent group-weight for a single group"; + return -1; + } + } if (addr.host_unix) { // for AF_UNIX socket, we use "localhost" as host for backend @@ -4078,6 +4133,17 @@ int configure_downstream_group(Config *config, bool http2_proxy, } } + for (auto &addr : g.addrs) { + if (addr.group_weight == 0) { + auto it = wgchk.find(addr.group); + if (it == std::end(wgchk)) { + addr.group_weight = 1; + } else { + addr.group_weight = (*it).second; + } + } + } + if (g.affinity.type != SessionAffinity::NONE) { size_t idx = 0; for (auto &addr : g.addrs) { diff --git a/src/shrpx_config.h b/src/shrpx_config.h index 8ea9c55d..c638191f 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -468,8 +468,15 @@ struct DownstreamAddrConfig { StringRef hostport; // hostname sent as SNI field StringRef sni; + // name of group which this address belongs to. + StringRef group; size_t fall; size_t rise; + // weight of this address inside a weight group. Its range is [1, + // 256], inclusive. + uint32_t weight; + // weight of the weight group. Its range is [1, 256], inclusive. + uint32_t group_weight; // Application protocol used in this group Proto proto; // backend port. 0 if |host_unix| is true. diff --git a/src/shrpx_connect_blocker.cc b/src/shrpx_connect_blocker.cc index e261b815..e272d07b 100644 --- a/src/shrpx_connect_blocker.cc +++ b/src/shrpx_connect_blocker.cc @@ -128,8 +128,16 @@ void ConnectBlocker::online() { bool ConnectBlocker::in_offline() const { return offline_; } -void ConnectBlocker::call_block_func() { block_func_(); } +void ConnectBlocker::call_block_func() { + if (block_func_) { + block_func_(); + } +} -void ConnectBlocker::call_unblock_func() { unblock_func_(); } +void ConnectBlocker::call_unblock_func() { + if (unblock_func_) { + unblock_func_(); + } +} } // namespace shrpx diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index 4663b9c9..fd25236c 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -2318,22 +2318,6 @@ Http2Session::get_downstream_addr_group() const { return group_; } -void Http2Session::add_to_avail_freelist() { - if (freelist_zone_ != FreelistZone::NONE) { - return; - } - - if (LOG_ENABLED(INFO)) { - SSLOG(INFO, this) << "Append to http2_avail_freelist, group=" - << group_.get() << ", freelist.size=" - << group_->shared_addr->http2_avail_freelist.size(); - } - - freelist_zone_ = FreelistZone::AVAIL; - group_->shared_addr->http2_avail_freelist.append(this); - addr_->in_avail = true; -} - void Http2Session::add_to_extra_freelist() { if (freelist_zone_ != FreelistZone::NONE) { return; @@ -2353,15 +2337,6 @@ void Http2Session::remove_from_freelist() { switch (freelist_zone_) { case FreelistZone::NONE: return; - case FreelistZone::AVAIL: - if (LOG_ENABLED(INFO)) { - SSLOG(INFO, this) << "Remove from http2_avail_freelist, group=" << group_ - << ", freelist.size=" - << group_->shared_addr->http2_avail_freelist.size(); - } - group_->shared_addr->http2_avail_freelist.remove(this); - addr_->in_avail = false; - break; case FreelistZone::EXTRA: if (LOG_ENABLED(INFO)) { SSLOG(INFO, this) << "Remove from http2_extra_freelist, addr=" << addr_ diff --git a/src/shrpx_http2_session.h b/src/shrpx_http2_session.h index c5743b83..6bfd457d 100644 --- a/src/shrpx_http2_session.h +++ b/src/shrpx_http2_session.h @@ -61,9 +61,6 @@ struct StreamData { enum class FreelistZone { // Http2Session object is not linked in any freelist. NONE, - // Http2Session object is linked in group scope - // http2_avail_freelist. - AVAIL, // Http2Session object is linked in address scope // http2_extra_freelist. EXTRA, diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index 94ff99a5..cdf2d60f 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -456,38 +456,33 @@ void Http2Upstream::start_downstream(Downstream *downstream) { void Http2Upstream::initiate_downstream(Downstream *downstream) { int rv; - auto dconn = handler_->get_downstream_connection(rv, downstream); - if (!dconn) { - if (rv == SHRPX_ERR_TLS_REQUIRED) { - rv = redirect_to_https(downstream); - } else { - rv = error_reply(downstream, 502); - } - if (rv != 0) { - rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); - } + DownstreamConnection *dconn_ptr; - downstream->set_request_state(DownstreamState::CONNECT_FAIL); - downstream_queue_.mark_failure(downstream); + for (;;) { + auto dconn = handler_->get_downstream_connection(rv, downstream); + if (!dconn) { + if (rv == SHRPX_ERR_TLS_REQUIRED) { + rv = redirect_to_https(downstream); + } else { + rv = error_reply(downstream, 502); + } + if (rv != 0) { + rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); + } - return; - } + downstream->set_request_state(DownstreamState::CONNECT_FAIL); + downstream_queue_.mark_failure(downstream); + + return; + } #ifdef HAVE_MRUBY - auto dconn_ptr = dconn.get(); + dconn_ptr = dconn.get(); #endif // HAVE_MRUBY - rv = downstream->attach_downstream_connection(std::move(dconn)); - if (rv != 0) { - // downstream connection fails, send error page - if (error_reply(downstream, 502) != 0) { - rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); + rv = downstream->attach_downstream_connection(std::move(dconn)); + if (rv == 0) { + break; } - - downstream->set_request_state(DownstreamState::CONNECT_FAIL); - - downstream_queue_.mark_failure(downstream); - - return; } #ifdef HAVE_MRUBY @@ -2076,14 +2071,16 @@ int Http2Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) { // downstream connection is clean; we can retry with new // downstream connection. - dconn = handler_->get_downstream_connection(rv, downstream); - if (!dconn) { - goto fail; - } + for (;;) { + auto dconn = handler_->get_downstream_connection(rv, downstream); + if (!dconn) { + goto fail; + } - rv = downstream->attach_downstream_connection(std::move(dconn)); - if (rv != 0) { - goto fail; + rv = downstream->attach_downstream_connection(std::move(dconn)); + if (rv == 0) { + break; + } } rv = downstream->push_request_headers(); diff --git a/src/shrpx_http_downstream_connection.cc b/src/shrpx_http_downstream_connection.cc index 1fea9b14..63c45bb1 100644 --- a/src/shrpx_http_downstream_connection.cc +++ b/src/shrpx_http_downstream_connection.cc @@ -78,6 +78,8 @@ void retry_downstream_connection(Downstream *downstream, auto upstream = downstream->get_upstream(); auto handler = upstream->get_client_handler(); + assert(!downstream->get_request_header_sent()); + downstream->add_retry(); if (downstream->no_more_retry()) { @@ -86,16 +88,20 @@ void retry_downstream_connection(Downstream *downstream, } downstream->pop_downstream_connection(); + auto buf = downstream->get_request_buf(); + buf->reset(); int rv; - // We have to use h1 backend for retry if we have already written h1 - // request in request buffer. - auto ndconn = handler->get_downstream_connection( - rv, downstream, - downstream->get_request_header_sent() ? Proto::HTTP1 : Proto::NONE); - if (ndconn) { - if (downstream->attach_downstream_connection(std::move(ndconn)) == 0 && - downstream->push_request_headers() == 0) { + + for (;;) { + auto ndconn = handler->get_downstream_connection(rv, downstream); + if (!ndconn) { + break; + } + if (downstream->attach_downstream_connection(std::move(ndconn)) != 0) { + continue; + } + if (downstream->push_request_headers() == 0) { return; } } @@ -187,7 +193,7 @@ void connectcb(struct ev_loop *loop, ev_io *w, int revents) { } // namespace HttpDownstreamConnection::HttpDownstreamConnection( - const std::shared_ptr &group, size_t initial_addr_idx, + const std::shared_ptr &group, DownstreamAddr *addr, struct ev_loop *loop, Worker *worker) : conn_(loop, -1, nullptr, worker->get_mcpool(), group->shared_addr->timeout.write, group->shared_addr->timeout.read, @@ -200,13 +206,13 @@ HttpDownstreamConnection::HttpDownstreamConnection( worker_(worker), ssl_ctx_(worker->get_cl_ssl_ctx()), group_(group), - addr_(nullptr), + addr_(addr), raddr_(nullptr), ioctrl_(&conn_.rlimit), response_htp_{0}, - initial_addr_idx_(initial_addr_idx), - reuse_first_write_done_(true), - reusable_(true) {} + first_write_done_(false), + reusable_(true), + request_header_written_(false) {} HttpDownstreamConnection::~HttpDownstreamConnection() { if (LOG_ENABLED(INFO)) { @@ -252,207 +258,145 @@ int HttpDownstreamConnection::initiate_connection() { auto &downstreamconf = *worker_->get_downstream_config(); if (conn_.fd == -1) { - auto &shared_addr = group_->shared_addr; - auto &addrs = shared_addr->addrs; + auto check_dns_result = dns_query_.get() != nullptr; - // If session affinity is enabled, we always start with address at - // initial_addr_idx_. - size_t temp_idx = initial_addr_idx_; - - auto &next_downstream = shared_addr->affinity.type == SessionAffinity::NONE - ? shared_addr->next - : temp_idx; - auto end = next_downstream; - for (;;) { - auto check_dns_result = dns_query_.get() != nullptr; - - DownstreamAddr *addr; - if (check_dns_result) { - addr = addr_; - addr_ = nullptr; - assert(addr); - assert(addr->dns); - } else { - assert(addr_ == nullptr); - if (shared_addr->affinity.type == SessionAffinity::NONE) { - addr = &addrs[next_downstream]; - if (++next_downstream >= addrs.size()) { - next_downstream = 0; - } - } else { - addr = &addrs[shared_addr->affinity_hash[next_downstream].idx]; - if (++next_downstream >= shared_addr->affinity_hash.size()) { - next_downstream = 0; - } - } - - if (addr->proto != Proto::HTTP1) { - if (end == next_downstream) { - return SHRPX_ERR_NETWORK; - } - - continue; - } - } - - auto &connect_blocker = addr->connect_blocker; - - if (connect_blocker->blocked()) { - if (LOG_ENABLED(INFO)) { - DCLOG(INFO, this) << "Backend server " << addr->host << ":" - << addr->port << " was not available temporarily"; - } - - if (check_dns_result) { - dns_query_.reset(); - } else if (end == next_downstream) { - return SHRPX_ERR_NETWORK; - } - - continue; - } - - Address *raddr; - - if (addr->dns) { - if (!check_dns_result) { - auto dns_query = std::make_unique( - addr->host, - [this](DNSResolverStatus status, const Address *result) { - int rv; - - if (status == DNSResolverStatus::OK) { - *this->resolved_addr_ = *result; - } - - rv = this->initiate_connection(); - if (rv != 0) { - // This callback destroys |this|. - auto downstream = this->downstream_; - backend_retry(downstream); - } - }); - - auto dns_tracker = worker_->get_dns_tracker(); - - if (!resolved_addr_) { - resolved_addr_ = std::make_unique
(); - } - switch (dns_tracker->resolve(resolved_addr_.get(), dns_query.get())) { - case DNSResolverStatus::ERROR: - downstream_failure(addr, nullptr); - if (end == next_downstream) { - return SHRPX_ERR_NETWORK; - } - continue; - case DNSResolverStatus::RUNNING: - dns_query_ = std::move(dns_query); - // Remember current addr - addr_ = addr; - return 0; - case DNSResolverStatus::OK: - break; - default: - assert(0); - } - } else { - switch (dns_query_->status) { - case DNSResolverStatus::ERROR: - dns_query_.reset(); - downstream_failure(addr, nullptr); - continue; - case DNSResolverStatus::OK: - dns_query_.reset(); - break; - default: - assert(0); - } - } - - raddr = resolved_addr_.get(); - util::set_port(*resolved_addr_, addr->port); - } else { - raddr = &addr->addr; - } - - conn_.fd = util::create_nonblock_socket(raddr->su.storage.ss_family); - - if (conn_.fd == -1) { - auto error = errno; - DCLOG(WARN, this) << "socket() failed; addr=" - << util::to_numeric_addr(raddr) - << ", errno=" << error; - - worker_blocker->on_failure(); - - return SHRPX_ERR_NETWORK; - } - - worker_blocker->on_success(); - - rv = connect(conn_.fd, &raddr->su.sa, raddr->len); - if (rv != 0 && errno != EINPROGRESS) { - auto error = errno; - DCLOG(WARN, this) << "connect() failed; addr=" - << util::to_numeric_addr(raddr) - << ", errno=" << error; - - downstream_failure(addr, raddr); - - close(conn_.fd); - conn_.fd = -1; - - if (!check_dns_result && end == next_downstream) { - return SHRPX_ERR_NETWORK; - } - - // Try again with the next downstream server - continue; - } - - if (LOG_ENABLED(INFO)) { - DCLOG(INFO, this) << "Connecting to downstream server"; - } - - addr_ = addr; - raddr_ = raddr; - - if (addr_->tls) { - assert(ssl_ctx_); - - auto ssl = tls::create_ssl(ssl_ctx_); - if (!ssl) { - return -1; - } - - tls::setup_downstream_http1_alpn(ssl); - - conn_.set_ssl(ssl); - conn_.tls.client_session_cache = &addr_->tls_session_cache; - - auto sni_name = - addr_->sni.empty() ? StringRef{addr_->host} : StringRef{addr_->sni}; - if (!util::numeric_host(sni_name.c_str())) { - SSL_set_tlsext_host_name(conn_.tls.ssl, sni_name.c_str()); - } - - auto session = tls::reuse_tls_session(addr_->tls_session_cache); - if (session) { - SSL_set_session(conn_.tls.ssl, session); - SSL_SESSION_free(session); - } - - conn_.prepare_client_handshake(); - } - - ev_io_set(&conn_.wev, conn_.fd, EV_WRITE); - ev_io_set(&conn_.rev, conn_.fd, EV_READ); - - conn_.wlimit.startw(); - - break; + if (check_dns_result) { + assert(addr_->dns); } + auto &connect_blocker = addr_->connect_blocker; + + if (connect_blocker->blocked()) { + if (LOG_ENABLED(INFO)) { + DCLOG(INFO, this) << "Backend server " << addr_->host << ":" + << addr_->port << " was not available temporarily"; + } + + return SHRPX_ERR_NETWORK; + } + + Address *raddr; + + if (addr_->dns) { + if (!check_dns_result) { + auto dns_query = std::make_unique( + addr_->host, + [this](DNSResolverStatus status, const Address *result) { + int rv; + + if (status == DNSResolverStatus::OK) { + *this->resolved_addr_ = *result; + } + + rv = this->initiate_connection(); + if (rv != 0) { + // This callback destroys |this|. + auto downstream = this->downstream_; + backend_retry(downstream); + } + }); + + auto dns_tracker = worker_->get_dns_tracker(); + + if (!resolved_addr_) { + resolved_addr_ = std::make_unique
(); + } + switch (dns_tracker->resolve(resolved_addr_.get(), dns_query.get())) { + case DNSResolverStatus::ERROR: + downstream_failure(addr_, nullptr); + return SHRPX_ERR_NETWORK; + case DNSResolverStatus::RUNNING: + dns_query_ = std::move(dns_query); + return 0; + case DNSResolverStatus::OK: + break; + default: + assert(0); + } + } else { + switch (dns_query_->status) { + case DNSResolverStatus::ERROR: + dns_query_.reset(); + downstream_failure(addr_, nullptr); + return SHRPX_ERR_NETWORK; + case DNSResolverStatus::OK: + dns_query_.reset(); + break; + default: + assert(0); + } + } + + raddr = resolved_addr_.get(); + util::set_port(*resolved_addr_, addr_->port); + } else { + raddr = &addr_->addr; + } + + conn_.fd = util::create_nonblock_socket(raddr->su.storage.ss_family); + + if (conn_.fd == -1) { + auto error = errno; + DCLOG(WARN, this) << "socket() failed; addr=" + << util::to_numeric_addr(raddr) << ", errno=" << error; + + worker_blocker->on_failure(); + + return SHRPX_ERR_NETWORK; + } + + worker_blocker->on_success(); + + rv = connect(conn_.fd, &raddr->su.sa, raddr->len); + if (rv != 0 && errno != EINPROGRESS) { + auto error = errno; + DCLOG(WARN, this) << "connect() failed; addr=" + << util::to_numeric_addr(raddr) << ", errno=" << error; + + downstream_failure(addr_, raddr); + + return SHRPX_ERR_NETWORK; + } + + if (LOG_ENABLED(INFO)) { + DCLOG(INFO, this) << "Connecting to downstream server"; + } + + raddr_ = raddr; + + if (addr_->tls) { + assert(ssl_ctx_); + + auto ssl = tls::create_ssl(ssl_ctx_); + if (!ssl) { + return -1; + } + + tls::setup_downstream_http1_alpn(ssl); + + conn_.set_ssl(ssl); + conn_.tls.client_session_cache = &addr_->tls_session_cache; + + auto sni_name = + addr_->sni.empty() ? StringRef{addr_->host} : StringRef{addr_->sni}; + if (!util::numeric_host(sni_name.c_str())) { + SSL_set_tlsext_host_name(conn_.tls.ssl, sni_name.c_str()); + } + + auto session = tls::reuse_tls_session(addr_->tls_session_cache); + if (session) { + SSL_set_session(conn_.tls.ssl, session); + SSL_SESSION_free(session); + } + + conn_.prepare_client_handshake(); + } + + ev_io_set(&conn_.wev, conn_.fd, EV_WRITE); + ev_io_set(&conn_.rev, conn_.fd, EV_READ); + + conn_.wlimit.startw(); + conn_.wt.repeat = downstreamconf.timeout.connect; ev_timer_again(conn_.loop, &conn_.wt); } else { @@ -467,8 +411,9 @@ int HttpDownstreamConnection::initiate_connection() { ev_set_cb(&conn_.rev, readcb); - on_write_ = &HttpDownstreamConnection::write_reuse_first; - reuse_first_write_done_ = false; + on_write_ = &HttpDownstreamConnection::write_first; + first_write_done_ = false; + request_header_written_ = false; } http_parser_init(&response_htp_, HTTP_RESPONSE); @@ -478,7 +423,7 @@ int HttpDownstreamConnection::initiate_connection() { } int HttpDownstreamConnection::push_request_headers() { - if (downstream_->get_request_header_sent()) { + if (request_header_written_) { signal_write(); return 0; } @@ -493,9 +438,7 @@ int HttpDownstreamConnection::push_request_headers() { auto config = get_config(); auto &httpconf = config->http; - // Set request_sent to true because we write request into buffer - // here. - downstream_->set_request_header_sent(true); + request_header_written_ = true; // For HTTP/1.0 request, there is no authority in request. In that // case, we use backend server's host nonetheless. @@ -731,7 +674,7 @@ int HttpDownstreamConnection::push_request_headers() { signal_write(); } - return process_blocked_request_buf(); + return 0; } int HttpDownstreamConnection::process_blocked_request_buf() { @@ -746,7 +689,7 @@ int HttpDownstreamConnection::process_blocked_request_buf() { dest->append("\r\n"); } - src->remove(*dest); + src->copy(*dest); if (chunked) { dest->append("\r\n"); @@ -1175,9 +1118,11 @@ constexpr http_parser_settings htp_hooks = { }; } // namespace -int HttpDownstreamConnection::write_reuse_first() { +int HttpDownstreamConnection::write_first() { int rv; + process_blocked_request_buf(); + if (conn_.tls.ssl) { rv = write_tls(); } else { @@ -1194,7 +1139,11 @@ int HttpDownstreamConnection::write_reuse_first() { on_write_ = &HttpDownstreamConnection::write_clear; } - reuse_first_write_done_ = true; + first_write_done_ = true; + downstream_->set_request_header_sent(true); + + auto buf = downstream_->get_blocked_request_buf(); + buf->reset(); return 0; } @@ -1244,7 +1193,7 @@ int HttpDownstreamConnection::write_clear() { } if (nwrite < 0) { - if (!reuse_first_write_done_) { + if (!first_write_done_) { return nwrite; } // We may have pending data in receive buffer which may contain @@ -1309,7 +1258,7 @@ int HttpDownstreamConnection::tls_handshake() { ev_set_cb(&conn_.wt, timeoutcb); on_read_ = &HttpDownstreamConnection::read_tls; - on_write_ = &HttpDownstreamConnection::write_tls; + on_write_ = &HttpDownstreamConnection::write_first; // TODO Check negotiated ALPN @@ -1368,7 +1317,7 @@ int HttpDownstreamConnection::write_tls() { } if (nwrite < 0) { - if (!reuse_first_write_done_) { + if (!first_write_done_) { return nwrite; } // We may have pending data in receive buffer which may contain @@ -1507,7 +1456,7 @@ int HttpDownstreamConnection::connected() { ev_set_cb(&conn_.wt, timeoutcb); on_read_ = &HttpDownstreamConnection::read_clear; - on_write_ = &HttpDownstreamConnection::write_clear; + on_write_ = &HttpDownstreamConnection::write_first; return 0; } diff --git a/src/shrpx_http_downstream_connection.h b/src/shrpx_http_downstream_connection.h index 554e9b94..8b33ddd1 100644 --- a/src/shrpx_http_downstream_connection.h +++ b/src/shrpx_http_downstream_connection.h @@ -44,7 +44,7 @@ struct DNSQuery; class HttpDownstreamConnection : public DownstreamConnection { public: HttpDownstreamConnection(const std::shared_ptr &group, - size_t initial_addr_idx, struct ev_loop *loop, + DownstreamAddr *addr, struct ev_loop *loop, Worker *worker); virtual ~HttpDownstreamConnection(); virtual int attach_downstream(Downstream *downstream); @@ -71,7 +71,7 @@ public: int initiate_connection(); - int write_reuse_first(); + int write_first(); int read_clear(); int write_clear(); int read_tls(); @@ -110,14 +110,12 @@ private: std::unique_ptr dns_query_; IOControl ioctrl_; http_parser response_htp_; - // Index to backend address. If client affinity is enabled, it is - // the index to affinity_hash. Otherwise, it is 0, and not used. - size_t initial_addr_idx_; - // true if first write of reused connection succeeded. For - // convenience, this is initialized as true. - bool reuse_first_write_done_; + // true if first write succeeded. + bool first_write_done_; // true if this object can be reused bool reusable_; + // true if request header is written to request buffer. + bool request_header_written_; }; } // namespace shrpx diff --git a/src/shrpx_https_upstream.cc b/src/shrpx_https_upstream.cc index a738707e..abc91619 100644 --- a/src/shrpx_https_upstream.cc +++ b/src/shrpx_https_upstream.cc @@ -426,24 +426,26 @@ int htp_hdrs_completecb(http_parser *htp) { return 0; } - auto dconn = handler->get_downstream_connection(rv, downstream); + DownstreamConnection *dconn_ptr; - if (!dconn) { - if (rv == SHRPX_ERR_TLS_REQUIRED) { - upstream->redirect_to_https(downstream); + for (;;) { + auto dconn = handler->get_downstream_connection(rv, downstream); + + if (!dconn) { + if (rv == SHRPX_ERR_TLS_REQUIRED) { + upstream->redirect_to_https(downstream); + } + downstream->set_request_state(DownstreamState::CONNECT_FAIL); + + return -1; } - downstream->set_request_state(DownstreamState::CONNECT_FAIL); - - return -1; - } #ifdef HAVE_MRUBY - auto dconn_ptr = dconn.get(); + dconn_ptr = dconn.get(); #endif // HAVE_MRUBY - if (downstream->attach_downstream_connection(std::move(dconn)) != 0) { - downstream->set_request_state(DownstreamState::CONNECT_FAIL); - - return -1; + if (downstream->attach_downstream_connection(std::move(dconn)) == 0) { + break; + } } #ifdef HAVE_MRUBY @@ -1427,14 +1429,16 @@ int HttpsUpstream::on_downstream_reset(Downstream *downstream, bool no_retry) { goto fail; } - dconn = handler_->get_downstream_connection(rv, downstream_.get()); - if (!dconn) { - goto fail; - } + for (;;) { + auto dconn = handler_->get_downstream_connection(rv, downstream_.get()); + if (!dconn) { + goto fail; + } - rv = downstream_->attach_downstream_connection(std::move(dconn)); - if (rv != 0) { - goto fail; + rv = downstream_->attach_downstream_connection(std::move(dconn)); + if (rv == 0) { + break; + } } rv = downstream_->push_request_headers(); diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 83b6fef2..1724bcfb 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -75,8 +75,9 @@ DownstreamAddrGroup::~DownstreamAddrGroup() {} // DownstreamKey is used to index SharedDownstreamAddr in order to // find the same configuration. using DownstreamKey = - std::tuple>, + std::tuple>, bool, SessionAffinity, StringRef, StringRef, SessionAffinityCookieSecure, int64_t, int64_t>; @@ -91,14 +92,17 @@ DownstreamKey create_downstream_key( for (auto &a : shared_addr->addrs) { std::get<0>(*p) = a.host; std::get<1>(*p) = a.sni; - std::get<2>(*p) = a.fall; - std::get<3>(*p) = a.rise; - std::get<4>(*p) = a.proto; - std::get<5>(*p) = a.port; - std::get<6>(*p) = a.host_unix; - std::get<7>(*p) = a.tls; - std::get<8>(*p) = a.dns; - std::get<9>(*p) = a.upgrade_scheme; + std::get<2>(*p) = a.group; + std::get<3>(*p) = a.fall; + std::get<4>(*p) = a.rise; + std::get<5>(*p) = a.proto; + std::get<6>(*p) = a.port; + std::get<7>(*p) = a.weight; + std::get<8>(*p) = a.group_weight; + std::get<9>(*p) = a.host_unix; + std::get<10>(*p) = a.tls; + std::get<11>(*p) = a.dns; + std::get<12>(*p) = a.upgrade_scheme; ++p; } std::sort(std::begin(addrs), std::end(addrs)); @@ -134,7 +138,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, conn_handler_(conn_handler), ticket_keys_(ticket_keys), connect_blocker_( - std::make_unique(randgen_, loop_, []() {}, []() {})), + std::make_unique(randgen_, loop_, nullptr, nullptr)), graceful_shutdown_(false) { ev_async_init(&w_, eventcb); w_.data = this; @@ -158,6 +162,39 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, replace_downstream_config(std::move(downstreamconf)); } +namespace { +void ensure_enqueue_addr(PriorityQueue &wgpq, + WeightGroup *wg, DownstreamAddr *addr) { + uint32_t cycle; + if (!wg->pq.empty()) { + auto key = wg->pq.key_top(); + cycle = key.first; + } else { + cycle = 0; + } + + addr->cycle = cycle; + addr->pending_penalty = 0; + wg->pq.emplace(DownstreamAddrKey{addr->cycle, addr->seq}, addr); + addr->queued = true; + + if (!wg->queued) { + if (!wgpq.empty()) { + auto key = wgpq.key_top(); + cycle = key.first; + } else { + cycle = 0; + } + + wg->cycle = cycle; + wg->pending_penalty = 0; + wgpq.emplace(DownstreamAddrKey{wg->cycle, wg->seq}, wg); + wg->queued = true; + } +} +} // namespace + void Worker::replace_downstream_config( std::shared_ptr downstreamconf) { for (auto &g : downstream_addr_groups_) { @@ -222,9 +259,6 @@ void Worker::replace_downstream_config( shared_addr->timeout.read = src.timeout.read; shared_addr->timeout.write = src.timeout.write; - size_t num_http1 = 0; - size_t num_http2 = 0; - for (size_t j = 0; j < src.addrs.size(); ++j) { auto &src_addr = src.addrs[j]; auto &dst_addr = shared_addr->addrs[j]; @@ -235,6 +269,9 @@ void Worker::replace_downstream_config( make_string_ref(shared_addr->balloc, src_addr.hostport); dst_addr.port = src_addr.port; dst_addr.host_unix = src_addr.host_unix; + dst_addr.weight = src_addr.weight; + dst_addr.group = make_string_ref(shared_addr->balloc, src_addr.group); + dst_addr.group_weight = src_addr.group_weight; dst_addr.proto = src_addr.proto; dst_addr.tls = src_addr.tls; dst_addr.sni = make_string_ref(shared_addr->balloc, src_addr.sni); @@ -246,41 +283,17 @@ void Worker::replace_downstream_config( auto shared_addr_ptr = shared_addr.get(); dst_addr.connect_blocker = std::make_unique( - randgen_, loop_, - [shared_addr_ptr, &dst_addr]() { - switch (dst_addr.proto) { - case Proto::HTTP1: - --shared_addr_ptr->http1_pri.weight; - break; - case Proto::HTTP2: - --shared_addr_ptr->http2_pri.weight; - break; - default: - assert(0); - } - }, - [shared_addr_ptr, &dst_addr]() { - switch (dst_addr.proto) { - case Proto::HTTP1: - ++shared_addr_ptr->http1_pri.weight; - break; - case Proto::HTTP2: - ++shared_addr_ptr->http2_pri.weight; - break; - default: - assert(0); + randgen_, loop_, nullptr, [shared_addr_ptr, &dst_addr]() { + if (!dst_addr.queued) { + if (!dst_addr.wg) { + return; + } + ensure_enqueue_addr(shared_addr_ptr->pq, dst_addr.wg, &dst_addr); } }); dst_addr.live_check = std::make_unique( loop_, cl_ssl_ctx_, this, &dst_addr, randgen_); - - if (dst_addr.proto == Proto::HTTP2) { - ++num_http2; - } else { - assert(dst_addr.proto == Proto::HTTP1); - ++num_http1; - } } // share the connection if patterns have the same set of backend @@ -290,19 +303,47 @@ void Worker::replace_downstream_config( auto it = addr_groups_indexer.find(dkey); if (it == std::end(addr_groups_indexer)) { - if (LOG_ENABLED(INFO)) { - LOG(INFO) << "number of http/1.1 backend: " << num_http1 - << ", number of h2 backend: " << num_http2; - } - - shared_addr->http1_pri.weight = num_http1; - shared_addr->http2_pri.weight = num_http2; - std::shuffle(std::begin(shared_addr->addrs), std::end(shared_addr->addrs), randgen_); + size_t seq = 0; for (auto &addr : shared_addr->addrs) { addr.dconn_pool = std::make_unique(); + addr.seq = seq++; + } + + if (shared_addr->affinity.type == SessionAffinity::NONE) { + std::map wgs; + size_t num_wgs = 0; + for (auto &addr : shared_addr->addrs) { + if (wgs.find(addr.group) == std::end(wgs)) { + ++num_wgs; + wgs.emplace(addr.group, nullptr); + } + } + + shared_addr->wgs = std::vector(num_wgs); + + for (auto &addr : shared_addr->addrs) { + auto &wg = wgs[addr.group]; + if (wg == nullptr) { + wg = &shared_addr->wgs[--num_wgs]; + wg->seq = num_wgs; + } + + wg->weight = addr.group_weight; + wg->pq.emplace(DownstreamAddrKey{0, addr.seq}, &addr); + addr.queued = true; + addr.wg = wg; + } + + assert(num_wgs == 0); + + for (auto &kv : wgs) { + shared_addr->pq.emplace(DownstreamAddrKey{0, kv.second->seq}, + kv.second); + kv.second->queued = true; + } } dst->shared_addr = shared_addr; diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index 14e2e2de..ea2c71e4 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -50,6 +50,7 @@ #include "shrpx_connect_blocker.h" #include "shrpx_dns_tracker.h" #include "allocator.h" +#include "priority_queue.h" using namespace nghttp2; @@ -73,6 +74,8 @@ namespace tls { class CertLookupTree; } // namespace tls +struct WeightGroup; + struct DownstreamAddr { Address addr; // backend address. If |host_unix| is true, this is UNIX domain @@ -96,21 +99,33 @@ struct DownstreamAddr { size_t rise; // Client side TLS session cache tls::TLSSessionCache tls_session_cache; - // Http2Session object created for this address. This list chains - // all Http2Session objects that is not in group scope - // http2_avail_freelist, and is not reached in maximum concurrency. - // - // If session affinity is enabled, http2_avail_freelist is not used, - // and this list is solely used. + // List of Http2Session which is not fully utilized (i.e., the + // server advertised maximum concurrency is not reached). We will + // coalesce as much stream as possible in one Http2Session to fully + // utilize TCP connection. DList http2_extra_freelist; - // true if Http2Session for this address is in group scope - // SharedDownstreamAddr.http2_avail_freelist - bool in_avail; + WeightGroup *wg; // total number of streams created in HTTP/2 connections for this // address. size_t num_dconn; + // the sequence number of this address to randomize the order access + // threads. + size_t seq; // Application protocol used in this backend Proto proto; + // cycle is used to prioritize this address. Lower value takes + // higher priority. + uint32_t cycle; + // penalty which is applied to the next cycle calculation. + uint32_t pending_penalty; + // Weight of this address inside a weight group. Its range is [1, + // 256], inclusive. + uint32_t weight; + // name of group which this address belongs to. + StringRef group; + // Weight of the weight group which this address belongs to. Its + // range is [1, 256], inclusive. + uint32_t group_weight; // true if TLS is used in this backend bool tls; // true if dynamic DNS is enabled @@ -119,28 +134,39 @@ struct DownstreamAddr { // variant (e.g., "https") when forwarding request to a backend // connected by TLS connection. bool upgrade_scheme; + // true if this address is queued. + bool queued; }; -// Simplified weighted fair queuing. Actually we don't use queue here -// since we have just 2 items. This is the same algorithm used in -// stream priority, but ignores remainder. -struct WeightedPri { - // current cycle of this item. The lesser cycle has higher - // priority. This is unsigned 32 bit integer, so it may overflow. - // But with the same theory described in stream priority, it is no - // problem. - uint32_t cycle; - // weight, larger weight means more frequent use. +constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256; + +using DownstreamAddrKey = std::pair; + +struct DownstreamAddrKeyLess { + bool operator()(const DownstreamAddrKey &lhs, + const DownstreamAddrKey &rhs) const { + auto d = rhs.first - lhs.first; + if (d == 0) { + return lhs.second < rhs.second; + } + return d <= MAX_DOWNSTREAM_ADDR_WEIGHT; + } +}; + +struct WeightGroup { + PriorityQueue pq; + size_t seq; uint32_t weight; + uint32_t cycle; + uint32_t pending_penalty; + // true if this object is queued. + bool queued; }; struct SharedDownstreamAddr { SharedDownstreamAddr() : balloc(1024, 1024), affinity{SessionAffinity::NONE}, - next{0}, - http1_pri{}, - http2_pri{}, redirect_if_not_tls{false} {} SharedDownstreamAddr(const SharedDownstreamAddr &) = delete; @@ -150,31 +176,13 @@ struct SharedDownstreamAddr { BlockAllocator balloc; std::vector addrs; + std::vector wgs; + PriorityQueue pq; // Bunch of session affinity hash. Only used if affinity == // SessionAffinity::IP. std::vector affinity_hash; - // List of Http2Session which is not fully utilized (i.e., the - // server advertised maximum concurrency is not reached). We will - // coalesce as much stream as possible in one Http2Session to fully - // utilize TCP connection. - // - // If session affinity is enabled, this list is not used. Per - // address http2_extra_freelist is used instead. - // - // TODO Verify that this approach performs better in performance - // wise. - DList http2_avail_freelist; // Configuration for session affinity AffinityConfig affinity; - // Next http/1.1 downstream address index in addrs. - size_t next; - // http1_pri and http2_pri are used to which protocols are used - // between HTTP/1.1 or HTTP/2 if they both are available in - // backends. They are choosed proportional to the number available - // backend. Usually, if http1_pri.cycle < http2_pri.cycle, choose - // HTTP/1.1. Otherwise, choose HTTP/2. - WeightedPri http1_pri; - WeightedPri http2_pri; // Session affinity // true if this group requires that client connection must be TLS, // and the request must be redirected to https URI. diff --git a/src/template.h b/src/template.h index 4a240b3e..b2f850d6 100644 --- a/src/template.h +++ b/src/template.h @@ -530,4 +530,19 @@ inline int run_app(std::function app, int argc, } // namespace nghttp2 +namespace std { +template <> struct hash { + std::size_t operator()(const nghttp2::StringRef &s) const noexcept { + // 32 bit FNV-1a: + // https://tools.ietf.org/html/draft-eastlake-fnv-16#section-6.1.1 + uint32_t h = 2166136261u; + for (auto c : s) { + h ^= static_cast(c); + h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24); + } + return h; + } +}; +} // namespace std + #endif // TEMPLATE_H