nghttp2/src/shrpx_worker.h

331 lines
10 KiB
C++

/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2012 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_WORKER_H
#define SHRPX_WORKER_H
#include "shrpx.h"
#include <mutex>
#include <vector>
#include <random>
#include <unordered_map>
#include <deque>
#include <thread>
#ifndef NOTHREADS
#include <future>
#endif // NOTHREADS
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <ev.h>
#include "shrpx_config.h"
#include "shrpx_downstream_connection_pool.h"
#include "memchunk.h"
#include "shrpx_ssl.h"
#include "shrpx_live_check.h"
#include "shrpx_connect_blocker.h"
#include "shrpx_dns_tracker.h"
#include "allocator.h"
using namespace nghttp2;
namespace shrpx {
class Http2Session;
class ConnectBlocker;
class MemcachedDispatcher;
struct UpstreamAddr;
class ConnectionHandler;
#ifdef HAVE_MRUBY
namespace mruby {
class MRubyContext;
} // namespace mruby
#endif // HAVE_MRUBY
namespace ssl {
class CertLookupTree;
} // namespace ssl
struct DownstreamAddr {
Address addr;
// backend address. If |host_unix| is true, this is UNIX domain
// socket path.
StringRef host;
StringRef hostport;
// backend port. 0 if |host_unix| is true.
uint16_t port;
// true if |host| contains UNIX domain socket path.
bool host_unix;
// sni field to send remote server if TLS is enabled.
StringRef sni;
std::unique_ptr<ConnectBlocker> connect_blocker;
std::unique_ptr<LiveCheck> live_check;
// Connection pool for this particular address if session affinity
// is enabled
std::unique_ptr<DownstreamConnectionPool> dconn_pool;
size_t fall;
size_t rise;
// Client side TLS session cache
ssl::TLSSessionCache tls_session_cache;
// Http2Session object created for this address. This list chains
// all Http2Session objects that is not in group scope
// http2_avail_freelist, and is not reached in maximum concurrency.
//
// If session affinity is enabled, http2_avail_freelist is not used,
// and this list is solely used.
DList<Http2Session> http2_extra_freelist;
// true if Http2Session for this address is in group scope
// SharedDownstreamAddr.http2_avail_freelist
bool in_avail;
// total number of streams created in HTTP/2 connections for this
// address.
size_t num_dconn;
// Application protocol used in this backend
shrpx_proto proto;
// true if TLS is used in this backend
bool tls;
// true if dynamic DNS is enabled
bool dns;
};
// Simplified weighted fair queuing. Actually we don't use queue here
// since we have just 2 items. This is the same algorithm used in
// stream priority, but ignores remainder.
struct WeightedPri {
// current cycle of this item. The lesser cycle has higher
// priority. This is unsigned 32 bit integer, so it may overflow.
// But with the same theory described in stream priority, it is no
// problem.
uint32_t cycle;
// weight, larger weight means more frequent use.
uint32_t weight;
};
struct SharedDownstreamAddr {
SharedDownstreamAddr()
: balloc(1024, 1024),
next{0},
http1_pri{},
http2_pri{},
affinity{AFFINITY_NONE} {}
SharedDownstreamAddr(const SharedDownstreamAddr &) = delete;
SharedDownstreamAddr(SharedDownstreamAddr &&) = delete;
SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete;
SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete;
BlockAllocator balloc;
std::vector<DownstreamAddr> addrs;
// Bunch of session affinity hash. Only used if affinity ==
// AFFINITY_IP.
std::vector<AffinityHash> affinity_hash;
// List of Http2Session which is not fully utilized (i.e., the
// server advertized maximum concurrency is not reached). We will
// coalesce as much stream as possible in one Http2Session to fully
// utilize TCP connection.
//
// If session affinity is enabled, this list is not used. Per
// address http2_extra_freelist is used instead.
//
// TODO Verify that this approach performs better in performance
// wise.
DList<Http2Session> http2_avail_freelist;
DownstreamConnectionPool dconn_pool;
// Next http/1.1 downstream address index in addrs.
size_t next;
// http1_pri and http2_pri are used to which protocols are used
// between HTTP/1.1 or HTTP/2 if they both are available in
// backends. They are choosed proportional to the number available
// backend. Usually, if http1_pri.cycle < http2_pri.cycle, choose
// HTTP/1.1. Otherwise, choose HTTP/2.
WeightedPri http1_pri;
WeightedPri http2_pri;
// Session affinity
shrpx_session_affinity affinity;
};
struct DownstreamAddrGroup {
DownstreamAddrGroup() : retired{false} {};
DownstreamAddrGroup(const DownstreamAddrGroup &) = delete;
DownstreamAddrGroup(DownstreamAddrGroup &&) = delete;
DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete;
DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete;
ImmutableString pattern;
std::shared_ptr<SharedDownstreamAddr> shared_addr;
// true if this group is no longer used for new request. If this is
// true, the connection made using one of address in shared_addr
// must not be pooled.
bool retired;
};
struct WorkerStat {
size_t num_connections;
};
enum WorkerEventType {
NEW_CONNECTION = 0x01,
REOPEN_LOG = 0x02,
GRACEFUL_SHUTDOWN = 0x03,
REPLACE_DOWNSTREAM = 0x04,
};
struct WorkerEvent {
WorkerEventType type;
struct {
sockaddr_union client_addr;
size_t client_addrlen;
int client_fd;
const UpstreamAddr *faddr;
};
std::shared_ptr<TicketKeys> ticket_keys;
std::shared_ptr<DownstreamConfig> downstreamconf;
};
class Worker {
public:
Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
SSL_CTX *tls_session_cache_memcached_ssl_ctx,
ssl::CertLookupTree *cert_tree,
const std::shared_ptr<TicketKeys> &ticket_keys,
ConnectionHandler *conn_handler,
std::shared_ptr<DownstreamConfig> downstreamconf);
~Worker();
void run_async();
void wait();
void process_events();
void send(const WorkerEvent &event);
ssl::CertLookupTree *get_cert_lookup_tree() const;
// These 2 functions make a lock m_ to get/set ticket keys
// atomically.
std::shared_ptr<TicketKeys> get_ticket_keys();
void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys);
WorkerStat *get_worker_stat();
struct ev_loop *get_loop() const;
SSL_CTX *get_sv_ssl_ctx() const;
SSL_CTX *get_cl_ssl_ctx() const;
void set_graceful_shutdown(bool f);
bool get_graceful_shutdown() const;
MemchunkPool *get_mcpool();
void schedule_clear_mcpool();
MemcachedDispatcher *get_session_cache_memcached_dispatcher();
std::mt19937 &get_randgen();
#ifdef HAVE_MRUBY
int create_mruby_context();
mruby::MRubyContext *get_mruby_context() const;
#endif // HAVE_MRUBY
std::vector<std::shared_ptr<DownstreamAddrGroup>> &
get_downstream_addr_groups();
ConnectBlocker *get_connect_blocker() const;
const DownstreamConfig *get_downstream_config() const;
void
replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf);
ConnectionHandler *get_connection_handler() const;
DNSTracker *get_dns_tracker();
private:
#ifndef NOTHREADS
std::future<void> fut_;
#endif // NOTHREADS
std::mutex m_;
std::deque<WorkerEvent> q_;
std::mt19937 randgen_;
ev_async w_;
ev_timer mcpool_clear_timer_;
ev_timer proc_wev_timer_;
MemchunkPool mcpool_;
WorkerStat worker_stat_;
DNSTracker dns_tracker_;
std::shared_ptr<DownstreamConfig> downstreamconf_;
std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_;
#ifdef HAVE_MRUBY
std::unique_ptr<mruby::MRubyContext> mruby_ctx_;
#endif // HAVE_MRUBY
struct ev_loop *loop_;
// Following fields are shared across threads if
// get_config()->tls_ctx_per_worker == true.
SSL_CTX *sv_ssl_ctx_;
SSL_CTX *cl_ssl_ctx_;
ssl::CertLookupTree *cert_tree_;
ConnectionHandler *conn_handler_;
#ifndef HAVE_ATOMIC_STD_SHARED_PTR
std::mutex ticket_keys_m_;
#endif // !HAVE_ATOMIC_STD_SHARED_PTR
std::shared_ptr<TicketKeys> ticket_keys_;
std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_;
// Worker level blocker for downstream connection. For example,
// this is used when file decriptor is exhausted.
std::unique_ptr<ConnectBlocker> connect_blocker_;
bool graceful_shutdown_;
};
// Selects group based on request's |hostport| and |path|. |hostport|
// is the value taken from :authority or host header field, and may
// contain port. The |path| may contain query part. We require the
// catch-all pattern in place, so this function always selects one
// group. The catch-all group index is given in |catch_all|. All
// patterns are given in |groups|.
size_t match_downstream_addr_group(
const RouterConfig &routerconfig, const StringRef &hostport,
const StringRef &path,
const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
size_t catch_all, BlockAllocator &balloc);
// Calls this function if connecting to backend failed. |raddr| is
// the actual address used to connect to backend, and it could be
// nullptr. This function may schedule live check.
void downstream_failure(DownstreamAddr *addr, const Address *raddr);
} // namespace shrpx
#endif // SHRPX_WORKER_H