nghttpx: Use std::priority_queue

This commit is contained in:
Tatsuhiro Tsujikawa 2019-01-22 00:01:17 +09:00
parent 8d842701b5
commit 8dc2b263ba
9 changed files with 60 additions and 314 deletions

View File

@ -163,7 +163,6 @@ if(ENABLE_APP)
memchunk_test.cc
template_test.cc
base64_test.cc
priority_queue_test.cc
)
add_executable(nghttpx-unittest EXCLUDE_FROM_ALL
${NGHTTPX_UNITTEST_SOURCES}

View File

@ -140,8 +140,7 @@ NGHTTPX_SRCS = \
shrpx_dual_dns_resolver.cc shrpx_dual_dns_resolver.h \
shrpx_dns_tracker.cc shrpx_dns_tracker.h \
buffer.h memchunk.h template.h allocator.h \
xsi_strerror.c xsi_strerror.h \
priority_queue.h
xsi_strerror.c xsi_strerror.h
if HAVE_MRUBY
NGHTTPX_SRCS += \
@ -187,8 +186,7 @@ nghttpx_unittest_SOURCES = shrpx-unittest.cc \
buffer_test.cc buffer_test.h \
memchunk_test.cc memchunk_test.h \
template_test.cc template_test.h \
base64_test.cc base64_test.h \
priority_queue_test.cc priority_queue_test.h
base64_test.cc base64_test.h
nghttpx_unittest_CPPFLAGS = ${AM_CPPFLAGS} \
-DNGHTTP2_SRC_DIR=\"$(top_srcdir)/src\"
nghttpx_unittest_LDADD = libnghttpx.a ${LDADD} @CUNIT_LIBS@ @TESTLDADD@

View File

@ -1,145 +0,0 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2019 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef PRIORITY_QUEUE_H
#define PRIORITY_QUEUE_H
#include "nghttp2_config.h"
#include <cassert>
#include <functional>
#include <utility>
#include <vector>
namespace nghttp2 {
template <typename KeyType, typename ValueType, typename Compare = std::less<>>
class PriorityQueue {
public:
const ValueType &top() const;
const KeyType &key_top() const;
void push(const KeyType &key, const ValueType &value);
void push(KeyType &&key, ValueType &&value);
template <typename... Args> void emplace(Args &&... args);
void pop();
bool empty() const;
size_t size() const;
private:
void bubble_up(size_t idx);
void bubble_down(size_t idx);
std::vector<std::pair<KeyType, ValueType>> c_;
Compare comp_;
};
template <typename KeyType, typename ValueType, typename Compare>
const ValueType &PriorityQueue<KeyType, ValueType, Compare>::top() const {
assert(!c_.empty());
return c_[0].second;
}
template <typename KeyType, typename ValueType, typename Compare>
const KeyType &PriorityQueue<KeyType, ValueType, Compare>::key_top() const {
assert(!c_.empty());
return c_[0].first;
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::push(const KeyType &key,
const ValueType &value) {
c_.push_back(std::pair<KeyType, ValueType>{key, value});
bubble_up(c_.size() - 1);
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::push(KeyType &&key,
ValueType &&value) {
c_.push_back(std::pair<KeyType, ValueType>{std::move(key), std::move(value)});
bubble_up(c_.size() - 1);
}
template <typename KeyType, typename ValueType, typename Compare>
template <typename... Args>
void PriorityQueue<KeyType, ValueType, Compare>::emplace(Args &&... args) {
c_.emplace_back(std::forward<Args>(args)...);
bubble_up(c_.size() - 1);
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::pop() {
assert(!c_.empty());
c_[0] = std::move(c_.back());
c_.resize(c_.size() - 1);
bubble_down(0);
}
template <typename KeyType, typename ValueType, typename Compare>
bool PriorityQueue<KeyType, ValueType, Compare>::empty() const {
return c_.empty();
}
template <typename KeyType, typename ValueType, typename Compare>
size_t PriorityQueue<KeyType, ValueType, Compare>::size() const {
return c_.size();
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::bubble_up(size_t idx) {
using std::swap;
while (idx != 0) {
auto parent = (idx - 1) / 2;
if (!comp_(c_[idx].first, c_[parent].first)) {
return;
}
swap(c_[idx], c_[parent]);
idx = parent;
}
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::bubble_down(size_t idx) {
using std::swap;
for (;;) {
auto j = idx * 2 + 1;
auto minidx = idx;
for (auto i = 0; i < 2; ++i, ++j) {
if (j >= c_.size()) {
break;
}
if (comp_(c_[j].first, c_[minidx].first)) {
minidx = j;
}
}
if (minidx == idx) {
return;
}
swap(c_[idx], c_[minidx]);
idx = minidx;
}
}
} // namespace nghttp2
#endif // PRIORITY_QUEUE_H

View File

@ -1,93 +0,0 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2019 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "priority_queue_test.h"
#include <string>
#include <CUnit/CUnit.h>
#include "priority_queue.h"
namespace nghttp2 {
void test_priority_queue_push(void) {
PriorityQueue<std::string, int64_t> pq;
CU_ASSERT(pq.empty());
CU_ASSERT(0 == pq.size());
pq.push("foo", 1);
CU_ASSERT(!pq.empty());
CU_ASSERT(1 == pq.size());
auto top = pq.top();
CU_ASSERT(1 == top);
pq.emplace("bar", 2);
top = pq.top();
CU_ASSERT(2 == top);
pq.push("baz", 3);
top = pq.top();
CU_ASSERT(2 == top);
pq.push("C", 4);
CU_ASSERT(4 == pq.size());
top = pq.top();
CU_ASSERT(4 == top);
pq.pop();
CU_ASSERT(3 == pq.size());
top = pq.top();
CU_ASSERT(2 == top);
pq.pop();
top = pq.top();
CU_ASSERT(3 == top);
pq.pop();
top = pq.top();
CU_ASSERT(1 == top);
pq.pop();
CU_ASSERT(pq.empty());
CU_ASSERT(0 == pq.size());
}
} // namespace nghttp2

View File

@ -1,38 +0,0 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2019 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef PRIORITY_QUEUE_TEST_H
#define PRIORITY_QUEUE_TEST_H
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif /* HAVE_CONFIG_H */
namespace nghttp2 {
void test_priority_queue_push(void);
} // namespace nghttp2
#endif // PRIORITY_QUEUE_TEST_H

View File

@ -45,7 +45,6 @@
#include "shrpx_config.h"
#include "tls.h"
#include "shrpx_router_test.h"
#include "priority_queue_test.h"
#include "shrpx_log.h"
static int init_suite1(void) { return 0; }
@ -218,9 +217,7 @@ int main(int argc, char *argv[]) {
!CU_add_test(pSuite, "template_string_ref",
nghttp2::test_template_string_ref) ||
!CU_add_test(pSuite, "base64_encode", nghttp2::test_base64_encode) ||
!CU_add_test(pSuite, "base64_decode", nghttp2::test_base64_decode) ||
!CU_add_test(pSuite, "priority_queue_push",
nghttp2::test_priority_queue_push)) {
!CU_add_test(pSuite, "base64_decode", nghttp2::test_base64_decode)) {
CU_cleanup_registry();
return CU_get_error();
}

View File

@ -754,27 +754,29 @@ uint32_t ClientHandler::get_affinity_cookie(Downstream *downstream,
}
namespace {
void reschedule_addr(PriorityQueue<DownstreamAddrKey, DownstreamAddr *,
DownstreamAddrKeyLess> &pq,
DownstreamAddr *addr) {
void reschedule_addr(
std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
DownstreamAddrEntryGreater> &pq,
DownstreamAddr *addr) {
auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + addr->pending_penalty;
addr->cycle += penalty / addr->weight;
addr->pending_penalty = penalty % addr->weight;
pq.emplace(DownstreamAddrKey{addr->cycle, addr->seq}, addr);
pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
addr->queued = true;
}
} // namespace
namespace {
void reschedule_wg(
PriorityQueue<DownstreamAddrKey, WeightGroup *, DownstreamAddrKeyLess> &pq,
std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
WeightGroupEntryGreater> &pq,
WeightGroup *wg) {
auto penalty = MAX_DOWNSTREAM_ADDR_WEIGHT + wg->pending_penalty;
wg->cycle += penalty / wg->weight;
wg->pending_penalty = penalty % wg->weight;
pq.emplace(DownstreamAddrKey{wg->cycle, wg->seq}, wg);
pq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
wg->queued = true;
}
} // namespace
@ -857,7 +859,7 @@ DownstreamAddr *ClientHandler::get_downstream_addr(int &err,
return nullptr;
}
auto wg = wgpq.top();
auto wg = wgpq.top().wg;
wgpq.pop();
wg->queued = false;
@ -866,7 +868,7 @@ DownstreamAddr *ClientHandler::get_downstream_addr(int &err,
break;
}
auto addr = wg->pq.top();
auto addr = wg->pq.top().addr;
wg->pq.pop();
addr->queued = false;

View File

@ -163,33 +163,34 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
}
namespace {
void ensure_enqueue_addr(PriorityQueue<DownstreamAddrKey, WeightGroup *,
DownstreamAddrKeyLess> &wgpq,
WeightGroup *wg, DownstreamAddr *addr) {
void ensure_enqueue_addr(
std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
WeightGroupEntryGreater> &wgpq,
WeightGroup *wg, DownstreamAddr *addr) {
uint32_t cycle;
if (!wg->pq.empty()) {
auto key = wg->pq.key_top();
cycle = key.first;
auto &top = wg->pq.top();
cycle = top.cycle;
} else {
cycle = 0;
}
addr->cycle = cycle;
addr->pending_penalty = 0;
wg->pq.emplace(DownstreamAddrKey{addr->cycle, addr->seq}, addr);
wg->pq.push(DownstreamAddrEntry{addr, addr->seq, addr->cycle});
addr->queued = true;
if (!wg->queued) {
if (!wgpq.empty()) {
auto key = wgpq.key_top();
cycle = key.first;
auto &top = wgpq.top();
cycle = top.cycle;
} else {
cycle = 0;
}
wg->cycle = cycle;
wg->pending_penalty = 0;
wgpq.emplace(DownstreamAddrKey{wg->cycle, wg->seq}, wg);
wgpq.push(WeightGroupEntry{wg, wg->seq, wg->cycle});
wg->queued = true;
}
}
@ -332,7 +333,7 @@ void Worker::replace_downstream_config(
}
wg->weight = addr.group_weight;
wg->pq.emplace(DownstreamAddrKey{0, addr.seq}, &addr);
wg->pq.push(DownstreamAddrEntry{&addr, addr.seq, addr.cycle});
addr.queued = true;
addr.wg = wg;
}
@ -340,8 +341,8 @@ void Worker::replace_downstream_config(
assert(num_wgs == 0);
for (auto &kv : wgs) {
shared_addr->pq.emplace(DownstreamAddrKey{0, kv.second->seq},
kv.second);
shared_addr->pq.push(
WeightGroupEntry{kv.second, kv.second->seq, kv.second->cycle});
kv.second->queued = true;
}
}

View File

@ -33,6 +33,7 @@
#include <unordered_map>
#include <deque>
#include <thread>
#include <queue>
#ifndef NOTHREADS
# include <future>
#endif // NOTHREADS
@ -50,7 +51,6 @@
#include "shrpx_connect_blocker.h"
#include "shrpx_dns_tracker.h"
#include "allocator.h"
#include "priority_queue.h"
using namespace nghttp2;
@ -140,21 +140,27 @@ struct DownstreamAddr {
constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256;
using DownstreamAddrKey = std::pair<uint32_t, size_t>;
struct DownstreamAddrEntry {
DownstreamAddr *addr;
size_t seq;
uint32_t cycle;
};
struct DownstreamAddrKeyLess {
bool operator()(const DownstreamAddrKey &lhs,
const DownstreamAddrKey &rhs) const {
auto d = rhs.first - lhs.first;
struct DownstreamAddrEntryGreater {
bool operator()(const DownstreamAddrEntry &lhs,
const DownstreamAddrEntry &rhs) const {
auto d = lhs.cycle - rhs.cycle;
if (d == 0) {
return lhs.second < rhs.second;
return rhs.seq < lhs.seq;
}
return d <= MAX_DOWNSTREAM_ADDR_WEIGHT;
}
};
struct WeightGroup {
PriorityQueue<DownstreamAddrKey, DownstreamAddr *, DownstreamAddrKeyLess> pq;
std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
DownstreamAddrEntryGreater>
pq;
size_t seq;
uint32_t weight;
uint32_t cycle;
@ -163,6 +169,23 @@ struct WeightGroup {
bool queued;
};
struct WeightGroupEntry {
WeightGroup *wg;
size_t seq;
uint32_t cycle;
};
struct WeightGroupEntryGreater {
bool operator()(const WeightGroupEntry &lhs,
const WeightGroupEntry &rhs) const {
auto d = lhs.cycle - rhs.cycle;
if (d == 0) {
return rhs.seq < lhs.seq;
}
return d <= MAX_DOWNSTREAM_ADDR_WEIGHT;
}
};
struct SharedDownstreamAddr {
SharedDownstreamAddr()
: balloc(1024, 1024),
@ -177,7 +200,9 @@ struct SharedDownstreamAddr {
BlockAllocator balloc;
std::vector<DownstreamAddr> addrs;
std::vector<WeightGroup> wgs;
PriorityQueue<DownstreamAddrKey, WeightGroup *, DownstreamAddrKeyLess> pq;
std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
WeightGroupEntryGreater>
pq;
// Bunch of session affinity hash. Only used if affinity ==
// SessionAffinity::IP.
std::vector<AffinityHash> affinity_hash;