From 76703f79fa6d7ac521dbd637865dd5d4dbfaa5ad Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sat, 9 Aug 2014 18:47:45 +0900 Subject: [PATCH] nghttpx: Add stream level timeout for HTTP/2 and SPDY upstream/downstream --- src/shrpx.cc | 25 +++ src/shrpx_config.cc | 16 ++ src/shrpx_config.h | 4 + src/shrpx_downstream.cc | 225 +++++++++++++++++++++++ src/shrpx_downstream.h | 33 ++++ src/shrpx_downstream_connection.h | 1 + src/shrpx_http2_downstream_connection.cc | 42 ++++- src/shrpx_http2_downstream_connection.h | 5 +- src/shrpx_http2_session.cc | 47 ++++- src/shrpx_http2_upstream.cc | 47 ++++- src/shrpx_http2_upstream.h | 1 + src/shrpx_spdy_upstream.cc | 31 ++++ src/shrpx_spdy_upstream.h | 1 + src/shrpx_upstream.h | 1 + 14 files changed, 466 insertions(+), 13 deletions(-) diff --git a/src/shrpx.cc b/src/shrpx.cc index 2cf67e66..d4b203c9 100644 --- a/src/shrpx.cc +++ b/src/shrpx.cc @@ -493,6 +493,12 @@ void fill_default_config() mod_config()->downstream_write_timeout.tv_sec = 60; mod_config()->downstream_write_timeout.tv_usec = 0; + // Read timeout for HTTP/2 stream + mod_config()->stream_read_timeout = {30, 0}; + + // Write timeout for HTTP/2 stream + mod_config()->stream_write_timeout = {30, 0}; + // Timeout for pooled (idle) connections mod_config()->downstream_idle_read_timeout.tv_sec = 60; @@ -680,6 +686,15 @@ Timeout: connections. Default: )" << get_config()->upstream_write_timeout.tv_sec << R"( + --stream-read-timeout= + Specify read timeout for HTTP/2 and SPDY streams. + Default: )" + << get_config()->stream_read_timeout.tv_sec << R"( + --stream-write-timeout= + Specify write timeout for HTTP/2 and SPDY + streams. + Default: )" + << get_config()->stream_write_timeout.tv_sec << R"( --backend-read-timeout= Specify read timeout for backend connection. Default: )" @@ -982,6 +997,8 @@ int main(int argc, char **argv) {"accesslog-syslog", no_argument, &flag, 57}, {"errorlog-file", required_argument, &flag, 58}, {"errorlog-syslog", no_argument, &flag, 59}, + {"stream-read-timeout", required_argument, &flag, 60}, + {"stream-write-timeout", required_argument, &flag, 61}, {nullptr, 0, nullptr, 0 } }; @@ -1242,6 +1259,14 @@ int main(int argc, char **argv) // --errorlog-syslog cmdcfgs.emplace_back(SHRPX_OPT_ERRORLOG_SYSLOG, "yes"); break; + case 60: + // --stream-read-timeout + cmdcfgs.emplace_back(SHRPX_OPT_STREAM_READ_TIMEOUT, optarg); + break; + case 61: + // --stream-write-timeout + cmdcfgs.emplace_back(SHRPX_OPT_STREAM_WRITE_TIMEOUT, optarg); + break; default: break; } diff --git a/src/shrpx_config.cc b/src/shrpx_config.cc index fe16a87a..9c2b3cec 100644 --- a/src/shrpx_config.cc +++ b/src/shrpx_config.cc @@ -74,6 +74,8 @@ const char SHRPX_OPT_FRONTEND_READ_TIMEOUT[] = "frontend-read-timeout"; const char SHRPX_OPT_FRONTEND_WRITE_TIMEOUT[] = "frontend-write-timeout"; const char SHRPX_OPT_BACKEND_READ_TIMEOUT[] = "backend-read-timeout"; const char SHRPX_OPT_BACKEND_WRITE_TIMEOUT[] = "backend-write-timeout"; +const char SHRPX_OPT_STREAM_READ_TIMEOUT[] = "stream-read-timeout"; +const char SHRPX_OPT_STREAM_WRITE_TIMEOUT[] = "stream-write-timeout"; const char SHRPX_OPT_ACCESSLOG_FILE[] = "accesslog-file"; const char SHRPX_OPT_ACCESSLOG_SYSLOG[] = "accesslog-syslog"; const char SHRPX_OPT_ERRORLOG_FILE[] = "errorlog-file"; @@ -403,6 +405,20 @@ int parse_config(const char *opt, const char *optarg) return 0; } + if(util::strieq(opt, SHRPX_OPT_STREAM_READ_TIMEOUT)) { + timeval tv = {strtol(optarg, nullptr, 10), 0}; + mod_config()->stream_read_timeout = tv; + + return 0; + } + + if(util::strieq(opt, SHRPX_OPT_STREAM_WRITE_TIMEOUT)) { + timeval tv = {strtol(optarg, nullptr, 10), 0}; + mod_config()->stream_write_timeout = tv; + + return 0; + } + if(util::strieq(opt, SHRPX_OPT_ACCESSLOG_FILE)) { mod_config()->accesslog_file = strcopy(optarg); diff --git a/src/shrpx_config.h b/src/shrpx_config.h index 3e057df8..2ebeff83 100644 --- a/src/shrpx_config.h +++ b/src/shrpx_config.h @@ -71,6 +71,8 @@ extern const char SHRPX_OPT_FRONTEND_READ_TIMEOUT[]; extern const char SHRPX_OPT_FRONTEND_WRITE_TIMEOUT[]; extern const char SHRPX_OPT_BACKEND_READ_TIMEOUT[]; extern const char SHRPX_OPT_BACKEND_WRITE_TIMEOUT[]; +extern const char SHRPX_OPT_STREAM_READ_TIMEOUT[]; +extern const char SHRPX_OPT_STREAM_WRITE_TIMEOUT[]; extern const char SHRPX_OPT_ACCESSLOG_FILE[]; extern const char SHRPX_OPT_ACCESSLOG_SYSLOG[]; extern const char SHRPX_OPT_ERRORLOG_FILE[]; @@ -162,6 +164,8 @@ struct Config { timeval upstream_write_timeout; timeval downstream_read_timeout; timeval downstream_write_timeout; + timeval stream_read_timeout; + timeval stream_write_timeout; timeval downstream_idle_read_timeout; std::unique_ptr host; std::unique_ptr private_key_file; diff --git a/src/shrpx_downstream.cc b/src/shrpx_downstream.cc index 4424ce2b..7e213293 100644 --- a/src/shrpx_downstream.cc +++ b/src/shrpx_downstream.cc @@ -44,6 +44,10 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority) upstream_(upstream), dconn_(nullptr), response_body_buf_(nullptr), + upstream_rtimerev_(nullptr), + upstream_wtimerev_(nullptr), + downstream_rtimerev_(nullptr), + downstream_wtimerev_(nullptr), request_headers_sum_(0), response_headers_sum_(0), request_datalen_(0), @@ -82,6 +86,18 @@ Downstream::~Downstream() // Passing NULL to evbuffer_free() causes segmentation fault. evbuffer_free(response_body_buf_); } + if(upstream_rtimerev_) { + event_free(upstream_rtimerev_); + } + if(upstream_wtimerev_) { + event_free(upstream_wtimerev_); + } + if(downstream_rtimerev_) { + event_free(downstream_rtimerev_); + } + if(downstream_wtimerev_) { + event_free(downstream_wtimerev_); + } if(dconn_) { delete dconn_; } @@ -880,4 +896,213 @@ bool Downstream::response_pseudo_header_allowed() const return pseudo_header_allowed(response_headers_); } +namespace { +void upstream_timeoutcb(evutil_socket_t fd, short event, void *arg) +{ + auto downstream = static_cast(arg); + auto upstream = downstream->get_upstream(); + + auto which = event == EV_READ ? "read" : "write"; + + if(LOG_ENABLED(INFO)) { + DLOG(INFO, downstream) << "upstream timeout stream_id=" + << downstream->get_stream_id() + << " event=" << which; + } + + downstream->disable_upstream_rtimer(); + downstream->disable_upstream_wtimer(); + + upstream->on_timeout(downstream); +} +} // namespace + +namespace { +void upstream_rtimeoutcb(evutil_socket_t fd, short event, void *arg) +{ + upstream_timeoutcb(fd, EV_READ, arg); +} +} // namespace + +namespace { +void upstream_wtimeoutcb(evutil_socket_t fd, short event, void *arg) +{ + upstream_timeoutcb(fd, EV_WRITE, arg); +} +} // namespace + +namespace { +event* init_timer(event_base *evbase, event_callback_fn cb, void *arg) +{ + auto timerev = evtimer_new(evbase, cb, arg); + + if(timerev == nullptr) { + LOG(WARNING) << "timer initialization failed"; + return nullptr; + } + + return timerev; +} +} // namespace + +void Downstream::init_upstream_timer() +{ + auto evbase = upstream_->get_client_handler()->get_evbase(); + + upstream_rtimerev_ = init_timer(evbase, upstream_rtimeoutcb, this); + upstream_wtimerev_ = init_timer(evbase, upstream_wtimeoutcb, this); +} + +namespace { +void reset_timer(event *timer, const timeval *timeout) +{ + if(!timer) { + return; + } + + event_add(timer, timeout); +} +} // namespace + +namespace { +void try_reset_timer(event *timer, const timeval *timeout) +{ + if(!timer) { + return; + } + + if(!evtimer_pending(timer, nullptr)) { + return; + } + + event_add(timer, timeout); +} +} // namespace + +namespace { +void ensure_timer(event *timer, const timeval *timeout) +{ + if(!timer) { + return; + } + + if(evtimer_pending(timer, nullptr)) { + return; + } + + event_add(timer, timeout); +} +} // namespace + +namespace { +void disable_timer(event *timer) +{ + if(!timer) { + return; + } + + event_del(timer); +} +} // namespace + +void Downstream::reset_upstream_rtimer() +{ + reset_timer(upstream_rtimerev_, &get_config()->stream_read_timeout); + try_reset_timer(upstream_wtimerev_, &get_config()->stream_write_timeout); +} + +void Downstream::reset_upstream_wtimer() +{ + reset_timer(upstream_wtimerev_, &get_config()->stream_write_timeout); + try_reset_timer(upstream_rtimerev_, &get_config()->stream_read_timeout); +} + +void Downstream::ensure_upstream_wtimer() +{ + ensure_timer(upstream_wtimerev_, &get_config()->stream_write_timeout); +} + +void Downstream::disable_upstream_rtimer() +{ + disable_timer(upstream_rtimerev_); +} + +void Downstream::disable_upstream_wtimer() +{ + disable_timer(upstream_wtimerev_); +} + +namespace { +void downstream_timeoutcb(evutil_socket_t fd, short event, void *arg) +{ + auto downstream = static_cast(arg); + + auto which = event == EV_READ ? "read" : "write"; + + if(LOG_ENABLED(INFO)) { + DLOG(INFO, downstream) << "downstream timeout stream_id=" + << downstream->get_downstream_stream_id() + << " event=" << which; + } + + downstream->disable_downstream_rtimer(); + downstream->disable_downstream_wtimer(); + + auto dconn = downstream->get_downstream_connection(); + + if(dconn) { + dconn->on_timeout(); + } +} +} // namespace + +namespace { +void downstream_rtimeoutcb(evutil_socket_t fd, short event, void *arg) +{ + downstream_timeoutcb(fd, EV_READ, arg); +} +} // namespace + +namespace { +void downstream_wtimeoutcb(evutil_socket_t fd, short event, void *arg) +{ + downstream_timeoutcb(fd, EV_WRITE, arg); +} +} // namespace + +void Downstream::init_downstream_timer() +{ + auto evbase = upstream_->get_client_handler()->get_evbase(); + + downstream_rtimerev_ = init_timer(evbase, downstream_rtimeoutcb, this); + downstream_wtimerev_ = init_timer(evbase, downstream_wtimeoutcb, this); +} + +void Downstream::reset_downstream_rtimer() +{ + reset_timer(downstream_rtimerev_, &get_config()->stream_read_timeout); + try_reset_timer(downstream_wtimerev_, &get_config()->stream_write_timeout); +} + +void Downstream::reset_downstream_wtimer() +{ + reset_timer(downstream_wtimerev_, &get_config()->stream_write_timeout); + try_reset_timer(downstream_rtimerev_, &get_config()->stream_read_timeout); +} + +void Downstream::ensure_downstream_wtimer() +{ + ensure_timer(downstream_wtimerev_, &get_config()->stream_write_timeout); +} + +void Downstream::disable_downstream_rtimer() +{ + disable_timer(downstream_rtimerev_); +} + +void Downstream::disable_downstream_wtimer() +{ + disable_timer(downstream_wtimerev_); +} + } // namespace shrpx diff --git a/src/shrpx_downstream.h b/src/shrpx_downstream.h index 98b4a713..490aa2e5 100644 --- a/src/shrpx_downstream.h +++ b/src/shrpx_downstream.h @@ -232,6 +232,33 @@ public: bool get_rst_stream_after_end_stream() const; void set_rst_stream_after_end_stream(bool f); + + // Initializes upstream timers, but they are not pending. + void init_upstream_timer(); + // Makes upstream read timer pending. If it is already pending, + // timeout value is reset. This function also resets write timer if + // it is already pending. + void reset_upstream_rtimer(); + // Makes upstream write timer pending. If it is already pending, + // timeout value is reset. This function also resets read timer if + // it is already pending. + void reset_upstream_wtimer(); + // Makes upstream write timer pending. If it is already pending, do + // nothing. + void ensure_upstream_wtimer(); + // Disables upstream read timer. + void disable_upstream_rtimer(); + // Disables upstream write timer. + void disable_upstream_wtimer(); + + // Downstream timer functions. They works in a similar way just + // like the upstream timer function. + void init_downstream_timer(); + void reset_downstream_rtimer(); + void reset_downstream_wtimer(); + void ensure_downstream_wtimer(); + void disable_downstream_rtimer(); + void disable_downstream_wtimer(); private: Headers request_headers_; Headers response_headers_; @@ -255,6 +282,12 @@ private: // body. nghttp2 library reads data from this in the callback. evbuffer *response_body_buf_; + event *upstream_rtimerev_; + event *upstream_wtimerev_; + + event *downstream_rtimerev_; + event *downstream_wtimerev_; + size_t request_headers_sum_; size_t response_headers_sum_; diff --git a/src/shrpx_downstream_connection.h b/src/shrpx_downstream_connection.h index 48b17c94..8b56a1e7 100644 --- a/src/shrpx_downstream_connection.h +++ b/src/shrpx_downstream_connection.h @@ -54,6 +54,7 @@ public: virtual int on_read() = 0; virtual int on_write() = 0; + virtual int on_timeout() { return 0; } virtual void on_upstream_change(Upstream *uptream) = 0; virtual int on_priority_change(int32_t pri) = 0; diff --git a/src/shrpx_http2_downstream_connection.cc b/src/shrpx_http2_downstream_connection.cc index 549ecd13..ea2d2558 100644 --- a/src/shrpx_http2_downstream_connection.cc +++ b/src/shrpx_http2_downstream_connection.cc @@ -64,6 +64,9 @@ Http2DownstreamConnection::~Http2DownstreamConnection() evbuffer_free(request_body_buf_); } if(downstream_) { + downstream_->disable_downstream_rtimer(); + downstream_->disable_downstream_wtimer(); + if(submit_rst_stream(downstream_) == 0) { http2session_->notify(); } @@ -120,6 +123,9 @@ int Http2DownstreamConnection::attach_downstream(Downstream *downstream) } downstream->set_downstream_connection(this); downstream_ = downstream; + + downstream_->init_downstream_timer(); + return 0; } @@ -142,12 +148,15 @@ void Http2DownstreamConnection::detach_downstream(Downstream *downstream) } downstream->set_downstream_connection(nullptr); + downstream->disable_downstream_rtimer(); + downstream->disable_downstream_wtimer(); downstream_ = nullptr; client_handler_->pool_downstream_connection(this); } -int Http2DownstreamConnection::submit_rst_stream(Downstream *downstream) +int Http2DownstreamConnection::submit_rst_stream(Downstream *downstream, + nghttp2_error_code error_code) { int rv = -1; if(http2session_->get_state() == Http2Session::CONNECTED && @@ -163,8 +172,7 @@ int Http2DownstreamConnection::submit_rst_stream(Downstream *downstream) << downstream->get_downstream_stream_id(); } rv = http2session_->submit_rst_stream - (downstream->get_downstream_stream_id(), - NGHTTP2_INTERNAL_ERROR); + (downstream->get_downstream_stream_id(), error_code); } } return rv; @@ -205,6 +213,8 @@ ssize_t http2_data_read_callback(nghttp2_session *session, !downstream->get_upgraded())) { *data_flags |= NGHTTP2_DATA_FLAG_EOF; } else { + downstream->disable_downstream_wtimer(); + return NGHTTP2_ERR_DEFERRED; } break; @@ -227,6 +237,9 @@ ssize_t http2_data_read_callback(nghttp2_session *session, *data_flags |= NGHTTP2_DATA_FLAG_EOF; break; } + + downstream->disable_downstream_wtimer(); + return NGHTTP2_ERR_DEFERRED; } } @@ -242,6 +255,13 @@ ssize_t http2_data_read_callback(nghttp2_session *session, break; } } + + if(evbuffer_get_length(body) > 0 && !downstream->get_output_buffer_full()) { + downstream->reset_downstream_wtimer(); + } else { + downstream->disable_downstream_wtimer(); + } + return nread; } } // namespace @@ -455,6 +475,7 @@ int Http2DownstreamConnection::push_request_headers() } downstream_->clear_request_headers(); + downstream_->reset_downstream_wtimer(); http2session_->notify(); return 0; @@ -473,6 +494,9 @@ int Http2DownstreamConnection::push_upload_data_chunk(const uint8_t *data, if(rv != 0) { return -1; } + + downstream_->ensure_downstream_wtimer(); + http2session_->notify(); } return 0; @@ -486,6 +510,9 @@ int Http2DownstreamConnection::end_upload_data() if(rv != 0) { return -1; } + + downstream_->ensure_downstream_wtimer(); + http2session_->notify(); } return 0; @@ -586,4 +613,13 @@ int Http2DownstreamConnection::on_priority_change(int32_t pri) return 0; } +int Http2DownstreamConnection::on_timeout() +{ + if(!downstream_) { + return 0; + } + + return submit_rst_stream(downstream_, NGHTTP2_NO_ERROR); +} + } // namespace shrpx diff --git a/src/shrpx_http2_downstream_connection.h b/src/shrpx_http2_downstream_connection.h index e3353d7c..18fd11e6 100644 --- a/src/shrpx_http2_downstream_connection.h +++ b/src/shrpx_http2_downstream_connection.h @@ -57,6 +57,7 @@ public: virtual int on_read(); virtual int on_write(); + virtual int on_timeout(); virtual void on_upstream_change(Upstream *upstream) {} virtual int on_priority_change(int32_t pri); @@ -69,7 +70,9 @@ public: void attach_stream_data(StreamData *sd); StreamData* detach_stream_data(); - int submit_rst_stream(Downstream *downstream); + int submit_rst_stream + (Downstream *downstream, + nghttp2_error_code error_code = NGHTTP2_INTERNAL_ERROR); private: Http2Session *http2session_; evbuffer *request_body_buf_; diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc index da4f6087..899dc6a9 100644 --- a/src/shrpx_http2_session.cc +++ b/src/shrpx_http2_session.cc @@ -999,7 +999,7 @@ int on_response_headers(Http2Session *http2session, if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream) != 0) { // If resume_read fails, just drop connection. Not ideal. delete upstream->get_client_handler(); - return 0; + return -1; } downstream->set_request_state(Downstream::HEADER_COMPLETE); if(LOG_ENABLED(INFO)) { @@ -1017,7 +1017,7 @@ int on_response_headers(Http2Session *http2session, NGHTTP2_PROTOCOL_ERROR); downstream->set_response_state(Downstream::MSG_RESET); } - call_downstream_readcb(http2session, downstream); + return 0; } } // namespace @@ -1051,6 +1051,8 @@ int on_frame_recv_callback } else if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { + downstream->disable_downstream_rtimer(); + if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { downstream->set_response_state(Downstream::MSG_COMPLETE); @@ -1082,17 +1084,16 @@ int on_frame_recv_callback rv = on_response_headers(http2session, downstream, session, frame); if(rv != 0) { - return rv; + return 0; } } if(frame->headers.cat == NGHTTP2_HCAT_HEADERS) { if(downstream->get_expect_final_response()) { - rv = on_response_headers(http2session, downstream, session, frame); if(rv != 0) { - return rv; + return 0; } } else if((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == 0) { http2session->submit_rst_stream(frame->hd.stream_id, @@ -1102,6 +1103,9 @@ int on_frame_recv_callback } if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { + + downstream->disable_downstream_rtimer(); + if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) { downstream->set_response_state(Downstream::MSG_COMPLETE); @@ -1113,7 +1117,13 @@ int on_frame_recv_callback downstream->set_response_state(Downstream::MSG_RESET); } } + } else { + downstream->reset_downstream_rtimer(); } + + // This may delete downstream + call_downstream_readcb(http2session, downstream); + break; } case NGHTTP2_RST_STREAM: { @@ -1214,6 +1224,8 @@ int on_data_chunk_recv_callback(nghttp2_session *session, return 0; } + downstream->reset_downstream_rtimer(); + downstream->add_response_bodylen(len); auto upstream = downstream->get_upstream(); @@ -1240,6 +1252,31 @@ int on_frame_send_callback(nghttp2_session* session, const nghttp2_frame *frame, void *user_data) { auto http2session = static_cast(user_data); + + if(frame->hd.type == NGHTTP2_DATA || frame->hd.type == NGHTTP2_HEADERS) { + if((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == 0) { + return 0; + } + + auto sd = static_cast + (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)); + + if(!sd || !sd->dconn) { + return 0; + } + + auto downstream = sd->dconn->get_downstream(); + + if(!downstream || + downstream->get_downstream_stream_id() != frame->hd.stream_id) { + return 0; + } + + downstream->reset_downstream_rtimer(); + + return 0; + } + if(frame->hd.type == NGHTTP2_SETTINGS && (frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) { if(http2session->start_settings_timer() != 0) { diff --git a/src/shrpx_http2_upstream.cc b/src/shrpx_http2_upstream.cc index d639a3a8..8706a050 100644 --- a/src/shrpx_http2_upstream.cc +++ b/src/shrpx_http2_upstream.cc @@ -216,6 +216,7 @@ int on_header_callback(nghttp2_session *session, if(!downstream) { return 0; } + if(downstream->get_request_headers_sum() > Downstream::MAX_HEADERS_SUM) { if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { return 0; @@ -273,6 +274,8 @@ int on_begin_headers_callback(nghttp2_session *session, 0); upstream->add_downstream(downstream); + downstream->init_upstream_timer(); + downstream->reset_upstream_rtimer(); downstream->init_response_body_buf(); // Although, we deprecated minor version from HTTP/2, we supply @@ -387,6 +390,8 @@ int on_request_headers(Http2Upstream *upstream, } downstream->set_request_state(Downstream::HEADER_COMPLETE); if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { + downstream->disable_upstream_rtimer(); + downstream->set_request_state(Downstream::MSG_COMPLETE); } @@ -407,15 +412,18 @@ int on_frame_recv_callback switch(frame->hd.type) { case NGHTTP2_DATA: { + auto downstream = upstream->find_downstream(frame->hd.stream_id); + if(!downstream) { + return 0; + } + if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { - auto downstream = upstream->find_downstream(frame->hd.stream_id); - if(!downstream) { - return 0; - } + downstream->disable_upstream_rtimer(); downstream->end_upload_data(); downstream->set_request_state(Downstream::MSG_COMPLETE); } + break; } case NGHTTP2_HEADERS: { @@ -425,10 +433,14 @@ int on_frame_recv_callback } if(frame->headers.cat == NGHTTP2_HCAT_REQUEST) { + downstream->reset_upstream_rtimer(); + return on_request_headers(upstream, downstream, session, frame); } if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { + downstream->disable_upstream_rtimer(); + downstream->end_upload_data(); downstream->set_request_state(Downstream::MSG_COMPLETE); } else { @@ -496,6 +508,8 @@ int on_data_chunk_recv_callback(nghttp2_session *session, return 0; } + downstream->reset_upstream_rtimer(); + if(downstream->push_upload_data_chunk(data, len) != 0) { upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); @@ -1033,6 +1047,7 @@ ssize_t downstream_data_read_callback(nghttp2_session *session, if(nread == 0 && downstream->get_response_state() == Downstream::MSG_COMPLETE) { + if(!downstream->get_upgraded()) { *data_flags |= NGHTTP2_DATA_FLAG_EOF; @@ -1053,6 +1068,12 @@ ssize_t downstream_data_read_callback(nghttp2_session *session, } } + if(evbuffer_get_length(body) > 0) { + downstream->reset_upstream_wtimer(); + } else { + downstream->disable_upstream_wtimer(); + } + if(nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) { if(downstream->resume_read(SHRPX_NO_BUFFER) != 0) { return NGHTTP2_ERR_CALLBACK_FAILURE; @@ -1260,11 +1281,15 @@ int Http2Upstream::on_downstream_body(Downstream *downstream, if(flush) { nghttp2_session_resume_data(session_, downstream->get_stream_id()); + + downstream->ensure_upstream_wtimer(); } if(evbuffer_get_length(body) >= INBUF_MAX_THRES) { if(!flush) { nghttp2_session_resume_data(session_, downstream->get_stream_id()); + + downstream->ensure_upstream_wtimer(); } downstream->pause_read(SHRPX_NO_BUFFER); @@ -1281,6 +1306,8 @@ int Http2Upstream::on_downstream_body_complete(Downstream *downstream) DLOG(INFO, downstream) << "HTTP response completed"; } nghttp2_session_resume_data(session_, downstream->get_stream_id()); + downstream->ensure_upstream_wtimer(); + return 0; } @@ -1351,4 +1378,16 @@ void Http2Upstream::log_response_headers << ss.str(); } +int Http2Upstream::on_timeout(Downstream *downstream) +{ + if(LOG_ENABLED(INFO)) { + ULOG(INFO, this) << "Stream timeout stream_id=" + << downstream->get_stream_id(); + } + + rst_stream(downstream, NGHTTP2_NO_ERROR); + + return 0; +} + } // namespace shrpx diff --git a/src/shrpx_http2_upstream.h b/src/shrpx_http2_upstream.h index 34b0a3bb..44b95bf5 100644 --- a/src/shrpx_http2_upstream.h +++ b/src/shrpx_http2_upstream.h @@ -46,6 +46,7 @@ public: virtual int on_read(); virtual int on_write(); virtual int on_event(); + virtual int on_timeout(Downstream *downstream); virtual int on_downstream_abort_request(Downstream *downstream, unsigned int status_code); int send(); diff --git a/src/shrpx_spdy_upstream.cc b/src/shrpx_spdy_upstream.cc index 84302a05..5b8cbfa4 100644 --- a/src/shrpx_spdy_upstream.cc +++ b/src/shrpx_spdy_upstream.cc @@ -157,6 +157,8 @@ void on_ctrl_recv_callback frame->syn_stream.stream_id, frame->syn_stream.pri); upstream->add_downstream(downstream); + downstream->init_upstream_timer(); + downstream->reset_upstream_rtimer(); downstream->init_response_body_buf(); auto nv = frame->syn_stream.nv; @@ -243,6 +245,7 @@ void on_ctrl_recv_callback } downstream->set_request_state(Downstream::HEADER_COMPLETE); if(frame->syn_stream.hd.flags & SPDYLAY_CTRL_FLAG_FIN) { + downstream->disable_upstream_rtimer(); downstream->set_request_state(Downstream::MSG_COMPLETE); } break; @@ -267,6 +270,8 @@ void on_data_chunk_recv_callback(spdylay_session *session, return; } + downstream->reset_upstream_rtimer(); + if(downstream->push_upload_data_chunk(data, len) != 0) { upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); upstream->handle_ign_data_chunk(len); @@ -317,6 +322,7 @@ void on_data_recv_callback(spdylay_session *session, uint8_t flags, auto upstream = static_cast(user_data); auto downstream = upstream->find_downstream(stream_id); if(downstream && (flags & SPDYLAY_DATA_FLAG_FIN)) { + downstream->disable_upstream_rtimer(); downstream->end_upload_data(); downstream->set_request_state(Downstream::MSG_COMPLETE); } @@ -795,6 +801,12 @@ ssize_t spdy_data_read_callback(spdylay_session *session, } } + if(evbuffer_get_length(body) > 0) { + downstream->reset_upstream_wtimer(); + } else { + downstream->disable_upstream_wtimer(); + } + if(nread == 0 && *eof != 1) { if(downstream->resume_read(SHRPX_NO_BUFFER) != 0) { return SPDYLAY_ERR_CALLBACK_FAILURE; @@ -993,11 +1005,15 @@ int SpdyUpstream::on_downstream_body(Downstream *downstream, if(flush) { spdylay_session_resume_data(session_, downstream->get_stream_id()); + + downstream->ensure_upstream_wtimer(); } if(evbuffer_get_length(body) >= INBUF_MAX_THRES) { if(!flush) { spdylay_session_resume_data(session_, downstream->get_stream_id()); + + downstream->ensure_upstream_wtimer(); } downstream->pause_read(SHRPX_NO_BUFFER); @@ -1013,7 +1029,10 @@ int SpdyUpstream::on_downstream_body_complete(Downstream *downstream) if(LOG_ENABLED(INFO)) { DLOG(INFO, downstream) << "HTTP response completed"; } + spdylay_session_resume_data(session_, downstream->get_stream_id()); + downstream->ensure_upstream_wtimer(); + return 0; } @@ -1094,4 +1113,16 @@ int SpdyUpstream::handle_ign_data_chunk(size_t len) return 0; } +int SpdyUpstream::on_timeout(Downstream *downstream) +{ + if(LOG_ENABLED(INFO)) { + ULOG(INFO, this) << "Stream timeout stream_id=" + << downstream->get_stream_id(); + } + + rst_stream(downstream, SPDYLAY_INTERNAL_ERROR); + + return 0; +} + } // namespace shrpx diff --git a/src/shrpx_spdy_upstream.h b/src/shrpx_spdy_upstream.h index e14545d7..c2155f8f 100644 --- a/src/shrpx_spdy_upstream.h +++ b/src/shrpx_spdy_upstream.h @@ -44,6 +44,7 @@ public: virtual int on_read(); virtual int on_write(); virtual int on_event(); + virtual int on_timeout(Downstream *downstream); virtual int on_downstream_abort_request(Downstream *downstream, unsigned int status_code); int send(); diff --git a/src/shrpx_upstream.h b/src/shrpx_upstream.h index d48b14b2..ae8793ab 100644 --- a/src/shrpx_upstream.h +++ b/src/shrpx_upstream.h @@ -42,6 +42,7 @@ public: virtual int on_read() = 0; virtual int on_write() = 0; virtual int on_event() = 0; + virtual int on_timeout(Downstream *downstream) { return 0; }; virtual int on_downstream_abort_request(Downstream *downstream, unsigned int status_code) = 0; virtual bufferevent_data_cb get_downstream_readcb() = 0;