Merge branch 'master' into cache-digest

This commit is contained in:
Tatsuhiro Tsujikawa 2016-07-09 11:59:28 +09:00
commit 4bfd9b182e
19 changed files with 323 additions and 58 deletions

View File

@ -176,6 +176,23 @@ To compile the source code, gcc >= 4.8.3 or clang >= 3.4 is required.
applications were not built, then using ``--enable-app`` may find applications were not built, then using ``--enable-app`` may find
that cause, such as the missing dependency. that cause, such as the missing dependency.
Notes for building on Windows (MSVC)
------------------------------------
The easiest way to build native Windows nghttp2 dll is use
[cmake](https://cmake.org/). The free version of [Visual C++ Build
Tools](http://landinghub.visualstudio.com/visual-cpp-build-tools)
works fine.
1. Install cmake for windows
2. Open "Visual C++ ... Native Build Tool Command Prompt", and inside
nghttp2 directly, run ``cmake``.
3. Then run ``cmake --build`` to build library.
4. nghttp2.dll, nghttp2.lib, nghttp2.exp are placed under lib directory.
Note that the above steps most likely produce nghttp2 library only.
No bundled applications are compiled.
Notes for building on Windows (Mingw/Cygwin) Notes for building on Windows (Mingw/Cygwin)
-------------------------------------------- --------------------------------------------

View File

@ -695,9 +695,6 @@ int main(int argc, char **argv) {
act.sa_handler = SIG_IGN; act.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &act, 0); sigaction(SIGPIPE, &act, 0);
#ifndef OPENSSL_IS_BORINGSSL
OPENSSL_config(NULL);
#endif /* OPENSSL_IS_BORINGSSL */
SSL_load_error_strings(); SSL_load_error_strings();
SSL_library_init(); SSL_library_init();

View File

@ -594,9 +594,6 @@ int main(int argc, char **argv) {
act.sa_handler = SIG_IGN; act.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &act, NULL); sigaction(SIGPIPE, &act, NULL);
#ifndef OPENSSL_IS_BORINGSSL
OPENSSL_config(NULL);
#endif /* OPENSSL_IS_BORINGSSL */
SSL_load_error_strings(); SSL_load_error_strings();
SSL_library_init(); SSL_library_init();

View File

@ -781,9 +781,6 @@ int main(int argc, char **argv) {
act.sa_handler = SIG_IGN; act.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &act, NULL); sigaction(SIGPIPE, &act, NULL);
#ifndef OPENSSL_IS_BORINGSSL
OPENSSL_config(NULL);
#endif /* OPENSSL_IS_BORINGSSL */
SSL_load_error_strings(); SSL_load_error_strings();
SSL_library_init(); SSL_library_init();

View File

@ -386,8 +386,9 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
faddr_(faddr), faddr_(faddr),
worker_(worker), worker_(worker),
left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN), left_connhd_len_(NGHTTP2_CLIENT_MAGIC_LEN),
affinity_hash_(-1), affinity_hash_(0),
should_close_after_write_(false) { should_close_after_write_(false),
affinity_hash_computed_(false) {
++worker_->get_worker_stat()->num_connections; ++worker_->get_worker_stat()->num_connections;
@ -674,23 +675,21 @@ void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) {
} }
namespace { namespace {
uint32_t hash32(const StringRef &s) { // Computes 32bits hash for session affinity for IP address |ip|.
/* 32 bit FNV-1a: http://isthe.com/chongo/tech/comp/fnv/ */ uint32_t compute_affinity_from_ip(const StringRef &ip) {
uint32_t h = 2166136261u; int rv;
size_t i; std::array<uint8_t, 32> buf;
for (i = 0; i < s.size(); ++i) { rv = util::sha256(buf.data(), ip);
h ^= s[i]; if (rv != 0) {
h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24); // Not sure when sha256 failed. Just fall back to another
// function.
return util::hash32(ip);
} }
return h; return (static_cast<uint32_t>(buf[0]) << 24) |
} (static_cast<uint32_t>(buf[1]) << 16) |
} // namespace (static_cast<uint32_t>(buf[2]) << 8) | static_cast<uint32_t>(buf[3]);
namespace {
int32_t calculate_affinity_from_ip(const StringRef &ip) {
return hash32(ip) & 0x7fffffff;
} }
} // namespace } // namespace
@ -918,11 +917,22 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
auto &shared_addr = group->shared_addr; auto &shared_addr = group->shared_addr;
if (shared_addr->affinity == AFFINITY_IP) { if (shared_addr->affinity == AFFINITY_IP) {
if (affinity_hash_ == -1) { if (!affinity_hash_computed_) {
affinity_hash_ = calculate_affinity_from_ip(StringRef{ipaddr_}); affinity_hash_ = compute_affinity_from_ip(StringRef{ipaddr_});
affinity_hash_computed_ = true;
} }
auto idx = affinity_hash_ % shared_addr->addrs.size(); const auto &affinity_hash = shared_addr->affinity_hash;
auto it = std::lower_bound(
std::begin(affinity_hash), std::end(affinity_hash), affinity_hash_,
[](const AffinityHash &lhs, uint32_t rhs) { return lhs.hash < rhs; });
if (it == std::end(affinity_hash)) {
it = std::begin(affinity_hash);
}
auto idx = (*it).idx;
auto &addr = shared_addr->addrs[idx]; auto &addr = shared_addr->addrs[idx];
if (addr.proto == PROTO_HTTP2) { if (addr.proto == PROTO_HTTP2) {

View File

@ -174,8 +174,11 @@ private:
Worker *worker_; Worker *worker_;
// The number of bytes of HTTP/2 client connection header to read // The number of bytes of HTTP/2 client connection header to read
size_t left_connhd_len_; size_t left_connhd_len_;
int32_t affinity_hash_; // hash for session affinity using client IP
uint32_t affinity_hash_;
bool should_close_after_write_; bool should_close_after_write_;
// true if affinity_hash_ is computed
bool affinity_hash_computed_;
ReadBuf rb_; ReadBuf rb_;
}; };

View File

@ -2830,12 +2830,51 @@ StringRef strproto(shrpx_proto proto) {
assert(0); assert(0);
} }
namespace {
// Consistent hashing method described in
// https://github.com/RJ/ketama. Generate 160 32-bit hashes per |s|,
// which is usually backend address. The each hash is associated to
// index of backend address. When all hashes for every backend
// address are calculated, sort it in ascending order of hash. To
// choose the index, compute 32-bit hash based on client IP address,
// and do lower bound search in the array. The returned index is the
// backend to use.
int compute_affinity_hash(std::vector<AffinityHash> &res, size_t idx,
const StringRef &s) {
int rv;
std::array<uint8_t, 32> buf;
for (auto i = 0; i < 20; ++i) {
auto t = s.str();
t += i;
rv = util::sha256(buf.data(), StringRef{t});
if (rv != 0) {
return -1;
}
for (int i = 0; i < 8; ++i) {
auto h = (static_cast<uint32_t>(buf[4 * i]) << 24) |
(static_cast<uint32_t>(buf[4 * i + 1]) << 16) |
(static_cast<uint32_t>(buf[4 * i + 2]) << 8) |
static_cast<uint32_t>(buf[4 * i + 3]);
res.emplace_back(idx, h);
}
}
return 0;
}
} // namespace
// Configures the following member in |config|: // Configures the following member in |config|:
// conn.downstream_router, conn.downstream.addr_groups, // conn.downstream_router, conn.downstream.addr_groups,
// conn.downstream.addr_group_catch_all. // conn.downstream.addr_group_catch_all.
int configure_downstream_group(Config *config, bool http2_proxy, int configure_downstream_group(Config *config, bool http2_proxy,
bool numeric_addr_only, bool numeric_addr_only,
const TLSConfig &tlsconf) { const TLSConfig &tlsconf) {
int rv;
auto &downstreamconf = *config->conn.downstream; auto &downstreamconf = *config->conn.downstream;
auto &addr_groups = downstreamconf.addr_groups; auto &addr_groups = downstreamconf.addr_groups;
auto &routerconf = downstreamconf.router; auto &routerconf = downstreamconf.router;
@ -2959,6 +2998,25 @@ int configure_downstream_group(Config *config, bool http2_proxy,
<< util::to_numeric_addr(&addr.addr); << util::to_numeric_addr(&addr.addr);
} }
} }
if (g.affinity == AFFINITY_IP) {
size_t idx = 0;
for (auto &addr : g.addrs) {
auto p = reinterpret_cast<uint8_t *>(&addr.addr.su);
rv = compute_affinity_hash(g.affinity_hash, idx,
StringRef{p, addr.addr.len});
if (rv != 0) {
return -1;
}
++idx;
}
std::sort(std::begin(g.affinity_hash), std::end(g.affinity_hash),
[](const AffinityHash &lhs, const AffinityHash &rhs) {
return lhs.hash < rhs.hash;
});
}
} }
return 0; return 0;

View File

@ -370,12 +370,24 @@ struct DownstreamAddrConfig {
bool tls; bool tls;
}; };
// Mapping hash to idx which is an index into
// DownstreamAddrGroupConfig::addrs.
struct AffinityHash {
AffinityHash(size_t idx, uint32_t hash) : idx(idx), hash(hash) {}
size_t idx;
uint32_t hash;
};
struct DownstreamAddrGroupConfig { struct DownstreamAddrGroupConfig {
DownstreamAddrGroupConfig(const StringRef &pattern) DownstreamAddrGroupConfig(const StringRef &pattern)
: pattern(pattern.c_str(), pattern.size()), affinity(AFFINITY_NONE) {} : pattern(pattern.c_str(), pattern.size()), affinity(AFFINITY_NONE) {}
ImmutableString pattern; ImmutableString pattern;
std::vector<DownstreamAddrConfig> addrs; std::vector<DownstreamAddrConfig> addrs;
// Bunch of session affinity hash. Only used if affinity ==
// AFFINITY_IP.
std::vector<AffinityHash> affinity_hash;
// Session affinity // Session affinity
shrpx_session_affinity affinity; shrpx_session_affinity affinity;
}; };

View File

@ -110,12 +110,8 @@ void serial_event_async_cb(struct ev_loop *loop, ev_async *w, int revent) {
} }
} // namespace } // namespace
namespace { ConnectionHandler::ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen)
std::random_device rd; : gen_(gen),
} // namespace
ConnectionHandler::ConnectionHandler(struct ev_loop *loop)
: gen_(rd()),
single_worker_(nullptr), single_worker_(nullptr),
loop_(loop), loop_(loop),
tls_ticket_key_memcached_get_retry_count_(0), tls_ticket_key_memcached_get_retry_count_(0),
@ -736,6 +732,14 @@ ConnectionHandler::get_tls_ticket_key_memcached_dispatcher() const {
return tls_ticket_key_memcached_dispatcher_.get(); return tls_ticket_key_memcached_dispatcher_.get();
} }
// Use the similar backoff algorithm described in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
namespace {
constexpr size_t MAX_BACKOFF_EXP = 10;
constexpr auto MULTIPLIER = 3.2;
constexpr auto JITTER = 0.2;
} // namespace
void ConnectionHandler::on_tls_ticket_key_network_error(ev_timer *w) { void ConnectionHandler::on_tls_ticket_key_network_error(ev_timer *w) {
if (++tls_ticket_key_memcached_get_retry_count_ >= if (++tls_ticket_key_memcached_get_retry_count_ >=
get_config()->tls.ticket.memcached.max_retry) { get_config()->tls.ticket.memcached.max_retry) {
@ -746,15 +750,19 @@ void ConnectionHandler::on_tls_ticket_key_network_error(ev_timer *w) {
return; return;
} }
auto dist = std::uniform_int_distribution<int>( auto base_backoff = util::int_pow(
1, std::min(60, 1 << tls_ticket_key_memcached_get_retry_count_)); MULTIPLIER,
auto t = dist(gen_); std::min(MAX_BACKOFF_EXP, tls_ticket_key_memcached_get_retry_count_));
auto dist = std::uniform_real_distribution<>(-JITTER * base_backoff,
JITTER * base_backoff);
auto backoff = base_backoff + dist(gen_);
LOG(WARN) LOG(WARN)
<< "Memcached: tls ticket get failed due to network error, retrying in " << "Memcached: tls ticket get failed due to network error, retrying in "
<< t << " seconds"; << backoff << " seconds";
ev_timer_set(w, t, 0.); ev_timer_set(w, backoff, 0.);
ev_timer_start(loop_, w); ev_timer_start(loop_, w);
} }

View File

@ -32,6 +32,7 @@
#include <sys/socket.h> #include <sys/socket.h>
#endif // HAVE_SYS_SOCKET_H #endif // HAVE_SYS_SOCKET_H
#include <mutex>
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <random> #include <random>
@ -100,7 +101,7 @@ struct SerialEvent {
class ConnectionHandler { class ConnectionHandler {
public: public:
ConnectionHandler(struct ev_loop *loop); ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen);
~ConnectionHandler(); ~ConnectionHandler();
int handle_connection(int fd, sockaddr *addr, int addrlen, int handle_connection(int fd, sockaddr *addr, int addrlen,
const UpstreamAddr *faddr); const UpstreamAddr *faddr);

View File

@ -56,7 +56,7 @@ void readcb(struct ev_loop *loop, ev_io *w, int revents) {
auto mconn = static_cast<MemcachedConnection *>(conn->data); auto mconn = static_cast<MemcachedConnection *>(conn->data);
if (mconn->on_read() != 0) { if (mconn->on_read() != 0) {
mconn->disconnect(); mconn->reconnect_or_fail();
return; return;
} }
} }
@ -68,7 +68,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
auto mconn = static_cast<MemcachedConnection *>(conn->data); auto mconn = static_cast<MemcachedConnection *>(conn->data);
if (mconn->on_write() != 0) { if (mconn->on_write() != 0) {
mconn->disconnect(); mconn->reconnect_or_fail();
return; return;
} }
} }
@ -88,22 +88,25 @@ void connectcb(struct ev_loop *loop, ev_io *w, int revents) {
} }
} // namespace } // namespace
constexpr ev_tstamp write_timeout = 10.; constexpr auto write_timeout = 10_s;
constexpr ev_tstamp read_timeout = 10.; constexpr auto read_timeout = 10_s;
MemcachedConnection::MemcachedConnection(const Address *addr, MemcachedConnection::MemcachedConnection(const Address *addr,
struct ev_loop *loop, SSL_CTX *ssl_ctx, struct ev_loop *loop, SSL_CTX *ssl_ctx,
const StringRef &sni_name, const StringRef &sni_name,
MemchunkPool *mcpool) MemchunkPool *mcpool,
std::mt19937 &gen)
: conn_(loop, -1, nullptr, mcpool, write_timeout, read_timeout, {}, {}, : conn_(loop, -1, nullptr, mcpool, write_timeout, read_timeout, {}, {},
connectcb, readcb, timeoutcb, this, 0, 0., PROTO_MEMCACHED), connectcb, readcb, timeoutcb, this, 0, 0., PROTO_MEMCACHED),
do_read_(&MemcachedConnection::noop), do_read_(&MemcachedConnection::noop),
do_write_(&MemcachedConnection::noop), do_write_(&MemcachedConnection::noop),
sni_name_(sni_name.str()), sni_name_(sni_name.str()),
connect_blocker_(gen, loop, [] {}, [] {}),
parse_state_{}, parse_state_{},
addr_(addr), addr_(addr),
ssl_ctx_(ssl_ctx), ssl_ctx_(ssl_ctx),
sendsum_(0), sendsum_(0),
try_count_(0),
connected_(false) {} connected_(false) {}
MemcachedConnection::~MemcachedConnection() { disconnect(); } MemcachedConnection::~MemcachedConnection() { disconnect(); }
@ -201,6 +204,8 @@ int MemcachedConnection::initiate_connection() {
int MemcachedConnection::connected() { int MemcachedConnection::connected() {
if (!util::check_socket_connected(conn_.fd)) { if (!util::check_socket_connected(conn_.fd)) {
connect_blocker_.on_failure();
conn_.wlimit.stopw(); conn_.wlimit.stopw();
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
@ -214,10 +219,7 @@ int MemcachedConnection::connected() {
MCLOG(INFO, this) << "connected to memcached server"; MCLOG(INFO, this) << "connected to memcached server";
} }
connected_ = true;
conn_.rlimit.startw(); conn_.rlimit.startw();
ev_timer_again(conn_.loop, &conn_.rt);
ev_set_cb(&conn_.wev, writecb); ev_set_cb(&conn_.wev, writecb);
@ -228,6 +230,12 @@ int MemcachedConnection::connected() {
return 0; return 0;
} }
ev_timer_stop(conn_.loop, &conn_.wt);
connected_ = true;
connect_blocker_.on_success();
do_read_ = &MemcachedConnection::read_clear; do_read_ = &MemcachedConnection::read_clear;
do_write_ = &MemcachedConnection::write_clear; do_write_ = &MemcachedConnection::write_clear;
@ -248,6 +256,7 @@ int MemcachedConnection::tls_handshake() {
} }
if (rv < 0) { if (rv < 0) {
connect_blocker_.on_failure();
return rv; return rv;
} }
@ -259,6 +268,7 @@ int MemcachedConnection::tls_handshake() {
if (!tlsconf.insecure && if (!tlsconf.insecure &&
ssl::check_cert(conn_.tls.ssl, addr_, StringRef(sni_name_)) != 0) { ssl::check_cert(conn_.tls.ssl, addr_, StringRef(sni_name_)) != 0) {
connect_blocker_.on_failure();
return -1; return -1;
} }
@ -270,6 +280,13 @@ int MemcachedConnection::tls_handshake() {
} }
} }
ev_timer_stop(conn_.loop, &conn_.rt);
ev_timer_stop(conn_.loop, &conn_.wt);
connected_ = true;
connect_blocker_.on_success();
do_read_ = &MemcachedConnection::read_tls; do_read_ = &MemcachedConnection::read_tls;
do_write_ = &MemcachedConnection::write_tls; do_write_ = &MemcachedConnection::write_tls;
@ -325,7 +342,9 @@ int MemcachedConnection::read_tls() {
return 0; return 0;
} }
ev_timer_again(conn_.loop, &conn_.rt); if (ev_is_active(&conn_.rt)) {
ev_timer_again(conn_.loop, &conn_.rt);
}
for (;;) { for (;;) {
auto nread = conn_.read_tls(recvbuf_.last, recvbuf_.wleft()); auto nread = conn_.read_tls(recvbuf_.last, recvbuf_.wleft());
@ -386,7 +405,9 @@ int MemcachedConnection::read_clear() {
return 0; return 0;
} }
ev_timer_again(conn_.loop, &conn_.rt); if (ev_is_active(&conn_.rt)) {
ev_timer_again(conn_.loop, &conn_.rt);
}
for (;;) { for (;;) {
auto nread = conn_.read_clear(recvbuf_.last, recvbuf_.wleft()); auto nread = conn_.read_clear(recvbuf_.last, recvbuf_.wleft());
@ -535,9 +556,16 @@ int MemcachedConnection::parse_packet() {
} }
} }
// We require at least one complete response to clear try count.
try_count_ = 0;
auto req = std::move(recvq_.front()); auto req = std::move(recvq_.front());
recvq_.pop_front(); recvq_.pop_front();
if (sendq_.empty() && recvq_.empty()) {
ev_timer_stop(conn_.loop, &conn_.rt);
}
if (!req->canceled && req->cb) { if (!req->canceled && req->cb) {
req->cb(req.get(), MemcachedResult(parse_state_.status_code, req->cb(req.get(), MemcachedResult(parse_state_.status_code,
std::move(parse_state_.value))); std::move(parse_state_.value)));
@ -635,6 +663,13 @@ void MemcachedConnection::drain_send_queue(size_t nwrite) {
recvq_.push_back(std::move(sendq_.front())); recvq_.push_back(std::move(sendq_.front()));
sendq_.pop_front(); sendq_.pop_front();
} }
// start read timer only when we wait for responses.
if (recvq_.empty()) {
ev_timer_stop(conn_.loop, &conn_.rt);
} else if (!ev_is_active(&conn_.rt)) {
ev_timer_again(conn_.loop, &conn_.rt);
}
} }
size_t MemcachedConnection::serialized_size(MemcachedRequest *req) { size_t MemcachedConnection::serialized_size(MemcachedRequest *req) {
@ -676,6 +711,10 @@ void MemcachedConnection::make_request(MemcachedSendbuf *sendbuf,
} }
int MemcachedConnection::add_request(std::unique_ptr<MemcachedRequest> req) { int MemcachedConnection::add_request(std::unique_ptr<MemcachedRequest> req) {
if (connect_blocker_.blocked()) {
return -1;
}
sendq_.push_back(std::move(req)); sendq_.push_back(std::move(req));
if (connected_) { if (connected_) {
@ -684,6 +723,7 @@ int MemcachedConnection::add_request(std::unique_ptr<MemcachedRequest> req) {
} }
if (conn_.fd == -1 && initiate_connection() != 0) { if (conn_.fd == -1 && initiate_connection() != 0) {
connect_blocker_.on_failure();
disconnect(); disconnect();
return -1; return -1;
} }
@ -696,4 +736,50 @@ void MemcachedConnection::signal_write() { conn_.wlimit.startw(); }
int MemcachedConnection::noop() { return 0; } int MemcachedConnection::noop() { return 0; }
void MemcachedConnection::reconnect_or_fail() {
if (!connected_ || (recvq_.empty() && sendq_.empty())) {
disconnect();
return;
}
constexpr size_t MAX_TRY_COUNT = 3;
if (++try_count_ >= MAX_TRY_COUNT) {
if (LOG_ENABLED(INFO)) {
MCLOG(INFO, this) << "Tried " << MAX_TRY_COUNT
<< " times, and all failed. Aborting";
}
try_count_ = 0;
disconnect();
return;
}
std::vector<std::unique_ptr<MemcachedRequest>> q;
q.reserve(recvq_.size() + sendq_.size());
if (LOG_ENABLED(INFO)) {
MCLOG(INFO, this) << "Retry connection, enqueue "
<< recvq_.size() + sendq_.size() << " request(s) again";
}
q.insert(std::end(q), std::make_move_iterator(std::begin(recvq_)),
std::make_move_iterator(std::end(recvq_)));
q.insert(std::end(q), std::make_move_iterator(std::begin(sendq_)),
std::make_move_iterator(std::end(sendq_)));
recvq_.clear();
sendq_.clear();
disconnect();
sendq_.insert(std::end(sendq_), std::make_move_iterator(std::begin(q)),
std::make_move_iterator(std::end(q)));
if (initiate_connection() != 0) {
connect_blocker_.on_failure();
disconnect();
return;
}
}
} // namespace shrpx } // namespace shrpx

View File

@ -34,6 +34,7 @@
#include "shrpx_connection.h" #include "shrpx_connection.h"
#include "shrpx_ssl.h" #include "shrpx_ssl.h"
#include "shrpx_connect_blocker.h"
#include "buffer.h" #include "buffer.h"
#include "network.h" #include "network.h"
@ -96,7 +97,7 @@ class MemcachedConnection {
public: public:
MemcachedConnection(const Address *addr, struct ev_loop *loop, MemcachedConnection(const Address *addr, struct ev_loop *loop,
SSL_CTX *ssl_ctx, const StringRef &sni_name, SSL_CTX *ssl_ctx, const StringRef &sni_name,
MemchunkPool *mcpool); MemchunkPool *mcpool, std::mt19937 &gen);
~MemcachedConnection(); ~MemcachedConnection();
void disconnect(); void disconnect();
@ -126,6 +127,8 @@ public:
int noop(); int noop();
void reconnect_or_fail();
private: private:
Connection conn_; Connection conn_;
std::deque<std::unique_ptr<MemcachedRequest>> recvq_; std::deque<std::unique_ptr<MemcachedRequest>> recvq_;
@ -134,11 +137,13 @@ private:
std::function<int(MemcachedConnection &)> do_read_, do_write_; std::function<int(MemcachedConnection &)> do_read_, do_write_;
std::string sni_name_; std::string sni_name_;
ssl::TLSSessionCache tls_session_cache_; ssl::TLSSessionCache tls_session_cache_;
ConnectBlocker connect_blocker_;
MemcachedParseState parse_state_; MemcachedParseState parse_state_;
const Address *addr_; const Address *addr_;
SSL_CTX *ssl_ctx_; SSL_CTX *ssl_ctx_;
// Sum of the bytes to be transmitted in sendbufv_. // Sum of the bytes to be transmitted in sendbufv_.
size_t sendsum_; size_t sendsum_;
size_t try_count_;
bool connected_; bool connected_;
Buffer<8_k> recvbuf_; Buffer<8_k> recvbuf_;
}; };

View File

@ -33,10 +33,11 @@ namespace shrpx {
MemcachedDispatcher::MemcachedDispatcher(const Address *addr, MemcachedDispatcher::MemcachedDispatcher(const Address *addr,
struct ev_loop *loop, SSL_CTX *ssl_ctx, struct ev_loop *loop, SSL_CTX *ssl_ctx,
const StringRef &sni_name, const StringRef &sni_name,
MemchunkPool *mcpool) MemchunkPool *mcpool,
std::mt19937 &gen)
: loop_(loop), : loop_(loop),
mconn_(make_unique<MemcachedConnection>(addr, loop_, ssl_ctx, sni_name, mconn_(make_unique<MemcachedConnection>(addr, loop_, ssl_ctx, sni_name,
mcpool)) {} mcpool, gen)) {}
MemcachedDispatcher::~MemcachedDispatcher() {} MemcachedDispatcher::~MemcachedDispatcher() {}

View File

@ -28,6 +28,7 @@
#include "shrpx.h" #include "shrpx.h"
#include <memory> #include <memory>
#include <random>
#include <ev.h> #include <ev.h>
@ -45,7 +46,7 @@ class MemcachedDispatcher {
public: public:
MemcachedDispatcher(const Address *addr, struct ev_loop *loop, MemcachedDispatcher(const Address *addr, struct ev_loop *loop,
SSL_CTX *ssl_ctx, const StringRef &sni_name, SSL_CTX *ssl_ctx, const StringRef &sni_name,
MemchunkPool *mcpool); MemchunkPool *mcpool, std::mt19937 &gen);
~MemcachedDispatcher(); ~MemcachedDispatcher();
int add_request(std::unique_ptr<MemcachedRequest> req); int add_request(std::unique_ptr<MemcachedRequest> req);

View File

@ -147,7 +147,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
session_cache_memcached_dispatcher_ = make_unique<MemcachedDispatcher>( session_cache_memcached_dispatcher_ = make_unique<MemcachedDispatcher>(
&session_cacheconf.memcached.addr, loop, &session_cacheconf.memcached.addr, loop,
tls_session_cache_memcached_ssl_ctx, tls_session_cache_memcached_ssl_ctx,
StringRef{session_cacheconf.memcached.host}, &mcpool_); StringRef{session_cacheconf.memcached.host}, &mcpool_, randgen_);
} }
replace_downstream_config(std::move(downstreamconf)); replace_downstream_config(std::move(downstreamconf));
@ -192,6 +192,7 @@ void Worker::replace_downstream_config(
shared_addr->addrs.resize(src.addrs.size()); shared_addr->addrs.resize(src.addrs.size());
shared_addr->affinity = src.affinity; shared_addr->affinity = src.affinity;
shared_addr->affinity_hash = src.affinity_hash;
size_t num_http1 = 0; size_t num_http1 = 0;
size_t num_http2 = 0; size_t num_http2 = 0;

View File

@ -127,6 +127,9 @@ struct WeightedPri {
struct SharedDownstreamAddr { struct SharedDownstreamAddr {
std::vector<DownstreamAddr> addrs; std::vector<DownstreamAddr> addrs;
// Bunch of session affinity hash. Only used if affinity ==
// AFFINITY_IP.
std::vector<AffinityHash> affinity_hash;
// List of Http2Session which is not fully utilized (i.e., the // List of Http2Session which is not fully utilized (i.e., the
// server advertized maximum concurrency is not reached). We will // server advertized maximum concurrency is not reached). We will
// coalesce as much stream as possible in one Http2Session to fully // coalesce as much stream as possible in one Http2Session to fully

View File

@ -379,6 +379,10 @@ void nb_child_cb(struct ev_loop *loop, ev_child *w, int revents) {
} // namespace } // namespace
#endif // HAVE_NEVERBLEED #endif // HAVE_NEVERBLEED
namespace {
std::random_device rd;
} // namespace
int worker_process_event_loop(WorkerProcessConfig *wpconf) { int worker_process_event_loop(WorkerProcessConfig *wpconf) {
if (reopen_log_files() != 0) { if (reopen_log_files() != 0) {
LOG(FATAL) << "Failed to open log file"; LOG(FATAL) << "Failed to open log file";
@ -387,7 +391,9 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) {
auto loop = EV_DEFAULT; auto loop = EV_DEFAULT;
ConnectionHandler conn_handler(loop); auto gen = std::mt19937(rd());
ConnectionHandler conn_handler(loop, gen);
for (auto &addr : get_config()->conn.listener.addrs) { for (auto &addr : get_config()->conn.listener.addrs) {
conn_handler.add_acceptor(make_unique<AcceptHandler>(&addr, &conn_handler)); conn_handler.add_acceptor(make_unique<AcceptHandler>(&addr, &conn_handler));
@ -435,7 +441,7 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) {
conn_handler.set_tls_ticket_key_memcached_dispatcher( conn_handler.set_tls_ticket_key_memcached_dispatcher(
make_unique<MemcachedDispatcher>( make_unique<MemcachedDispatcher>(
&ticketconf.memcached.addr, loop, ssl_ctx, &ticketconf.memcached.addr, loop, ssl_ctx,
StringRef{memcachedconf.host}, &mcpool)); StringRef{memcachedconf.host}, &mcpool, gen));
ev_timer_init(&renew_ticket_key_timer, memcached_get_ticket_key_cb, 0., ev_timer_init(&renew_ticket_key_timer, memcached_get_ticket_key_cb, 0.,
0.); 0.);

View File

@ -54,8 +54,11 @@
#include <iostream> #include <iostream>
#include <fstream> #include <fstream>
#include <openssl/evp.h>
#include <nghttp2/nghttp2.h> #include <nghttp2/nghttp2.h>
#include "ssl_compat.h"
#include "timegm.h" #include "timegm.h"
namespace nghttp2 { namespace nghttp2 {
@ -1325,6 +1328,59 @@ double int_pow(double x, size_t y) {
return res; return res;
} }
uint32_t hash32(const StringRef &s) {
/* 32 bit FNV-1a: http://isthe.com/chongo/tech/comp/fnv/ */
uint32_t h = 2166136261u;
size_t i;
for (i = 0; i < s.size(); ++i) {
h ^= s[i];
h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24);
}
return h;
}
#if !OPENSSL_101_API
namespace {
EVP_MD_CTX *EVP_MD_CTX_new(void) { return EVP_MD_CTX_create(); }
} // namespace
namespace {
void EVP_MD_CTX_free(EVP_MD_CTX *ctx) { EVP_MD_CTX_destroy(ctx); }
} // namespace
#endif
int sha256(uint8_t *res, const StringRef &s) {
int rv;
auto ctx = EVP_MD_CTX_new();
if (ctx == nullptr) {
return -1;
}
auto ctx_deleter = defer(EVP_MD_CTX_free, ctx);
rv = EVP_DigestInit_ex(ctx, EVP_sha256(), nullptr);
if (rv != 1) {
return -1;
}
rv = EVP_DigestUpdate(ctx, s.c_str(), s.size());
if (rv != 1) {
return -1;
}
unsigned int mdlen = 32;
rv = EVP_DigestFinal_ex(ctx, res, &mdlen);
if (rv != 1) {
return -1;
}
return 0;
}
} // namespace util } // namespace util
} // namespace nghttp2 } // namespace nghttp2

View File

@ -680,6 +680,12 @@ OutputIterator copy_lit(OutputIterator it, CharT(&s)[N]) {
// Returns x**y // Returns x**y
double int_pow(double x, size_t y); double int_pow(double x, size_t y);
uint32_t hash32(const StringRef &s);
// Computes SHA-256 of |s|, and stores it in |buf|. This function
// returns 0 if it succeeds, or -1.
int sha256(uint8_t *buf, const StringRef &s);
} // namespace util } // namespace util
} // namespace nghttp2 } // namespace nghttp2