nghttpx: Implement connection blocker for HTTP/1 backend

This commit is contained in:
Tatsuhiro Tsujikawa 2014-08-19 23:36:04 +09:00
parent fb62a5ed4f
commit c7e9fe8154
12 changed files with 224 additions and 6 deletions

View File

@ -113,7 +113,8 @@ NGHTTPX_SRCS = \
shrpx_ssl.cc shrpx_ssl.h \ shrpx_ssl.cc shrpx_ssl.h \
shrpx_thread_event_receiver.cc shrpx_thread_event_receiver.h \ shrpx_thread_event_receiver.cc shrpx_thread_event_receiver.h \
shrpx_worker.cc shrpx_worker.h \ shrpx_worker.cc shrpx_worker.h \
shrpx_worker_config.cc shrpx_worker_config.h shrpx_worker_config.cc shrpx_worker_config.h \
shrpx_connect_blocker.cc shrpx_connect_blocker.h
if HAVE_SPDYLAY if HAVE_SPDYLAY
NGHTTPX_SRCS += shrpx_spdy_upstream.cc shrpx_spdy_upstream.h NGHTTPX_SRCS += shrpx_spdy_upstream.cc shrpx_spdy_upstream.h

View File

@ -570,6 +570,8 @@ int event_loop()
listener_handler->create_worker_thread(get_config()->num_worker); listener_handler->create_worker_thread(get_config()->num_worker);
} else if(get_config()->downstream_proto == PROTO_HTTP2) { } else if(get_config()->downstream_proto == PROTO_HTTP2) {
listener_handler->create_http2_session(); listener_handler->create_http2_session();
} else {
listener_handler->create_http1_connect_blocker();
} }
#ifndef NOTHREADS #ifndef NOTHREADS

View File

@ -526,6 +526,17 @@ Http2Session* ClientHandler::get_http2_session() const
return http2session_; return http2session_;
} }
void ClientHandler::set_http1_connect_blocker
(ConnectBlocker *http1_connect_blocker)
{
http1_connect_blocker_ = http1_connect_blocker;
}
ConnectBlocker* ClientHandler::get_http1_connect_blocker() const
{
return http1_connect_blocker_;
}
size_t ClientHandler::get_left_connhd_len() const size_t ClientHandler::get_left_connhd_len() const
{ {
return left_connhd_len_; return left_connhd_len_;

View File

@ -41,6 +41,7 @@ class Upstream;
class DownstreamConnection; class DownstreamConnection;
class Http2Session; class Http2Session;
class HttpsUpstream; class HttpsUpstream;
class ConnectBlocker;
struct WorkerStat; struct WorkerStat;
class ClientHandler { class ClientHandler {
@ -71,6 +72,8 @@ public:
SSL* get_ssl() const; SSL* get_ssl() const;
void set_http2_session(Http2Session *http2session); void set_http2_session(Http2Session *http2session);
Http2Session* get_http2_session() const; Http2Session* get_http2_session() const;
void set_http1_connect_blocker(ConnectBlocker *http1_connect_blocker);
ConnectBlocker* get_http1_connect_blocker() const;
size_t get_left_connhd_len() const; size_t get_left_connhd_len() const;
void set_left_connhd_len(size_t left); void set_left_connhd_len(size_t left);
// Call this function when HTTP/2 connection header is received at // Call this function when HTTP/2 connection header is received at
@ -95,6 +98,7 @@ private:
// Shared HTTP2 session for each thread. NULL if backend is not // Shared HTTP2 session for each thread. NULL if backend is not
// HTTP2. Not deleted by this object. // HTTP2. Not deleted by this object.
Http2Session *http2session_; Http2Session *http2session_;
ConnectBlocker *http1_connect_blocker_;
SSL *ssl_; SSL *ssl_;
event *reneg_shutdown_timerev_; event *reneg_shutdown_timerev_;
WorkerStat *worker_stat_; WorkerStat *worker_stat_;

View File

@ -0,0 +1,92 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2014 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_connect_blocker.h"
namespace shrpx {
namespace {
const int INITIAL_SLEEP = 2;
} // namespace
ConnectBlocker::ConnectBlocker()
: timerev_(nullptr),
sleep_(INITIAL_SLEEP)
{}
ConnectBlocker::~ConnectBlocker()
{
if(timerev_) {
event_free(timerev_);
}
}
namespace {
void connect_blocker_cb(evutil_socket_t sig, short events, void *arg)
{
if(LOG_ENABLED(INFO)) {
LOG(INFO) << "unblock downstream connection";
}
}
} // namespace
int ConnectBlocker::init(event_base *evbase)
{
timerev_ = evtimer_new(evbase, connect_blocker_cb, this);
if(timerev_ == nullptr) {
return -1;
}
return 0;
}
bool ConnectBlocker::blocked() const
{
return evtimer_pending(timerev_, nullptr);
}
void ConnectBlocker::on_success()
{
sleep_ = INITIAL_SLEEP;
}
void ConnectBlocker::on_failure()
{
int rv;
sleep_ = std::min(128, sleep_ * 2);
LOG(WARNING) << "connect failure, start sleeping " << sleep_;
timeval t = {sleep_, 0};
rv = evtimer_add(timerev_, &t);
if(rv == -1) {
LOG(ERROR) << "evtimer_add for ConnectBlocker timerev_ failed";
}
}
} // namespace shrpx

View File

@ -0,0 +1,55 @@
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2014 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_CONNECT_BLOCKER_H
#define SHRPX_CONNECT_BLOCKER_H
#include "shrpx.h"
#include <event2/event.h>
namespace shrpx {
class ConnectBlocker {
public:
ConnectBlocker();
~ConnectBlocker();
int init(event_base *evbase);
// Returns true if making connection is not allowed.
bool blocked() const;
// Call this function if connect operation succeeded. This will
// reset sleep_ to minimum value.
void on_success();
// Call this function if connect operation failed. This will start
// timer and blocks connection establishment for sleep_ seconds.
void on_failure();
private:
event *timerev_;
int sleep_;
};
} // namespace
#endif // SHRPX_CONNECT_BLOCKER_H

View File

@ -31,6 +31,7 @@
#include "shrpx_error.h" #include "shrpx_error.h"
#include "shrpx_http.h" #include "shrpx_http.h"
#include "shrpx_worker_config.h" #include "shrpx_worker_config.h"
#include "shrpx_connect_blocker.h"
#include "http2.h" #include "http2.h"
#include "util.h" #include "util.h"
@ -76,12 +77,20 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream)
} }
auto upstream = downstream->get_upstream(); auto upstream = downstream->get_upstream();
if(!bev_) { if(!bev_) {
auto connect_blocker = client_handler_->get_http1_connect_blocker();
if(connect_blocker->blocked()) {
return -1;
}
auto evbase = client_handler_->get_evbase(); auto evbase = client_handler_->get_evbase();
auto fd = socket(get_config()->downstream_addr.storage.ss_family, auto fd = socket(get_config()->downstream_addr.storage.ss_family,
SOCK_STREAM | SOCK_CLOEXEC, 0); SOCK_STREAM | SOCK_CLOEXEC, 0);
if(fd == -1) { if(fd == -1) {
connect_blocker->on_failure();
return SHRPX_ERR_NETWORK; return SHRPX_ERR_NETWORK;
} }
@ -89,6 +98,8 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream)
(evbase, fd, (evbase, fd,
BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS); BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
if(!bev_) { if(!bev_) {
connect_blocker->on_failure();
DCLOG(INFO, this) << "bufferevent_socket_new() failed"; DCLOG(INFO, this) << "bufferevent_socket_new() failed";
close(fd); close(fd);
@ -100,10 +111,15 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream)
const_cast<sockaddr*>(&get_config()->downstream_addr.sa), const_cast<sockaddr*>(&get_config()->downstream_addr.sa),
get_config()->downstream_addrlen); get_config()->downstream_addrlen);
if(rv != 0) { if(rv != 0) {
connect_blocker->on_failure();
bufferevent_free(bev_); bufferevent_free(bev_);
bev_ = nullptr; bev_ = nullptr;
return SHRPX_ERR_NETWORK; return SHRPX_ERR_NETWORK;
} }
connect_blocker->on_success();
if(LOG_ENABLED(INFO)) { if(LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "Connecting to downstream server"; DCLOG(INFO, this) << "Connecting to downstream server";
} }

View File

@ -38,6 +38,7 @@
#include "shrpx_worker_config.h" #include "shrpx_worker_config.h"
#include "shrpx_config.h" #include "shrpx_config.h"
#include "shrpx_http2_session.h" #include "shrpx_http2_session.h"
#include "shrpx_connect_blocker.h"
#include "util.h" #include "util.h"
using namespace nghttp2; using namespace nghttp2;
@ -220,6 +221,8 @@ int ListenHandler::accept_connection(evutil_socket_t fd,
} }
client->set_http2_session(http2session_.get()); client->set_http2_session(http2session_.get());
client->set_http1_connect_blocker(http1_connect_blocker_.get());
return 0; return 0;
} }
size_t idx = worker_round_robin_cnt_ % workers_.size(); size_t idx = worker_round_robin_cnt_ % workers_.size();
@ -253,6 +256,20 @@ int ListenHandler::create_http2_session()
return rv; return rv;
} }
int ListenHandler::create_http1_connect_blocker()
{
int rv;
http1_connect_blocker_ = util::make_unique<ConnectBlocker>();
rv = http1_connect_blocker_->init(evbase_);
if(rv != 0) {
return -1;
}
return 0;
}
const WorkerStat* ListenHandler::get_worker_stat() const const WorkerStat* ListenHandler::get_worker_stat() const
{ {
return worker_stat_.get(); return worker_stat_.get();

View File

@ -55,6 +55,7 @@ struct WorkerInfo {
}; };
class Http2Session; class Http2Session;
class ConnectBlocker;
struct WorkerStat; struct WorkerStat;
class ListenHandler { class ListenHandler {
@ -66,6 +67,7 @@ public:
void worker_reopen_log_files(); void worker_reopen_log_files();
event_base* get_evbase() const; event_base* get_evbase() const;
int create_http2_session(); int create_http2_session();
int create_http1_connect_blocker();
const WorkerStat* get_worker_stat() const; const WorkerStat* get_worker_stat() const;
void set_evlistener4(evconnlistener *evlistener4); void set_evlistener4(evconnlistener *evlistener4);
evconnlistener* get_evlistener4() const; evconnlistener* get_evlistener4() const;
@ -86,6 +88,7 @@ private:
// Shared backend HTTP2 session. NULL if multi-threaded. In // Shared backend HTTP2 session. NULL if multi-threaded. In
// multi-threaded case, see shrpx_worker.cc. // multi-threaded case, see shrpx_worker.cc.
std::unique_ptr<Http2Session> http2session_; std::unique_ptr<Http2Session> http2session_;
std::unique_ptr<ConnectBlocker> http1_connect_blocker_;
bufferevent_rate_limit_group *rate_limit_group_; bufferevent_rate_limit_group *rate_limit_group_;
evconnlistener *evlistener4_; evconnlistener *evlistener4_;
evconnlistener *evlistener6_; evconnlistener *evlistener6_;

View File

@ -40,10 +40,12 @@ namespace shrpx {
ThreadEventReceiver::ThreadEventReceiver(event_base *evbase, ThreadEventReceiver::ThreadEventReceiver(event_base *evbase,
SSL_CTX *ssl_ctx, SSL_CTX *ssl_ctx,
Http2Session *http2session) Http2Session *http2session,
ConnectBlocker *http1_connect_blocker)
: evbase_(evbase), : evbase_(evbase),
ssl_ctx_(ssl_ctx), ssl_ctx_(ssl_ctx),
http2session_(http2session), http2session_(http2session),
http1_connect_blocker_(http1_connect_blocker),
rate_limit_group_(bufferevent_rate_limit_group_new rate_limit_group_(bufferevent_rate_limit_group_new
(evbase_, get_config()->worker_rate_limit_cfg)), (evbase_, get_config()->worker_rate_limit_cfg)),
worker_stat_(util::make_unique<WorkerStat>()) worker_stat_(util::make_unique<WorkerStat>())
@ -124,6 +126,7 @@ void ThreadEventReceiver::on_read(bufferevent *bev)
worker_stat_.get()); worker_stat_.get());
if(client_handler) { if(client_handler) {
client_handler->set_http2_session(http2session_); client_handler->set_http2_session(http2session_);
client_handler->set_http1_connect_blocker(http1_connect_blocker_);
if(LOG_ENABLED(INFO)) { if(LOG_ENABLED(INFO)) {
TLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created"; TLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created";

View File

@ -38,6 +38,7 @@
namespace shrpx { namespace shrpx {
class Http2Session; class Http2Session;
class ConnectBlocker;
struct WorkerStat; struct WorkerStat;
enum WorkerEventType { enum WorkerEventType {
@ -60,7 +61,8 @@ struct WorkerEvent {
class ThreadEventReceiver { class ThreadEventReceiver {
public: public:
ThreadEventReceiver(event_base *evbase, SSL_CTX *ssl_ctx, ThreadEventReceiver(event_base *evbase, SSL_CTX *ssl_ctx,
Http2Session *http2session); Http2Session *http2session,
ConnectBlocker *http1_connect_blocker);
~ThreadEventReceiver(); ~ThreadEventReceiver();
void on_read(bufferevent *bev); void on_read(bufferevent *bev);
private: private:
@ -69,6 +71,7 @@ private:
// Shared HTTP2 session for each thread. NULL if not client // Shared HTTP2 session for each thread. NULL if not client
// mode. Not deleted by this object. // mode. Not deleted by this object.
Http2Session *http2session_; Http2Session *http2session_;
ConnectBlocker *http1_connect_blocker_;
bufferevent_rate_limit_group *rate_limit_group_; bufferevent_rate_limit_group *rate_limit_group_;
std::unique_ptr<WorkerStat> worker_stat_; std::unique_ptr<WorkerStat> worker_stat_;
}; };

View File

@ -37,6 +37,7 @@
#include "shrpx_log.h" #include "shrpx_log.h"
#include "shrpx_http2_session.h" #include "shrpx_http2_session.h"
#include "shrpx_worker_config.h" #include "shrpx_worker_config.h"
#include "shrpx_connect_blocker.h"
#include "util.h" #include "util.h"
using namespace nghttp2; using namespace nghttp2;
@ -93,15 +94,25 @@ void Worker::run()
return; return;
} }
std::unique_ptr<Http2Session> http2session; std::unique_ptr<Http2Session> http2session;
std::unique_ptr<ConnectBlocker> http1_connect_blocker;
if(get_config()->downstream_proto == PROTO_HTTP2) { if(get_config()->downstream_proto == PROTO_HTTP2) {
http2session = util::make_unique<Http2Session>(evbase.get(), cl_ssl_ctx_); http2session = util::make_unique<Http2Session>(evbase.get(), cl_ssl_ctx_);
if(http2session->init_notification() == -1) { if(http2session->init_notification() == -1) {
DIE(); DIE();
} }
} else {
http1_connect_blocker = util::make_unique<ConnectBlocker>();
if(http1_connect_blocker->init(evbase.get()) == -1) {
DIE();
} }
auto receiver = util::make_unique<ThreadEventReceiver>(evbase.get(), }
auto receiver = util::make_unique<ThreadEventReceiver>
(evbase.get(),
sv_ssl_ctx_, sv_ssl_ctx_,
http2session.get()); http2session.get(),
http1_connect_blocker.get());
bufferevent_enable(bev.get(), EV_READ); bufferevent_enable(bev.get(), EV_READ);
bufferevent_setcb(bev.get(), readcb, nullptr, eventcb, receiver.get()); bufferevent_setcb(bev.get(), readcb, nullptr, eventcb, receiver.get());