From 479e15469c71d36dd8f1cfffc2570d375f6346e1 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Thu, 26 Jun 2014 22:55:22 +0900 Subject: [PATCH] nghttpx: Add worker-frontend-connections option --- src/shrpx.cc | 15 +++++ src/shrpx_client_handler.cc | 9 ++- src/shrpx_client_handler.h | 5 +- src/shrpx_config.cc | 17 ++++++ src/shrpx_config.h | 2 + src/shrpx_listen_handler.cc | 27 ++++++++- src/shrpx_listen_handler.h | 4 ++ src/shrpx_ssl.cc | 94 ++++++++++++++++-------------- src/shrpx_ssl.h | 4 +- src/shrpx_thread_event_receiver.cc | 24 +++++++- src/shrpx_thread_event_receiver.h | 4 ++ src/shrpx_worker.h | 6 ++ 12 files changed, 159 insertions(+), 52 deletions(-) diff --git a/src/shrpx.cc b/src/shrpx.cc index b5fa7247..7303e57a 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -439,6 +439,7 @@ void fill_default_config() mod_config()->http2_no_cookie_crumbling = false; mod_config()->upstream_frame_debug = false; mod_config()->padding = 0; + mod_config()->worker_frontend_connections = 0; nghttp2_option_new(&mod_config()->http2_option); @@ -539,6 +540,10 @@ Performance: means write burst size is unlimited. Default: )" << get_config()->worker_write_burst << R"( + --worker-frontend-connections= + Set maximum number of simultaneous connections + frontend accepts. Setting 0 means unlimited. + Default: 0 Timeout: --frontend-http2-read-timeout= @@ -844,6 +849,7 @@ int main(int argc, char **argv) {"worker-write-burst", required_argument, &flag, 53}, {"altsvc", required_argument, &flag, 54}, {"add-response-header", required_argument, &flag, 55}, + {"worker-frontend-connections", required_argument, &flag, 56}, {nullptr, 0, nullptr, 0 } }; @@ -1092,6 +1098,10 @@ int main(int argc, char **argv) // --add-response-header cmdcfgs.emplace_back(SHRPX_OPT_ADD_RESPONSE_HEADER, optarg); break; + case 56: + // --worker-frontend-connections + cmdcfgs.emplace_back(SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS, optarg); + break; default: break; } @@ -1185,6 +1195,11 @@ int main(int argc, char **argv) exit(EXIT_FAILURE); } + if(get_config()->worker_frontend_connections == 0) { + mod_config()->worker_frontend_connections = + std::numeric_limits::max(); + } + if(get_config()->http2_proxy + get_config()->http2_bridge + get_config()->client_proxy + get_config()->client > 1) { LOG(FATAL) << "--http2-proxy, --http2-bridge, --client-proxy and --client " diff --git a/src/shrpx_client_handler.cc b/src/shrpx_client_handler.cc index 6f6f97ed..fe2c790e 100644 --- a/src/shrpx_client_handler.cc +++ b/src/shrpx_client_handler.cc @@ -35,6 +35,7 @@ #include "shrpx_http2_downstream_connection.h" #include "shrpx_accesslog.h" #include "shrpx_ssl.h" +#include "shrpx_worker.h" #ifdef HAVE_SPDYLAY #include "shrpx_spdy_upstream.h" #endif // HAVE_SPDYLAY @@ -219,12 +220,14 @@ void upstream_http1_connhd_readcb(bufferevent *bev, void *arg) ClientHandler::ClientHandler(bufferevent *bev, bufferevent_rate_limit_group *rate_limit_group, int fd, SSL *ssl, - const char *ipaddr) + const char *ipaddr, + WorkerStat *worker_stat) : ipaddr_(ipaddr), bev_(bev), http2session_(nullptr), ssl_(ssl), reneg_shutdown_timerev_(nullptr), + worker_stat_(worker_stat), left_connhd_len_(NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN), fd_(fd), should_close_after_write_(false), @@ -233,6 +236,8 @@ ClientHandler::ClientHandler(bufferevent *bev, { int rv; + ++worker_stat->num_connections; + rv = bufferevent_add_to_rate_limit_group(bev_, rate_limit_group); if(rv == -1) { CLOG(FATAL, this) << "bufferevent_add_to_rate_limit_group() failed"; @@ -260,6 +265,8 @@ ClientHandler::~ClientHandler() CLOG(INFO, this) << "Deleting"; } + --worker_stat_->num_connections; + if(reneg_shutdown_timerev_) { event_free(reneg_shutdown_timerev_); } diff --git a/src/shrpx_client_handler.h b/src/shrpx_client_handler.h index 92fde773..3fd9f490 100644 --- a/src/shrpx_client_handler.h +++ b/src/shrpx_client_handler.h @@ -41,12 +41,14 @@ class Upstream; class DownstreamConnection; class Http2Session; class HttpsUpstream; +struct WorkerStat; class ClientHandler { public: ClientHandler(bufferevent *bev, bufferevent_rate_limit_group *rate_limit_group, - int fd, SSL *ssl, const char *ipaddr); + int fd, SSL *ssl, const char *ipaddr, + WorkerStat *worker_stat); ~ClientHandler(); int on_read(); int on_event(); @@ -95,6 +97,7 @@ private: Http2Session *http2session_; SSL *ssl_; event *reneg_shutdown_timerev_; + WorkerStat *worker_stat_; // The number of bytes of HTTP/2 client connection header to read size_t left_connhd_len_; int fd_; diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index 9f915c8f..2b2a35c5 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -118,6 +118,8 @@ const char SHRPX_OPT_FRONTEND_FRAME_DEBUG[] = "frontend-frame-debug"; const char SHRPX_OPT_PADDING[] = "padding"; const char SHRPX_OPT_ALTSVC[] = "altsvc"; const char SHRPX_OPT_ADD_RESPONSE_HEADER[] = "add-response-header"; +const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[] = + "worker-frontend-connections"; namespace { Config *config = nullptr; @@ -793,6 +795,21 @@ int parse_config(const char *opt, const char *optarg) return 0; } + if(util::strieq(opt, SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS)) { + errno = 0; + auto n = strtoul(optarg, nullptr, 10); + + if(errno != 0) { + LOG(ERROR) << "worker-frontend-connections: invalid argument: " + << optarg; + return -1; + } + + mod_config()->worker_frontend_connections = n; + + return 0; + } + if(util::strieq(opt, "conf")) { LOG(WARNING) << "conf is ignored"; diff --git a/src/shrpx_config.h b/src/shrpx_config.h index 8f3fd579..64657689 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -108,6 +108,7 @@ extern const char SHRPX_OPT_FRONTEND_FRAME_DEBUG[]; extern const char SHRPX_OPT_PADDING[]; extern const char SHRPX_OPT_ALTSVC[]; extern const char SHRPX_OPT_ADD_RESPONSE_HEADER[]; +extern const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[]; union sockaddr_union { sockaddr sa; @@ -212,6 +213,7 @@ struct Config { size_t worker_write_rate; size_t worker_write_burst; size_t padding; + size_t worker_frontend_connections; // Bit mask to disable SSL/TLS protocol versions. This will be // passed to SSL_CTX_set_options(). long int tls_proto_mask; diff --git a/src/shrpx_listen_handler.cc b/src/shrpx_listen_handler.cc index e2b70cfa..ef158c0c 100644 --- a/src/shrpx_listen_handler.cc +++ b/src/shrpx_listen_handler.cc @@ -38,6 +38,9 @@ #include "shrpx_worker.h" #include "shrpx_config.h" #include "shrpx_http2_session.h" +#include "util.h" + +using namespace nghttp2; namespace shrpx { @@ -50,6 +53,7 @@ ListenHandler::ListenHandler(event_base *evbase, SSL_CTX *sv_ssl_ctx, http2session_(nullptr), rate_limit_group_(bufferevent_rate_limit_group_new (evbase, get_config()->worker_rate_limit_cfg)), + worker_stat_(util::make_unique()), num_worker_(0), worker_round_robin_cnt_(0) {} @@ -110,12 +114,29 @@ int ListenHandler::accept_connection(evutil_socket_t fd, LLOG(INFO, this) << "Accepted connection. fd=" << fd; } if(num_worker_ == 0) { + + if(worker_stat_->num_connections >= + get_config()->worker_frontend_connections) { + + if(LOG_ENABLED(INFO)) { + TLOG(INFO, this) << "Too many connections >=" + << get_config()->worker_frontend_connections; + } + + close(fd); + return -1; + } + auto client = ssl::accept_connection(evbase_, rate_limit_group_, - sv_ssl_ctx_, fd, addr, addrlen); + sv_ssl_ctx_, fd, addr, addrlen, + worker_stat_.get()); if(!client) { LLOG(ERROR, this) << "ClientHandler creation failed"; - return 0; + + close(fd); + return -1; } + client->set_http2_session(http2session_); return 0; } @@ -129,8 +150,10 @@ int ListenHandler::accept_connection(evutil_socket_t fd, auto output = bufferevent_get_output(workers_[idx].bev); if(evbuffer_add(output, &wev, sizeof(wev)) != 0) { LLOG(FATAL, this) << "evbuffer_add() failed"; + close(fd); return -1; } + return 0; } diff --git a/src/shrpx_listen_handler.h b/src/shrpx_listen_handler.h index c58793c0..0d8e5a60 100644 --- a/src/shrpx_listen_handler.h +++ b/src/shrpx_listen_handler.h @@ -30,6 +30,8 @@ #include #include +#include + #include #include @@ -45,6 +47,7 @@ struct WorkerInfo { }; class Http2Session; +struct WorkerStat; class ListenHandler { public: @@ -65,6 +68,7 @@ private: // multi-threaded case, see shrpx_worker.cc. Http2Session *http2session_; bufferevent_rate_limit_group *rate_limit_group_; + std::unique_ptr worker_stat_; size_t num_worker_; unsigned int worker_round_robin_cnt_; }; diff --git a/src/shrpx_ssl.cc b/src/shrpx_ssl.cc index 74f5eae4..bcb80056 100644 --- a/src/shrpx_ssl.cc +++ b/src/shrpx_ssl.cc @@ -49,6 +49,7 @@ #include "shrpx_client_handler.h" #include "shrpx_config.h" #include "shrpx_accesslog.h" +#include "shrpx_worker.h" #include "util.h" using namespace nghttp2; @@ -467,59 +468,62 @@ ClientHandler* accept_connection bufferevent_rate_limit_group *rate_limit_group, SSL_CTX *ssl_ctx, evutil_socket_t fd, - sockaddr *addr, int addrlen) + sockaddr *addr, int addrlen, + WorkerStat *worker_stat) { char host[NI_MAXHOST]; int rv; rv = getnameinfo(addr, addrlen, host, sizeof(host), nullptr, 0, NI_NUMERICHOST); - if(rv == 0) { - if(get_config()->accesslog) { - upstream_connect(host); - } - - int val = 1; - rv = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, - reinterpret_cast(&val), sizeof(val)); - if(rv == -1) { - LOG(WARNING) << "Setting option TCP_NODELAY failed: errno=" - << errno; - } - SSL *ssl = nullptr; - bufferevent *bev; - if(ssl_ctx) { - ssl = SSL_new(ssl_ctx); - if(!ssl) { - LOG(ERROR) << "SSL_new() failed: " - << ERR_error_string(ERR_get_error(), nullptr); - return nullptr; - } - - if(SSL_set_fd(ssl, fd) == 0) { - LOG(ERROR) << "SSL_set_fd() failed: " - << ERR_error_string(ERR_get_error(), nullptr); - SSL_free(ssl); - return nullptr; - } - - bev = bufferevent_openssl_socket_new(evbase, fd, ssl, - BUFFEREVENT_SSL_ACCEPTING, - BEV_OPT_DEFER_CALLBACKS); - } else { - bev = bufferevent_socket_new(evbase, fd, BEV_OPT_DEFER_CALLBACKS); - } - if(!bev) { - LOG(ERROR) << "bufferevent_socket_new() failed"; - if(ssl) { - SSL_free(ssl); - } - return nullptr; - } - return new ClientHandler(bev, rate_limit_group, fd, ssl, host); - } else { + if(rv != 0) { LOG(ERROR) << "getnameinfo() failed: " << gai_strerror(rv); + return nullptr; } + + if(get_config()->accesslog) { + upstream_connect(host); + } + + int val = 1; + rv = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, + reinterpret_cast(&val), sizeof(val)); + if(rv == -1) { + LOG(WARNING) << "Setting option TCP_NODELAY failed: errno=" + << errno; + } + SSL *ssl = nullptr; + bufferevent *bev; + if(ssl_ctx) { + ssl = SSL_new(ssl_ctx); + if(!ssl) { + LOG(ERROR) << "SSL_new() failed: " + << ERR_error_string(ERR_get_error(), nullptr); + return nullptr; + } + + if(SSL_set_fd(ssl, fd) == 0) { + LOG(ERROR) << "SSL_set_fd() failed: " + << ERR_error_string(ERR_get_error(), nullptr); + SSL_free(ssl); + return nullptr; + } + + bev = bufferevent_openssl_socket_new(evbase, fd, ssl, + BUFFEREVENT_SSL_ACCEPTING, + BEV_OPT_DEFER_CALLBACKS); + } else { + bev = bufferevent_socket_new(evbase, fd, BEV_OPT_DEFER_CALLBACKS); + } + if(!bev) { + LOG(ERROR) << "bufferevent_socket_new() failed"; + if(ssl) { + SSL_free(ssl); + } + return nullptr; + } + + return new ClientHandler(bev, rate_limit_group, fd, ssl, host, worker_stat); } namespace { diff --git a/src/shrpx_ssl.h b/src/shrpx_ssl.h index 94b79850..23bd8caf 100644 --- a/src/shrpx_ssl.h +++ b/src/shrpx_ssl.h @@ -37,6 +37,7 @@ namespace shrpx { class ClientHandler; +struct WorkerStat; namespace ssl { @@ -50,7 +51,8 @@ ClientHandler* accept_connection bufferevent_rate_limit_group *rate_limit_group, SSL_CTX *ssl_ctx, evutil_socket_t fd, - sockaddr *addr, int addrlen); + sockaddr *addr, int addrlen, + WorkerStat *worker_stat); int check_cert(SSL *ssl); diff --git a/src/shrpx_thread_event_receiver.cc b/src/shrpx_thread_event_receiver.cc index b68ba701..9a86df05 100644 --- a/src/shrpx_thread_event_receiver.cc +++ b/src/shrpx_thread_event_receiver.cc @@ -30,6 +30,10 @@ #include "shrpx_log.h" #include "shrpx_client_handler.h" #include "shrpx_http2_session.h" +#include "shrpx_worker.h" +#include "util.h" + +using namespace nghttp2; namespace shrpx { @@ -40,7 +44,8 @@ ThreadEventReceiver::ThreadEventReceiver(event_base *evbase, ssl_ctx_(ssl_ctx), http2session_(http2session), rate_limit_group_(bufferevent_rate_limit_group_new - (evbase_, get_config()->worker_rate_limit_cfg)) + (evbase_, get_config()->worker_rate_limit_cfg)), + worker_stat_(util::make_unique()) {} ThreadEventReceiver::~ThreadEventReceiver() @@ -67,12 +72,27 @@ void ThreadEventReceiver::on_read(bufferevent *bev) TLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd << ", addrlen=" << wev.client_addrlen; } + + if(worker_stat_->num_connections >= + get_config()->worker_frontend_connections) { + + if(LOG_ENABLED(INFO)) { + TLOG(INFO, this) << "Too many connections >= " + << get_config()->worker_frontend_connections; + } + + close(wev.client_fd); + + continue; + } + auto evbase = bufferevent_get_base(bev); auto client_handler = ssl::accept_connection(evbase, rate_limit_group_, ssl_ctx_, wev.client_fd, &wev.client_addr.sa, - wev.client_addrlen); + wev.client_addrlen, + worker_stat_.get()); if(client_handler) { client_handler->set_http2_session(http2session_); diff --git a/src/shrpx_thread_event_receiver.h b/src/shrpx_thread_event_receiver.h index 21ae663d..8c361582 100644 --- a/src/shrpx_thread_event_receiver.h +++ b/src/shrpx_thread_event_receiver.h @@ -27,6 +27,8 @@ #include "shrpx.h" +#include + #include #include @@ -36,6 +38,7 @@ namespace shrpx { class Http2Session; +struct WorkerStat; struct WorkerEvent { sockaddr_union client_addr; @@ -56,6 +59,7 @@ private: // mode. Not deleted by this object. Http2Session *http2session_; bufferevent_rate_limit_group *rate_limit_group_; + std::unique_ptr worker_stat_; }; } // namespace shrpx diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h index f7b2ce50..38d863f1 100644 --- a/src/shrpx_worker.h +++ b/src/shrpx_worker.h @@ -34,6 +34,12 @@ namespace shrpx { +struct WorkerStat { + WorkerStat() : num_connections(0) {} + + size_t num_connections; +}; + class Worker { public: Worker(WorkerInfo *info);