/* * nghttp2 - HTTP/2 C Library * * Copyright (c) 2012 Tatsuhiro Tsujikawa * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #ifndef SHRPX_WORKER_H #define SHRPX_WORKER_H #include "shrpx.h" #include #include #include #include #include #include #include #ifndef NOTHREADS # include #endif // NOTHREADS #include #include #include #include "shrpx_config.h" #include "shrpx_downstream_connection_pool.h" #include "memchunk.h" #include "shrpx_tls.h" #include "shrpx_live_check.h" #include "shrpx_connect_blocker.h" #include "shrpx_dns_tracker.h" #include "allocator.h" using namespace nghttp2; namespace shrpx { class Http2Session; class ConnectBlocker; class MemcachedDispatcher; struct UpstreamAddr; class ConnectionHandler; #ifdef HAVE_MRUBY namespace mruby { class MRubyContext; } // namespace mruby #endif // HAVE_MRUBY namespace tls { class CertLookupTree; } // namespace tls struct WeightGroup; struct DownstreamAddr { Address addr; // backend address. If |host_unix| is true, this is UNIX domain // socket path. StringRef host; StringRef hostport; // backend port. 0 if |host_unix| is true. uint16_t port; // true if |host| contains UNIX domain socket path. bool host_unix; // sni field to send remote server if TLS is enabled. StringRef sni; std::unique_ptr connect_blocker; std::unique_ptr live_check; // Connection pool for this particular address if session affinity // is enabled std::unique_ptr dconn_pool; size_t fall; size_t rise; // Client side TLS session cache tls::TLSSessionCache tls_session_cache; // List of Http2Session which is not fully utilized (i.e., the // server advertised maximum concurrency is not reached). We will // coalesce as much stream as possible in one Http2Session to fully // utilize TCP connection. DList http2_extra_freelist; WeightGroup *wg; // total number of streams created in HTTP/2 connections for this // address. size_t num_dconn; // the sequence number of this address to randomize the order access // threads. size_t seq; // Application protocol used in this backend Proto proto; // cycle is used to prioritize this address. Lower value takes // higher priority. uint32_t cycle; // penalty which is applied to the next cycle calculation. uint32_t pending_penalty; // Weight of this address inside a weight group. Its range is [1, // 256], inclusive. uint32_t weight; // name of group which this address belongs to. StringRef group; // Weight of the weight group which this address belongs to. Its // range is [1, 256], inclusive. uint32_t group_weight; // true if TLS is used in this backend bool tls; // true if dynamic DNS is enabled bool dns; // true if :scheme pseudo header field should be upgraded to secure // variant (e.g., "https") when forwarding request to a backend // connected by TLS connection. bool upgrade_scheme; // true if this address is queued. bool queued; }; constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256; struct DownstreamAddrEntry { DownstreamAddr *addr; size_t seq; uint32_t cycle; }; struct DownstreamAddrEntryGreater { bool operator()(const DownstreamAddrEntry &lhs, const DownstreamAddrEntry &rhs) const { auto d = lhs.cycle - rhs.cycle; if (d == 0) { return rhs.seq < lhs.seq; } return d <= MAX_DOWNSTREAM_ADDR_WEIGHT; } }; struct WeightGroup { std::priority_queue, DownstreamAddrEntryGreater> pq; size_t seq; uint32_t weight; uint32_t cycle; uint32_t pending_penalty; // true if this object is queued. bool queued; }; struct WeightGroupEntry { WeightGroup *wg; size_t seq; uint32_t cycle; }; struct WeightGroupEntryGreater { bool operator()(const WeightGroupEntry &lhs, const WeightGroupEntry &rhs) const { auto d = lhs.cycle - rhs.cycle; if (d == 0) { return rhs.seq < lhs.seq; } return d <= MAX_DOWNSTREAM_ADDR_WEIGHT; } }; struct SharedDownstreamAddr { SharedDownstreamAddr() : balloc(1024, 1024), affinity{SessionAffinity::NONE}, redirect_if_not_tls{false} {} SharedDownstreamAddr(const SharedDownstreamAddr &) = delete; SharedDownstreamAddr(SharedDownstreamAddr &&) = delete; SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete; SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete; BlockAllocator balloc; std::vector addrs; std::vector wgs; std::priority_queue, WeightGroupEntryGreater> pq; // Bunch of session affinity hash. Only used if affinity == // SessionAffinity::IP. std::vector affinity_hash; // Configuration for session affinity AffinityConfig affinity; // Session affinity // true if this group requires that client connection must be TLS, // and the request must be redirected to https URI. bool redirect_if_not_tls; // Timeouts for backend connection. struct { ev_tstamp read; ev_tstamp write; } timeout; }; struct DownstreamAddrGroup { DownstreamAddrGroup(); ~DownstreamAddrGroup(); DownstreamAddrGroup(const DownstreamAddrGroup &) = delete; DownstreamAddrGroup(DownstreamAddrGroup &&) = delete; DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete; DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete; ImmutableString pattern; std::shared_ptr shared_addr; #ifdef HAVE_MRUBY std::shared_ptr mruby_ctx; #endif // HAVE_MRUBY // true if this group is no longer used for new request. If this is // true, the connection made using one of address in shared_addr // must not be pooled. bool retired; }; struct WorkerStat { size_t num_connections; }; enum class WorkerEventType { NEW_CONNECTION = 0x01, REOPEN_LOG = 0x02, GRACEFUL_SHUTDOWN = 0x03, REPLACE_DOWNSTREAM = 0x04, }; struct WorkerEvent { WorkerEventType type; struct { sockaddr_union client_addr; size_t client_addrlen; int client_fd; const UpstreamAddr *faddr; }; std::shared_ptr ticket_keys; std::shared_ptr downstreamconf; }; class Worker { public: Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, SSL_CTX *tls_session_cache_memcached_ssl_ctx, tls::CertLookupTree *cert_tree, const std::shared_ptr &ticket_keys, ConnectionHandler *conn_handler, std::shared_ptr downstreamconf); ~Worker(); void run_async(); void wait(); void process_events(); void send(const WorkerEvent &event); tls::CertLookupTree *get_cert_lookup_tree() const; // These 2 functions make a lock m_ to get/set ticket keys // atomically. std::shared_ptr get_ticket_keys(); void set_ticket_keys(std::shared_ptr ticket_keys); WorkerStat *get_worker_stat(); struct ev_loop *get_loop() const; SSL_CTX *get_sv_ssl_ctx() const; SSL_CTX *get_cl_ssl_ctx() const; void set_graceful_shutdown(bool f); bool get_graceful_shutdown() const; MemchunkPool *get_mcpool(); void schedule_clear_mcpool(); MemcachedDispatcher *get_session_cache_memcached_dispatcher(); std::mt19937 &get_randgen(); #ifdef HAVE_MRUBY int create_mruby_context(); mruby::MRubyContext *get_mruby_context() const; #endif // HAVE_MRUBY std::vector> & get_downstream_addr_groups(); ConnectBlocker *get_connect_blocker() const; const DownstreamConfig *get_downstream_config() const; void replace_downstream_config(std::shared_ptr downstreamconf); ConnectionHandler *get_connection_handler() const; DNSTracker *get_dns_tracker(); private: #ifndef NOTHREADS std::future fut_; #endif // NOTHREADS std::mutex m_; std::deque q_; std::mt19937 randgen_; ev_async w_; ev_timer mcpool_clear_timer_; ev_timer proc_wev_timer_; MemchunkPool mcpool_; WorkerStat worker_stat_; DNSTracker dns_tracker_; std::shared_ptr downstreamconf_; std::unique_ptr session_cache_memcached_dispatcher_; #ifdef HAVE_MRUBY std::unique_ptr mruby_ctx_; #endif // HAVE_MRUBY struct ev_loop *loop_; // Following fields are shared across threads if // get_config()->tls_ctx_per_worker == true. SSL_CTX *sv_ssl_ctx_; SSL_CTX *cl_ssl_ctx_; tls::CertLookupTree *cert_tree_; ConnectionHandler *conn_handler_; #ifndef HAVE_ATOMIC_STD_SHARED_PTR std::mutex ticket_keys_m_; #endif // !HAVE_ATOMIC_STD_SHARED_PTR std::shared_ptr ticket_keys_; std::vector> downstream_addr_groups_; // Worker level blocker for downstream connection. For example, // this is used when file decriptor is exhausted. std::unique_ptr connect_blocker_; bool graceful_shutdown_; }; // Selects group based on request's |hostport| and |path|. |hostport| // is the value taken from :authority or host header field, and may // contain port. The |path| may contain query part. We require the // catch-all pattern in place, so this function always selects one // group. The catch-all group index is given in |catch_all|. All // patterns are given in |groups|. size_t match_downstream_addr_group( const RouterConfig &routerconfig, const StringRef &hostport, const StringRef &path, const std::vector> &groups, size_t catch_all, BlockAllocator &balloc); // Calls this function if connecting to backend failed. |raddr| is // the actual address used to connect to backend, and it could be // nullptr. This function may schedule live check. void downstream_failure(DownstreamAddr *addr, const Address *raddr); } // namespace shrpx #endif // SHRPX_WORKER_H