nghttpx: Add worker-frontend-connections option

This commit is contained in:
Tatsuhiro Tsujikawa 2014-06-26 22:55:22 +09:00
parent 16fef227e8
commit 479e15469c
12 changed files with 159 additions and 52 deletions

View File

@ -439,6 +439,7 @@ void fill_default_config()
mod_config()->http2_no_cookie_crumbling = false; mod_config()->http2_no_cookie_crumbling = false;
mod_config()->upstream_frame_debug = false; mod_config()->upstream_frame_debug = false;
mod_config()->padding = 0; mod_config()->padding = 0;
mod_config()->worker_frontend_connections = 0;
nghttp2_option_new(&mod_config()->http2_option); nghttp2_option_new(&mod_config()->http2_option);
@ -539,6 +540,10 @@ Performance:
means write burst size is unlimited. means write burst size is unlimited.
Default: )" Default: )"
<< get_config()->worker_write_burst << R"( << get_config()->worker_write_burst << R"(
--worker-frontend-connections=<NUM>
Set maximum number of simultaneous connections
frontend accepts. Setting 0 means unlimited.
Default: 0
Timeout: Timeout:
--frontend-http2-read-timeout=<SEC> --frontend-http2-read-timeout=<SEC>
@ -844,6 +849,7 @@ int main(int argc, char **argv)
{"worker-write-burst", required_argument, &flag, 53}, {"worker-write-burst", required_argument, &flag, 53},
{"altsvc", required_argument, &flag, 54}, {"altsvc", required_argument, &flag, 54},
{"add-response-header", required_argument, &flag, 55}, {"add-response-header", required_argument, &flag, 55},
{"worker-frontend-connections", required_argument, &flag, 56},
{nullptr, 0, nullptr, 0 } {nullptr, 0, nullptr, 0 }
}; };
@ -1092,6 +1098,10 @@ int main(int argc, char **argv)
// --add-response-header // --add-response-header
cmdcfgs.emplace_back(SHRPX_OPT_ADD_RESPONSE_HEADER, optarg); cmdcfgs.emplace_back(SHRPX_OPT_ADD_RESPONSE_HEADER, optarg);
break; break;
case 56:
// --worker-frontend-connections
cmdcfgs.emplace_back(SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS, optarg);
break;
default: default:
break; break;
} }
@ -1185,6 +1195,11 @@ int main(int argc, char **argv)
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if(get_config()->worker_frontend_connections == 0) {
mod_config()->worker_frontend_connections =
std::numeric_limits<size_t>::max();
}
if(get_config()->http2_proxy + get_config()->http2_bridge + if(get_config()->http2_proxy + get_config()->http2_bridge +
get_config()->client_proxy + get_config()->client > 1) { get_config()->client_proxy + get_config()->client > 1) {
LOG(FATAL) << "--http2-proxy, --http2-bridge, --client-proxy and --client " LOG(FATAL) << "--http2-proxy, --http2-bridge, --client-proxy and --client "

View File

@ -35,6 +35,7 @@
#include "shrpx_http2_downstream_connection.h" #include "shrpx_http2_downstream_connection.h"
#include "shrpx_accesslog.h" #include "shrpx_accesslog.h"
#include "shrpx_ssl.h" #include "shrpx_ssl.h"
#include "shrpx_worker.h"
#ifdef HAVE_SPDYLAY #ifdef HAVE_SPDYLAY
#include "shrpx_spdy_upstream.h" #include "shrpx_spdy_upstream.h"
#endif // HAVE_SPDYLAY #endif // HAVE_SPDYLAY
@ -219,12 +220,14 @@ void upstream_http1_connhd_readcb(bufferevent *bev, void *arg)
ClientHandler::ClientHandler(bufferevent *bev, ClientHandler::ClientHandler(bufferevent *bev,
bufferevent_rate_limit_group *rate_limit_group, bufferevent_rate_limit_group *rate_limit_group,
int fd, SSL *ssl, int fd, SSL *ssl,
const char *ipaddr) const char *ipaddr,
WorkerStat *worker_stat)
: ipaddr_(ipaddr), : ipaddr_(ipaddr),
bev_(bev), bev_(bev),
http2session_(nullptr), http2session_(nullptr),
ssl_(ssl), ssl_(ssl),
reneg_shutdown_timerev_(nullptr), reneg_shutdown_timerev_(nullptr),
worker_stat_(worker_stat),
left_connhd_len_(NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN), left_connhd_len_(NGHTTP2_CLIENT_CONNECTION_PREFACE_LEN),
fd_(fd), fd_(fd),
should_close_after_write_(false), should_close_after_write_(false),
@ -233,6 +236,8 @@ ClientHandler::ClientHandler(bufferevent *bev,
{ {
int rv; int rv;
++worker_stat->num_connections;
rv = bufferevent_add_to_rate_limit_group(bev_, rate_limit_group); rv = bufferevent_add_to_rate_limit_group(bev_, rate_limit_group);
if(rv == -1) { if(rv == -1) {
CLOG(FATAL, this) << "bufferevent_add_to_rate_limit_group() failed"; CLOG(FATAL, this) << "bufferevent_add_to_rate_limit_group() failed";
@ -260,6 +265,8 @@ ClientHandler::~ClientHandler()
CLOG(INFO, this) << "Deleting"; CLOG(INFO, this) << "Deleting";
} }
--worker_stat_->num_connections;
if(reneg_shutdown_timerev_) { if(reneg_shutdown_timerev_) {
event_free(reneg_shutdown_timerev_); event_free(reneg_shutdown_timerev_);
} }

View File

@ -41,12 +41,14 @@ class Upstream;
class DownstreamConnection; class DownstreamConnection;
class Http2Session; class Http2Session;
class HttpsUpstream; class HttpsUpstream;
struct WorkerStat;
class ClientHandler { class ClientHandler {
public: public:
ClientHandler(bufferevent *bev, ClientHandler(bufferevent *bev,
bufferevent_rate_limit_group *rate_limit_group, bufferevent_rate_limit_group *rate_limit_group,
int fd, SSL *ssl, const char *ipaddr); int fd, SSL *ssl, const char *ipaddr,
WorkerStat *worker_stat);
~ClientHandler(); ~ClientHandler();
int on_read(); int on_read();
int on_event(); int on_event();
@ -95,6 +97,7 @@ private:
Http2Session *http2session_; Http2Session *http2session_;
SSL *ssl_; SSL *ssl_;
event *reneg_shutdown_timerev_; event *reneg_shutdown_timerev_;
WorkerStat *worker_stat_;
// The number of bytes of HTTP/2 client connection header to read // The number of bytes of HTTP/2 client connection header to read
size_t left_connhd_len_; size_t left_connhd_len_;
int fd_; int fd_;

View File

@ -118,6 +118,8 @@ const char SHRPX_OPT_FRONTEND_FRAME_DEBUG[] = "frontend-frame-debug";
const char SHRPX_OPT_PADDING[] = "padding"; const char SHRPX_OPT_PADDING[] = "padding";
const char SHRPX_OPT_ALTSVC[] = "altsvc"; const char SHRPX_OPT_ALTSVC[] = "altsvc";
const char SHRPX_OPT_ADD_RESPONSE_HEADER[] = "add-response-header"; const char SHRPX_OPT_ADD_RESPONSE_HEADER[] = "add-response-header";
const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[] =
"worker-frontend-connections";
namespace { namespace {
Config *config = nullptr; Config *config = nullptr;
@ -793,6 +795,21 @@ int parse_config(const char *opt, const char *optarg)
return 0; return 0;
} }
if(util::strieq(opt, SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS)) {
errno = 0;
auto n = strtoul(optarg, nullptr, 10);
if(errno != 0) {
LOG(ERROR) << "worker-frontend-connections: invalid argument: "
<< optarg;
return -1;
}
mod_config()->worker_frontend_connections = n;
return 0;
}
if(util::strieq(opt, "conf")) { if(util::strieq(opt, "conf")) {
LOG(WARNING) << "conf is ignored"; LOG(WARNING) << "conf is ignored";

View File

@ -108,6 +108,7 @@ extern const char SHRPX_OPT_FRONTEND_FRAME_DEBUG[];
extern const char SHRPX_OPT_PADDING[]; extern const char SHRPX_OPT_PADDING[];
extern const char SHRPX_OPT_ALTSVC[]; extern const char SHRPX_OPT_ALTSVC[];
extern const char SHRPX_OPT_ADD_RESPONSE_HEADER[]; extern const char SHRPX_OPT_ADD_RESPONSE_HEADER[];
extern const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[];
union sockaddr_union { union sockaddr_union {
sockaddr sa; sockaddr sa;
@ -212,6 +213,7 @@ struct Config {
size_t worker_write_rate; size_t worker_write_rate;
size_t worker_write_burst; size_t worker_write_burst;
size_t padding; size_t padding;
size_t worker_frontend_connections;
// Bit mask to disable SSL/TLS protocol versions. This will be // Bit mask to disable SSL/TLS protocol versions. This will be
// passed to SSL_CTX_set_options(). // passed to SSL_CTX_set_options().
long int tls_proto_mask; long int tls_proto_mask;

View File

@ -38,6 +38,9 @@
#include "shrpx_worker.h" #include "shrpx_worker.h"
#include "shrpx_config.h" #include "shrpx_config.h"
#include "shrpx_http2_session.h" #include "shrpx_http2_session.h"
#include "util.h"
using namespace nghttp2;
namespace shrpx { namespace shrpx {
@ -50,6 +53,7 @@ ListenHandler::ListenHandler(event_base *evbase, SSL_CTX *sv_ssl_ctx,
http2session_(nullptr), http2session_(nullptr),
rate_limit_group_(bufferevent_rate_limit_group_new rate_limit_group_(bufferevent_rate_limit_group_new
(evbase, get_config()->worker_rate_limit_cfg)), (evbase, get_config()->worker_rate_limit_cfg)),
worker_stat_(util::make_unique<WorkerStat>()),
num_worker_(0), num_worker_(0),
worker_round_robin_cnt_(0) worker_round_robin_cnt_(0)
{} {}
@ -110,12 +114,29 @@ int ListenHandler::accept_connection(evutil_socket_t fd,
LLOG(INFO, this) << "Accepted connection. fd=" << fd; LLOG(INFO, this) << "Accepted connection. fd=" << fd;
} }
if(num_worker_ == 0) { if(num_worker_ == 0) {
if(worker_stat_->num_connections >=
get_config()->worker_frontend_connections) {
if(LOG_ENABLED(INFO)) {
TLOG(INFO, this) << "Too many connections >="
<< get_config()->worker_frontend_connections;
}
close(fd);
return -1;
}
auto client = ssl::accept_connection(evbase_, rate_limit_group_, auto client = ssl::accept_connection(evbase_, rate_limit_group_,
sv_ssl_ctx_, fd, addr, addrlen); sv_ssl_ctx_, fd, addr, addrlen,
worker_stat_.get());
if(!client) { if(!client) {
LLOG(ERROR, this) << "ClientHandler creation failed"; LLOG(ERROR, this) << "ClientHandler creation failed";
return 0;
close(fd);
return -1;
} }
client->set_http2_session(http2session_); client->set_http2_session(http2session_);
return 0; return 0;
} }
@ -129,8 +150,10 @@ int ListenHandler::accept_connection(evutil_socket_t fd,
auto output = bufferevent_get_output(workers_[idx].bev); auto output = bufferevent_get_output(workers_[idx].bev);
if(evbuffer_add(output, &wev, sizeof(wev)) != 0) { if(evbuffer_add(output, &wev, sizeof(wev)) != 0) {
LLOG(FATAL, this) << "evbuffer_add() failed"; LLOG(FATAL, this) << "evbuffer_add() failed";
close(fd);
return -1; return -1;
} }
return 0; return 0;
} }

View File

@ -30,6 +30,8 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <memory>
#include <openssl/ssl.h> #include <openssl/ssl.h>
#include <event.h> #include <event.h>
@ -45,6 +47,7 @@ struct WorkerInfo {
}; };
class Http2Session; class Http2Session;
struct WorkerStat;
class ListenHandler { class ListenHandler {
public: public:
@ -65,6 +68,7 @@ private:
// multi-threaded case, see shrpx_worker.cc. // multi-threaded case, see shrpx_worker.cc.
Http2Session *http2session_; Http2Session *http2session_;
bufferevent_rate_limit_group *rate_limit_group_; bufferevent_rate_limit_group *rate_limit_group_;
std::unique_ptr<WorkerStat> worker_stat_;
size_t num_worker_; size_t num_worker_;
unsigned int worker_round_robin_cnt_; unsigned int worker_round_robin_cnt_;
}; };

View File

@ -49,6 +49,7 @@
#include "shrpx_client_handler.h" #include "shrpx_client_handler.h"
#include "shrpx_config.h" #include "shrpx_config.h"
#include "shrpx_accesslog.h" #include "shrpx_accesslog.h"
#include "shrpx_worker.h"
#include "util.h" #include "util.h"
using namespace nghttp2; using namespace nghttp2;
@ -467,59 +468,62 @@ ClientHandler* accept_connection
bufferevent_rate_limit_group *rate_limit_group, bufferevent_rate_limit_group *rate_limit_group,
SSL_CTX *ssl_ctx, SSL_CTX *ssl_ctx,
evutil_socket_t fd, evutil_socket_t fd,
sockaddr *addr, int addrlen) sockaddr *addr, int addrlen,
WorkerStat *worker_stat)
{ {
char host[NI_MAXHOST]; char host[NI_MAXHOST];
int rv; int rv;
rv = getnameinfo(addr, addrlen, host, sizeof(host), nullptr, 0, rv = getnameinfo(addr, addrlen, host, sizeof(host), nullptr, 0,
NI_NUMERICHOST); NI_NUMERICHOST);
if(rv == 0) { if(rv != 0) {
if(get_config()->accesslog) {
upstream_connect(host);
}
int val = 1;
rv = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
reinterpret_cast<char *>(&val), sizeof(val));
if(rv == -1) {
LOG(WARNING) << "Setting option TCP_NODELAY failed: errno="
<< errno;
}
SSL *ssl = nullptr;
bufferevent *bev;
if(ssl_ctx) {
ssl = SSL_new(ssl_ctx);
if(!ssl) {
LOG(ERROR) << "SSL_new() failed: "
<< ERR_error_string(ERR_get_error(), nullptr);
return nullptr;
}
if(SSL_set_fd(ssl, fd) == 0) {
LOG(ERROR) << "SSL_set_fd() failed: "
<< ERR_error_string(ERR_get_error(), nullptr);
SSL_free(ssl);
return nullptr;
}
bev = bufferevent_openssl_socket_new(evbase, fd, ssl,
BUFFEREVENT_SSL_ACCEPTING,
BEV_OPT_DEFER_CALLBACKS);
} else {
bev = bufferevent_socket_new(evbase, fd, BEV_OPT_DEFER_CALLBACKS);
}
if(!bev) {
LOG(ERROR) << "bufferevent_socket_new() failed";
if(ssl) {
SSL_free(ssl);
}
return nullptr;
}
return new ClientHandler(bev, rate_limit_group, fd, ssl, host);
} else {
LOG(ERROR) << "getnameinfo() failed: " << gai_strerror(rv); LOG(ERROR) << "getnameinfo() failed: " << gai_strerror(rv);
return nullptr; return nullptr;
} }
if(get_config()->accesslog) {
upstream_connect(host);
}
int val = 1;
rv = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
reinterpret_cast<char *>(&val), sizeof(val));
if(rv == -1) {
LOG(WARNING) << "Setting option TCP_NODELAY failed: errno="
<< errno;
}
SSL *ssl = nullptr;
bufferevent *bev;
if(ssl_ctx) {
ssl = SSL_new(ssl_ctx);
if(!ssl) {
LOG(ERROR) << "SSL_new() failed: "
<< ERR_error_string(ERR_get_error(), nullptr);
return nullptr;
}
if(SSL_set_fd(ssl, fd) == 0) {
LOG(ERROR) << "SSL_set_fd() failed: "
<< ERR_error_string(ERR_get_error(), nullptr);
SSL_free(ssl);
return nullptr;
}
bev = bufferevent_openssl_socket_new(evbase, fd, ssl,
BUFFEREVENT_SSL_ACCEPTING,
BEV_OPT_DEFER_CALLBACKS);
} else {
bev = bufferevent_socket_new(evbase, fd, BEV_OPT_DEFER_CALLBACKS);
}
if(!bev) {
LOG(ERROR) << "bufferevent_socket_new() failed";
if(ssl) {
SSL_free(ssl);
}
return nullptr;
}
return new ClientHandler(bev, rate_limit_group, fd, ssl, host, worker_stat);
} }
namespace { namespace {

View File

@ -37,6 +37,7 @@
namespace shrpx { namespace shrpx {
class ClientHandler; class ClientHandler;
struct WorkerStat;
namespace ssl { namespace ssl {
@ -50,7 +51,8 @@ ClientHandler* accept_connection
bufferevent_rate_limit_group *rate_limit_group, bufferevent_rate_limit_group *rate_limit_group,
SSL_CTX *ssl_ctx, SSL_CTX *ssl_ctx,
evutil_socket_t fd, evutil_socket_t fd,
sockaddr *addr, int addrlen); sockaddr *addr, int addrlen,
WorkerStat *worker_stat);
int check_cert(SSL *ssl); int check_cert(SSL *ssl);

View File

@ -30,6 +30,10 @@
#include "shrpx_log.h" #include "shrpx_log.h"
#include "shrpx_client_handler.h" #include "shrpx_client_handler.h"
#include "shrpx_http2_session.h" #include "shrpx_http2_session.h"
#include "shrpx_worker.h"
#include "util.h"
using namespace nghttp2;
namespace shrpx { namespace shrpx {
@ -40,7 +44,8 @@ ThreadEventReceiver::ThreadEventReceiver(event_base *evbase,
ssl_ctx_(ssl_ctx), ssl_ctx_(ssl_ctx),
http2session_(http2session), http2session_(http2session),
rate_limit_group_(bufferevent_rate_limit_group_new rate_limit_group_(bufferevent_rate_limit_group_new
(evbase_, get_config()->worker_rate_limit_cfg)) (evbase_, get_config()->worker_rate_limit_cfg)),
worker_stat_(util::make_unique<WorkerStat>())
{} {}
ThreadEventReceiver::~ThreadEventReceiver() ThreadEventReceiver::~ThreadEventReceiver()
@ -67,12 +72,27 @@ void ThreadEventReceiver::on_read(bufferevent *bev)
TLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd TLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd
<< ", addrlen=" << wev.client_addrlen; << ", addrlen=" << wev.client_addrlen;
} }
if(worker_stat_->num_connections >=
get_config()->worker_frontend_connections) {
if(LOG_ENABLED(INFO)) {
TLOG(INFO, this) << "Too many connections >= "
<< get_config()->worker_frontend_connections;
}
close(wev.client_fd);
continue;
}
auto evbase = bufferevent_get_base(bev); auto evbase = bufferevent_get_base(bev);
auto client_handler = ssl::accept_connection(evbase, rate_limit_group_, auto client_handler = ssl::accept_connection(evbase, rate_limit_group_,
ssl_ctx_, ssl_ctx_,
wev.client_fd, wev.client_fd,
&wev.client_addr.sa, &wev.client_addr.sa,
wev.client_addrlen); wev.client_addrlen,
worker_stat_.get());
if(client_handler) { if(client_handler) {
client_handler->set_http2_session(http2session_); client_handler->set_http2_session(http2session_);

View File

@ -27,6 +27,8 @@
#include "shrpx.h" #include "shrpx.h"
#include <memory>
#include <openssl/ssl.h> #include <openssl/ssl.h>
#include <event2/bufferevent.h> #include <event2/bufferevent.h>
@ -36,6 +38,7 @@
namespace shrpx { namespace shrpx {
class Http2Session; class Http2Session;
struct WorkerStat;
struct WorkerEvent { struct WorkerEvent {
sockaddr_union client_addr; sockaddr_union client_addr;
@ -56,6 +59,7 @@ private:
// mode. Not deleted by this object. // mode. Not deleted by this object.
Http2Session *http2session_; Http2Session *http2session_;
bufferevent_rate_limit_group *rate_limit_group_; bufferevent_rate_limit_group *rate_limit_group_;
std::unique_ptr<WorkerStat> worker_stat_;
}; };
} // namespace shrpx } // namespace shrpx

View File

@ -34,6 +34,12 @@
namespace shrpx { namespace shrpx {
struct WorkerStat {
WorkerStat() : num_connections(0) {}
size_t num_connections;
};
class Worker { class Worker {
public: public:
Worker(WorkerInfo *info); Worker(WorkerInfo *info);