nghttpx: Check HTTP/2 downstream connection after certain idle time

Previously when requests are issued to HTTP/2 downstream connection,
but it turns out that connection is down, handlers of those requests
are deleted.  In some situations, we only know connection is down when
we write something to network, so we'd like to handle this kind of
situation in more robust manner.  In this change, certain seconds
passed after last network activity, we first issue PING frame to
downstream connection before issuing new HTTP request.  If writing
PING frame is failed, it means connection was lost.  In this case,
instead of deleting handler, pending requests are migrated to new
HTTP2/ downstream connection, so that it can continue without
affecting upstream connection.
This commit is contained in:
Tatsuhiro Tsujikawa 2014-12-09 01:30:15 +09:00
parent e0a2353c56
commit dce20c3e6a
10 changed files with 262 additions and 17 deletions

View File

@ -255,10 +255,11 @@ ssize_t http2_data_read_callback(nghttp2_session *session, int32_t stream_id,
int Http2DownstreamConnection::push_request_headers() { int Http2DownstreamConnection::push_request_headers() {
int rv; int rv;
if (http2session_->get_state() != Http2Session::CONNECTED) { if (!http2session_->can_push_request()) {
// The HTTP2 session to the backend has not been established. // The HTTP2 session to the backend has not been established or
// This function will be called again just after it is // connection is now being checked. This function will be called
// established. // again just after it is established.
http2session_->start_checking_connection();
return 0; return 0;
} }
if (!downstream_) { if (!downstream_) {

View File

@ -54,7 +54,8 @@ namespace shrpx {
Http2Session::Http2Session(event_base *evbase, SSL_CTX *ssl_ctx) Http2Session::Http2Session(event_base *evbase, SSL_CTX *ssl_ctx)
: evbase_(evbase), ssl_ctx_(ssl_ctx), ssl_(nullptr), session_(nullptr), : evbase_(evbase), ssl_ctx_(ssl_ctx), ssl_(nullptr), session_(nullptr),
bev_(nullptr), wrbev_(nullptr), rdbev_(nullptr), bev_(nullptr), wrbev_(nullptr), rdbev_(nullptr),
settings_timerev_(nullptr), fd_(-1), state_(DISCONNECTED), settings_timerev_(nullptr), connection_check_timerev_(nullptr), fd_(-1),
state_(DISCONNECTED), connection_check_state_(CONNECTION_CHECK_NONE),
notified_(false), flow_control_(false) {} notified_(false), flow_control_(false) {}
Http2Session::~Http2Session() { disconnect(); } Http2Session::~Http2Session() { disconnect(); }
@ -66,6 +67,11 @@ int Http2Session::disconnect() {
nghttp2_session_del(session_); nghttp2_session_del(session_);
session_ = nullptr; session_ = nullptr;
if (connection_check_timerev_) {
event_free(connection_check_timerev_);
connection_check_timerev_ = nullptr;
}
if (settings_timerev_) { if (settings_timerev_) {
event_free(settings_timerev_); event_free(settings_timerev_);
settings_timerev_ = nullptr; settings_timerev_ = nullptr;
@ -109,29 +115,40 @@ int Http2Session::disconnect() {
} }
notified_ = false; notified_ = false;
connection_check_state_ = CONNECTION_CHECK_NONE;
state_ = DISCONNECTED; state_ = DISCONNECTED;
// Delete all client handler associated to Downstream. When deleting // Delete all client handler associated to Downstream. When deleting
// Http2DownstreamConnection, it calls this object's // Http2DownstreamConnection, it calls this object's
// remove_downstream_connection(). The multiple // remove_downstream_connection(). The multiple
// Http2DownstreamConnection objects belong to the same ClientHandler // Http2DownstreamConnection objects belong to the same
// object. So first dump ClientHandler objects and delete them once // ClientHandler object. So first dump ClientHandler objects. We
// and for all. // want to allow creating new pending Http2DownstreamConnection with
std::vector<Http2DownstreamConnection *> vec(std::begin(dconns_), // this object. In order to achieve this, we first swap dconns_ and
std::end(dconns_)); // streams_. Upstream::on_downstream_reset() may add
// Http2DownstreamConnection.
std::set<Http2DownstreamConnection *> dconns;
dconns.swap(dconns_);
std::set<StreamData *> streams;
streams.swap(streams_);
std::set<ClientHandler *> handlers; std::set<ClientHandler *> handlers;
for (auto dc : vec) { for (auto dc : dconns) {
if (!dc->get_client_handler()) {
continue;
}
handlers.insert(dc->get_client_handler()); handlers.insert(dc->get_client_handler());
} }
for (auto &h : handlers) { for (auto h : handlers) {
delete h; if (h->get_upstream()->on_downstream_reset() != 0) {
delete h;
}
} }
dconns_.clear(); for (auto &s : streams) {
for (auto &s : streams_) {
delete s; delete s;
} }
streams_.clear();
return 0; return 0;
} }
@ -211,6 +228,11 @@ void readcb(bufferevent *bev, void *ptr) {
int rv; int rv;
auto http2session = static_cast<Http2Session *>(ptr); auto http2session = static_cast<Http2Session *>(ptr);
http2session->reset_timeouts(); http2session->reset_timeouts();
rv = http2session->connection_alive();
if (rv != 0) {
http2session->disconnect();
return;
}
rv = http2session->on_read(); rv = http2session->on_read();
if (rv != 0) { if (rv != 0) {
http2session->disconnect(); http2session->disconnect();
@ -220,12 +242,17 @@ void readcb(bufferevent *bev, void *ptr) {
namespace { namespace {
void writecb(bufferevent *bev, void *ptr) { void writecb(bufferevent *bev, void *ptr) {
int rv;
auto http2session = static_cast<Http2Session *>(ptr); auto http2session = static_cast<Http2Session *>(ptr);
http2session->reset_timeouts(); http2session->reset_timeouts();
rv = http2session->connection_alive();
if (rv != 0) {
http2session->disconnect();
return;
}
if (evbuffer_get_length(bufferevent_get_output(bev)) > 0) { if (evbuffer_get_length(bufferevent_get_output(bev)) > 0) {
return; return;
} }
int rv;
rv = http2session->on_write(); rv = http2session->on_write();
if (rv != 0) { if (rv != 0) {
http2session->disconnect(); http2session->disconnect();
@ -1271,6 +1298,15 @@ int on_frame_not_send_callback(nghttp2_session *session,
} }
} // namespace } // namespace
namespace {
void connection_check_timeout_cb(evutil_socket_t fd, short what, void *arg) {
auto http2session = static_cast<Http2Session *>(arg);
SSLOG(INFO, http2session) << "connection check required";
http2session->set_connection_check_state(
Http2Session::CONNECTION_CHECK_REQUIRED);
}
} // namespace
int Http2Session::on_connect() { int Http2Session::on_connect() {
int rv; int rv;
if (ssl_ctx_) { if (ssl_ctx_) {
@ -1395,6 +1431,17 @@ int Http2Session::on_connect() {
return 0; return 0;
} }
connection_check_timerev_ =
evtimer_new(evbase_, connection_check_timeout_cb, this);
if (connection_check_timerev_ == nullptr) {
return -1;
}
rv = reset_connection_check_timer();
if (rv != 0) {
return -1;
}
// submit pending request // submit pending request
for (auto dconn : dconns_) { for (auto dconn : dconns_) {
if (dconn->push_request_headers() == 0) { if (dconn->push_request_headers() == 0) {
@ -1566,4 +1613,82 @@ void Http2Session::reset_timeouts() {
&get_config()->downstream_write_timeout); &get_config()->downstream_write_timeout);
} }
bool Http2Session::can_push_request() const {
return state_ == CONNECTED &&
connection_check_state_ == CONNECTION_CHECK_NONE;
}
void Http2Session::start_checking_connection() {
if (state_ != CONNECTED ||
connection_check_state_ != CONNECTION_CHECK_REQUIRED) {
return;
}
connection_check_state_ = CONNECTION_CHECK_STARTED;
SSLOG(INFO, this) << "Start checking connection";
// If connection is down, we may get error when writing data. Issue
// ping frame to see whether connection is alive.
nghttp2_submit_ping(session_, NGHTTP2_FLAG_NONE, NULL);
notify();
}
int Http2Session::reset_connection_check_timer() {
int rv;
timeval timeout = {5, 0};
rv = evtimer_add(connection_check_timerev_, &timeout);
if (rv == -1) {
return -1;
}
return 0;
}
int Http2Session::connection_alive() {
int rv;
rv = reset_connection_check_timer();
if (rv != 0) {
return -1;
}
if (connection_check_state_ == CONNECTION_CHECK_NONE) {
return 0;
}
SSLOG(INFO, this) << "Connection alive";
connection_check_state_ = CONNECTION_CHECK_NONE;
// submit pending request
for (auto dconn : dconns_) {
auto downstream = dconn->get_downstream();
if (!downstream ||
(downstream->get_request_state() != Downstream::HEADER_COMPLETE &&
downstream->get_request_state() != Downstream::MSG_COMPLETE) ||
downstream->get_response_state() != Downstream::INITIAL) {
continue;
}
auto upstream = downstream->get_upstream();
if (dconn->push_request_headers() == 0) {
upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0);
continue;
}
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "backend request failed";
}
upstream->on_downstream_abort_request(downstream, 400);
}
return 0;
}
void Http2Session::set_connection_check_state(int state) {
connection_check_state_ = state;
}
} // namespace shrpx } // namespace shrpx

View File

@ -108,6 +108,20 @@ public:
void reset_timeouts(); void reset_timeouts();
// Returns true if request can be issued on downstream connection.
bool can_push_request() const;
// Initiates the connection checking if downstream connection has
// been established and connection checking is required.
void start_checking_connection();
// Resets connection check timer. After timeout, we require
// connection checking.
int reset_connection_check_timer();
// Signals that connection is alive. Internally
// reset_connection_check_timer() is called.
int connection_alive();
// Change connection check state.
void set_connection_check_state(int state);
enum { enum {
// Disconnected // Disconnected
DISCONNECTED, DISCONNECTED,
@ -125,6 +139,15 @@ public:
static const size_t OUTBUF_MAX_THRES = 64 * 1024; static const size_t OUTBUF_MAX_THRES = 64 * 1024;
enum {
// Connection checking is not required
CONNECTION_CHECK_NONE,
// Connection checking is required
CONNECTION_CHECK_REQUIRED,
// Connection checking has been started
CONNECTION_CHECK_STARTED
};
private: private:
std::set<Http2DownstreamConnection *> dconns_; std::set<Http2DownstreamConnection *> dconns_;
std::set<StreamData *> streams_; std::set<StreamData *> streams_;
@ -139,12 +162,14 @@ private:
bufferevent *wrbev_; bufferevent *wrbev_;
bufferevent *rdbev_; bufferevent *rdbev_;
event *settings_timerev_; event *settings_timerev_;
event *connection_check_timerev_;
// fd_ is used for proxy connection and no TLS connection. For // fd_ is used for proxy connection and no TLS connection. For
// direct or TLS connection, it may be -1 even after connection is // direct or TLS connection, it may be -1 even after connection is
// established. Use bufferevent_getfd(bev_) to get file descriptor // established. Use bufferevent_getfd(bev_) to get file descriptor
// in these cases. // in these cases.
int fd_; int fd_;
int state_; int state_;
int connection_check_state_;
bool notified_; bool notified_;
bool flow_control_; bool flow_control_;
}; };

View File

@ -1397,4 +1397,38 @@ void Http2Upstream::on_handler_delete() {
} }
} }
int Http2Upstream::on_downstream_reset() {
int rv;
for (auto &ent : downstream_queue_.get_active_downstreams()) {
auto downstream = ent.second.get();
if ((downstream->get_request_state() != Downstream::HEADER_COMPLETE &&
downstream->get_request_state() != Downstream::MSG_COMPLETE) ||
downstream->get_response_state() != Downstream::INITIAL) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
downstream->pop_downstream_connection();
continue;
}
// downstream connection is clean; we can retry with new
// downstream connection.
downstream->pop_downstream_connection();
rv = downstream->attach_downstream_connection(
handler_->get_downstream_connection());
if (rv != 0) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
downstream->pop_downstream_connection();
continue;
}
}
rv = send();
if (rv != 0) {
return -1;
}
return 0;
}
} // namespace shrpx } // namespace shrpx

View File

@ -75,6 +75,7 @@ public:
virtual int on_downstream_body_complete(Downstream *downstream); virtual int on_downstream_body_complete(Downstream *downstream);
virtual void on_handler_delete(); virtual void on_handler_delete();
virtual int on_downstream_reset();
virtual void reset_timeouts(); virtual void reset_timeouts();

View File

@ -936,4 +936,24 @@ void HttpsUpstream::on_handler_delete() {
} }
} }
int HttpsUpstream::on_downstream_reset() {
int rv;
if ((downstream_->get_request_state() != Downstream::HEADER_COMPLETE &&
downstream_->get_request_state() != Downstream::MSG_COMPLETE) ||
downstream_->get_response_state() != Downstream::INITIAL) {
// Return error so that caller can delete handler
return -1;
}
downstream_->pop_downstream_connection();
rv = downstream_->attach_downstream_connection(
handler_->get_downstream_connection());
if (rv != 0) {
return -1;
}
return 0;
}
} // namespace shrpx } // namespace shrpx

View File

@ -68,6 +68,7 @@ public:
virtual int on_downstream_body_complete(Downstream *downstream); virtual int on_downstream_body_complete(Downstream *downstream);
virtual void on_handler_delete(); virtual void on_handler_delete();
virtual int on_downstream_reset();
virtual void reset_timeouts(); virtual void reset_timeouts();

View File

@ -1091,4 +1091,38 @@ void SpdyUpstream::on_handler_delete() {
} }
} }
int SpdyUpstream::on_downstream_reset() {
int rv;
for (auto &ent : downstream_queue_.get_active_downstreams()) {
auto downstream = ent.second.get();
if ((downstream->get_request_state() != Downstream::HEADER_COMPLETE &&
downstream->get_request_state() != Downstream::MSG_COMPLETE) ||
downstream->get_response_state() != Downstream::INITIAL) {
rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
downstream->pop_downstream_connection();
continue;
}
// downstream connection is clean; we can retry with new
// downstream connection.
downstream->pop_downstream_connection();
rv = downstream->attach_downstream_connection(
handler_->get_downstream_connection());
if (rv != 0) {
rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
downstream->pop_downstream_connection();
continue;
}
}
rv = send();
if (rv != 0) {
return -1;
}
return 0;
}
} // namespace shrpx } // namespace shrpx

View File

@ -74,6 +74,7 @@ public:
virtual int on_downstream_body_complete(Downstream *downstream); virtual int on_downstream_body_complete(Downstream *downstream);
virtual void on_handler_delete(); virtual void on_handler_delete();
virtual int on_downstream_reset();
virtual void reset_timeouts(); virtual void reset_timeouts();

View File

@ -56,6 +56,9 @@ public:
virtual int on_downstream_body_complete(Downstream *downstream) = 0; virtual int on_downstream_body_complete(Downstream *downstream) = 0;
virtual void on_handler_delete() = 0; virtual void on_handler_delete() = 0;
// Called when downstream connection is reset. Currently this is
// only used by Http2Session.
virtual int on_downstream_reset() = 0;
virtual void pause_read(IOCtrlReason reason) = 0; virtual void pause_read(IOCtrlReason reason) = 0;
virtual int resume_read(IOCtrlReason reason, Downstream *downstream, virtual int resume_read(IOCtrlReason reason, Downstream *downstream,