/* * nghttp2 - HTTP/2 C Library * * Copyright (c) 2012 Tatsuhiro Tsujikawa * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #ifndef SHRPX_WORKER_H #define SHRPX_WORKER_H #include "shrpx.h" #include #include #include #include #include #include #ifndef NOTHREADS # include #endif // NOTHREADS #include #include #include #include "shrpx_config.h" #include "shrpx_downstream_connection_pool.h" #include "memchunk.h" #include "shrpx_tls.h" #include "shrpx_live_check.h" #include "shrpx_connect_blocker.h" #include "shrpx_dns_tracker.h" #include "allocator.h" using namespace nghttp2; namespace shrpx { class Http2Session; class ConnectBlocker; class MemcachedDispatcher; struct UpstreamAddr; class ConnectionHandler; #ifdef HAVE_MRUBY namespace mruby { class MRubyContext; } // namespace mruby #endif // HAVE_MRUBY namespace tls { class CertLookupTree; } // namespace tls struct DownstreamAddr { Address addr; // backend address. If |host_unix| is true, this is UNIX domain // socket path. StringRef host; StringRef hostport; // backend port. 0 if |host_unix| is true. uint16_t port; // true if |host| contains UNIX domain socket path. bool host_unix; // sni field to send remote server if TLS is enabled. StringRef sni; std::unique_ptr connect_blocker; std::unique_ptr live_check; // Connection pool for this particular address if session affinity // is enabled std::unique_ptr dconn_pool; size_t fall; size_t rise; // Client side TLS session cache tls::TLSSessionCache tls_session_cache; // Http2Session object created for this address. This list chains // all Http2Session objects that is not in group scope // http2_avail_freelist, and is not reached in maximum concurrency. // // If session affinity is enabled, http2_avail_freelist is not used, // and this list is solely used. DList http2_extra_freelist; // true if Http2Session for this address is in group scope // SharedDownstreamAddr.http2_avail_freelist bool in_avail; // total number of streams created in HTTP/2 connections for this // address. size_t num_dconn; // Application protocol used in this backend shrpx_proto proto; // true if TLS is used in this backend bool tls; // true if dynamic DNS is enabled bool dns; // true if :scheme pseudo header field should be upgraded to secure // variant (e.g., "https") when forwarding request to a backend // connected by TLS connection. bool upgrade_scheme; }; // Simplified weighted fair queuing. Actually we don't use queue here // since we have just 2 items. This is the same algorithm used in // stream priority, but ignores remainder. struct WeightedPri { // current cycle of this item. The lesser cycle has higher // priority. This is unsigned 32 bit integer, so it may overflow. // But with the same theory described in stream priority, it is no // problem. uint32_t cycle; // weight, larger weight means more frequent use. uint32_t weight; }; struct SharedDownstreamAddr { SharedDownstreamAddr() : balloc(1024, 1024), affinity{AFFINITY_NONE}, next{0}, http1_pri{}, http2_pri{}, redirect_if_not_tls{false} {} SharedDownstreamAddr(const SharedDownstreamAddr &) = delete; SharedDownstreamAddr(SharedDownstreamAddr &&) = delete; SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete; SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete; BlockAllocator balloc; std::vector addrs; // Bunch of session affinity hash. Only used if affinity == // AFFINITY_IP. std::vector affinity_hash; // List of Http2Session which is not fully utilized (i.e., the // server advertised maximum concurrency is not reached). We will // coalesce as much stream as possible in one Http2Session to fully // utilize TCP connection. // // If session affinity is enabled, this list is not used. Per // address http2_extra_freelist is used instead. // // TODO Verify that this approach performs better in performance // wise. DList http2_avail_freelist; DownstreamConnectionPool dconn_pool; // Configuration for session affinity AffinityConfig affinity; // Next http/1.1 downstream address index in addrs. size_t next; // http1_pri and http2_pri are used to which protocols are used // between HTTP/1.1 or HTTP/2 if they both are available in // backends. They are choosed proportional to the number available // backend. Usually, if http1_pri.cycle < http2_pri.cycle, choose // HTTP/1.1. Otherwise, choose HTTP/2. WeightedPri http1_pri; WeightedPri http2_pri; // Session affinity // true if this group requires that client connection must be TLS, // and the request must be redirected to https URI. bool redirect_if_not_tls; }; struct DownstreamAddrGroup { DownstreamAddrGroup(); ~DownstreamAddrGroup(); DownstreamAddrGroup(const DownstreamAddrGroup &) = delete; DownstreamAddrGroup(DownstreamAddrGroup &&) = delete; DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete; DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete; ImmutableString pattern; std::shared_ptr shared_addr; #ifdef HAVE_MRUBY std::unique_ptr mruby_ctx; #endif // HAVE_MRUBY // true if this group is no longer used for new request. If this is // true, the connection made using one of address in shared_addr // must not be pooled. bool retired; }; struct WorkerStat { size_t num_connections; }; enum WorkerEventType { NEW_CONNECTION = 0x01, REOPEN_LOG = 0x02, GRACEFUL_SHUTDOWN = 0x03, REPLACE_DOWNSTREAM = 0x04, }; struct WorkerEvent { WorkerEventType type; struct { sockaddr_union client_addr; size_t client_addrlen; int client_fd; const UpstreamAddr *faddr; }; std::shared_ptr ticket_keys; std::shared_ptr downstreamconf; }; class Worker { public: Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, SSL_CTX *tls_session_cache_memcached_ssl_ctx, tls::CertLookupTree *cert_tree, const std::shared_ptr &ticket_keys, ConnectionHandler *conn_handler, std::shared_ptr downstreamconf); ~Worker(); void run_async(); void wait(); void process_events(); void send(const WorkerEvent &event); tls::CertLookupTree *get_cert_lookup_tree() const; // These 2 functions make a lock m_ to get/set ticket keys // atomically. std::shared_ptr get_ticket_keys(); void set_ticket_keys(std::shared_ptr ticket_keys); WorkerStat *get_worker_stat(); struct ev_loop *get_loop() const; SSL_CTX *get_sv_ssl_ctx() const; SSL_CTX *get_cl_ssl_ctx() const; void set_graceful_shutdown(bool f); bool get_graceful_shutdown() const; MemchunkPool *get_mcpool(); void schedule_clear_mcpool(); MemcachedDispatcher *get_session_cache_memcached_dispatcher(); std::mt19937 &get_randgen(); #ifdef HAVE_MRUBY int create_mruby_context(); mruby::MRubyContext *get_mruby_context() const; #endif // HAVE_MRUBY std::vector> & get_downstream_addr_groups(); ConnectBlocker *get_connect_blocker() const; const DownstreamConfig *get_downstream_config() const; void replace_downstream_config(std::shared_ptr downstreamconf); ConnectionHandler *get_connection_handler() const; DNSTracker *get_dns_tracker(); private: #ifndef NOTHREADS std::future fut_; #endif // NOTHREADS std::mutex m_; std::deque q_; std::mt19937 randgen_; ev_async w_; ev_timer mcpool_clear_timer_; ev_timer proc_wev_timer_; MemchunkPool mcpool_; WorkerStat worker_stat_; DNSTracker dns_tracker_; std::shared_ptr downstreamconf_; std::unique_ptr session_cache_memcached_dispatcher_; #ifdef HAVE_MRUBY std::unique_ptr mruby_ctx_; #endif // HAVE_MRUBY struct ev_loop *loop_; // Following fields are shared across threads if // get_config()->tls_ctx_per_worker == true. SSL_CTX *sv_ssl_ctx_; SSL_CTX *cl_ssl_ctx_; tls::CertLookupTree *cert_tree_; ConnectionHandler *conn_handler_; #ifndef HAVE_ATOMIC_STD_SHARED_PTR std::mutex ticket_keys_m_; #endif // !HAVE_ATOMIC_STD_SHARED_PTR std::shared_ptr ticket_keys_; std::vector> downstream_addr_groups_; // Worker level blocker for downstream connection. For example, // this is used when file decriptor is exhausted. std::unique_ptr connect_blocker_; bool graceful_shutdown_; }; // Selects group based on request's |hostport| and |path|. |hostport| // is the value taken from :authority or host header field, and may // contain port. The |path| may contain query part. We require the // catch-all pattern in place, so this function always selects one // group. The catch-all group index is given in |catch_all|. All // patterns are given in |groups|. size_t match_downstream_addr_group( const RouterConfig &routerconfig, const StringRef &hostport, const StringRef &path, const std::vector> &groups, size_t catch_all, BlockAllocator &balloc); // Calls this function if connecting to backend failed. |raddr| is // the actual address used to connect to backend, and it could be // nullptr. This function may schedule live check. void downstream_failure(DownstreamAddr *addr, const Address *raddr); } // namespace shrpx #endif // SHRPX_WORKER_H