nghttpx: Add stream level timeout for HTTP/2 and SPDY upstream/downstream

This commit is contained in:
Tatsuhiro Tsujikawa 2014-08-09 18:47:45 +09:00
parent 4679188069
commit 76703f79fa
14 changed files with 466 additions and 13 deletions

View File

@ -493,6 +493,12 @@ void fill_default_config()
mod_config()->downstream_write_timeout.tv_sec = 60;
mod_config()->downstream_write_timeout.tv_usec = 0;
// Read timeout for HTTP/2 stream
mod_config()->stream_read_timeout = {30, 0};
// Write timeout for HTTP/2 stream
mod_config()->stream_write_timeout = {30, 0};
// Timeout for pooled (idle) connections
mod_config()->downstream_idle_read_timeout.tv_sec = 60;
@ -680,6 +686,15 @@ Timeout:
connections.
Default: )"
<< get_config()->upstream_write_timeout.tv_sec << R"(
--stream-read-timeout=<SEC>
Specify read timeout for HTTP/2 and SPDY streams.
Default: )"
<< get_config()->stream_read_timeout.tv_sec << R"(
--stream-write-timeout=<SEC>
Specify write timeout for HTTP/2 and SPDY
streams.
Default: )"
<< get_config()->stream_write_timeout.tv_sec << R"(
--backend-read-timeout=<SEC>
Specify read timeout for backend connection.
Default: )"
@ -982,6 +997,8 @@ int main(int argc, char **argv)
{"accesslog-syslog", no_argument, &flag, 57},
{"errorlog-file", required_argument, &flag, 58},
{"errorlog-syslog", no_argument, &flag, 59},
{"stream-read-timeout", required_argument, &flag, 60},
{"stream-write-timeout", required_argument, &flag, 61},
{nullptr, 0, nullptr, 0 }
};
@ -1242,6 +1259,14 @@ int main(int argc, char **argv)
// --errorlog-syslog
cmdcfgs.emplace_back(SHRPX_OPT_ERRORLOG_SYSLOG, "yes");
break;
case 60:
// --stream-read-timeout
cmdcfgs.emplace_back(SHRPX_OPT_STREAM_READ_TIMEOUT, optarg);
break;
case 61:
// --stream-write-timeout
cmdcfgs.emplace_back(SHRPX_OPT_STREAM_WRITE_TIMEOUT, optarg);
break;
default:
break;
}

View File

@ -74,6 +74,8 @@ const char SHRPX_OPT_FRONTEND_READ_TIMEOUT[] = "frontend-read-timeout";
const char SHRPX_OPT_FRONTEND_WRITE_TIMEOUT[] = "frontend-write-timeout";
const char SHRPX_OPT_BACKEND_READ_TIMEOUT[] = "backend-read-timeout";
const char SHRPX_OPT_BACKEND_WRITE_TIMEOUT[] = "backend-write-timeout";
const char SHRPX_OPT_STREAM_READ_TIMEOUT[] = "stream-read-timeout";
const char SHRPX_OPT_STREAM_WRITE_TIMEOUT[] = "stream-write-timeout";
const char SHRPX_OPT_ACCESSLOG_FILE[] = "accesslog-file";
const char SHRPX_OPT_ACCESSLOG_SYSLOG[] = "accesslog-syslog";
const char SHRPX_OPT_ERRORLOG_FILE[] = "errorlog-file";
@ -403,6 +405,20 @@ int parse_config(const char *opt, const char *optarg)
return 0;
}
if(util::strieq(opt, SHRPX_OPT_STREAM_READ_TIMEOUT)) {
timeval tv = {strtol(optarg, nullptr, 10), 0};
mod_config()->stream_read_timeout = tv;
return 0;
}
if(util::strieq(opt, SHRPX_OPT_STREAM_WRITE_TIMEOUT)) {
timeval tv = {strtol(optarg, nullptr, 10), 0};
mod_config()->stream_write_timeout = tv;
return 0;
}
if(util::strieq(opt, SHRPX_OPT_ACCESSLOG_FILE)) {
mod_config()->accesslog_file = strcopy(optarg);

View File

@ -71,6 +71,8 @@ extern const char SHRPX_OPT_FRONTEND_READ_TIMEOUT[];
extern const char SHRPX_OPT_FRONTEND_WRITE_TIMEOUT[];
extern const char SHRPX_OPT_BACKEND_READ_TIMEOUT[];
extern const char SHRPX_OPT_BACKEND_WRITE_TIMEOUT[];
extern const char SHRPX_OPT_STREAM_READ_TIMEOUT[];
extern const char SHRPX_OPT_STREAM_WRITE_TIMEOUT[];
extern const char SHRPX_OPT_ACCESSLOG_FILE[];
extern const char SHRPX_OPT_ACCESSLOG_SYSLOG[];
extern const char SHRPX_OPT_ERRORLOG_FILE[];
@ -162,6 +164,8 @@ struct Config {
timeval upstream_write_timeout;
timeval downstream_read_timeout;
timeval downstream_write_timeout;
timeval stream_read_timeout;
timeval stream_write_timeout;
timeval downstream_idle_read_timeout;
std::unique_ptr<char[]> host;
std::unique_ptr<char[]> private_key_file;

View File

@ -44,6 +44,10 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
upstream_(upstream),
dconn_(nullptr),
response_body_buf_(nullptr),
upstream_rtimerev_(nullptr),
upstream_wtimerev_(nullptr),
downstream_rtimerev_(nullptr),
downstream_wtimerev_(nullptr),
request_headers_sum_(0),
response_headers_sum_(0),
request_datalen_(0),
@ -82,6 +86,18 @@ Downstream::~Downstream()
// Passing NULL to evbuffer_free() causes segmentation fault.
evbuffer_free(response_body_buf_);
}
if(upstream_rtimerev_) {
event_free(upstream_rtimerev_);
}
if(upstream_wtimerev_) {
event_free(upstream_wtimerev_);
}
if(downstream_rtimerev_) {
event_free(downstream_rtimerev_);
}
if(downstream_wtimerev_) {
event_free(downstream_wtimerev_);
}
if(dconn_) {
delete dconn_;
}
@ -880,4 +896,213 @@ bool Downstream::response_pseudo_header_allowed() const
return pseudo_header_allowed(response_headers_);
}
namespace {
void upstream_timeoutcb(evutil_socket_t fd, short event, void *arg)
{
auto downstream = static_cast<Downstream*>(arg);
auto upstream = downstream->get_upstream();
auto which = event == EV_READ ? "read" : "write";
if(LOG_ENABLED(INFO)) {
DLOG(INFO, downstream) << "upstream timeout stream_id="
<< downstream->get_stream_id()
<< " event=" << which;
}
downstream->disable_upstream_rtimer();
downstream->disable_upstream_wtimer();
upstream->on_timeout(downstream);
}
} // namespace
namespace {
void upstream_rtimeoutcb(evutil_socket_t fd, short event, void *arg)
{
upstream_timeoutcb(fd, EV_READ, arg);
}
} // namespace
namespace {
void upstream_wtimeoutcb(evutil_socket_t fd, short event, void *arg)
{
upstream_timeoutcb(fd, EV_WRITE, arg);
}
} // namespace
namespace {
event* init_timer(event_base *evbase, event_callback_fn cb, void *arg)
{
auto timerev = evtimer_new(evbase, cb, arg);
if(timerev == nullptr) {
LOG(WARNING) << "timer initialization failed";
return nullptr;
}
return timerev;
}
} // namespace
void Downstream::init_upstream_timer()
{
auto evbase = upstream_->get_client_handler()->get_evbase();
upstream_rtimerev_ = init_timer(evbase, upstream_rtimeoutcb, this);
upstream_wtimerev_ = init_timer(evbase, upstream_wtimeoutcb, this);
}
namespace {
void reset_timer(event *timer, const timeval *timeout)
{
if(!timer) {
return;
}
event_add(timer, timeout);
}
} // namespace
namespace {
void try_reset_timer(event *timer, const timeval *timeout)
{
if(!timer) {
return;
}
if(!evtimer_pending(timer, nullptr)) {
return;
}
event_add(timer, timeout);
}
} // namespace
namespace {
void ensure_timer(event *timer, const timeval *timeout)
{
if(!timer) {
return;
}
if(evtimer_pending(timer, nullptr)) {
return;
}
event_add(timer, timeout);
}
} // namespace
namespace {
void disable_timer(event *timer)
{
if(!timer) {
return;
}
event_del(timer);
}
} // namespace
void Downstream::reset_upstream_rtimer()
{
reset_timer(upstream_rtimerev_, &get_config()->stream_read_timeout);
try_reset_timer(upstream_wtimerev_, &get_config()->stream_write_timeout);
}
void Downstream::reset_upstream_wtimer()
{
reset_timer(upstream_wtimerev_, &get_config()->stream_write_timeout);
try_reset_timer(upstream_rtimerev_, &get_config()->stream_read_timeout);
}
void Downstream::ensure_upstream_wtimer()
{
ensure_timer(upstream_wtimerev_, &get_config()->stream_write_timeout);
}
void Downstream::disable_upstream_rtimer()
{
disable_timer(upstream_rtimerev_);
}
void Downstream::disable_upstream_wtimer()
{
disable_timer(upstream_wtimerev_);
}
namespace {
void downstream_timeoutcb(evutil_socket_t fd, short event, void *arg)
{
auto downstream = static_cast<Downstream*>(arg);
auto which = event == EV_READ ? "read" : "write";
if(LOG_ENABLED(INFO)) {
DLOG(INFO, downstream) << "downstream timeout stream_id="
<< downstream->get_downstream_stream_id()
<< " event=" << which;
}
downstream->disable_downstream_rtimer();
downstream->disable_downstream_wtimer();
auto dconn = downstream->get_downstream_connection();
if(dconn) {
dconn->on_timeout();
}
}
} // namespace
namespace {
void downstream_rtimeoutcb(evutil_socket_t fd, short event, void *arg)
{
downstream_timeoutcb(fd, EV_READ, arg);
}
} // namespace
namespace {
void downstream_wtimeoutcb(evutil_socket_t fd, short event, void *arg)
{
downstream_timeoutcb(fd, EV_WRITE, arg);
}
} // namespace
void Downstream::init_downstream_timer()
{
auto evbase = upstream_->get_client_handler()->get_evbase();
downstream_rtimerev_ = init_timer(evbase, downstream_rtimeoutcb, this);
downstream_wtimerev_ = init_timer(evbase, downstream_wtimeoutcb, this);
}
void Downstream::reset_downstream_rtimer()
{
reset_timer(downstream_rtimerev_, &get_config()->stream_read_timeout);
try_reset_timer(downstream_wtimerev_, &get_config()->stream_write_timeout);
}
void Downstream::reset_downstream_wtimer()
{
reset_timer(downstream_wtimerev_, &get_config()->stream_write_timeout);
try_reset_timer(downstream_rtimerev_, &get_config()->stream_read_timeout);
}
void Downstream::ensure_downstream_wtimer()
{
ensure_timer(downstream_wtimerev_, &get_config()->stream_write_timeout);
}
void Downstream::disable_downstream_rtimer()
{
disable_timer(downstream_rtimerev_);
}
void Downstream::disable_downstream_wtimer()
{
disable_timer(downstream_wtimerev_);
}
} // namespace shrpx

View File

@ -232,6 +232,33 @@ public:
bool get_rst_stream_after_end_stream() const;
void set_rst_stream_after_end_stream(bool f);
// Initializes upstream timers, but they are not pending.
void init_upstream_timer();
// Makes upstream read timer pending. If it is already pending,
// timeout value is reset. This function also resets write timer if
// it is already pending.
void reset_upstream_rtimer();
// Makes upstream write timer pending. If it is already pending,
// timeout value is reset. This function also resets read timer if
// it is already pending.
void reset_upstream_wtimer();
// Makes upstream write timer pending. If it is already pending, do
// nothing.
void ensure_upstream_wtimer();
// Disables upstream read timer.
void disable_upstream_rtimer();
// Disables upstream write timer.
void disable_upstream_wtimer();
// Downstream timer functions. They works in a similar way just
// like the upstream timer function.
void init_downstream_timer();
void reset_downstream_rtimer();
void reset_downstream_wtimer();
void ensure_downstream_wtimer();
void disable_downstream_rtimer();
void disable_downstream_wtimer();
private:
Headers request_headers_;
Headers response_headers_;
@ -255,6 +282,12 @@ private:
// body. nghttp2 library reads data from this in the callback.
evbuffer *response_body_buf_;
event *upstream_rtimerev_;
event *upstream_wtimerev_;
event *downstream_rtimerev_;
event *downstream_wtimerev_;
size_t request_headers_sum_;
size_t response_headers_sum_;

View File

@ -54,6 +54,7 @@ public:
virtual int on_read() = 0;
virtual int on_write() = 0;
virtual int on_timeout() { return 0; }
virtual void on_upstream_change(Upstream *uptream) = 0;
virtual int on_priority_change(int32_t pri) = 0;

View File

@ -64,6 +64,9 @@ Http2DownstreamConnection::~Http2DownstreamConnection()
evbuffer_free(request_body_buf_);
}
if(downstream_) {
downstream_->disable_downstream_rtimer();
downstream_->disable_downstream_wtimer();
if(submit_rst_stream(downstream_) == 0) {
http2session_->notify();
}
@ -120,6 +123,9 @@ int Http2DownstreamConnection::attach_downstream(Downstream *downstream)
}
downstream->set_downstream_connection(this);
downstream_ = downstream;
downstream_->init_downstream_timer();
return 0;
}
@ -142,12 +148,15 @@ void Http2DownstreamConnection::detach_downstream(Downstream *downstream)
}
downstream->set_downstream_connection(nullptr);
downstream->disable_downstream_rtimer();
downstream->disable_downstream_wtimer();
downstream_ = nullptr;
client_handler_->pool_downstream_connection(this);
}
int Http2DownstreamConnection::submit_rst_stream(Downstream *downstream)
int Http2DownstreamConnection::submit_rst_stream(Downstream *downstream,
nghttp2_error_code error_code)
{
int rv = -1;
if(http2session_->get_state() == Http2Session::CONNECTED &&
@ -163,8 +172,7 @@ int Http2DownstreamConnection::submit_rst_stream(Downstream *downstream)
<< downstream->get_downstream_stream_id();
}
rv = http2session_->submit_rst_stream
(downstream->get_downstream_stream_id(),
NGHTTP2_INTERNAL_ERROR);
(downstream->get_downstream_stream_id(), error_code);
}
}
return rv;
@ -205,6 +213,8 @@ ssize_t http2_data_read_callback(nghttp2_session *session,
!downstream->get_upgraded())) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
} else {
downstream->disable_downstream_wtimer();
return NGHTTP2_ERR_DEFERRED;
}
break;
@ -227,6 +237,9 @@ ssize_t http2_data_read_callback(nghttp2_session *session,
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
break;
}
downstream->disable_downstream_wtimer();
return NGHTTP2_ERR_DEFERRED;
}
}
@ -242,6 +255,13 @@ ssize_t http2_data_read_callback(nghttp2_session *session,
break;
}
}
if(evbuffer_get_length(body) > 0 && !downstream->get_output_buffer_full()) {
downstream->reset_downstream_wtimer();
} else {
downstream->disable_downstream_wtimer();
}
return nread;
}
} // namespace
@ -455,6 +475,7 @@ int Http2DownstreamConnection::push_request_headers()
}
downstream_->clear_request_headers();
downstream_->reset_downstream_wtimer();
http2session_->notify();
return 0;
@ -473,6 +494,9 @@ int Http2DownstreamConnection::push_upload_data_chunk(const uint8_t *data,
if(rv != 0) {
return -1;
}
downstream_->ensure_downstream_wtimer();
http2session_->notify();
}
return 0;
@ -486,6 +510,9 @@ int Http2DownstreamConnection::end_upload_data()
if(rv != 0) {
return -1;
}
downstream_->ensure_downstream_wtimer();
http2session_->notify();
}
return 0;
@ -586,4 +613,13 @@ int Http2DownstreamConnection::on_priority_change(int32_t pri)
return 0;
}
int Http2DownstreamConnection::on_timeout()
{
if(!downstream_) {
return 0;
}
return submit_rst_stream(downstream_, NGHTTP2_NO_ERROR);
}
} // namespace shrpx

View File

@ -57,6 +57,7 @@ public:
virtual int on_read();
virtual int on_write();
virtual int on_timeout();
virtual void on_upstream_change(Upstream *upstream) {}
virtual int on_priority_change(int32_t pri);
@ -69,7 +70,9 @@ public:
void attach_stream_data(StreamData *sd);
StreamData* detach_stream_data();
int submit_rst_stream(Downstream *downstream);
int submit_rst_stream
(Downstream *downstream,
nghttp2_error_code error_code = NGHTTP2_INTERNAL_ERROR);
private:
Http2Session *http2session_;
evbuffer *request_body_buf_;

View File

@ -999,7 +999,7 @@ int on_response_headers(Http2Session *http2session,
if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream) != 0) {
// If resume_read fails, just drop connection. Not ideal.
delete upstream->get_client_handler();
return 0;
return -1;
}
downstream->set_request_state(Downstream::HEADER_COMPLETE);
if(LOG_ENABLED(INFO)) {
@ -1017,7 +1017,7 @@ int on_response_headers(Http2Session *http2session,
NGHTTP2_PROTOCOL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
}
call_downstream_readcb(http2session, downstream);
return 0;
}
} // namespace
@ -1051,6 +1051,8 @@ int on_frame_recv_callback
} else if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
downstream->disable_downstream_rtimer();
if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
downstream->set_response_state(Downstream::MSG_COMPLETE);
@ -1082,17 +1084,16 @@ int on_frame_recv_callback
rv = on_response_headers(http2session, downstream, session, frame);
if(rv != 0) {
return rv;
return 0;
}
}
if(frame->headers.cat == NGHTTP2_HCAT_HEADERS) {
if(downstream->get_expect_final_response()) {
rv = on_response_headers(http2session, downstream, session, frame);
if(rv != 0) {
return rv;
return 0;
}
} else if((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == 0) {
http2session->submit_rst_stream(frame->hd.stream_id,
@ -1102,6 +1103,9 @@ int on_frame_recv_callback
}
if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
downstream->disable_downstream_rtimer();
if(downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
downstream->set_response_state(Downstream::MSG_COMPLETE);
@ -1113,7 +1117,13 @@ int on_frame_recv_callback
downstream->set_response_state(Downstream::MSG_RESET);
}
}
} else {
downstream->reset_downstream_rtimer();
}
// This may delete downstream
call_downstream_readcb(http2session, downstream);
break;
}
case NGHTTP2_RST_STREAM: {
@ -1214,6 +1224,8 @@ int on_data_chunk_recv_callback(nghttp2_session *session,
return 0;
}
downstream->reset_downstream_rtimer();
downstream->add_response_bodylen(len);
auto upstream = downstream->get_upstream();
@ -1240,6 +1252,31 @@ int on_frame_send_callback(nghttp2_session* session,
const nghttp2_frame *frame, void *user_data)
{
auto http2session = static_cast<Http2Session*>(user_data);
if(frame->hd.type == NGHTTP2_DATA || frame->hd.type == NGHTTP2_HEADERS) {
if((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == 0) {
return 0;
}
auto sd = static_cast<StreamData*>
(nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if(!sd || !sd->dconn) {
return 0;
}
auto downstream = sd->dconn->get_downstream();
if(!downstream ||
downstream->get_downstream_stream_id() != frame->hd.stream_id) {
return 0;
}
downstream->reset_downstream_rtimer();
return 0;
}
if(frame->hd.type == NGHTTP2_SETTINGS &&
(frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
if(http2session->start_settings_timer() != 0) {

View File

@ -216,6 +216,7 @@ int on_header_callback(nghttp2_session *session,
if(!downstream) {
return 0;
}
if(downstream->get_request_headers_sum() > Downstream::MAX_HEADERS_SUM) {
if(downstream->get_response_state() == Downstream::MSG_COMPLETE) {
return 0;
@ -273,6 +274,8 @@ int on_begin_headers_callback(nghttp2_session *session,
0);
upstream->add_downstream(downstream);
downstream->init_upstream_timer();
downstream->reset_upstream_rtimer();
downstream->init_response_body_buf();
// Although, we deprecated minor version from HTTP/2, we supply
@ -387,6 +390,8 @@ int on_request_headers(Http2Upstream *upstream,
}
downstream->set_request_state(Downstream::HEADER_COMPLETE);
if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
downstream->disable_upstream_rtimer();
downstream->set_request_state(Downstream::MSG_COMPLETE);
}
@ -407,15 +412,18 @@ int on_frame_recv_callback
switch(frame->hd.type) {
case NGHTTP2_DATA: {
auto downstream = upstream->find_downstream(frame->hd.stream_id);
if(!downstream) {
return 0;
}
if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
auto downstream = upstream->find_downstream(frame->hd.stream_id);
if(!downstream) {
return 0;
}
downstream->disable_upstream_rtimer();
downstream->end_upload_data();
downstream->set_request_state(Downstream::MSG_COMPLETE);
}
break;
}
case NGHTTP2_HEADERS: {
@ -425,10 +433,14 @@ int on_frame_recv_callback
}
if(frame->headers.cat == NGHTTP2_HCAT_REQUEST) {
downstream->reset_upstream_rtimer();
return on_request_headers(upstream, downstream, session, frame);
}
if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
downstream->disable_upstream_rtimer();
downstream->end_upload_data();
downstream->set_request_state(Downstream::MSG_COMPLETE);
} else {
@ -496,6 +508,8 @@ int on_data_chunk_recv_callback(nghttp2_session *session,
return 0;
}
downstream->reset_upstream_rtimer();
if(downstream->push_upload_data_chunk(data, len) != 0) {
upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
@ -1033,6 +1047,7 @@ ssize_t downstream_data_read_callback(nghttp2_session *session,
if(nread == 0 &&
downstream->get_response_state() == Downstream::MSG_COMPLETE) {
if(!downstream->get_upgraded()) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
@ -1053,6 +1068,12 @@ ssize_t downstream_data_read_callback(nghttp2_session *session,
}
}
if(evbuffer_get_length(body) > 0) {
downstream->reset_upstream_wtimer();
} else {
downstream->disable_upstream_wtimer();
}
if(nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) {
if(downstream->resume_read(SHRPX_NO_BUFFER) != 0) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
@ -1260,11 +1281,15 @@ int Http2Upstream::on_downstream_body(Downstream *downstream,
if(flush) {
nghttp2_session_resume_data(session_, downstream->get_stream_id());
downstream->ensure_upstream_wtimer();
}
if(evbuffer_get_length(body) >= INBUF_MAX_THRES) {
if(!flush) {
nghttp2_session_resume_data(session_, downstream->get_stream_id());
downstream->ensure_upstream_wtimer();
}
downstream->pause_read(SHRPX_NO_BUFFER);
@ -1281,6 +1306,8 @@ int Http2Upstream::on_downstream_body_complete(Downstream *downstream)
DLOG(INFO, downstream) << "HTTP response completed";
}
nghttp2_session_resume_data(session_, downstream->get_stream_id());
downstream->ensure_upstream_wtimer();
return 0;
}
@ -1351,4 +1378,16 @@ void Http2Upstream::log_response_headers
<< ss.str();
}
int Http2Upstream::on_timeout(Downstream *downstream)
{
if(LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Stream timeout stream_id="
<< downstream->get_stream_id();
}
rst_stream(downstream, NGHTTP2_NO_ERROR);
return 0;
}
} // namespace shrpx

View File

@ -46,6 +46,7 @@ public:
virtual int on_read();
virtual int on_write();
virtual int on_event();
virtual int on_timeout(Downstream *downstream);
virtual int on_downstream_abort_request(Downstream *downstream,
unsigned int status_code);
int send();

View File

@ -157,6 +157,8 @@ void on_ctrl_recv_callback
frame->syn_stream.stream_id,
frame->syn_stream.pri);
upstream->add_downstream(downstream);
downstream->init_upstream_timer();
downstream->reset_upstream_rtimer();
downstream->init_response_body_buf();
auto nv = frame->syn_stream.nv;
@ -243,6 +245,7 @@ void on_ctrl_recv_callback
}
downstream->set_request_state(Downstream::HEADER_COMPLETE);
if(frame->syn_stream.hd.flags & SPDYLAY_CTRL_FLAG_FIN) {
downstream->disable_upstream_rtimer();
downstream->set_request_state(Downstream::MSG_COMPLETE);
}
break;
@ -267,6 +270,8 @@ void on_data_chunk_recv_callback(spdylay_session *session,
return;
}
downstream->reset_upstream_rtimer();
if(downstream->push_upload_data_chunk(data, len) != 0) {
upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
upstream->handle_ign_data_chunk(len);
@ -317,6 +322,7 @@ void on_data_recv_callback(spdylay_session *session, uint8_t flags,
auto upstream = static_cast<SpdyUpstream*>(user_data);
auto downstream = upstream->find_downstream(stream_id);
if(downstream && (flags & SPDYLAY_DATA_FLAG_FIN)) {
downstream->disable_upstream_rtimer();
downstream->end_upload_data();
downstream->set_request_state(Downstream::MSG_COMPLETE);
}
@ -795,6 +801,12 @@ ssize_t spdy_data_read_callback(spdylay_session *session,
}
}
if(evbuffer_get_length(body) > 0) {
downstream->reset_upstream_wtimer();
} else {
downstream->disable_upstream_wtimer();
}
if(nread == 0 && *eof != 1) {
if(downstream->resume_read(SHRPX_NO_BUFFER) != 0) {
return SPDYLAY_ERR_CALLBACK_FAILURE;
@ -993,11 +1005,15 @@ int SpdyUpstream::on_downstream_body(Downstream *downstream,
if(flush) {
spdylay_session_resume_data(session_, downstream->get_stream_id());
downstream->ensure_upstream_wtimer();
}
if(evbuffer_get_length(body) >= INBUF_MAX_THRES) {
if(!flush) {
spdylay_session_resume_data(session_, downstream->get_stream_id());
downstream->ensure_upstream_wtimer();
}
downstream->pause_read(SHRPX_NO_BUFFER);
@ -1013,7 +1029,10 @@ int SpdyUpstream::on_downstream_body_complete(Downstream *downstream)
if(LOG_ENABLED(INFO)) {
DLOG(INFO, downstream) << "HTTP response completed";
}
spdylay_session_resume_data(session_, downstream->get_stream_id());
downstream->ensure_upstream_wtimer();
return 0;
}
@ -1094,4 +1113,16 @@ int SpdyUpstream::handle_ign_data_chunk(size_t len)
return 0;
}
int SpdyUpstream::on_timeout(Downstream *downstream)
{
if(LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Stream timeout stream_id="
<< downstream->get_stream_id();
}
rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
return 0;
}
} // namespace shrpx

View File

@ -44,6 +44,7 @@ public:
virtual int on_read();
virtual int on_write();
virtual int on_event();
virtual int on_timeout(Downstream *downstream);
virtual int on_downstream_abort_request(Downstream *downstream,
unsigned int status_code);
int send();

View File

@ -42,6 +42,7 @@ public:
virtual int on_read() = 0;
virtual int on_write() = 0;
virtual int on_event() = 0;
virtual int on_timeout(Downstream *downstream) { return 0; };
virtual int on_downstream_abort_request(Downstream *downstream,
unsigned int status_code) = 0;
virtual bufferevent_data_cb get_downstream_readcb() = 0;