nghttpx: Refactor worker interface

This commit is contained in:
Tatsuhiro Tsujikawa 2015-02-11 19:18:41 +09:00
parent 756e2b3e9b
commit ae0100a9ab
13 changed files with 237 additions and 192 deletions

View File

@ -454,12 +454,12 @@ void graceful_shutdown_signal_cb(struct ev_loop *loop, ev_signal *w,
namespace { namespace {
void refresh_cb(struct ev_loop *loop, ev_timer *w, int revents) { void refresh_cb(struct ev_loop *loop, ev_timer *w, int revents) {
auto conn_handler = static_cast<ConnectionHandler *>(w->data); auto conn_handler = static_cast<ConnectionHandler *>(w->data);
auto worker_stat = conn_handler->get_worker_stat(); auto worker = conn_handler->get_single_worker();
// In multi threaded mode (get_config()->num_worker > 1), we have to // In multi threaded mode (get_config()->num_worker > 1), we have to
// wait for event notification to workers to finish. // wait for event notification to workers to finish.
if (get_config()->num_worker == 1 && worker_config->graceful_shutdown && if (get_config()->num_worker == 1 && worker_config->graceful_shutdown &&
(!worker_stat || worker_stat->num_connections == 0)) { (!worker || worker->get_worker_stat()->num_connections == 0)) {
ev_break(loop); ev_break(loop);
} }
} }
@ -468,7 +468,7 @@ void refresh_cb(struct ev_loop *loop, ev_timer *w, int revents) {
namespace { namespace {
void renew_ticket_key_cb(struct ev_loop *loop, ev_timer *w, int revents) { void renew_ticket_key_cb(struct ev_loop *loop, ev_timer *w, int revents) {
auto conn_handler = static_cast<ConnectionHandler *>(w->data); auto conn_handler = static_cast<ConnectionHandler *>(w->data);
const auto &old_ticket_keys = worker_config->ticket_keys; const auto &old_ticket_keys = conn_handler->get_ticket_keys();
auto ticket_keys = std::make_shared<TicketKeys>(); auto ticket_keys = std::make_shared<TicketKeys>();
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
@ -502,8 +502,7 @@ void renew_ticket_key_cb(struct ev_loop *loop, ev_timer *w, int revents) {
} }
} }
worker_config->ticket_keys = ticket_keys; conn_handler->set_ticket_keys(ticket_keys);
conn_handler->worker_renew_ticket_keys(ticket_keys); conn_handler->worker_renew_ticket_keys(ticket_keys);
} }
} // namespace } // namespace
@ -539,22 +538,35 @@ int event_loop() {
conn_handler->set_acceptor4(std::move(acceptor4)); conn_handler->set_acceptor4(std::move(acceptor4));
conn_handler->set_acceptor6(std::move(acceptor6)); conn_handler->set_acceptor6(std::move(acceptor6));
ev_timer renew_ticket_key_timer;
if (!get_config()->upstream_no_tls) {
bool auto_tls_ticket_key = true;
if (!get_config()->tls_ticket_key_files.empty()) {
auto ticket_keys =
read_tls_ticket_key_file(get_config()->tls_ticket_key_files);
if (!ticket_keys) {
LOG(WARN) << "Use internal session ticket key generator";
} else {
conn_handler->set_ticket_keys(std::move(ticket_keys));
auto_tls_ticket_key = false;
}
}
if (auto_tls_ticket_key) {
// Renew ticket key every 12hrs
ev_timer_init(&renew_ticket_key_timer, renew_ticket_key_cb, 0.,
12 * 3600.);
renew_ticket_key_timer.data = conn_handler.get();
ev_timer_again(loop, &renew_ticket_key_timer);
// Generate first session ticket key before running workers.
renew_ticket_key_cb(loop, &renew_ticket_key_timer, 0);
}
}
// ListenHandler loads private key, and we listen on a priveleged port. // ListenHandler loads private key, and we listen on a priveleged port.
// After that, we drop the root privileges if needed. // After that, we drop the root privileges if needed.
drop_privileges(); drop_privileges();
ev_timer renew_ticket_key_timer;
if (!get_config()->client_mode && !get_config()->upstream_no_tls &&
get_config()->auto_tls_ticket_key) {
// Renew ticket key every 12hrs
ev_timer_init(&renew_ticket_key_timer, renew_ticket_key_cb, 0., 12 * 3600.);
renew_ticket_key_timer.data = conn_handler.get();
ev_timer_again(loop, &renew_ticket_key_timer);
// Generate first session ticket key before running workers.
renew_ticket_key_cb(loop, &renew_ticket_key_timer, 0);
}
#ifndef NOTHREADS #ifndef NOTHREADS
int rv; int rv;
sigset_t signals; sigset_t signals;
@ -568,18 +580,10 @@ int event_loop() {
} }
#endif // !NOTHREADS #endif // !NOTHREADS
if (get_config()->num_worker > 1) { if (get_config()->num_worker == 1) {
if (!get_config()->tls_ctx_per_worker) { conn_handler->create_single_worker();
conn_handler->create_ssl_context();
}
conn_handler->create_worker_thread(get_config()->num_worker);
} else { } else {
conn_handler->create_ssl_context(); conn_handler->create_worker_thread(get_config()->num_worker);
if (get_config()->downstream_proto == PROTO_HTTP2) {
conn_handler->create_http2_session();
} else {
conn_handler->create_http1_connect_blocker();
}
} }
#ifndef NOTHREADS #ifndef NOTHREADS
@ -778,7 +782,6 @@ void fill_default_config() {
mod_config()->downstream_connections_per_host = 8; mod_config()->downstream_connections_per_host = 8;
mod_config()->downstream_connections_per_frontend = 0; mod_config()->downstream_connections_per_frontend = 0;
mod_config()->listener_disable_timeout = 0.; mod_config()->listener_disable_timeout = 0.;
mod_config()->auto_tls_ticket_key = true;
mod_config()->tls_ctx_per_worker = false; mod_config()->tls_ctx_per_worker = false;
mod_config()->downstream_request_buffer_size = 16 * 1024; mod_config()->downstream_request_buffer_size = 16 * 1024;
mod_config()->downstream_response_buffer_size = 16 * 1024; mod_config()->downstream_response_buffer_size = 16 * 1024;
@ -1817,17 +1820,6 @@ int main(int argc, char **argv) {
mod_config()->alpn_prefs = ssl::set_alpn_prefs(get_config()->npn_list); mod_config()->alpn_prefs = ssl::set_alpn_prefs(get_config()->npn_list);
if (!get_config()->tls_ticket_key_files.empty()) {
auto ticket_keys =
read_tls_ticket_key_file(get_config()->tls_ticket_key_files);
if (!ticket_keys) {
LOG(WARN) << "Use internal session ticket key generator";
} else {
worker_config->ticket_keys = std::move(ticket_keys);
mod_config()->auto_tls_ticket_key = false;
}
}
if (get_config()->backend_ipv4 && get_config()->backend_ipv6) { if (get_config()->backend_ipv4 && get_config()->backend_ipv6) {
LOG(FATAL) << "--backend-ipv4 and --backend-ipv6 cannot be used at the " LOG(FATAL) << "--backend-ipv4 and --backend-ipv6 cannot be used at the "
<< "same time."; << "same time.";
@ -1849,6 +1841,7 @@ int main(int argc, char **argv) {
if (get_config()->client || get_config()->client_proxy) { if (get_config()->client || get_config()->client_proxy) {
mod_config()->client_mode = true; mod_config()->client_mode = true;
mod_config()->upstream_no_tls = true;
} }
if (get_config()->client_mode || get_config()->http2_bridge) { if (get_config()->client_mode || get_config()->http2_bridge) {
@ -1857,12 +1850,11 @@ int main(int argc, char **argv) {
mod_config()->downstream_proto = PROTO_HTTP; mod_config()->downstream_proto = PROTO_HTTP;
} }
if (!get_config()->client_mode && !get_config()->upstream_no_tls) { if (!get_config()->upstream_no_tls &&
if (!get_config()->private_key_file || !get_config()->cert_file) { (!get_config()->private_key_file || !get_config()->cert_file)) {
print_usage(std::cerr); print_usage(std::cerr);
LOG(FATAL) << "Too few arguments"; LOG(FATAL) << "Too few arguments";
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
}
} }
if (get_config()->downstream_addrs.empty()) { if (get_config()->downstream_addrs.empty()) {

View File

@ -352,21 +352,17 @@ int ClientHandler::upstream_http1_connhd_read() {
return 0; return 0;
} }
ClientHandler::ClientHandler(struct ev_loop *loop, int fd, SSL *ssl, ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
const char *ipaddr, const char *port, const char *ipaddr, const char *port)
WorkerStat *worker_stat, : conn_(worker->get_loop(), fd, ssl, get_config()->upstream_write_timeout,
DownstreamConnectionPool *dconn_pool)
: conn_(loop, fd, ssl, get_config()->upstream_write_timeout,
get_config()->upstream_read_timeout, get_config()->write_rate, get_config()->upstream_read_timeout, get_config()->write_rate,
get_config()->write_burst, get_config()->read_rate, get_config()->write_burst, get_config()->read_rate,
get_config()->read_burst, writecb, readcb, timeoutcb, this), get_config()->read_burst, writecb, readcb, timeoutcb, this),
ipaddr_(ipaddr), port_(port), dconn_pool_(dconn_pool), ipaddr_(ipaddr), port_(port), worker_(worker),
http2session_(nullptr), http1_connect_blocker_(nullptr),
worker_stat_(worker_stat),
left_connhd_len_(NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN), left_connhd_len_(NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN),
should_close_after_write_(false) { should_close_after_write_(false) {
++worker_stat->num_connections; ++worker_->get_worker_stat()->num_connections;
ev_timer_init(&reneg_shutdown_timer_, shutdowncb, 0., 0.); ev_timer_init(&reneg_shutdown_timer_, shutdowncb, 0., 0.);
@ -402,13 +398,14 @@ ClientHandler::~ClientHandler() {
upstream_->on_handler_delete(); upstream_->on_handler_delete();
} }
--worker_stat_->num_connections; auto worker_stat = worker_->get_worker_stat();
--worker_stat->num_connections;
ev_timer_stop(conn_.loop, &reneg_shutdown_timer_); ev_timer_stop(conn_.loop, &reneg_shutdown_timer_);
// TODO If backend is http/2, and it is in CONNECTED state, signal // TODO If backend is http/2, and it is in CONNECTED state, signal
// it and make it loopbreak when output is zero. // it and make it loopbreak when output is zero.
if (worker_config->graceful_shutdown && worker_stat_->num_connections == 0) { if (worker_config->graceful_shutdown && worker_stat->num_connections == 0) {
ev_break(conn_.loop); ev_break(conn_.loop);
} }
@ -579,7 +576,8 @@ void ClientHandler::pool_downstream_connection(
CLOG(INFO, this) << "Pooling downstream connection DCONN:" << dconn.get(); CLOG(INFO, this) << "Pooling downstream connection DCONN:" << dconn.get();
} }
dconn->set_client_handler(nullptr); dconn->set_client_handler(nullptr);
dconn_pool_->add_downstream_connection(std::move(dconn)); auto dconn_pool = worker_->get_dconn_pool();
dconn_pool->add_downstream_connection(std::move(dconn));
} }
void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) { void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) {
@ -587,12 +585,14 @@ void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) {
CLOG(INFO, this) << "Removing downstream connection DCONN:" << dconn CLOG(INFO, this) << "Removing downstream connection DCONN:" << dconn
<< " from pool"; << " from pool";
} }
dconn_pool_->remove_downstream_connection(dconn); auto dconn_pool = worker_->get_dconn_pool();
dconn_pool->remove_downstream_connection(dconn);
} }
std::unique_ptr<DownstreamConnection> std::unique_ptr<DownstreamConnection>
ClientHandler::get_downstream_connection() { ClientHandler::get_downstream_connection() {
auto dconn = dconn_pool_->pop_downstream_connection(); auto dconn_pool = worker_->get_dconn_pool();
auto dconn = dconn_pool->pop_downstream_connection();
if (!dconn) { if (!dconn) {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
@ -600,11 +600,13 @@ ClientHandler::get_downstream_connection() {
<< " Create new one"; << " Create new one";
} }
if (http2session_) { auto dconn_pool = worker_->get_dconn_pool();
dconn = auto http2session = worker_->get_http2_session();
make_unique<Http2DownstreamConnection>(dconn_pool_, http2session_);
if (http2session) {
dconn = make_unique<Http2DownstreamConnection>(dconn_pool, http2session);
} else { } else {
dconn = make_unique<HttpDownstreamConnection>(dconn_pool_, conn_.loop); dconn = make_unique<HttpDownstreamConnection>(dconn_pool, conn_.loop);
} }
dconn->set_client_handler(this); dconn->set_client_handler(this);
return dconn; return dconn;
@ -622,19 +624,12 @@ ClientHandler::get_downstream_connection() {
SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; } SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; }
void ClientHandler::set_http2_session(Http2Session *http2session) { Http2Session *ClientHandler::get_http2_session() const {
http2session_ = http2session; return worker_->get_http2_session();
}
Http2Session *ClientHandler::get_http2_session() const { return http2session_; }
void ClientHandler::set_http1_connect_blocker(
ConnectBlocker *http1_connect_blocker) {
http1_connect_blocker_ = http1_connect_blocker;
} }
ConnectBlocker *ClientHandler::get_http1_connect_blocker() const { ConnectBlocker *ClientHandler::get_http1_connect_blocker() const {
return http1_connect_blocker_; return worker_->get_http1_connect_blocker();
} }
void ClientHandler::direct_http2_upgrade() { void ClientHandler::direct_http2_upgrade() {
@ -724,8 +719,6 @@ void ClientHandler::write_accesslog(int major, int minor, unsigned int status,
upstream_accesslog(get_config()->accesslog_format, &lgsp); upstream_accesslog(get_config()->accesslog_format, &lgsp);
} }
WorkerStat *ClientHandler::get_worker_stat() const { return worker_stat_; }
ClientHandler::WriteBuf *ClientHandler::get_wb() { return &wb_; } ClientHandler::WriteBuf *ClientHandler::get_wb() { return &wb_; }
ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; } ClientHandler::ReadBuf *ClientHandler::get_rb() { return &rb_; }
@ -737,4 +730,6 @@ RateLimit *ClientHandler::get_wlimit() { return &conn_.wlimit; }
ev_io *ClientHandler::get_wev() { return &conn_.wev; } ev_io *ClientHandler::get_wev() { return &conn_.wev; }
Worker *ClientHandler::get_worker() const { return worker_; }
} // namespace shrpx } // namespace shrpx

View File

@ -47,13 +47,13 @@ class Http2Session;
class HttpsUpstream; class HttpsUpstream;
class ConnectBlocker; class ConnectBlocker;
class DownstreamConnectionPool; class DownstreamConnectionPool;
class Worker;
struct WorkerStat; struct WorkerStat;
class ClientHandler { class ClientHandler {
public: public:
ClientHandler(struct ev_loop *loop, int fd, SSL *ssl, const char *ipaddr, ClientHandler(Worker *worker, int fd, SSL *ssl, const char *ipaddr,
const char *port, WorkerStat *worker_stat, const char *port);
DownstreamConnectionPool *dconn_pool);
~ClientHandler(); ~ClientHandler();
// Performs clear text I/O // Performs clear text I/O
@ -93,9 +93,7 @@ public:
void remove_downstream_connection(DownstreamConnection *dconn); void remove_downstream_connection(DownstreamConnection *dconn);
std::unique_ptr<DownstreamConnection> get_downstream_connection(); std::unique_ptr<DownstreamConnection> get_downstream_connection();
SSL *get_ssl() const; SSL *get_ssl() const;
void set_http2_session(Http2Session *http2session);
Http2Session *get_http2_session() const; Http2Session *get_http2_session() const;
void set_http1_connect_blocker(ConnectBlocker *http1_connect_blocker);
ConnectBlocker *get_http1_connect_blocker() const; ConnectBlocker *get_http1_connect_blocker() const;
// Call this function when HTTP/2 connection header is received at // Call this function when HTTP/2 connection header is received at
// the start of the connection. // the start of the connection.
@ -117,7 +115,7 @@ public:
// corresponding Downstream object is not available. // corresponding Downstream object is not available.
void write_accesslog(int major, int minor, unsigned int status, void write_accesslog(int major, int minor, unsigned int status,
int64_t body_bytes_sent); int64_t body_bytes_sent);
WorkerStat *get_worker_stat() const; Worker *get_worker() const;
using WriteBuf = Buffer<32768>; using WriteBuf = Buffer<32768>;
using ReadBuf = Buffer<8192>; using ReadBuf = Buffer<8192>;
@ -141,12 +139,7 @@ private:
std::string alpn_; std::string alpn_;
std::function<int(ClientHandler &)> read_, write_; std::function<int(ClientHandler &)> read_, write_;
std::function<int(ClientHandler &)> on_read_, on_write_; std::function<int(ClientHandler &)> on_read_, on_write_;
DownstreamConnectionPool *dconn_pool_; Worker *worker_;
// Shared HTTP2 session for each thread. NULL if backend is not
// HTTP2. Not deleted by this object.
Http2Session *http2session_;
ConnectBlocker *http1_connect_blocker_;
WorkerStat *worker_stat_;
// The number of bytes of HTTP/2 client connection header to read // The number of bytes of HTTP/2 client connection header to read
size_t left_connhd_len_; size_t left_connhd_len_;
bool should_close_after_write_; bool should_close_after_write_;

View File

@ -303,7 +303,6 @@ struct Config {
bool upstream_frame_debug; bool upstream_frame_debug;
bool no_location_rewrite; bool no_location_rewrite;
bool no_host_rewrite; bool no_host_rewrite;
bool auto_tls_ticket_key;
bool tls_ctx_per_worker; bool tls_ctx_per_worker;
bool no_server_push; bool no_server_push;
}; };

View File

@ -60,24 +60,15 @@ void acceptor_disable_cb(struct ev_loop *loop, ev_timer *w, int revent) {
} // namespace } // namespace
ConnectionHandler::ConnectionHandler(struct ev_loop *loop) ConnectionHandler::ConnectionHandler(struct ev_loop *loop)
: loop_(loop), sv_ssl_ctx_(nullptr), cl_ssl_ctx_(nullptr), : single_worker_(nullptr), loop_(loop), worker_round_robin_cnt_(0) {
// rate_limit_group_(bufferevent_rate_limit_group_new(
// evbase, get_config()->worker_rate_limit_cfg)),
worker_stat_(make_unique<WorkerStat>()), worker_round_robin_cnt_(0) {
ev_timer_init(&disable_acceptor_timer_, acceptor_disable_cb, 0., 0.); ev_timer_init(&disable_acceptor_timer_, acceptor_disable_cb, 0., 0.);
disable_acceptor_timer_.data = this; disable_acceptor_timer_.data = this;
} }
ConnectionHandler::~ConnectionHandler() { ConnectionHandler::~ConnectionHandler() {
// bufferevent_rate_limit_group_free(rate_limit_group_);
ev_timer_stop(loop_, &disable_acceptor_timer_); ev_timer_stop(loop_, &disable_acceptor_timer_);
} }
void ConnectionHandler::create_ssl_context() {
sv_ssl_ctx_ = ssl::setup_server_ssl_context();
cl_ssl_ctx_ = ssl::setup_client_ssl_context();
}
void ConnectionHandler::worker_reopen_log_files() { void ConnectionHandler::worker_reopen_log_files() {
WorkerEvent wev; WorkerEvent wev;
@ -102,14 +93,41 @@ void ConnectionHandler::worker_renew_ticket_keys(
} }
} }
void ConnectionHandler::create_single_worker() {
auto cert_tree = ssl::create_cert_lookup_tree();
auto sv_ssl_ctx = ssl::setup_server_ssl_context(cert_tree);
auto cl_ssl_ctx = ssl::setup_client_ssl_context();
single_worker_ = make_unique<Worker>(loop_, sv_ssl_ctx, cl_ssl_ctx, cert_tree,
ticket_keys_);
}
void ConnectionHandler::create_worker_thread(size_t num) { void ConnectionHandler::create_worker_thread(size_t num) {
#ifndef NOTHREADS #ifndef NOTHREADS
assert(workers_.size() == 0); assert(workers_.size() == 0);
SSL_CTX *sv_ssl_ctx = nullptr, *cl_ssl_ctx = nullptr;
ssl::CertLookupTree *cert_tree = nullptr;
if (!get_config()->tls_ctx_per_worker) {
cert_tree = ssl::create_cert_lookup_tree();
sv_ssl_ctx = ssl::setup_server_ssl_context(cert_tree);
cl_ssl_ctx = ssl::setup_client_ssl_context();
}
for (size_t i = 0; i < num; ++i) { for (size_t i = 0; i < num; ++i) {
workers_.push_back(make_unique<Worker>(sv_ssl_ctx_, cl_ssl_ctx_, auto loop = ev_loop_new(0);
worker_config->cert_tree,
worker_config->ticket_keys)); if (get_config()->tls_ctx_per_worker) {
cert_tree = ssl::create_cert_lookup_tree();
sv_ssl_ctx = ssl::setup_server_ssl_context(cert_tree);
cl_ssl_ctx = ssl::setup_client_ssl_context();
}
auto worker = make_unique<Worker>(loop, sv_ssl_ctx, cl_ssl_ctx, cert_tree,
ticket_keys_);
worker->run_async();
workers_.push_back(std::move(worker));
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
LLOG(INFO, this) << "Created thread #" << workers_.size() - 1; LLOG(INFO, this) << "Created thread #" << workers_.size() - 1;
@ -163,7 +181,7 @@ int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen) {
if (get_config()->num_worker == 1) { if (get_config()->num_worker == 1) {
if (worker_stat_->num_connections >= if (single_worker_->get_worker_stat()->num_connections >=
get_config()->worker_frontend_connections) { get_config()->worker_frontend_connections) {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
@ -175,8 +193,8 @@ int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen) {
return -1; return -1;
} }
auto client = ssl::accept_connection(loop_, sv_ssl_ctx_, fd, addr, addrlen, auto client =
worker_stat_.get(), &dconn_pool_); ssl::accept_connection(single_worker_.get(), fd, addr, addrlen);
if (!client) { if (!client) {
LLOG(ERROR, this) << "ClientHandler creation failed"; LLOG(ERROR, this) << "ClientHandler creation failed";
@ -184,13 +202,13 @@ int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen) {
return -1; return -1;
} }
client->set_http2_session(http2session_.get());
client->set_http1_connect_blocker(http1_connect_blocker_.get());
return 0; return 0;
} }
size_t idx = worker_round_robin_cnt_ % workers_.size(); size_t idx = worker_round_robin_cnt_ % workers_.size();
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "Dispatch connection to worker #" << idx;
}
++worker_round_robin_cnt_; ++worker_round_robin_cnt_;
WorkerEvent wev; WorkerEvent wev;
memset(&wev, 0, sizeof(wev)); memset(&wev, 0, sizeof(wev));
@ -208,16 +226,8 @@ struct ev_loop *ConnectionHandler::get_loop() const {
return loop_; return loop_;
} }
void ConnectionHandler::create_http2_session() { Worker *ConnectionHandler::get_single_worker() const {
http2session_ = make_unique<Http2Session>(loop_, cl_ssl_ctx_); return single_worker_.get();
}
void ConnectionHandler::create_http1_connect_blocker() {
http1_connect_blocker_ = make_unique<ConnectBlocker>(loop_);
}
const WorkerStat *ConnectionHandler::get_worker_stat() const {
return worker_stat_.get();
} }
void ConnectionHandler::set_acceptor4(std::unique_ptr<AcceptHandler> h) { void ConnectionHandler::set_acceptor4(std::unique_ptr<AcceptHandler> h) {
@ -276,4 +286,16 @@ void ConnectionHandler::accept_pending_connection() {
} }
} }
void
ConnectionHandler::set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys) {
ticket_keys_ = std::move(ticket_keys);
if (single_worker_) {
single_worker_->set_ticket_keys(ticket_keys_);
}
}
const std::shared_ptr<TicketKeys> &ConnectionHandler::get_ticket_keys() const {
return ticket_keys_;
}
} // namespace shrpx } // namespace shrpx

View File

@ -54,14 +54,17 @@ public:
ConnectionHandler(struct ev_loop *loop); ConnectionHandler(struct ev_loop *loop);
~ConnectionHandler(); ~ConnectionHandler();
int handle_connection(int fd, sockaddr *addr, int addrlen); int handle_connection(int fd, sockaddr *addr, int addrlen);
void create_ssl_context(); // Creates Worker object for single threaded configuration.
void create_single_worker();
// Creates |num| Worker objects for multi threaded configuration.
// The |num| must be strictly more than 1.
void create_worker_thread(size_t num); void create_worker_thread(size_t num);
void worker_reopen_log_files(); void worker_reopen_log_files();
void worker_renew_ticket_keys(const std::shared_ptr<TicketKeys> &ticket_keys); void worker_renew_ticket_keys(const std::shared_ptr<TicketKeys> &ticket_keys);
void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys);
const std::shared_ptr<TicketKeys> &get_ticket_keys() const;
struct ev_loop *get_loop() const; struct ev_loop *get_loop() const;
void create_http2_session(); Worker *get_single_worker() const;
void create_http1_connect_blocker();
const WorkerStat *get_worker_stat() const;
void set_acceptor4(std::unique_ptr<AcceptHandler> h); void set_acceptor4(std::unique_ptr<AcceptHandler> h);
AcceptHandler *get_acceptor4() const; AcceptHandler *get_acceptor4() const;
void set_acceptor6(std::unique_ptr<AcceptHandler> h); void set_acceptor6(std::unique_ptr<AcceptHandler> h);
@ -74,22 +77,19 @@ public:
void join_worker(); void join_worker();
private: private:
DownstreamConnectionPool dconn_pool_; // Worker instances when multi threaded mode (-nN, N >= 2) is used.
std::vector<std::unique_ptr<Worker>> workers_; std::vector<std::unique_ptr<Worker>> workers_;
// Worker instance used when single threaded mode (-n1) is used.
// Otherwise, nullptr and workers_ has instances of Worker instead.
std::unique_ptr<Worker> single_worker_;
// Current TLS session ticket keys. Note that TLS connection does
// not refer to this field directly. They use TicketKeys object in
// Worker object.
std::shared_ptr<TicketKeys> ticket_keys_;
struct ev_loop *loop_; struct ev_loop *loop_;
// The frontend server SSL_CTX
SSL_CTX *sv_ssl_ctx_;
// The backend server SSL_CTX
SSL_CTX *cl_ssl_ctx_;
// Shared backend HTTP2 session. NULL if multi-threaded. In
// multi-threaded case, see shrpx_worker.cc.
std::unique_ptr<Http2Session> http2session_;
std::unique_ptr<ConnectBlocker> http1_connect_blocker_;
// bufferevent_rate_limit_group *rate_limit_group_;
std::unique_ptr<AcceptHandler> acceptor4_; std::unique_ptr<AcceptHandler> acceptor4_;
std::unique_ptr<AcceptHandler> acceptor6_; std::unique_ptr<AcceptHandler> acceptor6_;
ev_timer disable_acceptor_timer_; ev_timer disable_acceptor_timer_;
std::unique_ptr<WorkerStat> worker_stat_;
unsigned int worker_round_robin_cnt_; unsigned int worker_round_robin_cnt_;
}; };

View File

@ -140,7 +140,8 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
return -1; return -1;
} }
auto worker_stat = client_handler_->get_worker_stat(); auto worker = client_handler_->get_worker();
auto worker_stat = worker->get_worker_stat();
auto end = worker_stat->next_downstream; auto end = worker_stat->next_downstream;
for (;;) { for (;;) {
auto i = worker_stat->next_downstream; auto i = worker_stat->next_downstream;

View File

@ -47,7 +47,6 @@
#include "shrpx_client_handler.h" #include "shrpx_client_handler.h"
#include "shrpx_config.h" #include "shrpx_config.h"
#include "shrpx_worker.h" #include "shrpx_worker.h"
#include "shrpx_worker_config.h"
#include "shrpx_downstream_connection_pool.h" #include "shrpx_downstream_connection_pool.h"
#include "util.h" #include "util.h"
#include "ssl.h" #include "ssl.h"
@ -130,7 +129,9 @@ int ssl_pem_passwd_cb(char *buf, int size, int rwflag, void *user_data) {
namespace { namespace {
int servername_callback(SSL *ssl, int *al, void *arg) { int servername_callback(SSL *ssl, int *al, void *arg) {
auto cert_tree = worker_config->cert_tree; auto handler = static_cast<ClientHandler *>(SSL_get_app_data(ssl));
auto worker = handler->get_worker();
auto cert_tree = worker->get_cert_lookup_tree();
if (cert_tree) { if (cert_tree) {
const char *hostname = SSL_get_servername(ssl, TLSEXT_NAMETYPE_host_name); const char *hostname = SSL_get_servername(ssl, TLSEXT_NAMETYPE_host_name);
if (hostname) { if (hostname) {
@ -149,7 +150,8 @@ namespace {
int ticket_key_cb(SSL *ssl, unsigned char *key_name, unsigned char *iv, int ticket_key_cb(SSL *ssl, unsigned char *key_name, unsigned char *iv,
EVP_CIPHER_CTX *ctx, HMAC_CTX *hctx, int enc) { EVP_CIPHER_CTX *ctx, HMAC_CTX *hctx, int enc) {
auto handler = static_cast<ClientHandler *>(SSL_get_app_data(ssl)); auto handler = static_cast<ClientHandler *>(SSL_get_app_data(ssl));
const auto &ticket_keys = worker_config->ticket_keys; auto worker = handler->get_worker();
const auto &ticket_keys = worker->get_ticket_keys();
if (!ticket_keys) { if (!ticket_keys) {
// No ticket keys available. // No ticket keys available.
@ -515,10 +517,8 @@ SSL_CTX *create_ssl_client_context() {
return ssl_ctx; return ssl_ctx;
} }
ClientHandler *accept_connection(struct ev_loop *loop, SSL_CTX *ssl_ctx, int fd, ClientHandler *accept_connection(Worker *worker, int fd, sockaddr *addr,
sockaddr *addr, int addrlen, int addrlen) {
WorkerStat *worker_stat,
DownstreamConnectionPool *dconn_pool) {
char host[NI_MAXHOST]; char host[NI_MAXHOST];
char service[NI_MAXSERV]; char service[NI_MAXSERV];
int rv; int rv;
@ -537,6 +537,7 @@ ClientHandler *accept_connection(struct ev_loop *loop, SSL_CTX *ssl_ctx, int fd,
LOG(WARN) << "Setting option TCP_NODELAY failed: errno=" << errno; LOG(WARN) << "Setting option TCP_NODELAY failed: errno=" << errno;
} }
SSL *ssl = nullptr; SSL *ssl = nullptr;
auto ssl_ctx = worker->get_sv_ssl_ctx();
if (ssl_ctx) { if (ssl_ctx) {
ssl = SSL_new(ssl_ctx); ssl = SSL_new(ssl_ctx);
if (!ssl) { if (!ssl) {
@ -555,8 +556,7 @@ ClientHandler *accept_connection(struct ev_loop *loop, SSL_CTX *ssl_ctx, int fd,
SSL_set_accept_state(ssl); SSL_set_accept_state(ssl);
} }
return new ClientHandler(loop, fd, ssl, host, service, worker_stat, return new ClientHandler(worker, fd, ssl, host, service);
dconn_pool);
} }
namespace { namespace {
@ -927,7 +927,7 @@ bool check_http2_requirement(SSL *ssl) {
return true; return true;
} }
SSL_CTX *setup_server_ssl_context() { SSL_CTX *setup_server_ssl_context(CertLookupTree *cert_tree) {
if (get_config()->upstream_no_tls) { if (get_config()->upstream_no_tls) {
return nullptr; return nullptr;
} }
@ -939,9 +939,11 @@ SSL_CTX *setup_server_ssl_context() {
return ssl_ctx; return ssl_ctx;
} }
auto cert_tree = new CertLookupTree(); if (!cert_tree) {
LOG(WARN) << "We have multiple additional certificates (--subcert), but "
worker_config->cert_tree = cert_tree; "cert_tree is not given. SNI may not work.";
return ssl_ctx;
}
for (auto &keycert : get_config()->subcerts) { for (auto &keycert : get_config()->subcerts) {
auto ssl_ctx = auto ssl_ctx =
@ -973,6 +975,13 @@ SSL_CTX *setup_client_ssl_context() {
: nullptr; : nullptr;
} }
CertLookupTree *create_cert_lookup_tree() {
if (get_config()->upstream_no_tls || get_config()->subcerts.empty()) {
return nullptr;
}
return new ssl::CertLookupTree();
}
} // namespace ssl } // namespace ssl
} // namespace shrpx } // namespace shrpx

View File

@ -37,7 +37,7 @@
namespace shrpx { namespace shrpx {
class ClientHandler; class ClientHandler;
struct WorkerStat; class Worker;
class DownstreamConnectionPool; class DownstreamConnectionPool;
namespace ssl { namespace ssl {
@ -49,10 +49,8 @@ SSL_CTX *create_ssl_context(const char *private_key_file,
// Create client side SSL_CTX // Create client side SSL_CTX
SSL_CTX *create_ssl_client_context(); SSL_CTX *create_ssl_client_context();
ClientHandler *accept_connection(struct ev_loop *loop, SSL_CTX *ssl_ctx, int fd, ClientHandler *accept_connection(Worker *worker, int fd, sockaddr *addr,
sockaddr *addr, int addrlen, int addrlen);
WorkerStat *worker_stat,
DownstreamConnectionPool *dconn_pool);
// Check peer's certificate against first downstream address in // Check peer's certificate against first downstream address in
// Config::downstream_addrs. We only consider first downstream since // Config::downstream_addrs. We only consider first downstream since
@ -143,15 +141,20 @@ std::vector<unsigned char> set_alpn_prefs(const std::vector<char *> &protos);
// Setups server side SSL_CTX. This function inspects get_config() // Setups server side SSL_CTX. This function inspects get_config()
// and if upstream_no_tls is true, returns nullptr. Otherwise // and if upstream_no_tls is true, returns nullptr. Otherwise
// construct default SSL_CTX. If subcerts are not empty, create // construct default SSL_CTX. If subcerts are available
// SSL_CTX for them. All created SSL_CTX are added to CertLookupTree. // (get_config()->subcerts), caller should provide CertLookupTree
SSL_CTX *setup_server_ssl_context(); // object as |cert_tree| parameter, otherwise SNI does not work.
SSL_CTX *setup_server_ssl_context(CertLookupTree *cert_tree);
// Setups client side SSL_CTX. This function inspects get_config() // Setups client side SSL_CTX. This function inspects get_config()
// and if downstream_no_tls is true, returns nullptr. Otherwise, only // and if downstream_no_tls is true, returns nullptr. Otherwise, only
// construct SSL_CTX if either client_mode or http2_bridge is true. // construct SSL_CTX if either client_mode or http2_bridge is true.
SSL_CTX *setup_client_ssl_context(); SSL_CTX *setup_client_ssl_context();
// Creates CertLookupTree. If frontend is configured not to use TLS,
// this function returns nullptr.
CertLookupTree *create_cert_lookup_tree();
} // namespace ssl } // namespace ssl
} // namespace shrpx } // namespace shrpx

View File

@ -48,35 +48,20 @@ void eventcb(struct ev_loop *loop, ev_async *w, int revents) {
} }
} // namespace } // namespace
Worker::Worker(SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
ssl::CertLookupTree *cert_tree, ssl::CertLookupTree *cert_tree,
const std::shared_ptr<TicketKeys> &ticket_keys) const std::shared_ptr<TicketKeys> &ticket_keys)
: loop_(ev_loop_new(0)), sv_ssl_ctx_(sv_ssl_ctx), cl_ssl_ctx_(cl_ssl_ctx), : loop_(loop), sv_ssl_ctx_(sv_ssl_ctx), cl_ssl_ctx_(cl_ssl_ctx),
worker_stat_(make_unique<WorkerStat>()) { cert_tree_(cert_tree), ticket_keys_(ticket_keys) {
ev_async_init(&w_, eventcb); ev_async_init(&w_, eventcb);
w_.data = this; w_.data = this;
ev_async_start(loop_, &w_); ev_async_start(loop_, &w_);
#ifndef NOTHREADS if (get_config()->downstream_proto == PROTO_HTTP2) {
fut_ = std::async(std::launch::async, [this, cert_tree, &ticket_keys] { http2session_ = make_unique<Http2Session>(loop_, cl_ssl_ctx_);
if (get_config()->tls_ctx_per_worker) { } else {
sv_ssl_ctx_ = ssl::setup_server_ssl_context(); http1_connect_blocker_ = make_unique<ConnectBlocker>(loop_);
cl_ssl_ctx_ = ssl::setup_client_ssl_context(); }
} else {
worker_config->cert_tree = cert_tree;
}
if (get_config()->downstream_proto == PROTO_HTTP2) {
http2session_ = make_unique<Http2Session>(loop_, cl_ssl_ctx_);
} else {
http1_connect_blocker_ = make_unique<ConnectBlocker>(loop_);
}
worker_config->ticket_keys = ticket_keys;
(void)reopen_log_files();
ev_run(loop_);
});
#endif // !NOTHREADS
} }
Worker::~Worker() { ev_async_stop(loop_, &w_); } Worker::~Worker() { ev_async_stop(loop_, &w_); }
@ -87,6 +72,15 @@ void Worker::wait() {
#endif // !NOTHREADS #endif // !NOTHREADS
} }
void Worker::run_async() {
#ifndef NOTHREADS
fut_ = std::async(std::launch::async, [this] {
(void)reopen_log_files();
ev_run(loop_);
});
#endif // !NOTHREADS
}
void Worker::send(const WorkerEvent &event) { void Worker::send(const WorkerEvent &event) {
{ {
std::lock_guard<std::mutex> g(m_); std::lock_guard<std::mutex> g(m_);
@ -111,7 +105,7 @@ void Worker::process_events() {
<< ", addrlen=" << wev.client_addrlen; << ", addrlen=" << wev.client_addrlen;
} }
if (worker_stat_->num_connections >= if (worker_stat_.num_connections >=
get_config()->worker_frontend_connections) { get_config()->worker_frontend_connections) {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
@ -125,8 +119,7 @@ void Worker::process_events() {
} }
auto client_handler = ssl::accept_connection( auto client_handler = ssl::accept_connection(
loop_, sv_ssl_ctx_, wev.client_fd, &wev.client_addr.sa, this, wev.client_fd, &wev.client_addr.sa, wev.client_addrlen);
wev.client_addrlen, worker_stat_.get(), &dconn_pool_);
if (!client_handler) { if (!client_handler) {
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
WLOG(ERROR, this) << "ClientHandler creation failed"; WLOG(ERROR, this) << "ClientHandler creation failed";
@ -135,9 +128,6 @@ void Worker::process_events() {
break; break;
} }
client_handler->set_http2_session(http2session_.get());
client_handler->set_http1_connect_blocker(http1_connect_blocker_.get());
if (LOG_ENABLED(INFO)) { if (LOG_ENABLED(INFO)) {
WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created "; WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created ";
} }
@ -150,7 +140,7 @@ void Worker::process_events() {
<< ")"; << ")";
} }
worker_config->ticket_keys = wev.ticket_keys; ticket_keys_ = wev.ticket_keys;
break; break;
case REOPEN_LOG: case REOPEN_LOG:
@ -167,7 +157,7 @@ void Worker::process_events() {
worker_config->graceful_shutdown = true; worker_config->graceful_shutdown = true;
if (worker_stat_->num_connections == 0) { if (worker_stat_.num_connections == 0) {
ev_break(loop_); ev_break(loop_);
return; return;
@ -182,4 +172,30 @@ void Worker::process_events() {
} }
} }
ssl::CertLookupTree *Worker::get_cert_lookup_tree() const { return cert_tree_; }
const std::shared_ptr<TicketKeys> &Worker::get_ticket_keys() const {
return ticket_keys_;
}
void Worker::set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys) {
ticket_keys_ = std::move(ticket_keys);
}
WorkerStat *Worker::get_worker_stat() { return &worker_stat_; }
DownstreamConnectionPool *Worker::get_dconn_pool() { return &dconn_pool_; }
Http2Session *Worker::get_http2_session() const { return http2session_.get(); }
ConnectBlocker *Worker::get_http1_connect_blocker() const {
return http1_connect_blocker_.get();
}
struct ev_loop *Worker::get_loop() const {
return loop_;
}
SSL_CTX *Worker::get_sv_ssl_ctx() const { return sv_ssl_ctx_; }
} // namespace shrpx } // namespace shrpx

View File

@ -80,14 +80,25 @@ struct WorkerEvent {
class Worker { class Worker {
public: public:
Worker(SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx, Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
ssl::CertLookupTree *cert_tree, ssl::CertLookupTree *cert_tree,
const std::shared_ptr<TicketKeys> &ticket_keys); const std::shared_ptr<TicketKeys> &ticket_keys);
~Worker(); ~Worker();
void run_async();
void wait(); void wait();
void process_events(); void process_events();
void send(const WorkerEvent &event); void send(const WorkerEvent &event);
ssl::CertLookupTree *get_cert_lookup_tree() const;
const std::shared_ptr<TicketKeys> &get_ticket_keys() const;
void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys);
WorkerStat *get_worker_stat();
DownstreamConnectionPool *get_dconn_pool();
Http2Session *get_http2_session() const;
ConnectBlocker *get_http1_connect_blocker() const;
struct ev_loop *get_loop() const;
SSL_CTX *get_sv_ssl_ctx() const;
private: private:
#ifndef NOTHREADS #ifndef NOTHREADS
std::future<void> fut_; std::future<void> fut_;
@ -96,12 +107,18 @@ private:
std::deque<WorkerEvent> q_; std::deque<WorkerEvent> q_;
ev_async w_; ev_async w_;
DownstreamConnectionPool dconn_pool_; DownstreamConnectionPool dconn_pool_;
WorkerStat worker_stat_;
struct ev_loop *loop_; struct ev_loop *loop_;
// Following fields are shared across threads if
// get_config()->tls_ctx_per_worker == true.
SSL_CTX *sv_ssl_ctx_; SSL_CTX *sv_ssl_ctx_;
SSL_CTX *cl_ssl_ctx_; SSL_CTX *cl_ssl_ctx_;
ssl::CertLookupTree *cert_tree_;
std::shared_ptr<TicketKeys> ticket_keys_;
std::unique_ptr<Http2Session> http2session_; std::unique_ptr<Http2Session> http2session_;
std::unique_ptr<ConnectBlocker> http1_connect_blocker_; std::unique_ptr<ConnectBlocker> http1_connect_blocker_;
std::unique_ptr<WorkerStat> worker_stat_;
}; };
} // namespace shrpx } // namespace shrpx

View File

@ -30,8 +30,8 @@ using namespace nghttp2;
namespace shrpx { namespace shrpx {
WorkerConfig::WorkerConfig() WorkerConfig::WorkerConfig()
: cert_tree(nullptr), accesslog_fd(-1), errorlog_fd(-1), : accesslog_fd(-1), errorlog_fd(-1), errorlog_tty(false),
errorlog_tty(false), graceful_shutdown(false) {} graceful_shutdown(false) {}
#ifndef NOTHREADS #ifndef NOTHREADS
thread_local thread_local

View File

@ -38,11 +38,9 @@ class CertLookupTree;
struct TicketKeys; struct TicketKeys;
struct WorkerConfig { struct WorkerConfig {
std::shared_ptr<TicketKeys> ticket_keys;
std::chrono::system_clock::time_point time_str_updated_; std::chrono::system_clock::time_point time_str_updated_;
std::string time_local_str; std::string time_local_str;
std::string time_iso8601_str; std::string time_iso8601_str;
ssl::CertLookupTree *cert_tree;
int accesslog_fd; int accesslog_fd;
int errorlog_fd; int errorlog_fd;
// true if errorlog_fd is referring to a terminal. // true if errorlog_fd is referring to a terminal.