Merge branch 'nghttpx-backend-weight'

This commit is contained in:
Tatsuhiro Tsujikawa 2019-01-21 22:59:39 +09:00
commit 5d6beed590
22 changed files with 951 additions and 758 deletions

View File

@ -163,6 +163,7 @@ if(ENABLE_APP)
memchunk_test.cc
template_test.cc
base64_test.cc
priority_queue_test.cc
)
add_executable(nghttpx-unittest EXCLUDE_FROM_ALL
${NGHTTPX_UNITTEST_SOURCES}

View File

@ -140,7 +140,8 @@ NGHTTPX_SRCS = \
shrpx_dual_dns_resolver.cc shrpx_dual_dns_resolver.h \
shrpx_dns_tracker.cc shrpx_dns_tracker.h \
buffer.h memchunk.h template.h allocator.h \
xsi_strerror.c xsi_strerror.h
xsi_strerror.c xsi_strerror.h \
priority_queue.h
if HAVE_MRUBY
NGHTTPX_SRCS += \
@ -186,7 +187,8 @@ nghttpx_unittest_SOURCES = shrpx-unittest.cc \
buffer_test.cc buffer_test.h \
memchunk_test.cc memchunk_test.h \
template_test.cc template_test.h \
base64_test.cc base64_test.h
base64_test.cc base64_test.h \
priority_queue_test.cc priority_queue_test.h
nghttpx_unittest_CPPFLAGS = ${AM_CPPFLAGS} \
-DNGHTTP2_SRC_DIR=\"$(top_srcdir)/src\"
nghttpx_unittest_LDADD = libnghttpx.a ${LDADD} @CUNIT_LIBS@ @TESTLDADD@

View File

@ -187,6 +187,14 @@ template <typename Memchunk> struct Memchunks {
size_t append(const ImmutableString &s) {
return append(s.c_str(), s.size());
}
size_t copy(Memchunks &dest) {
auto m = head;
while (m) {
dest.append(m->pos, m->len());
m = m->next;
}
return len;
}
size_t remove(void *dest, size_t count) {
if (!tail || count == 0) {
return 0;

145
src/priority_queue.h Normal file
View File

@ -0,0 +1,145 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2019 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef PRIORITY_QUEUE_H
#define PRIORITY_QUEUE_H
#include "nghttp2_config.h"
#include <cassert>
#include <functional>
#include <utility>
#include <vector>
namespace nghttp2 {
template <typename KeyType, typename ValueType, typename Compare = std::less<>>
class PriorityQueue {
public:
const ValueType &top() const;
const KeyType &key_top() const;
void push(const KeyType &key, const ValueType &value);
void push(KeyType &&key, ValueType &&value);
template <typename... Args> void emplace(Args &&... args);
void pop();
bool empty() const;
size_t size() const;
private:
void bubble_up(size_t idx);
void bubble_down(size_t idx);
std::vector<std::pair<KeyType, ValueType>> c_;
Compare comp_;
};
template <typename KeyType, typename ValueType, typename Compare>
const ValueType &PriorityQueue<KeyType, ValueType, Compare>::top() const {
assert(!c_.empty());
return c_[0].second;
}
template <typename KeyType, typename ValueType, typename Compare>
const KeyType &PriorityQueue<KeyType, ValueType, Compare>::key_top() const {
assert(!c_.empty());
return c_[0].first;
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::push(const KeyType &key,
const ValueType &value) {
c_.push_back(std::pair<KeyType, ValueType>{key, value});
bubble_up(c_.size() - 1);
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::push(KeyType &&key,
ValueType &&value) {
c_.push_back(std::pair<KeyType, ValueType>{std::move(key), std::move(value)});
bubble_up(c_.size() - 1);
}
template <typename KeyType, typename ValueType, typename Compare>
template <typename... Args>
void PriorityQueue<KeyType, ValueType, Compare>::emplace(Args &&... args) {
c_.emplace_back(std::forward<Args>(args)...);
bubble_up(c_.size() - 1);
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::pop() {
assert(!c_.empty());
c_[0] = std::move(c_.back());
c_.resize(c_.size() - 1);
bubble_down(0);
}
template <typename KeyType, typename ValueType, typename Compare>
bool PriorityQueue<KeyType, ValueType, Compare>::empty() const {
return c_.empty();
}
template <typename KeyType, typename ValueType, typename Compare>
size_t PriorityQueue<KeyType, ValueType, Compare>::size() const {
return c_.size();
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::bubble_up(size_t idx) {
using std::swap;
while (idx != 0) {
auto parent = (idx - 1) / 2;
if (!comp_(c_[idx].first, c_[parent].first)) {
return;
}
swap(c_[idx], c_[parent]);
idx = parent;
}
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::bubble_down(size_t idx) {
using std::swap;
for (;;) {
auto j = idx * 2 + 1;
auto minidx = idx;
for (auto i = 0; i < 2; ++i, ++j) {
if (j >= c_.size()) {
break;
}
if (comp_(c_[j].first, c_[minidx].first)) {
minidx = j;
}
}
if (minidx == idx) {
return;
}
swap(c_[idx], c_[minidx]);
idx = minidx;
}
}
} // namespace nghttp2
#endif // PRIORITY_QUEUE_H

View File

@ -0,0 +1,93 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2019 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "priority_queue_test.h"
#include <string>
#include <CUnit/CUnit.h>
#include "priority_queue.h"
namespace nghttp2 {
void test_priority_queue_push(void) {
PriorityQueue<std::string, int64_t> pq;
CU_ASSERT(pq.empty());
CU_ASSERT(0 == pq.size());
pq.push("foo", 1);
CU_ASSERT(!pq.empty());
CU_ASSERT(1 == pq.size());
auto top = pq.top();
CU_ASSERT(1 == top);
pq.emplace("bar", 2);
top = pq.top();
CU_ASSERT(2 == top);
pq.push("baz", 3);
top = pq.top();
CU_ASSERT(2 == top);
pq.push("C", 4);
CU_ASSERT(4 == pq.size());
top = pq.top();
CU_ASSERT(4 == top);
pq.pop();
CU_ASSERT(3 == pq.size());
top = pq.top();
CU_ASSERT(2 == top);
pq.pop();
top = pq.top();
CU_ASSERT(3 == top);
pq.pop();
top = pq.top();
CU_ASSERT(1 == top);
pq.pop();
CU_ASSERT(pq.empty());
CU_ASSERT(0 == pq.size());
}
} // namespace nghttp2

38
src/priority_queue_test.h Normal file
View File

@ -0,0 +1,38 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2019 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef PRIORITY_QUEUE_TEST_H
#define PRIORITY_QUEUE_TEST_H
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif /* HAVE_CONFIG_H */
namespace nghttp2 {
void test_priority_queue_push(void);
} // namespace nghttp2
#endif // PRIORITY_QUEUE_TEST_H

View File

@ -45,6 +45,7 @@
#include "shrpx_config.h"
#include "tls.h"
#include "shrpx_router_test.h"
#include "priority_queue_test.h"
#include "shrpx_log.h"
static int init_suite1(void) { return 0; }
@ -217,7 +218,9 @@ int main(int argc, char *argv[]) {
!CU_add_test(pSuite, "template_string_ref",
nghttp2::test_template_string_ref) ||
!CU_add_test(pSuite, "base64_encode", nghttp2::test_base64_encode) ||
!CU_add_test(pSuite, "base64_decode", nghttp2::test_base64_decode)) {
!CU_add_test(pSuite, "base64_decode", nghttp2::test_base64_decode) ||
!CU_add_test(pSuite, "priority_queue_push",
nghttp2::test_priority_queue_push)) {
CU_cleanup_registry();
return CU_get_error();
}

View File

@ -1738,13 +1738,13 @@ Connections:
"sni=<SNI_HOST>", "fall=<N>", "rise=<N>",
"affinity=<METHOD>", "dns", "redirect-if-not-tls",
"upgrade-scheme", "mruby=<PATH>",
"read-timeout=<DURATION>", and
"write-timeout=<DURATION>". The parameter consists of
keyword, and optionally followed by "=" and value. For
example, the parameter "proto=h2" consists of the
keyword "proto" and value "h2". The parameter "tls"
consists of the keyword "tls" without value. Each
parameter is described as follows.
"read-timeout=<DURATION>", "write-timeout=<DURATION>",
"group=<GROUP>", "group-weight=<N>", and "weight=<N>".
The parameter consists of keyword, and optionally
followed by "=" and value. For example, the parameter
"proto=h2" consists of the keyword "proto" and value
"h2". The parameter "tls" consists of the keyword "tls"
without value. Each parameter is described as follows.
The backend application protocol can be specified using
optional "proto" parameter, and in the form of
@ -1850,6 +1850,31 @@ Connections:
pattern, --backend-read-timeout and
--backend-write-timeout are used.
"group=<GROUP>" parameter specifies the name of group
this backend address belongs to. By default, it belongs
to the unnamed default group. The name of group is
unique per pattern. "group-weight=<N>" parameter
specifies the weight of the group. The higher weight
gets more frequently selected by the load balancing
algorithm. <N> must be [1, 256] inclusive. The weight
8 has 4 times more weight than 2. <N> must be the same
for all addresses which share the same <GROUP>. If
"group-weight" is omitted in an address, but the other
address which belongs to the same group specifies
"group-weight", its weight is used. If no
"group-weight" is specified for all addresses, the
weight of a group becomes 1. "group" and "group-weight"
are ignored if session affinity is enabled.
"weight=<N>" parameter specifies the weight of the
backend address inside a group which this address
belongs to. The higher weight gets more frequently
selected by the load balancing algorithm. <N> must be
[1, 256] inclusive. The weight 8 has 4 times more
weight than weight 2. If this parameter is omitted,
weight becomes 1. "weight" is ignored if session
affinity is enabled.
Since ";" and ":" are used as delimiter, <PATTERN> must
not contain these characters. Since ";" has special
meaning in shell, the option value must be quoted.

View File

@ -682,7 +682,7 @@ uint32_t compute_affinity_from_ip(const StringRef &ip) {
}
} // namespace
Http2Session *ClientHandler::select_http2_session_with_affinity(
Http2Session *ClientHandler::get_http2_session(
const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr) {
auto &shared_addr = group->shared_addr;
@ -735,171 +735,6 @@ Http2Session *ClientHandler::select_http2_session_with_affinity(
return session;
}
namespace {
// Returns true if load of |lhs| is lighter than that of |rhs|.
// Currently, we assume that lesser streams means lesser load.
bool load_lighter(const DownstreamAddr *lhs, const DownstreamAddr *rhs) {
return lhs->num_dconn < rhs->num_dconn;
}
} // namespace
Http2Session *ClientHandler::select_http2_session(
const std::shared_ptr<DownstreamAddrGroup> &group) {
auto &shared_addr = group->shared_addr;
// First count the working backend addresses.
size_t min = 0;
for (const auto &addr : shared_addr->addrs) {
if (addr.proto != Proto::HTTP2 || addr.connect_blocker->blocked()) {
continue;
}
++min;
}
if (min == 0) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "No working backend address found";
}
return nullptr;
}
auto &http2_avail_freelist = shared_addr->http2_avail_freelist;
if (http2_avail_freelist.size() >= min) {
for (auto session = http2_avail_freelist.head; session;) {
auto next = session->dlnext;
session->remove_from_freelist();
// session may be in graceful shutdown period now.
if (session->max_concurrency_reached(0)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this)
<< "Maximum streams have been reached for Http2Session("
<< session << "). Skip it";
}
session = next;
continue;
}
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Use Http2Session " << session
<< " from http2_avail_freelist";
}
if (session->max_concurrency_reached(1)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
<< session << ").";
}
} else {
session->add_to_avail_freelist();
}
return session;
}
}
DownstreamAddr *selected_addr = nullptr;
for (auto &addr : shared_addr->addrs) {
if (addr.in_avail || addr.proto != Proto::HTTP2 ||
(addr.http2_extra_freelist.size() == 0 &&
addr.connect_blocker->blocked())) {
continue;
}
for (auto session = addr.http2_extra_freelist.head; session;) {
auto next = session->dlnext;
// session may be in graceful shutdown period now.
if (session->max_concurrency_reached(0)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this)
<< "Maximum streams have been reached for Http2Session("
<< session << "). Skip it";
}
session->remove_from_freelist();
session = next;
continue;
}
break;
}
if (selected_addr == nullptr || load_lighter(&addr, selected_addr)) {
selected_addr = &addr;
}
}
assert(selected_addr);
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Selected DownstreamAddr=" << selected_addr
<< ", index="
<< (selected_addr - shared_addr->addrs.data());
}
if (selected_addr->http2_extra_freelist.size()) {
auto session = selected_addr->http2_extra_freelist.head;
session->remove_from_freelist();
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Use Http2Session " << session
<< " from http2_extra_freelist";
}
if (session->max_concurrency_reached(1)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Maximum streams are reached for Http2Session("
<< session << ").";
}
} else {
session->add_to_avail_freelist();
}
return session;
}
auto session = new Http2Session(conn_.loop, worker_->get_cl_ssl_ctx(),
worker_, group, selected_addr);
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Create new Http2Session " << session;
}
session->add_to_avail_freelist();
return session;
}
namespace {
// The chosen value is small enough for uint32_t, and large enough for
// the number of backend.
constexpr uint32_t WEIGHT_MAX = 65536;
} // namespace
namespace {
bool pri_less(const WeightedPri &lhs, const WeightedPri &rhs) {
if (lhs.cycle < rhs.cycle) {
return rhs.cycle - lhs.cycle <= WEIGHT_MAX;
}
return lhs.cycle - rhs.cycle > WEIGHT_MAX;
}
} // namespace
namespace {
uint32_t next_cycle(const WeightedPri &pri) {
return pri.cycle + WEIGHT_MAX / std::min(WEIGHT_MAX, pri.weight);
}
} // namespace
uint32_t ClientHandler::get_affinity_cookie(Downstream *downstream,
const StringRef &cookie_name) {
auto h = downstream->find_affinity_cookie(cookie_name);
@ -918,9 +753,137 @@ uint32_t ClientHandler::get_affinity_cookie(Downstream *downstream,
return h;
}
namespace {
void reschedule_addr(PriorityQueue<DownstreamAddrKey, DownstreamAddr *,
DownstreamAddrKeyLess> &pq,
DownstreamAddr *addr) {
auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + addr->pending_penalty;
addr->cycle += penalty / addr->weight;
addr->pending_penalty = penalty % addr->weight;
pq.emplace(DownstreamAddrKey{addr->cycle, addr->seq}, addr);
addr->queued = true;
}
} // namespace
namespace {
void reschedule_wg(
PriorityQueue<DownstreamAddrKey, WeightGroup *, DownstreamAddrKeyLess> &pq,
WeightGroup *wg) {
auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + wg->pending_penalty;
wg->cycle += penalty / wg->weight;
wg->pending_penalty = penalty % wg->weight;
pq.emplace(DownstreamAddrKey{wg->cycle, wg->seq}, wg);
wg->queued = true;
}
} // namespace
DownstreamAddr *ClientHandler::get_downstream_addr(int &err,
DownstreamAddrGroup *group,
Downstream *downstream) {
err = 0;
switch (faddr_->alt_mode) {
case UpstreamAltMode::API:
case UpstreamAltMode::HEALTHMON:
assert(0);
default:
break;
}
auto &shared_addr = group->shared_addr;
if (shared_addr->affinity.type != SessionAffinity::NONE) {
uint32_t hash;
switch (shared_addr->affinity.type) {
case SessionAffinity::IP:
if (!affinity_hash_computed_) {
affinity_hash_ = compute_affinity_from_ip(ipaddr_);
affinity_hash_computed_ = true;
}
hash = affinity_hash_;
break;
case SessionAffinity::COOKIE:
hash = get_affinity_cookie(downstream, shared_addr->affinity.cookie.name);
break;
default:
assert(0);
}
const auto &affinity_hash = shared_addr->affinity_hash;
auto it = std::lower_bound(
std::begin(affinity_hash), std::end(affinity_hash), hash,
[](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; });
if (it == std::end(affinity_hash)) {
it = std::begin(affinity_hash);
}
auto aff_idx =
static_cast<size_t>(std::distance(std::begin(affinity_hash), it));
auto idx = (*it).idx;
auto addr = &shared_addr->addrs[idx];
if (addr->connect_blocker->blocked()) {
size_t i;
for (i = aff_idx + 1; i != aff_idx; ++i) {
if (i == shared_addr->affinity_hash.size()) {
i = 0;
}
addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx];
if (addr->connect_blocker->blocked()) {
continue;
}
break;
}
if (i == aff_idx) {
err = -1;
return nullptr;
}
aff_idx = i;
}
return addr;
}
auto &wgpq = shared_addr->pq;
for (;;) {
if (wgpq.empty()) {
CLOG(INFO, this) << "No working downstream address found";
err = -1;
return nullptr;
}
auto wg = wgpq.top();
wgpq.pop();
wg->queued = false;
for (;;) {
if (wg->pq.empty()) {
break;
}
auto addr = wg->pq.top();
wg->pq.pop();
addr->queued = false;
if (addr->connect_blocker->blocked()) {
continue;
}
reschedule_addr(wg->pq, addr);
reschedule_wg(wgpq, wg);
return addr;
}
}
}
std::unique_ptr<DownstreamConnection>
ClientHandler::get_downstream_connection(int &err, Downstream *downstream,
Proto pref_proto) {
ClientHandler::get_downstream_connection(int &err, Downstream *downstream) {
size_t group_idx;
auto &downstreamconf = *worker_->get_downstream_config();
auto &routerconf = downstreamconf.router;
@ -984,185 +947,37 @@ ClientHandler::get_downstream_connection(int &err, Downstream *downstream,
}
auto &group = groups[group_idx];
auto &shared_addr = group->shared_addr;
if (shared_addr->affinity.type != SessionAffinity::NONE) {
uint32_t hash;
switch (shared_addr->affinity.type) {
case SessionAffinity::IP:
if (!affinity_hash_computed_) {
affinity_hash_ = compute_affinity_from_ip(ipaddr_);
affinity_hash_computed_ = true;
}
hash = affinity_hash_;
break;
case SessionAffinity::COOKIE:
hash = get_affinity_cookie(downstream, shared_addr->affinity.cookie.name);
break;
default:
assert(0);
}
const auto &affinity_hash = shared_addr->affinity_hash;
auto it = std::lower_bound(
std::begin(affinity_hash), std::end(affinity_hash), hash,
[](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; });
if (it == std::end(affinity_hash)) {
it = std::begin(affinity_hash);
}
auto aff_idx =
static_cast<size_t>(std::distance(std::begin(affinity_hash), it));
auto idx = (*it).idx;
auto addr = &shared_addr->addrs[idx];
if (addr->connect_blocker->blocked()) {
size_t i;
for (i = aff_idx + 1; i != aff_idx; ++i) {
if (i == shared_addr->affinity_hash.size()) {
i = 0;
}
addr = &shared_addr->addrs[shared_addr->affinity_hash[i].idx];
if (addr->connect_blocker->blocked() ||
(pref_proto != Proto::NONE && pref_proto != addr->proto)) {
continue;
}
break;
}
if (i == aff_idx) {
err = -1;
return nullptr;
}
aff_idx = i;
}
if (addr->proto == Proto::HTTP2) {
auto http2session = select_http2_session_with_affinity(group, addr);
auto dconn = std::make_unique<Http2DownstreamConnection>(http2session);
dconn->set_client_handler(this);
return std::move(dconn);
}
auto &dconn_pool = addr->dconn_pool;
auto dconn = dconn_pool->pop_downstream_connection();
if (!dconn) {
dconn = std::make_unique<HttpDownstreamConnection>(group, aff_idx,
conn_.loop, worker_);
}
dconn->set_client_handler(this);
return dconn;
}
auto http1_weight = shared_addr->http1_pri.weight;
auto http2_weight = shared_addr->http2_pri.weight;
auto proto = Proto::NONE;
if (pref_proto == Proto::HTTP1) {
if (http1_weight > 0) {
proto = Proto::HTTP1;
}
} else if (pref_proto == Proto::HTTP2) {
if (http2_weight > 0) {
proto = Proto::HTTP2;
}
} else if (http1_weight > 0 && http2_weight > 0) {
// We only advance cycle if both weight has nonzero to keep its
// distance under WEIGHT_MAX.
if (pri_less(shared_addr->http1_pri, shared_addr->http2_pri)) {
proto = Proto::HTTP1;
shared_addr->http1_pri.cycle = next_cycle(shared_addr->http1_pri);
} else {
proto = Proto::HTTP2;
shared_addr->http2_pri.cycle = next_cycle(shared_addr->http2_pri);
}
} else if (http1_weight > 0) {
proto = Proto::HTTP1;
} else if (http2_weight > 0) {
proto = Proto::HTTP2;
}
if (proto == Proto::NONE) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "No working downstream address found";
}
err = -1;
auto addr = get_downstream_addr(err, group.get(), downstream);
if (addr == nullptr) {
return nullptr;
}
if (proto == Proto::HTTP2) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Downstream connection pool is empty."
<< " Create new one";
}
auto http2session = select_http2_session(group);
if (http2session == nullptr) {
err = -1;
return nullptr;
}
auto dconn = std::make_unique<Http2DownstreamConnection>(http2session);
dconn->set_client_handler(this);
return std::move(dconn);
}
auto end = shared_addr->next;
for (;;) {
auto addr = &shared_addr->addrs[shared_addr->next];
if (addr->proto != Proto::HTTP1) {
if (++shared_addr->next >= shared_addr->addrs.size()) {
shared_addr->next = 0;
}
assert(end != shared_addr->next);
continue;
}
// pool connection must be HTTP/1.1 connection
if (addr->proto == Proto::HTTP1) {
auto dconn = addr->dconn_pool->pop_downstream_connection();
if (dconn) {
if (++shared_addr->next >= shared_addr->addrs.size()) {
shared_addr->next = 0;
}
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Reuse downstream connection DCONN:" << dconn.get()
<< " from pool";
}
dconn->set_client_handler(this);
return dconn;
}
break;
}
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Downstream connection pool is empty."
<< " Create new one";
}
auto dconn =
std::make_unique<HttpDownstreamConnection>(group, 0, conn_.loop, worker_);
dconn = std::make_unique<HttpDownstreamConnection>(group, addr, conn_.loop,
worker_);
dconn->set_client_handler(this);
return dconn;
}
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Downstream connection pool is empty."
<< " Create new one";
}
auto http2session = get_http2_session(group, addr);
auto dconn = std::make_unique<Http2DownstreamConnection>(http2session);
dconn->set_client_handler(this);
return dconn;
}

View File

@ -99,14 +99,14 @@ public:
void pool_downstream_connection(std::unique_ptr<DownstreamConnection> dconn);
void remove_downstream_connection(DownstreamConnection *dconn);
DownstreamAddr *get_downstream_addr(int &err, DownstreamAddrGroup *group,
Downstream *downstream);
// Returns DownstreamConnection object based on request path. This
// function returns non-null DownstreamConnection, and assigns 0 to
// |err| if it succeeds, or returns nullptr, and assigns negative
// error code to |err|. If |pref_proto| is not PROTO_NONE, choose
// backend whose protocol is |pref_proto|.
// error code to |err|.
std::unique_ptr<DownstreamConnection>
get_downstream_connection(int &err, Downstream *downstream,
Proto pref_proto = Proto::NONE);
get_downstream_connection(int &err, Downstream *downstream);
MemchunkPool *get_mcpool();
SSL *get_ssl() const;
// Call this function when HTTP/2 connection header is received at
@ -150,10 +150,8 @@ public:
StringRef get_forwarded_for() const;
Http2Session *
select_http2_session(const std::shared_ptr<DownstreamAddrGroup> &group);
Http2Session *select_http2_session_with_affinity(
const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr);
get_http2_session(const std::shared_ptr<DownstreamAddrGroup> &group,
DownstreamAddr *addr);
// Returns an affinity cookie value for |downstream|. |cookie_name|
// is used to inspect cookie header field in request header fields.

View File

@ -47,6 +47,7 @@
#include <cerrno>
#include <limits>
#include <fstream>
#include <unordered_map>
#include <nghttp2/nghttp2.h>
@ -813,11 +814,14 @@ int parse_upstream_params(UpstreamParams &out, const StringRef &src_params) {
struct DownstreamParams {
StringRef sni;
StringRef mruby;
StringRef group;
AffinityConfig affinity;
ev_tstamp read_timeout;
ev_tstamp write_timeout;
size_t fall;
size_t rise;
uint32_t weight;
uint32_t group_weight;
Proto proto;
bool tls;
bool dns;
@ -960,6 +964,43 @@ int parse_downstream_params(DownstreamParams &out,
StringRef{first + str_size("write-timeout="), end}) == -1) {
return -1;
}
} else if (util::istarts_with_l(param, "weight=")) {
auto valstr = StringRef{first + str_size("weight="), end};
if (valstr.empty()) {
LOG(ERROR)
<< "backend: weight: non-negative integer [1, 256] is expected";
return -1;
}
auto n = util::parse_uint(valstr);
if (n < 1 || n > 256) {
LOG(ERROR)
<< "backend: weight: non-negative integer [1, 256] is expected";
return -1;
}
out.weight = n;
} else if (util::istarts_with_l(param, "group=")) {
auto valstr = StringRef{first + str_size("group="), end};
if (valstr.empty()) {
LOG(ERROR) << "backend: group: empty string is not allowed";
return -1;
}
out.group = valstr;
} else if (util::istarts_with_l(param, "group-weight=")) {
auto valstr = StringRef{first + str_size("group-weight="), end};
if (valstr.empty()) {
LOG(ERROR) << "backend: group-weight: non-negative integer [1, 256] is "
"expected";
return -1;
}
auto n = util::parse_uint(valstr);
if (n < 1 || n > 256) {
LOG(ERROR) << "backend: group-weight: non-negative integer [1, 256] is "
"expected";
return -1;
}
out.group_weight = n;
} else if (!param.empty()) {
LOG(ERROR) << "backend: " << param << ": unknown keyword";
return -1;
@ -996,6 +1037,7 @@ int parse_mapping(Config *config, DownstreamAddrConfig &addr,
DownstreamParams params{};
params.proto = Proto::HTTP1;
params.weight = 1;
if (parse_downstream_params(params, src_params) != 0) {
return -1;
@ -1015,6 +1057,9 @@ int parse_mapping(Config *config, DownstreamAddrConfig &addr,
addr.fall = params.fall;
addr.rise = params.rise;
addr.weight = params.weight;
addr.group = make_string_ref(downstreamconf.balloc, params.group);
addr.group_weight = params.group_weight;
addr.proto = params.proto;
addr.tls = params.tls;
addr.sni = make_string_ref(downstreamconf.balloc, params.sni);
@ -4025,7 +4070,17 @@ int configure_downstream_group(Config *config, bool http2_proxy,
auto resolve_flags = numeric_addr_only ? AI_NUMERICHOST | AI_NUMERICSERV : 0;
for (auto &g : addr_groups) {
std::unordered_map<StringRef, uint32_t> wgchk;
for (auto &addr : g.addrs) {
if (addr.group_weight) {
auto it = wgchk.find(addr.group);
if (it == std::end(wgchk)) {
wgchk.emplace(addr.group, addr.group_weight);
} else if ((*it).second != addr.group_weight) {
LOG(FATAL) << "backend: inconsistent group-weight for a single group";
return -1;
}
}
if (addr.host_unix) {
// for AF_UNIX socket, we use "localhost" as host for backend
@ -4078,6 +4133,17 @@ int configure_downstream_group(Config *config, bool http2_proxy,
}
}
for (auto &addr : g.addrs) {
if (addr.group_weight == 0) {
auto it = wgchk.find(addr.group);
if (it == std::end(wgchk)) {
addr.group_weight = 1;
} else {
addr.group_weight = (*it).second;
}
}
}
if (g.affinity.type != SessionAffinity::NONE) {
size_t idx = 0;
for (auto &addr : g.addrs) {

View File

@ -468,8 +468,15 @@ struct DownstreamAddrConfig {
StringRef hostport;
// hostname sent as SNI field
StringRef sni;
// name of group which this address belongs to.
StringRef group;
size_t fall;
size_t rise;
// weight of this address inside a weight group. Its range is [1,
// 256], inclusive.
uint32_t weight;
// weight of the weight group. Its range is [1, 256], inclusive.
uint32_t group_weight;
// Application protocol used in this group
Proto proto;
// backend port. 0 if |host_unix| is true.

View File

@ -128,8 +128,16 @@ void ConnectBlocker::online() {
bool ConnectBlocker::in_offline() const { return offline_; }
void ConnectBlocker::call_block_func() { block_func_(); }
void ConnectBlocker::call_block_func() {
if (block_func_) {
block_func_();
}
}
void ConnectBlocker::call_unblock_func() { unblock_func_(); }
void ConnectBlocker::call_unblock_func() {
if (unblock_func_) {
unblock_func_();
}
}
} // namespace shrpx

View File

@ -2318,22 +2318,6 @@ Http2Session::get_downstream_addr_group() const {
return group_;
}
void Http2Session::add_to_avail_freelist() {
if (freelist_zone_ != FreelistZone::NONE) {
return;
}
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Append to http2_avail_freelist, group="
<< group_.get() << ", freelist.size="
<< group_->shared_addr->http2_avail_freelist.size();
}
freelist_zone_ = FreelistZone::AVAIL;
group_->shared_addr->http2_avail_freelist.append(this);
addr_->in_avail = true;
}
void Http2Session::add_to_extra_freelist() {
if (freelist_zone_ != FreelistZone::NONE) {
return;
@ -2353,15 +2337,6 @@ void Http2Session::remove_from_freelist() {
switch (freelist_zone_) {
case FreelistZone::NONE:
return;
case FreelistZone::AVAIL:
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Remove from http2_avail_freelist, group=" << group_
<< ", freelist.size="
<< group_->shared_addr->http2_avail_freelist.size();
}
group_->shared_addr->http2_avail_freelist.remove(this);
addr_->in_avail = false;
break;
case FreelistZone::EXTRA:
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Remove from http2_extra_freelist, addr=" << addr_

View File

@ -61,9 +61,6 @@ struct StreamData {
enum class FreelistZone {
// Http2Session object is not linked in any freelist.
NONE,
// Http2Session object is linked in group scope
// http2_avail_freelist.
AVAIL,
// Http2Session object is linked in address scope
// http2_extra_freelist.
EXTRA,

View File

@ -456,6 +456,9 @@ void Http2Upstream::start_downstream(Downstream *downstream) {
void Http2Upstream::initiate_downstream(Downstream *downstream) {
int rv;
DownstreamConnection *dconn_ptr;
for (;;) {
auto dconn = handler_->get_downstream_connection(rv, downstream);
if (!dconn) {
if (rv == SHRPX_ERR_TLS_REQUIRED) {
@ -474,20 +477,12 @@ void Http2Upstream::initiate_downstream(Downstream *downstream) {
}
#ifdef HAVE_MRUBY
auto dconn_ptr = dconn.get();
dconn_ptr = dconn.get();
#endif // HAVE_MRUBY
rv = downstream->attach_downstream_connection(std::move(dconn));
if (rv != 0) {
// downstream connection fails, send error page
if (error_reply(downstream, 502) != 0) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
if (rv == 0) {
break;
}
downstream->set_request_state(DownstreamState::CONNECT_FAIL);
downstream_queue_.mark_failure(downstream);
return;
}
#ifdef HAVE_MRUBY
@ -2076,14 +2071,16 @@ int Http2Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
// downstream connection is clean; we can retry with new
// downstream connection.
dconn = handler_->get_downstream_connection(rv, downstream);
for (;;) {
auto dconn = handler_->get_downstream_connection(rv, downstream);
if (!dconn) {
goto fail;
}
rv = downstream->attach_downstream_connection(std::move(dconn));
if (rv != 0) {
goto fail;
if (rv == 0) {
break;
}
}
rv = downstream->push_request_headers();

View File

@ -78,6 +78,8 @@ void retry_downstream_connection(Downstream *downstream,
auto upstream = downstream->get_upstream();
auto handler = upstream->get_client_handler();
assert(!downstream->get_request_header_sent());
downstream->add_retry();
if (downstream->no_more_retry()) {
@ -86,16 +88,20 @@ void retry_downstream_connection(Downstream *downstream,
}
downstream->pop_downstream_connection();
auto buf = downstream->get_request_buf();
buf->reset();
int rv;
// We have to use h1 backend for retry if we have already written h1
// request in request buffer.
auto ndconn = handler->get_downstream_connection(
rv, downstream,
downstream->get_request_header_sent() ? Proto::HTTP1 : Proto::NONE);
if (ndconn) {
if (downstream->attach_downstream_connection(std::move(ndconn)) == 0 &&
downstream->push_request_headers() == 0) {
for (;;) {
auto ndconn = handler->get_downstream_connection(rv, downstream);
if (!ndconn) {
break;
}
if (downstream->attach_downstream_connection(std::move(ndconn)) != 0) {
continue;
}
if (downstream->push_request_headers() == 0) {
return;
}
}
@ -187,7 +193,7 @@ void connectcb(struct ev_loop *loop, ev_io *w, int revents) {
} // namespace
HttpDownstreamConnection::HttpDownstreamConnection(
const std::shared_ptr<DownstreamAddrGroup> &group, size_t initial_addr_idx,
const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr,
struct ev_loop *loop, Worker *worker)
: conn_(loop, -1, nullptr, worker->get_mcpool(),
group->shared_addr->timeout.write, group->shared_addr->timeout.read,
@ -200,13 +206,13 @@ HttpDownstreamConnection::HttpDownstreamConnection(
worker_(worker),
ssl_ctx_(worker->get_cl_ssl_ctx()),
group_(group),
addr_(nullptr),
addr_(addr),
raddr_(nullptr),
ioctrl_(&conn_.rlimit),
response_htp_{0},
initial_addr_idx_(initial_addr_idx),
reuse_first_write_done_(true),
reusable_(true) {}
first_write_done_(false),
reusable_(true),
request_header_written_(false) {}
HttpDownstreamConnection::~HttpDownstreamConnection() {
if (LOG_ENABLED(INFO)) {
@ -252,72 +258,29 @@ int HttpDownstreamConnection::initiate_connection() {
auto &downstreamconf = *worker_->get_downstream_config();
if (conn_.fd == -1) {
auto &shared_addr = group_->shared_addr;
auto &addrs = shared_addr->addrs;
// If session affinity is enabled, we always start with address at
// initial_addr_idx_.
size_t temp_idx = initial_addr_idx_;
auto &next_downstream = shared_addr->affinity.type == SessionAffinity::NONE
? shared_addr->next
: temp_idx;
auto end = next_downstream;
for (;;) {
auto check_dns_result = dns_query_.get() != nullptr;
DownstreamAddr *addr;
if (check_dns_result) {
addr = addr_;
addr_ = nullptr;
assert(addr);
assert(addr->dns);
} else {
assert(addr_ == nullptr);
if (shared_addr->affinity.type == SessionAffinity::NONE) {
addr = &addrs[next_downstream];
if (++next_downstream >= addrs.size()) {
next_downstream = 0;
}
} else {
addr = &addrs[shared_addr->affinity_hash[next_downstream].idx];
if (++next_downstream >= shared_addr->affinity_hash.size()) {
next_downstream = 0;
}
assert(addr_->dns);
}
if (addr->proto != Proto::HTTP1) {
if (end == next_downstream) {
return SHRPX_ERR_NETWORK;
}
continue;
}
}
auto &connect_blocker = addr->connect_blocker;
auto &connect_blocker = addr_->connect_blocker;
if (connect_blocker->blocked()) {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "Backend server " << addr->host << ":"
<< addr->port << " was not available temporarily";
DCLOG(INFO, this) << "Backend server " << addr_->host << ":"
<< addr_->port << " was not available temporarily";
}
if (check_dns_result) {
dns_query_.reset();
} else if (end == next_downstream) {
return SHRPX_ERR_NETWORK;
}
continue;
}
Address *raddr;
if (addr->dns) {
if (addr_->dns) {
if (!check_dns_result) {
auto dns_query = std::make_unique<DNSQuery>(
addr->host,
addr_->host,
[this](DNSResolverStatus status, const Address *result) {
int rv;
@ -340,15 +303,10 @@ int HttpDownstreamConnection::initiate_connection() {
}
switch (dns_tracker->resolve(resolved_addr_.get(), dns_query.get())) {
case DNSResolverStatus::ERROR:
downstream_failure(addr, nullptr);
if (end == next_downstream) {
downstream_failure(addr_, nullptr);
return SHRPX_ERR_NETWORK;
}
continue;
case DNSResolverStatus::RUNNING:
dns_query_ = std::move(dns_query);
// Remember current addr
addr_ = addr;
return 0;
case DNSResolverStatus::OK:
break;
@ -359,8 +317,8 @@ int HttpDownstreamConnection::initiate_connection() {
switch (dns_query_->status) {
case DNSResolverStatus::ERROR:
dns_query_.reset();
downstream_failure(addr, nullptr);
continue;
downstream_failure(addr_, nullptr);
return SHRPX_ERR_NETWORK;
case DNSResolverStatus::OK:
dns_query_.reset();
break;
@ -370,9 +328,9 @@ int HttpDownstreamConnection::initiate_connection() {
}
raddr = resolved_addr_.get();
util::set_port(*resolved_addr_, addr->port);
util::set_port(*resolved_addr_, addr_->port);
} else {
raddr = &addr->addr;
raddr = &addr_->addr;
}
conn_.fd = util::create_nonblock_socket(raddr->su.storage.ss_family);
@ -380,8 +338,7 @@ int HttpDownstreamConnection::initiate_connection() {
if (conn_.fd == -1) {
auto error = errno;
DCLOG(WARN, this) << "socket() failed; addr="
<< util::to_numeric_addr(raddr)
<< ", errno=" << error;
<< util::to_numeric_addr(raddr) << ", errno=" << error;
worker_blocker->on_failure();
@ -394,27 +351,17 @@ int HttpDownstreamConnection::initiate_connection() {
if (rv != 0 && errno != EINPROGRESS) {
auto error = errno;
DCLOG(WARN, this) << "connect() failed; addr="
<< util::to_numeric_addr(raddr)
<< ", errno=" << error;
<< util::to_numeric_addr(raddr) << ", errno=" << error;
downstream_failure(addr, raddr);
downstream_failure(addr_, raddr);
close(conn_.fd);
conn_.fd = -1;
if (!check_dns_result && end == next_downstream) {
return SHRPX_ERR_NETWORK;
}
// Try again with the next downstream server
continue;
}
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "Connecting to downstream server";
}
addr_ = addr;
raddr_ = raddr;
if (addr_->tls) {
@ -450,9 +397,6 @@ int HttpDownstreamConnection::initiate_connection() {
conn_.wlimit.startw();
break;
}
conn_.wt.repeat = downstreamconf.timeout.connect;
ev_timer_again(conn_.loop, &conn_.wt);
} else {
@ -467,8 +411,9 @@ int HttpDownstreamConnection::initiate_connection() {
ev_set_cb(&conn_.rev, readcb);
on_write_ = &HttpDownstreamConnection::write_reuse_first;
reuse_first_write_done_ = false;
on_write_ = &HttpDownstreamConnection::write_first;
first_write_done_ = false;
request_header_written_ = false;
}
http_parser_init(&response_htp_, HTTP_RESPONSE);
@ -478,7 +423,7 @@ int HttpDownstreamConnection::initiate_connection() {
}
int HttpDownstreamConnection::push_request_headers() {
if (downstream_->get_request_header_sent()) {
if (request_header_written_) {
signal_write();
return 0;
}
@ -493,9 +438,7 @@ int HttpDownstreamConnection::push_request_headers() {
auto config = get_config();
auto &httpconf = config->http;
// Set request_sent to true because we write request into buffer
// here.
downstream_->set_request_header_sent(true);
request_header_written_ = true;
// For HTTP/1.0 request, there is no authority in request. In that
// case, we use backend server's host nonetheless.
@ -731,7 +674,7 @@ int HttpDownstreamConnection::push_request_headers() {
signal_write();
}
return process_blocked_request_buf();
return 0;
}
int HttpDownstreamConnection::process_blocked_request_buf() {
@ -746,7 +689,7 @@ int HttpDownstreamConnection::process_blocked_request_buf() {
dest->append("\r\n");
}
src->remove(*dest);
src->copy(*dest);
if (chunked) {
dest->append("\r\n");
@ -1175,9 +1118,11 @@ constexpr http_parser_settings htp_hooks = {
};
} // namespace
int HttpDownstreamConnection::write_reuse_first() {
int HttpDownstreamConnection::write_first() {
int rv;
process_blocked_request_buf();
if (conn_.tls.ssl) {
rv = write_tls();
} else {
@ -1194,7 +1139,11 @@ int HttpDownstreamConnection::write_reuse_first() {
on_write_ = &HttpDownstreamConnection::write_clear;
}
reuse_first_write_done_ = true;
first_write_done_ = true;
downstream_->set_request_header_sent(true);
auto buf = downstream_->get_blocked_request_buf();
buf->reset();
return 0;
}
@ -1244,7 +1193,7 @@ int HttpDownstreamConnection::write_clear() {
}
if (nwrite < 0) {
if (!reuse_first_write_done_) {
if (!first_write_done_) {
return nwrite;
}
// We may have pending data in receive buffer which may contain
@ -1309,7 +1258,7 @@ int HttpDownstreamConnection::tls_handshake() {
ev_set_cb(&conn_.wt, timeoutcb);
on_read_ = &HttpDownstreamConnection::read_tls;
on_write_ = &HttpDownstreamConnection::write_tls;
on_write_ = &HttpDownstreamConnection::write_first;
// TODO Check negotiated ALPN
@ -1368,7 +1317,7 @@ int HttpDownstreamConnection::write_tls() {
}
if (nwrite < 0) {
if (!reuse_first_write_done_) {
if (!first_write_done_) {
return nwrite;
}
// We may have pending data in receive buffer which may contain
@ -1507,7 +1456,7 @@ int HttpDownstreamConnection::connected() {
ev_set_cb(&conn_.wt, timeoutcb);
on_read_ = &HttpDownstreamConnection::read_clear;
on_write_ = &HttpDownstreamConnection::write_clear;
on_write_ = &HttpDownstreamConnection::write_first;
return 0;
}

View File

@ -44,7 +44,7 @@ struct DNSQuery;
class HttpDownstreamConnection : public DownstreamConnection {
public:
HttpDownstreamConnection(const std::shared_ptr<DownstreamAddrGroup> &group,
size_t initial_addr_idx, struct ev_loop *loop,
DownstreamAddr *addr, struct ev_loop *loop,
Worker *worker);
virtual ~HttpDownstreamConnection();
virtual int attach_downstream(Downstream *downstream);
@ -71,7 +71,7 @@ public:
int initiate_connection();
int write_reuse_first();
int write_first();
int read_clear();
int write_clear();
int read_tls();
@ -110,14 +110,12 @@ private:
std::unique_ptr<DNSQuery> dns_query_;
IOControl ioctrl_;
http_parser response_htp_;
// Index to backend address. If client affinity is enabled, it is
// the index to affinity_hash. Otherwise, it is 0, and not used.
size_t initial_addr_idx_;
// true if first write of reused connection succeeded. For
// convenience, this is initialized as true.
bool reuse_first_write_done_;
// true if first write succeeded.
bool first_write_done_;
// true if this object can be reused
bool reusable_;
// true if request header is written to request buffer.
bool request_header_written_;
};
} // namespace shrpx

View File

@ -426,6 +426,9 @@ int htp_hdrs_completecb(http_parser *htp) {
return 0;
}
DownstreamConnection *dconn_ptr;
for (;;) {
auto dconn = handler->get_downstream_connection(rv, downstream);
if (!dconn) {
@ -438,12 +441,11 @@ int htp_hdrs_completecb(http_parser *htp) {
}
#ifdef HAVE_MRUBY
auto dconn_ptr = dconn.get();
dconn_ptr = dconn.get();
#endif // HAVE_MRUBY
if (downstream->attach_downstream_connection(std::move(dconn)) != 0) {
downstream->set_request_state(DownstreamState::CONNECT_FAIL);
return -1;
if (downstream->attach_downstream_connection(std::move(dconn)) == 0) {
break;
}
}
#ifdef HAVE_MRUBY
@ -1427,14 +1429,16 @@ int HttpsUpstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
goto fail;
}
dconn = handler_->get_downstream_connection(rv, downstream_.get());
for (;;) {
auto dconn = handler_->get_downstream_connection(rv, downstream_.get());
if (!dconn) {
goto fail;
}
rv = downstream_->attach_downstream_connection(std::move(dconn));
if (rv != 0) {
goto fail;
if (rv == 0) {
break;
}
}
rv = downstream_->push_request_headers();

View File

@ -75,8 +75,9 @@ DownstreamAddrGroup::~DownstreamAddrGroup() {}
// DownstreamKey is used to index SharedDownstreamAddr in order to
// find the same configuration.
using DownstreamKey =
std::tuple<std::vector<std::tuple<StringRef, StringRef, size_t, size_t,
Proto, uint16_t, bool, bool, bool, bool>>,
std::tuple<std::vector<std::tuple<StringRef, StringRef, StringRef, size_t,
size_t, Proto, uint32_t, uint32_t,
uint32_t, bool, bool, bool, bool>>,
bool, SessionAffinity, StringRef, StringRef,
SessionAffinityCookieSecure, int64_t, int64_t>;
@ -91,14 +92,17 @@ DownstreamKey create_downstream_key(
for (auto &a : shared_addr->addrs) {
std::get<0>(*p) = a.host;
std::get<1>(*p) = a.sni;
std::get<2>(*p) = a.fall;
std::get<3>(*p) = a.rise;
std::get<4>(*p) = a.proto;
std::get<5>(*p) = a.port;
std::get<6>(*p) = a.host_unix;
std::get<7>(*p) = a.tls;
std::get<8>(*p) = a.dns;
std::get<9>(*p) = a.upgrade_scheme;
std::get<2>(*p) = a.group;
std::get<3>(*p) = a.fall;
std::get<4>(*p) = a.rise;
std::get<5>(*p) = a.proto;
std::get<6>(*p) = a.port;
std::get<7>(*p) = a.weight;
std::get<8>(*p) = a.group_weight;
std::get<9>(*p) = a.host_unix;
std::get<10>(*p) = a.tls;
std::get<11>(*p) = a.dns;
std::get<12>(*p) = a.upgrade_scheme;
++p;
}
std::sort(std::begin(addrs), std::end(addrs));
@ -134,7 +138,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
conn_handler_(conn_handler),
ticket_keys_(ticket_keys),
connect_blocker_(
std::make_unique<ConnectBlocker>(randgen_, loop_, []() {}, []() {})),
std::make_unique<ConnectBlocker>(randgen_, loop_, nullptr, nullptr)),
graceful_shutdown_(false) {
ev_async_init(&w_, eventcb);
w_.data = this;
@ -158,6 +162,39 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
replace_downstream_config(std::move(downstreamconf));
}
namespace {
void ensure_enqueue_addr(PriorityQueue<DownstreamAddrKey, WeightGroup *,
DownstreamAddrKeyLess> &wgpq,
WeightGroup *wg, DownstreamAddr *addr) {
uint32_t cycle;
if (!wg->pq.empty()) {
auto key = wg->pq.key_top();
cycle = key.first;
} else {
cycle = 0;
}
addr->cycle = cycle;
addr->pending_penalty = 0;
wg->pq.emplace(DownstreamAddrKey{addr->cycle, addr->seq}, addr);
addr->queued = true;
if (!wg->queued) {
if (!wgpq.empty()) {
auto key = wgpq.key_top();
cycle = key.first;
} else {
cycle = 0;
}
wg->cycle = cycle;
wg->pending_penalty = 0;
wgpq.emplace(DownstreamAddrKey{wg->cycle, wg->seq}, wg);
wg->queued = true;
}
}
} // namespace
void Worker::replace_downstream_config(
std::shared_ptr<DownstreamConfig> downstreamconf) {
for (auto &g : downstream_addr_groups_) {
@ -222,9 +259,6 @@ void Worker::replace_downstream_config(
shared_addr->timeout.read = src.timeout.read;
shared_addr->timeout.write = src.timeout.write;
size_t num_http1 = 0;
size_t num_http2 = 0;
for (size_t j = 0; j < src.addrs.size(); ++j) {
auto &src_addr = src.addrs[j];
auto &dst_addr = shared_addr->addrs[j];
@ -235,6 +269,9 @@ void Worker::replace_downstream_config(
make_string_ref(shared_addr->balloc, src_addr.hostport);
dst_addr.port = src_addr.port;
dst_addr.host_unix = src_addr.host_unix;
dst_addr.weight = src_addr.weight;
dst_addr.group = make_string_ref(shared_addr->balloc, src_addr.group);
dst_addr.group_weight = src_addr.group_weight;
dst_addr.proto = src_addr.proto;
dst_addr.tls = src_addr.tls;
dst_addr.sni = make_string_ref(shared_addr->balloc, src_addr.sni);
@ -246,41 +283,17 @@ void Worker::replace_downstream_config(
auto shared_addr_ptr = shared_addr.get();
dst_addr.connect_blocker = std::make_unique<ConnectBlocker>(
randgen_, loop_,
[shared_addr_ptr, &dst_addr]() {
switch (dst_addr.proto) {
case Proto::HTTP1:
--shared_addr_ptr->http1_pri.weight;
break;
case Proto::HTTP2:
--shared_addr_ptr->http2_pri.weight;
break;
default:
assert(0);
randgen_, loop_, nullptr, [shared_addr_ptr, &dst_addr]() {
if (!dst_addr.queued) {
if (!dst_addr.wg) {
return;
}
},
[shared_addr_ptr, &dst_addr]() {
switch (dst_addr.proto) {
case Proto::HTTP1:
++shared_addr_ptr->http1_pri.weight;
break;
case Proto::HTTP2:
++shared_addr_ptr->http2_pri.weight;
break;
default:
assert(0);
ensure_enqueue_addr(shared_addr_ptr->pq, dst_addr.wg, &dst_addr);
}
});
dst_addr.live_check = std::make_unique<LiveCheck>(
loop_, cl_ssl_ctx_, this, &dst_addr, randgen_);
if (dst_addr.proto == Proto::HTTP2) {
++num_http2;
} else {
assert(dst_addr.proto == Proto::HTTP1);
++num_http1;
}
}
// share the connection if patterns have the same set of backend
@ -290,19 +303,47 @@ void Worker::replace_downstream_config(
auto it = addr_groups_indexer.find(dkey);
if (it == std::end(addr_groups_indexer)) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "number of http/1.1 backend: " << num_http1
<< ", number of h2 backend: " << num_http2;
}
shared_addr->http1_pri.weight = num_http1;
shared_addr->http2_pri.weight = num_http2;
std::shuffle(std::begin(shared_addr->addrs), std::end(shared_addr->addrs),
randgen_);
size_t seq = 0;
for (auto &addr : shared_addr->addrs) {
addr.dconn_pool = std::make_unique<DownstreamConnectionPool>();
addr.seq = seq++;
}
if (shared_addr->affinity.type == SessionAffinity::NONE) {
std::map<StringRef, WeightGroup *> wgs;
size_t num_wgs = 0;
for (auto &addr : shared_addr->addrs) {
if (wgs.find(addr.group) == std::end(wgs)) {
++num_wgs;
wgs.emplace(addr.group, nullptr);
}
}
shared_addr->wgs = std::vector<WeightGroup>(num_wgs);
for (auto &addr : shared_addr->addrs) {
auto &wg = wgs[addr.group];
if (wg == nullptr) {
wg = &shared_addr->wgs[--num_wgs];
wg->seq = num_wgs;
}
wg->weight = addr.group_weight;
wg->pq.emplace(DownstreamAddrKey{0, addr.seq}, &addr);
addr.queued = true;
addr.wg = wg;
}
assert(num_wgs == 0);
for (auto &kv : wgs) {
shared_addr->pq.emplace(DownstreamAddrKey{0, kv.second->seq},
kv.second);
kv.second->queued = true;
}
}
dst->shared_addr = shared_addr;

View File

@ -50,6 +50,7 @@
#include "shrpx_connect_blocker.h"
#include "shrpx_dns_tracker.h"
#include "allocator.h"
#include "priority_queue.h"
using namespace nghttp2;
@ -73,6 +74,8 @@ namespace tls {
class CertLookupTree;
} // namespace tls
struct WeightGroup;
struct DownstreamAddr {
Address addr;
// backend address. If |host_unix| is true, this is UNIX domain
@ -96,21 +99,33 @@ struct DownstreamAddr {
size_t rise;
// Client side TLS session cache
tls::TLSSessionCache tls_session_cache;
// Http2Session object created for this address. This list chains
// all Http2Session objects that is not in group scope
// http2_avail_freelist, and is not reached in maximum concurrency.
//
// If session affinity is enabled, http2_avail_freelist is not used,
// and this list is solely used.
// List of Http2Session which is not fully utilized (i.e., the
// server advertised maximum concurrency is not reached). We will
// coalesce as much stream as possible in one Http2Session to fully
// utilize TCP connection.
DList<Http2Session> http2_extra_freelist;
// true if Http2Session for this address is in group scope
// SharedDownstreamAddr.http2_avail_freelist
bool in_avail;
WeightGroup *wg;
// total number of streams created in HTTP/2 connections for this
// address.
size_t num_dconn;
// the sequence number of this address to randomize the order access
// threads.
size_t seq;
// Application protocol used in this backend
Proto proto;
// cycle is used to prioritize this address. Lower value takes
// higher priority.
uint32_t cycle;
// penalty which is applied to the next cycle calculation.
uint32_t pending_penalty;
// Weight of this address inside a weight group. Its range is [1,
// 256], inclusive.
uint32_t weight;
// name of group which this address belongs to.
StringRef group;
// Weight of the weight group which this address belongs to. Its
// range is [1, 256], inclusive.
uint32_t group_weight;
// true if TLS is used in this backend
bool tls;
// true if dynamic DNS is enabled
@ -119,28 +134,39 @@ struct DownstreamAddr {
// variant (e.g., "https") when forwarding request to a backend
// connected by TLS connection.
bool upgrade_scheme;
// true if this address is queued.
bool queued;
};
// Simplified weighted fair queuing. Actually we don't use queue here
// since we have just 2 items. This is the same algorithm used in
// stream priority, but ignores remainder.
struct WeightedPri {
// current cycle of this item. The lesser cycle has higher
// priority. This is unsigned 32 bit integer, so it may overflow.
// But with the same theory described in stream priority, it is no
// problem.
uint32_t cycle;
// weight, larger weight means more frequent use.
constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256;
using DownstreamAddrKey = std::pair<uint32_t, size_t>;
struct DownstreamAddrKeyLess {
bool operator()(const DownstreamAddrKey &lhs,
const DownstreamAddrKey &rhs) const {
auto d = rhs.first - lhs.first;
if (d == 0) {
return lhs.second < rhs.second;
}
return d <= MAX_DOWNSTREAM_ADDR_WEIGHT;
}
};
struct WeightGroup {
PriorityQueue<DownstreamAddrKey, DownstreamAddr *, DownstreamAddrKeyLess> pq;
size_t seq;
uint32_t weight;
uint32_t cycle;
uint32_t pending_penalty;
// true if this object is queued.
bool queued;
};
struct SharedDownstreamAddr {
SharedDownstreamAddr()
: balloc(1024, 1024),
affinity{SessionAffinity::NONE},
next{0},
http1_pri{},
http2_pri{},
redirect_if_not_tls{false} {}
SharedDownstreamAddr(const SharedDownstreamAddr &) = delete;
@ -150,31 +176,13 @@ struct SharedDownstreamAddr {
BlockAllocator balloc;
std::vector<DownstreamAddr> addrs;
std::vector<WeightGroup> wgs;
PriorityQueue<DownstreamAddrKey, WeightGroup *, DownstreamAddrKeyLess> pq;
// Bunch of session affinity hash. Only used if affinity ==
// SessionAffinity::IP.
std::vector<AffinityHash> affinity_hash;
// List of Http2Session which is not fully utilized (i.e., the
// server advertised maximum concurrency is not reached). We will
// coalesce as much stream as possible in one Http2Session to fully
// utilize TCP connection.
//
// If session affinity is enabled, this list is not used. Per
// address http2_extra_freelist is used instead.
//
// TODO Verify that this approach performs better in performance
// wise.
DList<Http2Session> http2_avail_freelist;
// Configuration for session affinity
AffinityConfig affinity;
// Next http/1.1 downstream address index in addrs.
size_t next;
// http1_pri and http2_pri are used to which protocols are used
// between HTTP/1.1 or HTTP/2 if they both are available in
// backends. They are choosed proportional to the number available
// backend. Usually, if http1_pri.cycle < http2_pri.cycle, choose
// HTTP/1.1. Otherwise, choose HTTP/2.
WeightedPri http1_pri;
WeightedPri http2_pri;
// Session affinity
// true if this group requires that client connection must be TLS,
// and the request must be redirected to https URI.

View File

@ -530,4 +530,19 @@ inline int run_app(std::function<int(int, char **)> app, int argc,
} // namespace nghttp2
namespace std {
template <> struct hash<nghttp2::StringRef> {
std::size_t operator()(const nghttp2::StringRef &s) const noexcept {
// 32 bit FNV-1a:
// https://tools.ietf.org/html/draft-eastlake-fnv-16#section-6.1.1
uint32_t h = 2166136261u;
for (auto c : s) {
h ^= static_cast<uint8_t>(c);
h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24);
}
return h;
}
};
} // namespace std
#endif // TEMPLATE_H