From 0ca7c4cb387ff41759527b31d97386b71e87cf69 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sat, 4 Jun 2016 01:02:57 +0900 Subject: [PATCH] nghttpx: Send notice to replace downstream via ConnectionHandler --- src/shrpx_api_downstream_connection.cc | 5 +- src/shrpx_connection_handler.cc | 80 ++++++++++++++++++++++++-- src/shrpx_connection_handler.h | 32 +++++++++++ src/shrpx_worker.cc | 19 +++++- src/shrpx_worker.h | 14 ++++- 5 files changed, 137 insertions(+), 13 deletions(-) diff --git a/src/shrpx_api_downstream_connection.cc b/src/shrpx_api_downstream_connection.cc index e67a7f49..b6eaca14 100644 --- a/src/shrpx_api_downstream_connection.cc +++ b/src/shrpx_api_downstream_connection.cc @@ -28,6 +28,7 @@ #include "shrpx_upstream.h" #include "shrpx_downstream.h" #include "shrpx_worker.h" +#include "shrpx_connection_handler.h" namespace shrpx { @@ -176,7 +177,9 @@ int APIDownstreamConnection::end_upload_data() { return 0; } - worker_->replace_downstream_config(downstreamconf); + auto conn_handler = worker_->get_connection_handler(); + + conn_handler->send_replace_downstream(downstreamconf); send_reply(200, body); diff --git a/src/shrpx_connection_handler.cc b/src/shrpx_connection_handler.cc index dc3223a9..7b862a47 100644 --- a/src/shrpx_connection_handler.cc +++ b/src/shrpx_connection_handler.cc @@ -102,6 +102,14 @@ void thread_join_async_cb(struct ev_loop *loop, ev_async *w, int revent) { } } // namespace +namespace { +void serial_event_async_cb(struct ev_loop *loop, ev_async *w, int revent) { + auto h = static_cast(w->data); + + h->handle_serial_event(); +} +} // namespace + namespace { std::random_device rd; } // namespace @@ -125,6 +133,11 @@ ConnectionHandler::ConnectionHandler(struct ev_loop *loop) ev_async_init(&thread_join_asyncev_, thread_join_async_cb); + ev_async_init(&serial_event_asyncev_, serial_event_async_cb); + serial_event_asyncev_.data = this; + + ev_async_start(loop_, &serial_event_asyncev_); + ev_child_init(&ocsp_.chldev, ocsp_chld_cb, 0, 0); ocsp_.chldev.data = this; @@ -136,6 +149,7 @@ ConnectionHandler::ConnectionHandler(struct ev_loop *loop) ConnectionHandler::~ConnectionHandler() { ev_child_stop(loop_, &ocsp_.chldev); + ev_async_stop(loop_, &serial_event_asyncev_); ev_async_stop(loop_, &thread_join_asyncev_); ev_io_stop(loop_, &ocsp_.rev); ev_timer_stop(loop_, &ocsp_timer_); @@ -175,6 +189,18 @@ void ConnectionHandler::worker_reopen_log_files() { } } +void ConnectionHandler::worker_replace_downstream( + std::shared_ptr downstreamconf) { + WorkerEvent wev{}; + + wev.type = REPLACE_DOWNSTREAM; + wev.downstreamconf = std::move(downstreamconf); + + for (auto &worker : workers_) { + worker->send(wev); + } +} + int ConnectionHandler::create_single_worker() { auto cert_tree = ssl::create_cert_lookup_tree(); auto sv_ssl_ctx = ssl::setup_server_ssl_context(all_ssl_ctx_, cert_tree @@ -207,9 +233,9 @@ int ConnectionHandler::create_single_worker() { all_ssl_ctx_.push_back(session_cache_ssl_ctx); } - single_worker_ = - make_unique(loop_, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, - cert_tree, ticket_keys_); + single_worker_ = make_unique( + loop_, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree, + ticket_keys_, this, get_config()->conn.downstream); #ifdef HAVE_MRUBY if (single_worker_->create_mruby_context() != 0) { return -1; @@ -256,9 +282,9 @@ int ConnectionHandler::create_worker_thread(size_t num) { StringRef{memcachedconf.private_key_file}, nullptr); all_ssl_ctx_.push_back(session_cache_ssl_ctx); } - auto worker = - make_unique(loop, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, - cert_tree, ticket_keys_); + auto worker = make_unique( + loop, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree, + ticket_keys_, this, get_config()->conn.downstream); #ifdef HAVE_MRUBY if (worker->create_mruby_context() != 0) { return -1; @@ -782,4 +808,46 @@ neverbleed_t *ConnectionHandler::get_neverbleed() const { return nb_.get(); } #endif // HAVE_NEVERBLEED +void ConnectionHandler::handle_serial_event() { + std::vector q; + { + std::lock_guard g(serial_event_mu_); + q.swap(serial_events_); + } + + for (auto &sev : q) { + switch (sev.type) { + case SEV_REPLACE_DOWNSTREAM: + // TODO make sure that none of worker uses + // get_config()->conn.downstream + mod_config()->conn.downstream = sev.downstreamconf; + + if (single_worker_) { + single_worker_->replace_downstream_config(sev.downstreamconf); + + break; + } + + worker_replace_downstream(sev.downstreamconf); + + break; + } + } +} + +void ConnectionHandler::send_replace_downstream( + const std::shared_ptr &downstreamconf) { + send_serial_event(SerialEvent(SEV_REPLACE_DOWNSTREAM, downstreamconf)); +} + +void ConnectionHandler::send_serial_event(SerialEvent ev) { + { + std::lock_guard g(serial_event_mu_); + + serial_events_.push_back(std::move(ev)); + } + + ev_async_send(loop_, &serial_event_asyncev_); +} + } // namespace shrpx diff --git a/src/shrpx_connection_handler.h b/src/shrpx_connection_handler.h index 8602b359..cd805f57 100644 --- a/src/shrpx_connection_handler.h +++ b/src/shrpx_connection_handler.h @@ -48,6 +48,7 @@ #endif // HAVE_NEVERBLEED #include "shrpx_downstream_connection_pool.h" +#include "shrpx_config.h" namespace shrpx { @@ -76,6 +77,21 @@ struct OCSPUpdateContext { pid_t pid; }; +// SerialEvent is an event sent from Worker thread. +enum SerialEventType { + SEV_NONE, + SEV_REPLACE_DOWNSTREAM, +}; + +struct SerialEvent { + // ctor for event uses DownstreamConfig + SerialEvent(int type, const std::shared_ptr &downstreamconf) + : type(type), downstreamconf(downstreamconf) {} + + int type; + std::shared_ptr downstreamconf; +}; + class ConnectionHandler { public: ConnectionHandler(struct ev_loop *loop); @@ -136,6 +152,17 @@ public: neverbleed_t *get_neverbleed() const; #endif // HAVE_NEVERBLEED + // Send SerialEvent SEV_REPLACE_DOWNSTREAM to this object. + void send_replace_downstream( + const std::shared_ptr &downstreamconf); + // Internal function to send |ev| to this object. + void send_serial_event(SerialEvent ev); + // Handles SerialEvents received. + void handle_serial_event(); + // Sends WorkerEvent to make them replace downstream. + void + worker_replace_downstream(std::shared_ptr downstreamconf); + private: // Stores all SSL_CTX objects. std::vector all_ssl_ctx_; @@ -145,6 +172,10 @@ private: std::vector worker_loops_; // Worker instances when multi threaded mode (-nN, N >= 2) is used. std::vector> workers_; + // mutex for serial event resive buffer handling + std::mutex serial_event_mu_; + // SerialEvent receive buffer + std::vector serial_events_; // Worker instance used when single threaded mode (-n1) is used. // Otherwise, nullptr and workers_ has instances of Worker instead. std::unique_ptr single_worker_; @@ -161,6 +192,7 @@ private: ev_timer disable_acceptor_timer_; ev_timer ocsp_timer_; ev_async thread_join_asyncev_; + ev_async serial_event_asyncev_; #ifndef NOTHREADS std::future thread_join_fut_; #endif // NOTHREADS diff --git a/src/shrpx_worker.cc b/src/shrpx_worker.cc index 660588c8..64e9d0e8 100644 --- a/src/shrpx_worker.cc +++ b/src/shrpx_worker.cc @@ -106,13 +106,16 @@ std::random_device rd; Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, SSL_CTX *tls_session_cache_memcached_ssl_ctx, ssl::CertLookupTree *cert_tree, - const std::shared_ptr &ticket_keys) + const std::shared_ptr &ticket_keys, + ConnectionHandler *conn_handler, + std::shared_ptr downstreamconf) : randgen_(rd()), worker_stat_{}, loop_(loop), sv_ssl_ctx_(sv_ssl_ctx), cl_ssl_ctx_(cl_ssl_ctx), cert_tree_(cert_tree), + conn_handler_(conn_handler), ticket_keys_(ticket_keys), connect_blocker_( make_unique(randgen_, loop_, []() {}, []() {})), @@ -133,11 +136,11 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, StringRef{session_cacheconf.memcached.host}, &mcpool_); } - replace_downstream_config(get_config()->conn.downstream); + replace_downstream_config(std::move(downstreamconf)); } void Worker::replace_downstream_config( - const std::shared_ptr &downstreamconf) { + std::shared_ptr downstreamconf) { downstreamconf_ = downstreamconf; downstream_addr_groups_ = std::vector>( @@ -345,6 +348,12 @@ void Worker::process_events() { return; } + break; + case REPLACE_DOWNSTREAM: + WLOG(NOTICE, this) << "Replace downstream"; + + replace_downstream_config(wev.downstreamconf); + break; default: if (LOG_ENABLED(INFO)) { @@ -416,6 +425,10 @@ const DownstreamConfig *Worker::get_downstream_config() const { return downstreamconf_.get(); } +ConnectionHandler *Worker::get_connection_handler() const { + return conn_handler_; +} + namespace { size_t match_downstream_addr_group_host( const Router &router, const std::vector &wildcard_patterns, diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index 240d80e3..c3e3e04a 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -56,6 +56,7 @@ class ConnectBlocker; class LiveCheck; class MemcachedDispatcher; struct UpstreamAddr; +class ConnectionHandler; #ifdef HAVE_MRUBY namespace mruby { @@ -153,6 +154,7 @@ enum WorkerEventType { NEW_CONNECTION = 0x01, REOPEN_LOG = 0x02, GRACEFUL_SHUTDOWN = 0x03, + REPLACE_DOWNSTREAM = 0x04, }; struct WorkerEvent { @@ -164,6 +166,7 @@ struct WorkerEvent { const UpstreamAddr *faddr; }; std::shared_ptr ticket_keys; + std::shared_ptr downstreamconf; }; class Worker { @@ -171,7 +174,9 @@ public: Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, SSL_CTX *tls_session_cache_memcached_ssl_ctx, ssl::CertLookupTree *cert_tree, - const std::shared_ptr &ticket_keys); + const std::shared_ptr &ticket_keys, + ConnectionHandler *conn_handler, + std::shared_ptr downstreamconf); ~Worker(); void run_async(); void wait(); @@ -213,8 +218,10 @@ public: const DownstreamConfig *get_downstream_config() const; - void replace_downstream_config( - const std::shared_ptr &downstreamconf); + void + replace_downstream_config(std::shared_ptr downstreamconf); + + ConnectionHandler *get_connection_handler() const; private: #ifndef NOTHREADS @@ -240,6 +247,7 @@ private: SSL_CTX *sv_ssl_ctx_; SSL_CTX *cl_ssl_ctx_; ssl::CertLookupTree *cert_tree_; + ConnectionHandler *conn_handler_; std::shared_ptr ticket_keys_; std::vector> downstream_addr_groups_;