diff --git a/src/HttpServer.cc b/src/HttpServer.cc index a4e08bb8..7649d873 100644 --- a/src/HttpServer.cc +++ b/src/HttpServer.cc @@ -35,6 +35,7 @@ #include #include #include +#include #include @@ -67,6 +68,7 @@ Config::Config() : data_ptr(nullptr), output_upper_thres(1024*1024), padding(0), + num_worker(1), header_table_size(-1), port(0), verbose(false), @@ -92,14 +94,14 @@ public: Sessions(event_base *evbase, const Config *config, SSL_CTX *ssl_ctx) : evbase_(evbase), config_(config), - ssl_ctx_(ssl_ctx) + ssl_ctx_(ssl_ctx), + next_session_id_(1) {} ~Sessions() { for(auto handler : handlers_) { delete handler; } - SSL_CTX_free(ssl_ctx_); } void add_handler(Http2Handler *handler) { @@ -135,11 +137,43 @@ public: { return evbase_; } + int64_t get_next_session_id() + { + auto session_id = next_session_id_; + if(next_session_id_ == INT64_MAX) { + next_session_id_ = 1; + } + return session_id; + } + void accept_connection(int fd) + { + int val = 1; + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, + reinterpret_cast(&val), sizeof(val)); + SSL *ssl = nullptr; + if(ssl_ctx_) { + ssl = ssl_session_new(fd); + if(!ssl) { + close(fd); + return; + } + } + auto handler = util::make_unique(this, fd, ssl, + get_next_session_id()); + handler->setup_bev(); + if(!ssl) { + if(handler->on_connect() != 0) { + return; + } + } + add_handler(handler.release()); + } private: std::set handlers_; event_base *evbase_; const Config *config_; SSL_CTX *ssl_ctx_; + int64_t next_session_id_; }; namespace { @@ -971,43 +1005,99 @@ void fill_callback(nghttp2_session_callbacks& callbacks, const Config *config) } } // namespace +struct ClientInfo { + int fd; +}; + +namespace { +void worker_readcb(bufferevent *bev, void *arg) +{ + auto sessions = static_cast(arg); + auto input = bufferevent_get_input(bev); + while(evbuffer_get_length(input) >= sizeof(ClientInfo)) { + ClientInfo client; + evbuffer_remove(input, &client, sizeof(client)); + sessions->accept_connection(client.fd); + } +} +} // namespace + +namespace { +void run_worker(int thread_id, int fd, SSL_CTX *ssl_ctx, const Config *config) +{ + auto evbase = event_base_new(); + auto bev = bufferevent_socket_new(evbase, fd, + BEV_OPT_DEFER_CALLBACKS | + BEV_OPT_CLOSE_ON_FREE); + auto sessions = Sessions(evbase, config, ssl_ctx); + + bufferevent_enable(bev, EV_READ); + bufferevent_setcb(bev, worker_readcb, nullptr, nullptr, &sessions); + event_base_loop(evbase, 0); +} +} // namespace + class ListenEventHandler { public: - ListenEventHandler(Sessions *sessions, int64_t *session_id_seed_ptr) + ListenEventHandler(Sessions *sessions, const Config *config) : sessions_(sessions), - session_id_seed_ptr_(session_id_seed_ptr) - {} - void accept_connection(int fd, sockaddr *addr, int addrlen) + config_(config), + next_worker_(0) { int rv; - int val = 1; - SSL *ssl = nullptr; - rv = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, - reinterpret_cast(&val), sizeof(val)); - if(rv == -1) { - std::cerr << "Setting option TCP_NODELAY failed: errno=" - << errno << std::endl; + if(config_->num_worker == 1) { + return; } - if(sessions_->get_ssl_ctx()) { - ssl = sessions_->ssl_session_new(fd); - if(!ssl) { - return; + for(size_t i = 0; i < config_->num_worker; ++i) { + if(config_->verbose) { + std::cerr << "spawning thread #" << i << std::endl; } - } - int64_t session_id = ++(*session_id_seed_ptr_); - auto handler = util::make_unique(sessions_, fd, ssl, - session_id); - handler->setup_bev(); - if(!ssl) { - if(handler->on_connect() != 0) { - return; + int socks[2]; + rv = socketpair(AF_UNIX, SOCK_STREAM, 0, socks); + if(rv == -1) { + std::cerr << "socketpair() failed: errno=" << errno << std::endl; + assert(0); } + evutil_make_socket_nonblocking(socks[0]); + evutil_make_socket_nonblocking(socks[1]); + auto bev = bufferevent_socket_new(sessions_->get_evbase(), socks[0], + BEV_OPT_DEFER_CALLBACKS | + BEV_OPT_CLOSE_ON_FREE); + if(!bev) { + std::cerr << "bufferevent_socket_new() failed" << std::endl; + assert(0); + } + workers_.push_back(bev); + auto t = std::thread(run_worker, i, socks[1], sessions_->get_ssl_ctx(), + config_); + t.detach(); + } + } + void accept_connection(int fd, sockaddr *addr, int addrlen) + { + if(config_->num_worker == 1) { + sessions_->accept_connection(fd); + return; + } + // Dispatch client to the one of the worker threads, in a round + // robin manner. + auto client = ClientInfo{fd}; + bufferevent_write(workers_[next_worker_], &client, sizeof(client)); + if(next_worker_ == config_->num_worker - 1) { + next_worker_ = 0; + } else { + ++next_worker_; } - sessions_->add_handler(handler.release()); } private: + // In multi threading mode, this includes bufferevent to dispatch + // client to the worker threads. + std::vector workers_; Sessions *sessions_; - int64_t *session_id_seed_ptr_; + const Config *config_; + // In multi threading mode, this points to the next thread that + // client will be dispatched. + size_t next_worker_; }; HttpServer::HttpServer(const Config *config) @@ -1051,13 +1141,12 @@ void evlistener_errorcb(evconnlistener *listener, void *ptr) } // namespace namespace { -int start_listen(event_base *evbase, Sessions *sessions, - int64_t *session_id_seed_ptr) +int start_listen(event_base *evbase, Sessions *sessions, const Config *config) { addrinfo hints; int r; char service[10]; - snprintf(service, sizeof(service), "%u", sessions->get_config()->port); + snprintf(service, sizeof(service), "%u", config->port); memset(&hints, 0, sizeof(addrinfo)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; @@ -1066,6 +1155,8 @@ int start_listen(event_base *evbase, Sessions *sessions, hints.ai_flags |= AI_ADDRCONFIG; #endif // AI_ADDRCONFIG + auto listen_handler = new ListenEventHandler(sessions, config); + addrinfo *res, *rp; r = getaddrinfo(nullptr, service, &hints, &res); if(r != 0) { @@ -1095,17 +1186,14 @@ int start_listen(event_base *evbase, Sessions *sessions, #endif // IPV6_V6ONLY if(bind(fd, rp->ai_addr, rp->ai_addrlen) == 0) { auto evlistener = evconnlistener_new - (evbase, - evlistener_acceptcb, - new ListenEventHandler(sessions, session_id_seed_ptr), - LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, - -1, fd); + (evbase, evlistener_acceptcb, listen_handler, + LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1, fd); evconnlistener_set_error_cb(evlistener, evlistener_errorcb); - if(sessions->get_config()->verbose) { + if(config->verbose) { std::cout << (rp->ai_family == AF_INET ? "IPv4" : "IPv6") << ": listen on port " - << sessions->get_config()->port << std::endl; + << config->port << std::endl; } continue; } else { @@ -1226,13 +1314,11 @@ int HttpServer::run() } auto evbase = event_base_new(); - int64_t session_id_seed = 0; Sessions sessions(evbase, config_, ssl_ctx); - if(start_listen(evbase, &sessions, &session_id_seed) != 0) { + if(start_listen(evbase, &sessions, config_) != 0) { std::cerr << "Could not listen" << std::endl; return -1; } - event_base_loop(evbase, 0); return 0; } diff --git a/src/HttpServer.h b/src/HttpServer.h index f89a1ee2..c7624693 100644 --- a/src/HttpServer.h +++ b/src/HttpServer.h @@ -57,6 +57,7 @@ struct Config { void *data_ptr; size_t output_upper_thres; size_t padding; + size_t num_worker; ssize_t header_table_size; uint16_t port; bool verbose; @@ -66,8 +67,6 @@ struct Config { Config(); }; -class Sessions; - struct Request { Headers headers; std::pair response_body; diff --git a/src/Makefile.am b/src/Makefile.am index 369e892e..25f0ce74 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -67,6 +67,7 @@ nghttp_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} nghttp.cc \ ${HTML_PARSER_OBJECTS} ${HTML_PARSER_HFILES} nghttpd_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} nghttpd.cc \ + ssl.cc ssl.h \ HttpServer.cc HttpServer.h h2load_SOURCES = util.cc util.h http2.cc http2.h h2load.cc h2load.h \ diff --git a/src/nghttpd.cc b/src/nghttpd.cc index dac39f0d..56434bf9 100644 --- a/src/nghttpd.cc +++ b/src/nghttpd.cc @@ -42,6 +42,7 @@ #include "app_helper.h" #include "HttpServer.h" #include "util.h" +#include "ssl.h" namespace nghttp2 { @@ -130,6 +131,9 @@ void print_help(std::ostream& out) << " root. See --htdocs option.\n" << " -b, --padding= Add at most bytes to a frame payload as\n" << " padding. Specify 0 to disable padding.\n" + << " -n, --workers=\n" + << " Set the number of worker threads.\n" + << " Default: 1\n" << " --version Display version information and exit.\n" << " -h, --help Display this help and exit.\n" << std::endl; @@ -151,13 +155,15 @@ int main(int argc, char **argv) {"header-table-size", required_argument, nullptr, 'c'}, {"push", required_argument, nullptr, 'p'}, {"padding", required_argument, nullptr, 'b'}, + {"workers", required_argument, nullptr, 'n'}, {"no-tls", no_argument, &flag, 1}, {"color", no_argument, &flag, 2}, {"version", no_argument, &flag, 3}, {nullptr, 0, nullptr, 0} }; int option_index = 0; - int c = getopt_long(argc, argv, "DVb:c:d:hp:v", long_options, &option_index); + int c = getopt_long(argc, argv, "DVb:c:d:hn:p:v", long_options, + &option_index); char *end; if(c == -1) { break; @@ -175,6 +181,14 @@ int main(int argc, char **argv) case 'd': config.htdocs = optarg; break; + case 'n': + errno = 0; + config.num_worker = strtoul(optarg, &end, 10); + if(errno == ERANGE || *end != '\0' || config.num_worker == 0) { + std::cerr << "-n: Bad option value: " << optarg << std::endl; + exit(EXIT_FAILURE); + } + break; case 'h': print_help(std::cout); exit(EXIT_SUCCESS); @@ -254,6 +268,8 @@ int main(int argc, char **argv) OpenSSL_add_all_algorithms(); SSL_load_error_strings(); SSL_library_init(); + ssl::LibsslGlobalLock(); + reset_timer(); HttpServer server(&config);