nghttpd: Add multi threading support

This commit is contained in:
Tatsuhiro Tsujikawa 2014-03-04 23:14:26 +09:00
parent d4ea2418d8
commit caeeba681f
4 changed files with 145 additions and 43 deletions

View File

@ -35,6 +35,7 @@
#include <cassert> #include <cassert>
#include <set> #include <set>
#include <iostream> #include <iostream>
#include <thread>
#include <openssl/err.h> #include <openssl/err.h>
@ -67,6 +68,7 @@ Config::Config()
: data_ptr(nullptr), : data_ptr(nullptr),
output_upper_thres(1024*1024), output_upper_thres(1024*1024),
padding(0), padding(0),
num_worker(1),
header_table_size(-1), header_table_size(-1),
port(0), port(0),
verbose(false), verbose(false),
@ -92,14 +94,14 @@ public:
Sessions(event_base *evbase, const Config *config, SSL_CTX *ssl_ctx) Sessions(event_base *evbase, const Config *config, SSL_CTX *ssl_ctx)
: evbase_(evbase), : evbase_(evbase),
config_(config), config_(config),
ssl_ctx_(ssl_ctx) ssl_ctx_(ssl_ctx),
next_session_id_(1)
{} {}
~Sessions() ~Sessions()
{ {
for(auto handler : handlers_) { for(auto handler : handlers_) {
delete handler; delete handler;
} }
SSL_CTX_free(ssl_ctx_);
} }
void add_handler(Http2Handler *handler) void add_handler(Http2Handler *handler)
{ {
@ -135,11 +137,43 @@ public:
{ {
return evbase_; return evbase_;
} }
int64_t get_next_session_id()
{
auto session_id = next_session_id_;
if(next_session_id_ == INT64_MAX) {
next_session_id_ = 1;
}
return session_id;
}
void accept_connection(int fd)
{
int val = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
reinterpret_cast<char *>(&val), sizeof(val));
SSL *ssl = nullptr;
if(ssl_ctx_) {
ssl = ssl_session_new(fd);
if(!ssl) {
close(fd);
return;
}
}
auto handler = util::make_unique<Http2Handler>(this, fd, ssl,
get_next_session_id());
handler->setup_bev();
if(!ssl) {
if(handler->on_connect() != 0) {
return;
}
}
add_handler(handler.release());
}
private: private:
std::set<Http2Handler*> handlers_; std::set<Http2Handler*> handlers_;
event_base *evbase_; event_base *evbase_;
const Config *config_; const Config *config_;
SSL_CTX *ssl_ctx_; SSL_CTX *ssl_ctx_;
int64_t next_session_id_;
}; };
namespace { namespace {
@ -971,43 +1005,99 @@ void fill_callback(nghttp2_session_callbacks& callbacks, const Config *config)
} }
} // namespace } // namespace
struct ClientInfo {
int fd;
};
namespace {
void worker_readcb(bufferevent *bev, void *arg)
{
auto sessions = static_cast<Sessions*>(arg);
auto input = bufferevent_get_input(bev);
while(evbuffer_get_length(input) >= sizeof(ClientInfo)) {
ClientInfo client;
evbuffer_remove(input, &client, sizeof(client));
sessions->accept_connection(client.fd);
}
}
} // namespace
namespace {
void run_worker(int thread_id, int fd, SSL_CTX *ssl_ctx, const Config *config)
{
auto evbase = event_base_new();
auto bev = bufferevent_socket_new(evbase, fd,
BEV_OPT_DEFER_CALLBACKS |
BEV_OPT_CLOSE_ON_FREE);
auto sessions = Sessions(evbase, config, ssl_ctx);
bufferevent_enable(bev, EV_READ);
bufferevent_setcb(bev, worker_readcb, nullptr, nullptr, &sessions);
event_base_loop(evbase, 0);
}
} // namespace
class ListenEventHandler { class ListenEventHandler {
public: public:
ListenEventHandler(Sessions *sessions, int64_t *session_id_seed_ptr) ListenEventHandler(Sessions *sessions, const Config *config)
: sessions_(sessions), : sessions_(sessions),
session_id_seed_ptr_(session_id_seed_ptr) config_(config),
{} next_worker_(0)
void accept_connection(int fd, sockaddr *addr, int addrlen)
{ {
int rv; int rv;
int val = 1; if(config_->num_worker == 1) {
SSL *ssl = nullptr; return;
rv = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
reinterpret_cast<char *>(&val), sizeof(val));
if(rv == -1) {
std::cerr << "Setting option TCP_NODELAY failed: errno="
<< errno << std::endl;
} }
if(sessions_->get_ssl_ctx()) { for(size_t i = 0; i < config_->num_worker; ++i) {
ssl = sessions_->ssl_session_new(fd); if(config_->verbose) {
if(!ssl) { std::cerr << "spawning thread #" << i << std::endl;
return;
} }
} int socks[2];
int64_t session_id = ++(*session_id_seed_ptr_); rv = socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
auto handler = util::make_unique<Http2Handler>(sessions_, fd, ssl, if(rv == -1) {
session_id); std::cerr << "socketpair() failed: errno=" << errno << std::endl;
handler->setup_bev(); assert(0);
if(!ssl) {
if(handler->on_connect() != 0) {
return;
} }
evutil_make_socket_nonblocking(socks[0]);
evutil_make_socket_nonblocking(socks[1]);
auto bev = bufferevent_socket_new(sessions_->get_evbase(), socks[0],
BEV_OPT_DEFER_CALLBACKS |
BEV_OPT_CLOSE_ON_FREE);
if(!bev) {
std::cerr << "bufferevent_socket_new() failed" << std::endl;
assert(0);
}
workers_.push_back(bev);
auto t = std::thread(run_worker, i, socks[1], sessions_->get_ssl_ctx(),
config_);
t.detach();
}
}
void accept_connection(int fd, sockaddr *addr, int addrlen)
{
if(config_->num_worker == 1) {
sessions_->accept_connection(fd);
return;
}
// Dispatch client to the one of the worker threads, in a round
// robin manner.
auto client = ClientInfo{fd};
bufferevent_write(workers_[next_worker_], &client, sizeof(client));
if(next_worker_ == config_->num_worker - 1) {
next_worker_ = 0;
} else {
++next_worker_;
} }
sessions_->add_handler(handler.release());
} }
private: private:
// In multi threading mode, this includes bufferevent to dispatch
// client to the worker threads.
std::vector<bufferevent*> workers_;
Sessions *sessions_; Sessions *sessions_;
int64_t *session_id_seed_ptr_; const Config *config_;
// In multi threading mode, this points to the next thread that
// client will be dispatched.
size_t next_worker_;
}; };
HttpServer::HttpServer(const Config *config) HttpServer::HttpServer(const Config *config)
@ -1051,13 +1141,12 @@ void evlistener_errorcb(evconnlistener *listener, void *ptr)
} // namespace } // namespace
namespace { namespace {
int start_listen(event_base *evbase, Sessions *sessions, int start_listen(event_base *evbase, Sessions *sessions, const Config *config)
int64_t *session_id_seed_ptr)
{ {
addrinfo hints; addrinfo hints;
int r; int r;
char service[10]; char service[10];
snprintf(service, sizeof(service), "%u", sessions->get_config()->port); snprintf(service, sizeof(service), "%u", config->port);
memset(&hints, 0, sizeof(addrinfo)); memset(&hints, 0, sizeof(addrinfo));
hints.ai_family = AF_UNSPEC; hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM; hints.ai_socktype = SOCK_STREAM;
@ -1066,6 +1155,8 @@ int start_listen(event_base *evbase, Sessions *sessions,
hints.ai_flags |= AI_ADDRCONFIG; hints.ai_flags |= AI_ADDRCONFIG;
#endif // AI_ADDRCONFIG #endif // AI_ADDRCONFIG
auto listen_handler = new ListenEventHandler(sessions, config);
addrinfo *res, *rp; addrinfo *res, *rp;
r = getaddrinfo(nullptr, service, &hints, &res); r = getaddrinfo(nullptr, service, &hints, &res);
if(r != 0) { if(r != 0) {
@ -1095,17 +1186,14 @@ int start_listen(event_base *evbase, Sessions *sessions,
#endif // IPV6_V6ONLY #endif // IPV6_V6ONLY
if(bind(fd, rp->ai_addr, rp->ai_addrlen) == 0) { if(bind(fd, rp->ai_addr, rp->ai_addrlen) == 0) {
auto evlistener = evconnlistener_new auto evlistener = evconnlistener_new
(evbase, (evbase, evlistener_acceptcb, listen_handler,
evlistener_acceptcb, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1, fd);
new ListenEventHandler(sessions, session_id_seed_ptr),
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
-1, fd);
evconnlistener_set_error_cb(evlistener, evlistener_errorcb); evconnlistener_set_error_cb(evlistener, evlistener_errorcb);
if(sessions->get_config()->verbose) { if(config->verbose) {
std::cout << (rp->ai_family == AF_INET ? "IPv4" : "IPv6") std::cout << (rp->ai_family == AF_INET ? "IPv4" : "IPv6")
<< ": listen on port " << ": listen on port "
<< sessions->get_config()->port << std::endl; << config->port << std::endl;
} }
continue; continue;
} else { } else {
@ -1226,13 +1314,11 @@ int HttpServer::run()
} }
auto evbase = event_base_new(); auto evbase = event_base_new();
int64_t session_id_seed = 0;
Sessions sessions(evbase, config_, ssl_ctx); Sessions sessions(evbase, config_, ssl_ctx);
if(start_listen(evbase, &sessions, &session_id_seed) != 0) { if(start_listen(evbase, &sessions, config_) != 0) {
std::cerr << "Could not listen" << std::endl; std::cerr << "Could not listen" << std::endl;
return -1; return -1;
} }
event_base_loop(evbase, 0); event_base_loop(evbase, 0);
return 0; return 0;
} }

View File

@ -57,6 +57,7 @@ struct Config {
void *data_ptr; void *data_ptr;
size_t output_upper_thres; size_t output_upper_thres;
size_t padding; size_t padding;
size_t num_worker;
ssize_t header_table_size; ssize_t header_table_size;
uint16_t port; uint16_t port;
bool verbose; bool verbose;
@ -66,8 +67,6 @@ struct Config {
Config(); Config();
}; };
class Sessions;
struct Request { struct Request {
Headers headers; Headers headers;
std::pair<std::string, size_t> response_body; std::pair<std::string, size_t> response_body;

View File

@ -67,6 +67,7 @@ nghttp_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} nghttp.cc \
${HTML_PARSER_OBJECTS} ${HTML_PARSER_HFILES} ${HTML_PARSER_OBJECTS} ${HTML_PARSER_HFILES}
nghttpd_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} nghttpd.cc \ nghttpd_SOURCES = ${HELPER_OBJECTS} ${HELPER_HFILES} nghttpd.cc \
ssl.cc ssl.h \
HttpServer.cc HttpServer.h HttpServer.cc HttpServer.h
h2load_SOURCES = util.cc util.h http2.cc http2.h h2load.cc h2load.h \ h2load_SOURCES = util.cc util.h http2.cc http2.h h2load.cc h2load.h \

View File

@ -42,6 +42,7 @@
#include "app_helper.h" #include "app_helper.h"
#include "HttpServer.h" #include "HttpServer.h"
#include "util.h" #include "util.h"
#include "ssl.h"
namespace nghttp2 { namespace nghttp2 {
@ -130,6 +131,9 @@ void print_help(std::ostream& out)
<< " root. See --htdocs option.\n" << " root. See --htdocs option.\n"
<< " -b, --padding=<N> Add at most <N> bytes to a frame payload as\n" << " -b, --padding=<N> Add at most <N> bytes to a frame payload as\n"
<< " padding. Specify 0 to disable padding.\n" << " padding. Specify 0 to disable padding.\n"
<< " -n, --workers=<CORE>\n"
<< " Set the number of worker threads.\n"
<< " Default: 1\n"
<< " --version Display version information and exit.\n" << " --version Display version information and exit.\n"
<< " -h, --help Display this help and exit.\n" << " -h, --help Display this help and exit.\n"
<< std::endl; << std::endl;
@ -151,13 +155,15 @@ int main(int argc, char **argv)
{"header-table-size", required_argument, nullptr, 'c'}, {"header-table-size", required_argument, nullptr, 'c'},
{"push", required_argument, nullptr, 'p'}, {"push", required_argument, nullptr, 'p'},
{"padding", required_argument, nullptr, 'b'}, {"padding", required_argument, nullptr, 'b'},
{"workers", required_argument, nullptr, 'n'},
{"no-tls", no_argument, &flag, 1}, {"no-tls", no_argument, &flag, 1},
{"color", no_argument, &flag, 2}, {"color", no_argument, &flag, 2},
{"version", no_argument, &flag, 3}, {"version", no_argument, &flag, 3},
{nullptr, 0, nullptr, 0} {nullptr, 0, nullptr, 0}
}; };
int option_index = 0; int option_index = 0;
int c = getopt_long(argc, argv, "DVb:c:d:hp:v", long_options, &option_index); int c = getopt_long(argc, argv, "DVb:c:d:hn:p:v", long_options,
&option_index);
char *end; char *end;
if(c == -1) { if(c == -1) {
break; break;
@ -175,6 +181,14 @@ int main(int argc, char **argv)
case 'd': case 'd':
config.htdocs = optarg; config.htdocs = optarg;
break; break;
case 'n':
errno = 0;
config.num_worker = strtoul(optarg, &end, 10);
if(errno == ERANGE || *end != '\0' || config.num_worker == 0) {
std::cerr << "-n: Bad option value: " << optarg << std::endl;
exit(EXIT_FAILURE);
}
break;
case 'h': case 'h':
print_help(std::cout); print_help(std::cout);
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
@ -254,6 +268,8 @@ int main(int argc, char **argv)
OpenSSL_add_all_algorithms(); OpenSSL_add_all_algorithms();
SSL_load_error_strings(); SSL_load_error_strings();
SSL_library_init(); SSL_library_init();
ssl::LibsslGlobalLock();
reset_timer(); reset_timer();
HttpServer server(&config); HttpServer server(&config);