382 lines
11 KiB
C++
382 lines
11 KiB
C++
/*
|
|
* nghttp2 - HTTP/2 C Library
|
|
*
|
|
* Copyright (c) 2012 Tatsuhiro Tsujikawa
|
|
*
|
|
* Permission is hereby granted, free of charge, to any person obtaining
|
|
* a copy of this software and associated documentation files (the
|
|
* "Software"), to deal in the Software without restriction, including
|
|
* without limitation the rights to use, copy, modify, merge, publish,
|
|
* distribute, sublicense, and/or sell copies of the Software, and to
|
|
* permit persons to whom the Software is furnished to do so, subject to
|
|
* the following conditions:
|
|
*
|
|
* The above copyright notice and this permission notice shall be
|
|
* included in all copies or substantial portions of the Software.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
|
|
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
|
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
|
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
*/
|
|
#ifndef SHRPX_WORKER_H
|
|
#define SHRPX_WORKER_H
|
|
|
|
#include "shrpx.h"
|
|
|
|
#include <mutex>
|
|
#include <vector>
|
|
#include <random>
|
|
#include <unordered_map>
|
|
#include <deque>
|
|
#include <thread>
|
|
#include <queue>
|
|
#ifndef NOTHREADS
|
|
# include <future>
|
|
#endif // NOTHREADS
|
|
|
|
#include <openssl/ssl.h>
|
|
#include <openssl/err.h>
|
|
|
|
#include <ev.h>
|
|
|
|
#include "shrpx_config.h"
|
|
#include "shrpx_downstream_connection_pool.h"
|
|
#include "memchunk.h"
|
|
#include "shrpx_tls.h"
|
|
#include "shrpx_live_check.h"
|
|
#include "shrpx_connect_blocker.h"
|
|
#include "shrpx_dns_tracker.h"
|
|
#include "allocator.h"
|
|
|
|
using namespace nghttp2;
|
|
|
|
namespace shrpx {
|
|
|
|
class Http2Session;
|
|
class ConnectBlocker;
|
|
class MemcachedDispatcher;
|
|
struct UpstreamAddr;
|
|
class ConnectionHandler;
|
|
|
|
#ifdef HAVE_MRUBY
|
|
namespace mruby {
|
|
|
|
class MRubyContext;
|
|
|
|
} // namespace mruby
|
|
#endif // HAVE_MRUBY
|
|
|
|
namespace tls {
|
|
class CertLookupTree;
|
|
} // namespace tls
|
|
|
|
struct WeightGroup;
|
|
|
|
struct DownstreamAddr {
|
|
Address addr;
|
|
// backend address. If |host_unix| is true, this is UNIX domain
|
|
// socket path.
|
|
StringRef host;
|
|
StringRef hostport;
|
|
// backend port. 0 if |host_unix| is true.
|
|
uint16_t port;
|
|
// true if |host| contains UNIX domain socket path.
|
|
bool host_unix;
|
|
|
|
// sni field to send remote server if TLS is enabled.
|
|
StringRef sni;
|
|
|
|
std::unique_ptr<ConnectBlocker> connect_blocker;
|
|
std::unique_ptr<LiveCheck> live_check;
|
|
// Connection pool for this particular address if session affinity
|
|
// is enabled
|
|
std::unique_ptr<DownstreamConnectionPool> dconn_pool;
|
|
size_t fall;
|
|
size_t rise;
|
|
// Client side TLS session cache
|
|
tls::TLSSessionCache tls_session_cache;
|
|
// List of Http2Session which is not fully utilized (i.e., the
|
|
// server advertised maximum concurrency is not reached). We will
|
|
// coalesce as much stream as possible in one Http2Session to fully
|
|
// utilize TCP connection.
|
|
DList<Http2Session> http2_extra_freelist;
|
|
WeightGroup *wg;
|
|
// total number of streams created in HTTP/2 connections for this
|
|
// address.
|
|
size_t num_dconn;
|
|
// the sequence number of this address to randomize the order access
|
|
// threads.
|
|
size_t seq;
|
|
// Application protocol used in this backend
|
|
Proto proto;
|
|
// cycle is used to prioritize this address. Lower value takes
|
|
// higher priority.
|
|
uint32_t cycle;
|
|
// penalty which is applied to the next cycle calculation.
|
|
uint32_t pending_penalty;
|
|
// Weight of this address inside a weight group. Its range is [1,
|
|
// 256], inclusive.
|
|
uint32_t weight;
|
|
// name of group which this address belongs to.
|
|
StringRef group;
|
|
// Weight of the weight group which this address belongs to. Its
|
|
// range is [1, 256], inclusive.
|
|
uint32_t group_weight;
|
|
// true if TLS is used in this backend
|
|
bool tls;
|
|
// true if dynamic DNS is enabled
|
|
bool dns;
|
|
// true if :scheme pseudo header field should be upgraded to secure
|
|
// variant (e.g., "https") when forwarding request to a backend
|
|
// connected by TLS connection.
|
|
bool upgrade_scheme;
|
|
// true if this address is queued.
|
|
bool queued;
|
|
};
|
|
|
|
constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256;
|
|
|
|
struct DownstreamAddrEntry {
|
|
DownstreamAddr *addr;
|
|
size_t seq;
|
|
uint32_t cycle;
|
|
};
|
|
|
|
struct DownstreamAddrEntryGreater {
|
|
bool operator()(const DownstreamAddrEntry &lhs,
|
|
const DownstreamAddrEntry &rhs) const {
|
|
auto d = lhs.cycle - rhs.cycle;
|
|
if (d == 0) {
|
|
return rhs.seq < lhs.seq;
|
|
}
|
|
return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1;
|
|
}
|
|
};
|
|
|
|
struct WeightGroup {
|
|
std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
|
|
DownstreamAddrEntryGreater>
|
|
pq;
|
|
size_t seq;
|
|
uint32_t weight;
|
|
uint32_t cycle;
|
|
uint32_t pending_penalty;
|
|
// true if this object is queued.
|
|
bool queued;
|
|
};
|
|
|
|
struct WeightGroupEntry {
|
|
WeightGroup *wg;
|
|
size_t seq;
|
|
uint32_t cycle;
|
|
};
|
|
|
|
struct WeightGroupEntryGreater {
|
|
bool operator()(const WeightGroupEntry &lhs,
|
|
const WeightGroupEntry &rhs) const {
|
|
auto d = lhs.cycle - rhs.cycle;
|
|
if (d == 0) {
|
|
return rhs.seq < lhs.seq;
|
|
}
|
|
return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1;
|
|
}
|
|
};
|
|
|
|
struct SharedDownstreamAddr {
|
|
SharedDownstreamAddr()
|
|
: balloc(1024, 1024),
|
|
affinity{SessionAffinity::NONE},
|
|
redirect_if_not_tls{false},
|
|
timeout{} {}
|
|
|
|
SharedDownstreamAddr(const SharedDownstreamAddr &) = delete;
|
|
SharedDownstreamAddr(SharedDownstreamAddr &&) = delete;
|
|
SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete;
|
|
SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete;
|
|
|
|
BlockAllocator balloc;
|
|
std::vector<DownstreamAddr> addrs;
|
|
std::vector<WeightGroup> wgs;
|
|
std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
|
|
WeightGroupEntryGreater>
|
|
pq;
|
|
// Bunch of session affinity hash. Only used if affinity ==
|
|
// SessionAffinity::IP.
|
|
std::vector<AffinityHash> affinity_hash;
|
|
#ifdef HAVE_MRUBY
|
|
std::shared_ptr<mruby::MRubyContext> mruby_ctx;
|
|
#endif // HAVE_MRUBY
|
|
// Configuration for session affinity
|
|
AffinityConfig affinity;
|
|
// Session affinity
|
|
// true if this group requires that client connection must be TLS,
|
|
// and the request must be redirected to https URI.
|
|
bool redirect_if_not_tls;
|
|
// Timeouts for backend connection.
|
|
struct {
|
|
ev_tstamp read;
|
|
ev_tstamp write;
|
|
} timeout;
|
|
};
|
|
|
|
struct DownstreamAddrGroup {
|
|
DownstreamAddrGroup();
|
|
~DownstreamAddrGroup();
|
|
|
|
DownstreamAddrGroup(const DownstreamAddrGroup &) = delete;
|
|
DownstreamAddrGroup(DownstreamAddrGroup &&) = delete;
|
|
DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete;
|
|
DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete;
|
|
|
|
ImmutableString pattern;
|
|
std::shared_ptr<SharedDownstreamAddr> shared_addr;
|
|
// true if this group is no longer used for new request. If this is
|
|
// true, the connection made using one of address in shared_addr
|
|
// must not be pooled.
|
|
bool retired;
|
|
};
|
|
|
|
struct WorkerStat {
|
|
size_t num_connections;
|
|
};
|
|
|
|
enum class WorkerEventType {
|
|
NEW_CONNECTION = 0x01,
|
|
REOPEN_LOG = 0x02,
|
|
GRACEFUL_SHUTDOWN = 0x03,
|
|
REPLACE_DOWNSTREAM = 0x04,
|
|
};
|
|
|
|
struct WorkerEvent {
|
|
WorkerEventType type;
|
|
struct {
|
|
sockaddr_union client_addr;
|
|
size_t client_addrlen;
|
|
int client_fd;
|
|
const UpstreamAddr *faddr;
|
|
};
|
|
std::shared_ptr<TicketKeys> ticket_keys;
|
|
std::shared_ptr<DownstreamConfig> downstreamconf;
|
|
};
|
|
|
|
class Worker {
|
|
public:
|
|
Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
|
|
SSL_CTX *tls_session_cache_memcached_ssl_ctx,
|
|
tls::CertLookupTree *cert_tree,
|
|
const std::shared_ptr<TicketKeys> &ticket_keys,
|
|
ConnectionHandler *conn_handler,
|
|
std::shared_ptr<DownstreamConfig> downstreamconf);
|
|
~Worker();
|
|
void run_async();
|
|
void wait();
|
|
void process_events();
|
|
void send(const WorkerEvent &event);
|
|
|
|
tls::CertLookupTree *get_cert_lookup_tree() const;
|
|
|
|
// These 2 functions make a lock m_ to get/set ticket keys
|
|
// atomically.
|
|
std::shared_ptr<TicketKeys> get_ticket_keys();
|
|
void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys);
|
|
|
|
WorkerStat *get_worker_stat();
|
|
struct ev_loop *get_loop() const;
|
|
SSL_CTX *get_sv_ssl_ctx() const;
|
|
SSL_CTX *get_cl_ssl_ctx() const;
|
|
|
|
void set_graceful_shutdown(bool f);
|
|
bool get_graceful_shutdown() const;
|
|
|
|
MemchunkPool *get_mcpool();
|
|
void schedule_clear_mcpool();
|
|
|
|
MemcachedDispatcher *get_session_cache_memcached_dispatcher();
|
|
|
|
std::mt19937 &get_randgen();
|
|
|
|
#ifdef HAVE_MRUBY
|
|
int create_mruby_context();
|
|
|
|
mruby::MRubyContext *get_mruby_context() const;
|
|
#endif // HAVE_MRUBY
|
|
|
|
std::vector<std::shared_ptr<DownstreamAddrGroup>> &
|
|
get_downstream_addr_groups();
|
|
|
|
ConnectBlocker *get_connect_blocker() const;
|
|
|
|
const DownstreamConfig *get_downstream_config() const;
|
|
|
|
void
|
|
replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf);
|
|
|
|
ConnectionHandler *get_connection_handler() const;
|
|
|
|
DNSTracker *get_dns_tracker();
|
|
|
|
private:
|
|
#ifndef NOTHREADS
|
|
std::future<void> fut_;
|
|
#endif // NOTHREADS
|
|
std::mutex m_;
|
|
std::deque<WorkerEvent> q_;
|
|
std::mt19937 randgen_;
|
|
ev_async w_;
|
|
ev_timer mcpool_clear_timer_;
|
|
ev_timer proc_wev_timer_;
|
|
MemchunkPool mcpool_;
|
|
WorkerStat worker_stat_;
|
|
DNSTracker dns_tracker_;
|
|
|
|
std::shared_ptr<DownstreamConfig> downstreamconf_;
|
|
std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_;
|
|
#ifdef HAVE_MRUBY
|
|
std::unique_ptr<mruby::MRubyContext> mruby_ctx_;
|
|
#endif // HAVE_MRUBY
|
|
struct ev_loop *loop_;
|
|
|
|
// Following fields are shared across threads if
|
|
// get_config()->tls_ctx_per_worker == true.
|
|
SSL_CTX *sv_ssl_ctx_;
|
|
SSL_CTX *cl_ssl_ctx_;
|
|
tls::CertLookupTree *cert_tree_;
|
|
ConnectionHandler *conn_handler_;
|
|
|
|
#ifndef HAVE_ATOMIC_STD_SHARED_PTR
|
|
std::mutex ticket_keys_m_;
|
|
#endif // !HAVE_ATOMIC_STD_SHARED_PTR
|
|
std::shared_ptr<TicketKeys> ticket_keys_;
|
|
std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_;
|
|
// Worker level blocker for downstream connection. For example,
|
|
// this is used when file decriptor is exhausted.
|
|
std::unique_ptr<ConnectBlocker> connect_blocker_;
|
|
|
|
bool graceful_shutdown_;
|
|
};
|
|
|
|
// Selects group based on request's |hostport| and |path|. |hostport|
|
|
// is the value taken from :authority or host header field, and may
|
|
// contain port. The |path| may contain query part. We require the
|
|
// catch-all pattern in place, so this function always selects one
|
|
// group. The catch-all group index is given in |catch_all|. All
|
|
// patterns are given in |groups|.
|
|
size_t match_downstream_addr_group(
|
|
const RouterConfig &routerconfig, const StringRef &hostport,
|
|
const StringRef &path,
|
|
const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
|
|
size_t catch_all, BlockAllocator &balloc);
|
|
|
|
// Calls this function if connecting to backend failed. |raddr| is
|
|
// the actual address used to connect to backend, and it could be
|
|
// nullptr. This function may schedule live check.
|
|
void downstream_failure(DownstreamAddr *addr, const Address *raddr);
|
|
|
|
} // namespace shrpx
|
|
|
|
#endif // SHRPX_WORKER_H
|