nghttpx: Send notice to replace downstream via ConnectionHandler

This commit is contained in:
Tatsuhiro Tsujikawa 2016-06-04 01:02:57 +09:00
parent 43913838b4
commit 0ca7c4cb38
5 changed files with 137 additions and 13 deletions

View File

@ -28,6 +28,7 @@
#include "shrpx_upstream.h" #include "shrpx_upstream.h"
#include "shrpx_downstream.h" #include "shrpx_downstream.h"
#include "shrpx_worker.h" #include "shrpx_worker.h"
#include "shrpx_connection_handler.h"
namespace shrpx { namespace shrpx {
@ -176,7 +177,9 @@ int APIDownstreamConnection::end_upload_data() {
return 0; return 0;
} }
worker_->replace_downstream_config(downstreamconf); auto conn_handler = worker_->get_connection_handler();
conn_handler->send_replace_downstream(downstreamconf);
send_reply(200, body); send_reply(200, body);

View File

@ -102,6 +102,14 @@ void thread_join_async_cb(struct ev_loop *loop, ev_async *w, int revent) {
} }
} // namespace } // namespace
namespace {
void serial_event_async_cb(struct ev_loop *loop, ev_async *w, int revent) {
auto h = static_cast<ConnectionHandler *>(w->data);
h->handle_serial_event();
}
} // namespace
namespace { namespace {
std::random_device rd; std::random_device rd;
} // namespace } // namespace
@ -125,6 +133,11 @@ ConnectionHandler::ConnectionHandler(struct ev_loop *loop)
ev_async_init(&thread_join_asyncev_, thread_join_async_cb); ev_async_init(&thread_join_asyncev_, thread_join_async_cb);
ev_async_init(&serial_event_asyncev_, serial_event_async_cb);
serial_event_asyncev_.data = this;
ev_async_start(loop_, &serial_event_asyncev_);
ev_child_init(&ocsp_.chldev, ocsp_chld_cb, 0, 0); ev_child_init(&ocsp_.chldev, ocsp_chld_cb, 0, 0);
ocsp_.chldev.data = this; ocsp_.chldev.data = this;
@ -136,6 +149,7 @@ ConnectionHandler::ConnectionHandler(struct ev_loop *loop)
ConnectionHandler::~ConnectionHandler() { ConnectionHandler::~ConnectionHandler() {
ev_child_stop(loop_, &ocsp_.chldev); ev_child_stop(loop_, &ocsp_.chldev);
ev_async_stop(loop_, &serial_event_asyncev_);
ev_async_stop(loop_, &thread_join_asyncev_); ev_async_stop(loop_, &thread_join_asyncev_);
ev_io_stop(loop_, &ocsp_.rev); ev_io_stop(loop_, &ocsp_.rev);
ev_timer_stop(loop_, &ocsp_timer_); ev_timer_stop(loop_, &ocsp_timer_);
@ -175,6 +189,18 @@ void ConnectionHandler::worker_reopen_log_files() {
} }
} }
void ConnectionHandler::worker_replace_downstream(
std::shared_ptr<DownstreamConfig> downstreamconf) {
WorkerEvent wev{};
wev.type = REPLACE_DOWNSTREAM;
wev.downstreamconf = std::move(downstreamconf);
for (auto &worker : workers_) {
worker->send(wev);
}
}
int ConnectionHandler::create_single_worker() { int ConnectionHandler::create_single_worker() {
auto cert_tree = ssl::create_cert_lookup_tree(); auto cert_tree = ssl::create_cert_lookup_tree();
auto sv_ssl_ctx = ssl::setup_server_ssl_context(all_ssl_ctx_, cert_tree auto sv_ssl_ctx = ssl::setup_server_ssl_context(all_ssl_ctx_, cert_tree
@ -207,9 +233,9 @@ int ConnectionHandler::create_single_worker() {
all_ssl_ctx_.push_back(session_cache_ssl_ctx); all_ssl_ctx_.push_back(session_cache_ssl_ctx);
} }
single_worker_ = single_worker_ = make_unique<Worker>(
make_unique<Worker>(loop_, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, loop_, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree,
cert_tree, ticket_keys_); ticket_keys_, this, get_config()->conn.downstream);
#ifdef HAVE_MRUBY #ifdef HAVE_MRUBY
if (single_worker_->create_mruby_context() != 0) { if (single_worker_->create_mruby_context() != 0) {
return -1; return -1;
@ -256,9 +282,9 @@ int ConnectionHandler::create_worker_thread(size_t num) {
StringRef{memcachedconf.private_key_file}, nullptr); StringRef{memcachedconf.private_key_file}, nullptr);
all_ssl_ctx_.push_back(session_cache_ssl_ctx); all_ssl_ctx_.push_back(session_cache_ssl_ctx);
} }
auto worker = auto worker = make_unique<Worker>(
make_unique<Worker>(loop, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, loop, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree,
cert_tree, ticket_keys_); ticket_keys_, this, get_config()->conn.downstream);
#ifdef HAVE_MRUBY #ifdef HAVE_MRUBY
if (worker->create_mruby_context() != 0) { if (worker->create_mruby_context() != 0) {
return -1; return -1;
@ -782,4 +808,46 @@ neverbleed_t *ConnectionHandler::get_neverbleed() const { return nb_.get(); }
#endif // HAVE_NEVERBLEED #endif // HAVE_NEVERBLEED
void ConnectionHandler::handle_serial_event() {
std::vector<SerialEvent> q;
{
std::lock_guard<std::mutex> g(serial_event_mu_);
q.swap(serial_events_);
}
for (auto &sev : q) {
switch (sev.type) {
case SEV_REPLACE_DOWNSTREAM:
// TODO make sure that none of worker uses
// get_config()->conn.downstream
mod_config()->conn.downstream = sev.downstreamconf;
if (single_worker_) {
single_worker_->replace_downstream_config(sev.downstreamconf);
break;
}
worker_replace_downstream(sev.downstreamconf);
break;
}
}
}
void ConnectionHandler::send_replace_downstream(
const std::shared_ptr<DownstreamConfig> &downstreamconf) {
send_serial_event(SerialEvent(SEV_REPLACE_DOWNSTREAM, downstreamconf));
}
void ConnectionHandler::send_serial_event(SerialEvent ev) {
{
std::lock_guard<std::mutex> g(serial_event_mu_);
serial_events_.push_back(std::move(ev));
}
ev_async_send(loop_, &serial_event_asyncev_);
}
} // namespace shrpx } // namespace shrpx

View File

@ -48,6 +48,7 @@
#endif // HAVE_NEVERBLEED #endif // HAVE_NEVERBLEED
#include "shrpx_downstream_connection_pool.h" #include "shrpx_downstream_connection_pool.h"
#include "shrpx_config.h"
namespace shrpx { namespace shrpx {
@ -76,6 +77,21 @@ struct OCSPUpdateContext {
pid_t pid; pid_t pid;
}; };
// SerialEvent is an event sent from Worker thread.
enum SerialEventType {
SEV_NONE,
SEV_REPLACE_DOWNSTREAM,
};
struct SerialEvent {
// ctor for event uses DownstreamConfig
SerialEvent(int type, const std::shared_ptr<DownstreamConfig> &downstreamconf)
: type(type), downstreamconf(downstreamconf) {}
int type;
std::shared_ptr<DownstreamConfig> downstreamconf;
};
class ConnectionHandler { class ConnectionHandler {
public: public:
ConnectionHandler(struct ev_loop *loop); ConnectionHandler(struct ev_loop *loop);
@ -136,6 +152,17 @@ public:
neverbleed_t *get_neverbleed() const; neverbleed_t *get_neverbleed() const;
#endif // HAVE_NEVERBLEED #endif // HAVE_NEVERBLEED
// Send SerialEvent SEV_REPLACE_DOWNSTREAM to this object.
void send_replace_downstream(
const std::shared_ptr<DownstreamConfig> &downstreamconf);
// Internal function to send |ev| to this object.
void send_serial_event(SerialEvent ev);
// Handles SerialEvents received.
void handle_serial_event();
// Sends WorkerEvent to make them replace downstream.
void
worker_replace_downstream(std::shared_ptr<DownstreamConfig> downstreamconf);
private: private:
// Stores all SSL_CTX objects. // Stores all SSL_CTX objects.
std::vector<SSL_CTX *> all_ssl_ctx_; std::vector<SSL_CTX *> all_ssl_ctx_;
@ -145,6 +172,10 @@ private:
std::vector<struct ev_loop *> worker_loops_; std::vector<struct ev_loop *> worker_loops_;
// Worker instances when multi threaded mode (-nN, N >= 2) is used. // Worker instances when multi threaded mode (-nN, N >= 2) is used.
std::vector<std::unique_ptr<Worker>> workers_; std::vector<std::unique_ptr<Worker>> workers_;
// mutex for serial event resive buffer handling
std::mutex serial_event_mu_;
// SerialEvent receive buffer
std::vector<SerialEvent> serial_events_;
// Worker instance used when single threaded mode (-n1) is used. // Worker instance used when single threaded mode (-n1) is used.
// Otherwise, nullptr and workers_ has instances of Worker instead. // Otherwise, nullptr and workers_ has instances of Worker instead.
std::unique_ptr<Worker> single_worker_; std::unique_ptr<Worker> single_worker_;
@ -161,6 +192,7 @@ private:
ev_timer disable_acceptor_timer_; ev_timer disable_acceptor_timer_;
ev_timer ocsp_timer_; ev_timer ocsp_timer_;
ev_async thread_join_asyncev_; ev_async thread_join_asyncev_;
ev_async serial_event_asyncev_;
#ifndef NOTHREADS #ifndef NOTHREADS
std::future<void> thread_join_fut_; std::future<void> thread_join_fut_;
#endif // NOTHREADS #endif // NOTHREADS

View File

@ -106,13 +106,16 @@ std::random_device rd;
Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
SSL_CTX *tls_session_cache_memcached_ssl_ctx, SSL_CTX *tls_session_cache_memcached_ssl_ctx,
ssl::CertLookupTree *cert_tree, ssl::CertLookupTree *cert_tree,
const std::shared_ptr<TicketKeys> &ticket_keys) const std::shared_ptr<TicketKeys> &ticket_keys,
ConnectionHandler *conn_handler,
std::shared_ptr<DownstreamConfig> downstreamconf)
: randgen_(rd()), : randgen_(rd()),
worker_stat_{}, worker_stat_{},
loop_(loop), loop_(loop),
sv_ssl_ctx_(sv_ssl_ctx), sv_ssl_ctx_(sv_ssl_ctx),
cl_ssl_ctx_(cl_ssl_ctx), cl_ssl_ctx_(cl_ssl_ctx),
cert_tree_(cert_tree), cert_tree_(cert_tree),
conn_handler_(conn_handler),
ticket_keys_(ticket_keys), ticket_keys_(ticket_keys),
connect_blocker_( connect_blocker_(
make_unique<ConnectBlocker>(randgen_, loop_, []() {}, []() {})), make_unique<ConnectBlocker>(randgen_, loop_, []() {}, []() {})),
@ -133,11 +136,11 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
StringRef{session_cacheconf.memcached.host}, &mcpool_); StringRef{session_cacheconf.memcached.host}, &mcpool_);
} }
replace_downstream_config(get_config()->conn.downstream); replace_downstream_config(std::move(downstreamconf));
} }
void Worker::replace_downstream_config( void Worker::replace_downstream_config(
const std::shared_ptr<DownstreamConfig> &downstreamconf) { std::shared_ptr<DownstreamConfig> downstreamconf) {
downstreamconf_ = downstreamconf; downstreamconf_ = downstreamconf;
downstream_addr_groups_ = std::vector<std::shared_ptr<DownstreamAddrGroup>>( downstream_addr_groups_ = std::vector<std::shared_ptr<DownstreamAddrGroup>>(
@ -345,6 +348,12 @@ void Worker::process_events() {
return; return;
} }
break;
case REPLACE_DOWNSTREAM:
WLOG(NOTICE, this) << "Replace downstream";
replace_downstream_config(wev.downstreamconf);
break; break;
default: default:
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
@ -416,6 +425,10 @@ const DownstreamConfig *Worker::get_downstream_config() const {
return downstreamconf_.get(); return downstreamconf_.get();
} }
ConnectionHandler *Worker::get_connection_handler() const {
return conn_handler_;
}
namespace { namespace {
size_t match_downstream_addr_group_host( size_t match_downstream_addr_group_host(
const Router &router, const std::vector<WildcardPattern> &wildcard_patterns, const Router &router, const std::vector<WildcardPattern> &wildcard_patterns,

View File

@ -56,6 +56,7 @@ class ConnectBlocker;
class LiveCheck; class LiveCheck;
class MemcachedDispatcher; class MemcachedDispatcher;
struct UpstreamAddr; struct UpstreamAddr;
class ConnectionHandler;
#ifdef HAVE_MRUBY #ifdef HAVE_MRUBY
namespace mruby { namespace mruby {
@ -153,6 +154,7 @@ enum WorkerEventType {
NEW_CONNECTION = 0x01, NEW_CONNECTION = 0x01,
REOPEN_LOG = 0x02, REOPEN_LOG = 0x02,
GRACEFUL_SHUTDOWN = 0x03, GRACEFUL_SHUTDOWN = 0x03,
REPLACE_DOWNSTREAM = 0x04,
}; };
struct WorkerEvent { struct WorkerEvent {
@ -164,6 +166,7 @@ struct WorkerEvent {
const UpstreamAddr *faddr; const UpstreamAddr *faddr;
}; };
std::shared_ptr<TicketKeys> ticket_keys; std::shared_ptr<TicketKeys> ticket_keys;
std::shared_ptr<DownstreamConfig> downstreamconf;
}; };
class Worker { class Worker {
@ -171,7 +174,9 @@ public:
Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
SSL_CTX *tls_session_cache_memcached_ssl_ctx, SSL_CTX *tls_session_cache_memcached_ssl_ctx,
ssl::CertLookupTree *cert_tree, ssl::CertLookupTree *cert_tree,
const std::shared_ptr<TicketKeys> &ticket_keys); const std::shared_ptr<TicketKeys> &ticket_keys,
ConnectionHandler *conn_handler,
std::shared_ptr<DownstreamConfig> downstreamconf);
~Worker(); ~Worker();
void run_async(); void run_async();
void wait(); void wait();
@ -213,8 +218,10 @@ public:
const DownstreamConfig *get_downstream_config() const; const DownstreamConfig *get_downstream_config() const;
void replace_downstream_config( void
const std::shared_ptr<DownstreamConfig> &downstreamconf); replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf);
ConnectionHandler *get_connection_handler() const;
private: private:
#ifndef NOTHREADS #ifndef NOTHREADS
@ -240,6 +247,7 @@ private:
SSL_CTX *sv_ssl_ctx_; SSL_CTX *sv_ssl_ctx_;
SSL_CTX *cl_ssl_ctx_; SSL_CTX *cl_ssl_ctx_;
ssl::CertLookupTree *cert_tree_; ssl::CertLookupTree *cert_tree_;
ConnectionHandler *conn_handler_;
std::shared_ptr<TicketKeys> ticket_keys_; std::shared_ptr<TicketKeys> ticket_keys_;
std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_; std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_;