nghttpx: Worker wide blocker which is used when socket(2) is failed

This commit is contained in:
Tatsuhiro Tsujikawa 2016-02-21 15:27:19 +09:00
parent c9a4f293a1
commit 11c8803b92
4 changed files with 36 additions and 4 deletions

View File

@ -254,8 +254,15 @@ int Http2Session::initiate_connection() {
auto &groups = worker_->get_downstream_addr_groups(); auto &groups = worker_->get_downstream_addr_groups();
auto &addrs = groups[group_].addrs; auto &addrs = groups[group_].addrs;
auto worker_blocker = worker_->get_connect_blocker();
if (state_ == DISCONNECTED) { if (state_ == DISCONNECTED) {
if (worker_blocker->blocked()) {
DLOG(INFO, this)
<< "Worker wide backend connection was blocked temporarily";
return -1;
}
auto &next_downstream = worker_->get_dgrp(group_)->next; auto &next_downstream = worker_->get_dgrp(group_)->next;
auto end = next_downstream; auto end = next_downstream;
@ -305,10 +312,12 @@ int Http2Session::initiate_connection() {
conn_.fd = util::create_nonblock_socket(proxy.addr.su.storage.ss_family); conn_.fd = util::create_nonblock_socket(proxy.addr.su.storage.ss_family);
if (conn_.fd == -1) { if (conn_.fd == -1) {
connect_blocker->on_failure(); worker_blocker->on_failure();
return -1; return -1;
} }
worker_blocker->on_success();
rv = connect(conn_.fd, &proxy.addr.su.sa, proxy.addr.len); rv = connect(conn_.fd, &proxy.addr.su.sa, proxy.addr.len);
if (rv != 0 && errno != EINPROGRESS) { if (rv != 0 && errno != EINPROGRESS) {
SSLOG(ERROR, this) << "Failed to connect to the proxy " << proxy.host SSLOG(ERROR, this) << "Failed to connect to the proxy " << proxy.host
@ -373,10 +382,12 @@ int Http2Session::initiate_connection() {
conn_.fd = conn_.fd =
util::create_nonblock_socket(addr_->addr.su.storage.ss_family); util::create_nonblock_socket(addr_->addr.su.storage.ss_family);
if (conn_.fd == -1) { if (conn_.fd == -1) {
connect_blocker->on_failure(); worker_blocker->on_failure();
return -1; return -1;
} }
worker_blocker->on_success();
rv = connect(conn_.fd, rv = connect(conn_.fd,
// TODO maybe not thread-safe? // TODO maybe not thread-safe?
const_cast<sockaddr *>(&addr_->addr.su.sa), const_cast<sockaddr *>(&addr_->addr.su.sa),
@ -400,10 +411,12 @@ int Http2Session::initiate_connection() {
util::create_nonblock_socket(addr_->addr.su.storage.ss_family); util::create_nonblock_socket(addr_->addr.su.storage.ss_family);
if (conn_.fd == -1) { if (conn_.fd == -1) {
connect_blocker->on_failure(); worker_blocker->on_failure();
return -1; return -1;
} }
worker_blocker->on_success();
rv = connect(conn_.fd, const_cast<sockaddr *>(&addr_->addr.su.sa), rv = connect(conn_.fd, const_cast<sockaddr *>(&addr_->addr.su.sa),
addr_->addr.len); addr_->addr.len);
if (rv != 0 && errno != EINPROGRESS) { if (rv != 0 && errno != EINPROGRESS) {

View File

@ -144,6 +144,13 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
DCLOG(INFO, this) << "Attaching to DOWNSTREAM:" << downstream; DCLOG(INFO, this) << "Attaching to DOWNSTREAM:" << downstream;
} }
auto worker_blocker = worker_->get_connect_blocker();
if (worker_blocker->blocked()) {
DLOG(INFO, this)
<< "Worker wide backend connection was blocked temporarily";
return SHRPX_ERR_NETWORK;
}
auto &downstreamconf = get_config()->conn.downstream; auto &downstreamconf = get_config()->conn.downstream;
if (conn_.fd == -1) { if (conn_.fd == -1) {
@ -189,11 +196,13 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
auto error = errno; auto error = errno;
DCLOG(WARN, this) << "socket() failed; errno=" << error; DCLOG(WARN, this) << "socket() failed; errno=" << error;
connect_blocker->on_failure(); worker_blocker->on_failure();
return SHRPX_ERR_NETWORK; return SHRPX_ERR_NETWORK;
} }
worker_blocker->on_success();
int rv; int rv;
rv = connect(conn_.fd, &addr.addr.su.sa, addr.addr.len); rv = connect(conn_.fd, &addr.addr.su.sa, addr.addr.len);
if (rv != 0 && errno != EINPROGRESS) { if (rv != 0 && errno != EINPROGRESS) {

View File

@ -81,6 +81,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
cert_tree_(cert_tree), cert_tree_(cert_tree),
ticket_keys_(ticket_keys), ticket_keys_(ticket_keys),
downstream_addr_groups_(get_config()->conn.downstream.addr_groups), downstream_addr_groups_(get_config()->conn.downstream.addr_groups),
connect_blocker_(make_unique<ConnectBlocker>(randgen_, loop_)),
graceful_shutdown_(false) { graceful_shutdown_(false) {
ev_async_init(&w_, eventcb); ev_async_init(&w_, eventcb);
w_.data = this; w_.data = this;
@ -373,4 +374,8 @@ std::vector<DownstreamAddrGroup> &Worker::get_downstream_addr_groups() {
return downstream_addr_groups_; return downstream_addr_groups_;
} }
ConnectBlocker *Worker::get_connect_blocker() const {
return connect_blocker_.get();
}
} // namespace shrpx } // namespace shrpx

View File

@ -165,6 +165,8 @@ public:
std::vector<DownstreamAddrGroup> &get_downstream_addr_groups(); std::vector<DownstreamAddrGroup> &get_downstream_addr_groups();
ConnectBlocker *get_connect_blocker() const;
private: private:
#ifndef NOTHREADS #ifndef NOTHREADS
std::future<void> fut_; std::future<void> fut_;
@ -198,6 +200,9 @@ private:
std::shared_ptr<TicketKeys> ticket_keys_; std::shared_ptr<TicketKeys> ticket_keys_;
std::vector<DownstreamAddrGroup> downstream_addr_groups_; std::vector<DownstreamAddrGroup> downstream_addr_groups_;
// Worker level blocker for downstream connection. For example,
// this is used when file decriptor is exhausted.
std::unique_ptr<ConnectBlocker> connect_blocker_;
bool graceful_shutdown_; bool graceful_shutdown_;
}; };