nghttpx: Distribute session ticket keys to workers without mutex

This commit is contained in:
Tatsuhiro Tsujikawa 2015-01-08 21:15:45 +09:00
parent 5d3544185c
commit fcddb5c06c
8 changed files with 66 additions and 56 deletions

View File

@ -438,10 +438,8 @@ void refresh_cb(struct ev_loop *loop, ev_timer *w, int revents) {
namespace { namespace {
void renew_ticket_key_cb(struct ev_loop *loop, ev_timer *w, int revents) { void renew_ticket_key_cb(struct ev_loop *loop, ev_timer *w, int revents) {
#ifndef NOTHREADS auto listener_handler = static_cast<ListenHandler *>(w->data);
std::lock_guard<std::mutex> g(mod_config()->ticket_keys_lock); const auto &old_ticket_keys = worker_config->ticket_keys;
#endif // !NOTHREADS
auto old_ticket_keys = get_config()->ticket_keys;
auto ticket_keys = std::make_shared<TicketKeys>(); auto ticket_keys = std::make_shared<TicketKeys>();
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
@ -475,7 +473,9 @@ void renew_ticket_key_cb(struct ev_loop *loop, ev_timer *w, int revents) {
} }
} }
mod_config()->ticket_keys = ticket_keys; worker_config->ticket_keys = ticket_keys;
listener_handler->worker_renew_ticket_keys(ticket_keys);
} }
} // namespace } // namespace
@ -529,6 +529,17 @@ int event_loop() {
// After that, we drop the root privileges if needed. // After that, we drop the root privileges if needed.
drop_privileges(); drop_privileges();
ev_timer renew_ticket_key_timer;
if (sv_ssl_ctx && get_config()->auto_tls_ticket_key) {
// Renew ticket key every 12hrs
ev_timer_init(&renew_ticket_key_timer, renew_ticket_key_cb, 0., 12 * 3600.);
renew_ticket_key_timer.data = listener_handler.get();
ev_timer_again(loop, &renew_ticket_key_timer);
// Generate first session ticket key before running workers.
renew_ticket_key_cb(loop, &renew_ticket_key_timer, 0);
}
#ifndef NOTHREADS #ifndef NOTHREADS
int rv; int rv;
sigset_t signals; sigset_t signals;
@ -578,14 +589,6 @@ int event_loop() {
refresh_timer.data = listener_handler.get(); refresh_timer.data = listener_handler.get();
ev_timer_again(loop, &refresh_timer); ev_timer_again(loop, &refresh_timer);
ev_timer renew_ticket_key_timer;
if (sv_ssl_ctx && get_config()->auto_tls_ticket_key) {
// Renew ticket key every 12hrs
ev_timer_init(&renew_ticket_key_timer, renew_ticket_key_cb, 0., 12 * 3600.);
ev_timer_again(loop, &renew_ticket_key_timer);
renew_ticket_key_cb(loop, &renew_ticket_key_timer, 0);
}
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Entering event loop"; LOG(INFO) << "Entering event loop";
} }
@ -1755,7 +1758,7 @@ int main(int argc, char **argv) {
if (!ticket_keys) { if (!ticket_keys) {
LOG(WARN) << "Use internal session ticket key generator"; LOG(WARN) << "Use internal session ticket key generator";
} else { } else {
mod_config()->ticket_keys = std::move(ticket_keys); worker_config->ticket_keys = std::move(ticket_keys);
mod_config()->auto_tls_ticket_key = false; mod_config()->auto_tls_ticket_key = false;
} }
} }

View File

@ -35,8 +35,6 @@
#include <cstdio> #include <cstdio>
#include <vector> #include <vector>
#include <memory> #include <memory>
#include <atomic>
#include <mutex>
#include <openssl/ssl.h> #include <openssl/ssl.h>
@ -184,8 +182,6 @@ struct Config {
std::vector<LogFragment> accesslog_format; std::vector<LogFragment> accesslog_format;
std::vector<DownstreamAddr> downstream_addrs; std::vector<DownstreamAddr> downstream_addrs;
std::vector<std::string> tls_ticket_key_files; std::vector<std::string> tls_ticket_key_files;
std::mutex ticket_keys_lock;
std::shared_ptr<TicketKeys> ticket_keys;
// binary form of http proxy host and port // binary form of http proxy host and port
sockaddr_union downstream_http_proxy_addr; sockaddr_union downstream_http_proxy_addr;
ev_tstamp http2_upstream_read_timeout; ev_tstamp http2_upstream_read_timeout;

View File

@ -85,14 +85,26 @@ void ListenHandler::worker_reopen_log_files() {
} }
} }
void ListenHandler::worker_renew_ticket_keys(
const std::shared_ptr<TicketKeys> &ticket_keys) {
WorkerEvent wev;
memset(&wev, 0, sizeof(wev));
wev.type = RENEW_TICKET_KEYS;
wev.ticket_keys = ticket_keys;
for (auto &worker : workers_) {
worker->send(wev);
}
}
void ListenHandler::create_worker_thread(size_t num) { void ListenHandler::create_worker_thread(size_t num) {
#ifndef NOTHREADS #ifndef NOTHREADS
assert(workers_.size() == 0); assert(workers_.size() == 0);
for (size_t i = 0; i < num; ++i) { for (size_t i = 0; i < num; ++i) {
auto worker = util::make_unique<Worker>(sv_ssl_ctx_, cl_ssl_ctx_); workers_.push_back(util::make_unique<Worker>(sv_ssl_ctx_, cl_ssl_ctx_,
worker->run(); worker_config->ticket_keys));
workers_.push_back(std::move(worker));
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
LLOG(INFO, this) << "Created thread #" << workers_.size() - 1; LLOG(INFO, this) << "Created thread #" << workers_.size() - 1;

View File

@ -46,6 +46,7 @@ class ConnectBlocker;
class AcceptHandler; class AcceptHandler;
class Worker; class Worker;
struct WorkerStat; struct WorkerStat;
struct TicketKeys;
// TODO should be renamed as ConnectionHandler // TODO should be renamed as ConnectionHandler
class ListenHandler { class ListenHandler {
@ -55,6 +56,7 @@ public:
int handle_connection(int fd, sockaddr *addr, int addrlen); int handle_connection(int fd, sockaddr *addr, int addrlen);
void create_worker_thread(size_t num); void create_worker_thread(size_t num);
void worker_reopen_log_files(); void worker_reopen_log_files();
void worker_renew_ticket_keys(const std::shared_ptr<TicketKeys> &ticket_keys);
struct ev_loop *get_loop() const; struct ev_loop *get_loop() const;
void create_http2_session(); void create_http2_session();
void create_http1_connect_blocker(); void create_http1_connect_blocker();

View File

@ -47,6 +47,7 @@
#include "shrpx_client_handler.h" #include "shrpx_client_handler.h"
#include "shrpx_config.h" #include "shrpx_config.h"
#include "shrpx_worker.h" #include "shrpx_worker.h"
#include "shrpx_worker_config.h"
#include "shrpx_downstream_connection_pool.h" #include "shrpx_downstream_connection_pool.h"
#include "util.h" #include "util.h"
#include "ssl.h" #include "ssl.h"
@ -147,17 +148,7 @@ namespace {
int ticket_key_cb(SSL *ssl, unsigned char *key_name, unsigned char *iv, int ticket_key_cb(SSL *ssl, unsigned char *key_name, unsigned char *iv,
EVP_CIPHER_CTX *ctx, HMAC_CTX *hctx, int enc) { EVP_CIPHER_CTX *ctx, HMAC_CTX *hctx, int enc) {
auto handler = static_cast<ClientHandler *>(SSL_get_app_data(ssl)); auto handler = static_cast<ClientHandler *>(SSL_get_app_data(ssl));
#ifndef NOTHREADS auto ticket_keys = worker_config->ticket_keys;
std::shared_ptr<TicketKeys> ticket_keys;
if (get_config()->auto_tls_ticket_key) {
std::lock_guard<std::mutex> g(mod_config()->ticket_keys_lock);
ticket_keys = get_config()->ticket_keys;
} else {
ticket_keys = get_config()->ticket_keys;
}
#else // NOTHREADS
auto ticket_keys = get_config()->ticket_keys;
#endif // NOTHREADS
if (!ticket_keys) { if (!ticket_keys) {
// No ticket keys available. // No ticket keys available.

View File

@ -47,7 +47,8 @@ void eventcb(struct ev_loop *loop, ev_async *w, int revents) {
} }
} // namespace } // namespace
Worker::Worker(SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx) Worker::Worker(SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
const std::shared_ptr<TicketKeys> &ticket_keys)
: loop_(ev_loop_new(0)), sv_ssl_ctx_(sv_ssl_ctx), cl_ssl_ctx_(cl_ssl_ctx), : loop_(ev_loop_new(0)), sv_ssl_ctx_(sv_ssl_ctx), cl_ssl_ctx_(cl_ssl_ctx),
worker_stat_(util::make_unique<WorkerStat>()) { worker_stat_(util::make_unique<WorkerStat>()) {
ev_async_init(&w_, eventcb); ev_async_init(&w_, eventcb);
@ -59,22 +60,15 @@ Worker::Worker(SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx)
} else { } else {
http1_connect_blocker_ = util::make_unique<ConnectBlocker>(loop_); http1_connect_blocker_ = util::make_unique<ConnectBlocker>(loop_);
} }
fut_ = std::async(std::launch::async, [this, &ticket_keys] {
worker_config->ticket_keys = ticket_keys;
(void)reopen_log_files();
ev_run(loop_);
});
} }
Worker::~Worker() { Worker::~Worker() { ev_async_stop(loop_, &w_); }
ev_async_stop(loop_, &w_);
}
void Worker::run() {
#ifndef NOTHREADS
fut_ = std::async(std::launch::async, [this] { this->run_loop(); });
#endif // !NOTHREADS
}
void Worker::run_loop() {
(void)reopen_log_files();
ev_run(loop_);
}
void Worker::wait() { void Worker::wait() {
#ifndef NOTHREADS #ifndef NOTHREADS
@ -99,6 +93,16 @@ void Worker::process_events() {
q.swap(q_); q.swap(q_);
} }
for (auto &wev : q) { for (auto &wev : q) {
if (wev.type == RENEW_TICKET_KEYS) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Renew ticket keys: worker_info(" << worker_config << ")";
}
worker_config->ticket_keys = wev.ticket_keys;
continue;
}
if (wev.type == REOPEN_LOG) { if (wev.type == REOPEN_LOG) {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Reopening log files: worker_info(" << worker_config LOG(INFO) << "Reopening log files: worker_info(" << worker_config

View File

@ -61,25 +61,24 @@ enum WorkerEventType {
NEW_CONNECTION = 0x01, NEW_CONNECTION = 0x01,
REOPEN_LOG = 0x02, REOPEN_LOG = 0x02,
GRACEFUL_SHUTDOWN = 0x03, GRACEFUL_SHUTDOWN = 0x03,
RENEW_TICKET_KEYS = 0x04,
}; };
struct WorkerEvent { struct WorkerEvent {
WorkerEventType type; WorkerEventType type;
union { struct {
struct { sockaddr_union client_addr;
sockaddr_union client_addr; size_t client_addrlen;
size_t client_addrlen; int client_fd;
int client_fd;
};
}; };
std::shared_ptr<TicketKeys> ticket_keys;
}; };
class Worker { class Worker {
public: public:
Worker(SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx); Worker(SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
const std::shared_ptr<TicketKeys> &ticket_keys);
~Worker(); ~Worker();
void run();
void run_loop();
void wait(); void wait();
void process_events(); void process_events();
void send(const WorkerEvent &event); void send(const WorkerEvent &event);

View File

@ -29,7 +29,10 @@
namespace shrpx { namespace shrpx {
struct TicketKeys;
struct WorkerConfig { struct WorkerConfig {
std::shared_ptr<TicketKeys> ticket_keys;
int accesslog_fd; int accesslog_fd;
int errorlog_fd; int errorlog_fd;
// true if errorlog_fd is referring to a terminal. // true if errorlog_fd is referring to a terminal.