nghttpx: Fix possible flow control issue

Previously we only update consumed flow control window when number of
bytes read in nghttp2 and spdylay callback is 0.  Now we notify
nghttp2 library the consumed bytes even if number of bytes read > 0.
This change also uses newly added spdylay_session_consume() API, so we
require spdylay >= 1.3.0.
This commit is contained in:
Tatsuhiro Tsujikawa 2014-08-21 21:22:16 +09:00
parent 69b9ce6b68
commit d5dcbf6f3b
16 changed files with 133 additions and 132 deletions

View File

@ -312,7 +312,7 @@ fi
# spdylay (for src/nghttpx and src/h2load)
have_spdylay=no
if test "x${request_spdylay}" != "xno"; then
PKG_CHECK_MODULES([LIBSPDYLAY], [libspdylay >= 1.2.3],
PKG_CHECK_MODULES([LIBSPDYLAY], [libspdylay >= 1.3.0],
[have_spdylay=yes], [have_spdylay=no])
if test "x${have_spdylay}" = "xyes"; then
AC_DEFINE([HAVE_SPDYLAY], [1], [Define to 1 if you have `spdylay` library.])

View File

@ -150,13 +150,13 @@ void Downstream::pause_read(IOCtrlReason reason)
}
}
int Downstream::resume_read(IOCtrlReason reason)
int Downstream::resume_read(IOCtrlReason reason, size_t consumed)
{
if(dconn_) {
return dconn_->resume_read(reason);
} else {
return 0;
return dconn_->resume_read(reason, consumed);
}
return 0;
}
void Downstream::force_resume_read()
@ -873,6 +873,12 @@ size_t Downstream::get_request_datalen() const
return request_datalen_;
}
void Downstream::dec_request_datalen(size_t len)
{
assert(request_datalen_ >= len);
request_datalen_ -= len;
}
void Downstream::reset_request_datalen()
{
request_datalen_ = 0;
@ -883,6 +889,12 @@ void Downstream::add_response_datalen(size_t len)
response_datalen_ += len;
}
void Downstream::dec_response_datalen(size_t len)
{
assert(response_datalen_ >= len);
response_datalen_ -= len;
}
size_t Downstream::get_response_datalen() const
{
return response_datalen_;

View File

@ -59,7 +59,7 @@ public:
void set_priority(int32_t pri);
int32_t get_priority() const;
void pause_read(IOCtrlReason reason);
int resume_read(IOCtrlReason reason);
int resume_read(IOCtrlReason reason, size_t consumed);
void force_resume_read();
// Set stream ID for downstream HTTP2 connection.
void set_downstream_stream_id(int32_t stream_id);
@ -151,6 +151,7 @@ public:
int push_upload_data_chunk(const uint8_t *data, size_t datalen);
int end_upload_data();
size_t get_request_datalen() const;
void dec_request_datalen(size_t len);
void reset_request_datalen();
bool request_pseudo_header_allowed() const;
bool expect_response_body() const;
@ -224,6 +225,7 @@ public:
void set_expect_final_response(bool f);
bool get_expect_final_response() const;
void add_response_datalen(size_t len);
void dec_response_datalen(size_t len);
size_t get_response_datalen() const;
void reset_response_datalen();
bool response_pseudo_header_allowed() const;

View File

@ -47,7 +47,7 @@ public:
virtual int end_upload_data() = 0;
virtual void pause_read(IOCtrlReason reason) = 0;
virtual int resume_read(IOCtrlReason reason) = 0;
virtual int resume_read(IOCtrlReason reason, size_t consumed) = 0;
virtual void force_resume_read() = 0;
virtual bool get_output_buffer_full() = 0;

View File

@ -203,11 +203,12 @@ ssize_t http2_data_read_callback(nghttp2_session *session,
DCLOG(FATAL, dconn) << "evbuffer_remove() failed";
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
if(nread == 0) {
if(nread > 0) {
// This is important because it will handle flow control
// stuff.
if(downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER,
downstream) != 0) {
if(downstream->get_upstream()->resume_read
(SHRPX_NO_BUFFER, downstream, nread) != 0) {
// In this case, downstream may be deleted.
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
@ -218,40 +219,32 @@ ssize_t http2_data_read_callback(nghttp2_session *session,
return NGHTTP2_ERR_DEFERRED;
}
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
if(!downstream->get_upgrade_request() ||
(downstream->get_response_state() == Downstream::HEADER_COMPLETE &&
!downstream->get_upgraded())) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
} else {
downstream->disable_downstream_wtimer();
break;
}
return NGHTTP2_ERR_DEFERRED;
}
break;
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
if(!downstream->get_upgrade_request() ||
(downstream->get_response_state() == Downstream::HEADER_COMPLETE &&
!downstream->get_upgraded())) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
} else {
if(evbuffer_get_length(body) == 0) {
// Check get_request_state() == MSG_COMPLETE just in case
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
break;
}
downstream->disable_downstream_wtimer();
downstream->disable_downstream_wtimer();
return NGHTTP2_ERR_DEFERRED;
}
}
} else {
// Send WINDOW_UPDATE before buffer is empty to avoid delay
// because of RTT.
if(!downstream->get_output_buffer_full() &&
downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER,
downstream) == -1) {
// In this case, downstream may be deleted.
return NGHTTP2_ERR_DEFERRED;
}
break;
} else {
if(evbuffer_get_length(body) == 0) {
// Check get_request_state() == MSG_COMPLETE just in case
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
break;
}
downstream->disable_downstream_wtimer();
return NGHTTP2_ERR_DEFERRED;
}
}
}
@ -517,7 +510,8 @@ int Http2DownstreamConnection::end_upload_data()
return 0;
}
int Http2DownstreamConnection::resume_read(IOCtrlReason reason)
int Http2DownstreamConnection::resume_read
(IOCtrlReason reason, size_t consumed)
{
int rv;
@ -530,17 +524,21 @@ int Http2DownstreamConnection::resume_read(IOCtrlReason reason)
return 0;
}
rv = http2session_->consume(downstream_->get_downstream_stream_id(),
downstream_->get_response_datalen());
if(consumed > 0) {
assert(downstream_->get_response_datalen() >= consumed);
if(rv != 0) {
return -1;
rv = http2session_->consume(downstream_->get_downstream_stream_id(),
consumed);
if(rv != 0) {
return -1;
}
downstream_->dec_response_datalen(consumed);
http2session_->notify();
}
downstream_->reset_response_datalen();
http2session_->notify();
return 0;
}

View File

@ -50,7 +50,7 @@ public:
virtual int end_upload_data();
virtual void pause_read(IOCtrlReason reason) {}
virtual int resume_read(IOCtrlReason reason);
virtual int resume_read(IOCtrlReason reason, size_t consumed);
virtual void force_resume_read() {}
virtual bool get_output_buffer_full();

View File

@ -1037,7 +1037,7 @@ int on_response_headers(Http2Session *http2session,
if(downstream->get_upgraded()) {
downstream->set_response_connection_close(true);
// On upgrade sucess, both ends can send data
if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream) != 0) {
if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream, 0) != 0) {
// If resume_read fails, just drop connection. Not ideal.
delete upstream->get_client_handler();
return -1;

View File

@ -886,9 +886,7 @@ void downstream_writecb(bufferevent *bev, void *ptr)
return;
}
auto dconn = static_cast<DownstreamConnection*>(ptr);
auto downstream = dconn->get_downstream();
auto upstream = static_cast<Http2Upstream*>(downstream->get_upstream());
upstream->resume_read(SHRPX_NO_BUFFER, downstream);
dconn->on_write();
}
} // namespace
@ -1101,15 +1099,14 @@ ssize_t downstream_data_read_callback(nghttp2_session *session,
downstream->disable_upstream_wtimer();
}
if(nread == 0) {
if(downstream->resume_read(SHRPX_NO_BUFFER) != 0) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
if(((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) {
return NGHTTP2_ERR_DEFERRED;
}
if(nread > 0 && downstream->resume_read(SHRPX_NO_BUFFER, nread) != 0) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
if(nread == 0 && ((*data_flags) & NGHTTP2_DATA_FLAG_EOF) == 0) {
return NGHTTP2_ERR_DEFERRED;
}
return nread;
}
} // namespace
@ -1361,15 +1358,17 @@ bool Http2Upstream::get_flow_control() const
void Http2Upstream::pause_read(IOCtrlReason reason)
{}
int Http2Upstream::resume_read(IOCtrlReason reason, Downstream *downstream)
int Http2Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
size_t consumed)
{
if(get_flow_control()) {
if(consume(downstream->get_stream_id(),
downstream->get_request_datalen()) != 0) {
assert(downstream->get_request_datalen() >= consumed);
if(consume(downstream->get_stream_id(), consumed) != 0) {
return -1;
}
downstream->reset_request_datalen();
downstream->dec_request_datalen(consumed);
}
return send();

View File

@ -65,7 +65,8 @@ public:
int error_reply(Downstream *downstream, unsigned int status_code);
virtual void pause_read(IOCtrlReason reason);
virtual int resume_read(IOCtrlReason reason, Downstream *downstream);
virtual int resume_read(IOCtrlReason reason, Downstream *downstream,
size_t consumed);
virtual int on_downstream_header_complete(Downstream *downstream);
virtual int on_downstream_body(Downstream *downstream,

View File

@ -408,7 +408,7 @@ void HttpDownstreamConnection::pause_read(IOCtrlReason reason)
ioctrl_.pause_read(reason);
}
int HttpDownstreamConnection::resume_read(IOCtrlReason reason)
int HttpDownstreamConnection::resume_read(IOCtrlReason reason, size_t consumed)
{
ioctrl_.resume_read(reason);
return 0;
@ -475,7 +475,7 @@ int htp_hdrs_completecb(http_parser *htp)
if(downstream->get_upgraded()) {
// Upgrade complete, read until EOF in both ends
if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream) != 0) {
if(upstream->resume_read(SHRPX_MSG_BLOCK, downstream, 0) != 0) {
return -1;
}
downstream->set_request_state(Downstream::HEADER_COMPLETE);
@ -652,6 +652,9 @@ int HttpDownstreamConnection::on_read()
int HttpDownstreamConnection::on_write()
{
auto upstream = downstream_->get_upstream();
upstream->resume_read(SHRPX_NO_BUFFER, downstream_,
downstream_->get_request_datalen());
return 0;
}

View File

@ -49,7 +49,7 @@ public:
virtual int end_upload_data();
virtual void pause_read(IOCtrlReason reason);
virtual int resume_read(IOCtrlReason reason);
virtual int resume_read(IOCtrlReason reason, size_t consumed);
virtual void force_resume_read();
virtual bool get_output_buffer_full();

View File

@ -422,7 +422,8 @@ int HttpsUpstream::on_write()
}
}
rv = downstream->resume_read(SHRPX_NO_BUFFER);
rv = downstream->resume_read(SHRPX_NO_BUFFER,
downstream->get_response_datalen());
}
return rv;
}
@ -442,7 +443,8 @@ void HttpsUpstream::pause_read(IOCtrlReason reason)
ioctrl_.pause_read(reason);
}
int HttpsUpstream::resume_read(IOCtrlReason reason, Downstream *downstream)
int HttpsUpstream::resume_read(IOCtrlReason reason, Downstream *downstream,
size_t consumed)
{
if(ioctrl_.resume_read(reason)) {
// Process remaining data in input buffer here because these bytes
@ -491,7 +493,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr)
upstream->delete_downstream();
// Process next HTTP request
if(upstream->resume_read(SHRPX_MSG_BLOCK, 0) == -1) {
if(upstream->resume_read(SHRPX_MSG_BLOCK, nullptr, 0) == -1) {
return;
}
}
@ -537,7 +539,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr)
upstream->delete_downstream();
// Process next HTTP request
if(upstream->resume_read(SHRPX_MSG_BLOCK, 0) == -1) {
if(upstream->resume_read(SHRPX_MSG_BLOCK, nullptr, 0) == -1) {
return;
}
@ -579,7 +581,7 @@ void https_downstream_writecb(bufferevent *bev, void *ptr)
auto downstream = dconn->get_downstream();
auto upstream = static_cast<HttpsUpstream*>(downstream->get_upstream());
// May return -1
upstream->resume_read(SHRPX_NO_BUFFER, downstream);
upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0);
}
} // namespace
@ -631,7 +633,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
}
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
upstream->delete_downstream();
if(upstream->resume_read(SHRPX_MSG_BLOCK, 0) == -1) {
if(upstream->resume_read(SHRPX_MSG_BLOCK, nullptr, 0) == -1) {
return;
}
}
@ -661,7 +663,7 @@ void https_downstream_eventcb(bufferevent *bev, short events, void *ptr)
}
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
upstream->delete_downstream();
if(upstream->resume_read(SHRPX_MSG_BLOCK, 0) == -1) {
if(upstream->resume_read(SHRPX_MSG_BLOCK, nullptr, 0) == -1) {
return;
}
}
@ -903,6 +905,7 @@ int HttpsUpstream::on_downstream_body(Downstream *downstream,
ULOG(FATAL, this) << "evbuffer_add() failed";
return -1;
}
if(downstream->get_chunked_response()) {
if(evbuffer_add(output, "\r\n", 2) != 0) {
ULOG(FATAL, this) << "evbuffer_add() failed";

View File

@ -59,7 +59,8 @@ public:
int error_reply(unsigned int status_code);
virtual void pause_read(IOCtrlReason reason);
virtual int resume_read(IOCtrlReason reason, Downstream *downstream);
virtual int resume_read(IOCtrlReason reason, Downstream *downstream,
size_t consumed);
virtual int on_downstream_header_complete(Downstream *downstream);
virtual int on_downstream_body(Downstream *downstream,

View File

@ -107,6 +107,10 @@ void on_stream_close_callback
return;
}
upstream->consume(stream_id, downstream->get_request_datalen());
downstream->reset_request_datalen();
if(downstream->get_request_state() == Downstream::CONNECT_FAIL) {
upstream->remove_downstream(downstream);
// downstrea was deleted
@ -295,7 +299,8 @@ void on_data_chunk_recv_callback(spdylay_session *session,
auto downstream = upstream->find_downstream(stream_id);
if(!downstream) {
upstream->handle_ign_data_chunk(len);
upstream->consume(stream_id, len);
return;
}
@ -303,7 +308,9 @@ void on_data_chunk_recv_callback(spdylay_session *session,
if(downstream->push_upload_data_chunk(data, len) != 0) {
upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
upstream->handle_ign_data_chunk(len);
upstream->consume(stream_id, len);
return;
}
@ -456,7 +463,7 @@ SpdyUpstream::SpdyUpstream(uint16_t version, ClientHandler *handler)
flow_control_ = true;
initial_window_size_ = 1 << get_config()->http2_upstream_window_bits;
rv = spdylay_session_set_option(session_,
SPDYLAY_OPT_NO_AUTO_WINDOW_UPDATE, &val,
SPDYLAY_OPT_NO_AUTO_WINDOW_UPDATE2, &val,
sizeof(val));
assert(rv == 0);
} else {
@ -619,9 +626,7 @@ void spdy_downstream_writecb(bufferevent *bev, void *ptr)
return;
}
auto dconn = static_cast<DownstreamConnection*>(ptr);
auto downstream = dconn->get_downstream();
auto upstream = static_cast<SpdyUpstream*>(downstream->get_upstream());
upstream->resume_read(SHRPX_NO_BUFFER, downstream);
dconn->on_write();
}
} // namespace
@ -834,14 +839,12 @@ ssize_t spdy_data_read_callback(spdylay_session *session,
downstream->disable_upstream_wtimer();
}
if(nread == 0) {
if(downstream->resume_read(SHRPX_NO_BUFFER) != 0) {
return SPDYLAY_ERR_CALLBACK_FAILURE;
}
if(nread > 0 && downstream->resume_read(SHRPX_NO_BUFFER, nread) != 0) {
return SPDYLAY_ERR_CALLBACK_FAILURE;
}
if(*eof != 1) {
return SPDYLAY_ERR_DEFERRED;
}
if(nread == 0 && *eof != 1) {
return SPDYLAY_ERR_DEFERRED;
}
return nread;
@ -1091,40 +1094,19 @@ bool SpdyUpstream::get_flow_control() const
void SpdyUpstream::pause_read(IOCtrlReason reason)
{}
namespace {
int32_t determine_window_update_transmission(spdylay_session *session,
int32_t stream_id)
{
int32_t recv_length, window_size;
if(stream_id == 0) {
recv_length = spdylay_session_get_recv_data_length(session);
window_size = 1 << get_config()->http2_upstream_connection_window_bits;
} else {
recv_length = spdylay_session_get_stream_recv_data_length
(session, stream_id);
window_size = 1 << get_config()->http2_upstream_window_bits;
}
if(recv_length != -1 && recv_length >= window_size / 2) {
return recv_length;
}
return -1;
}
} // namespace
int SpdyUpstream::resume_read(IOCtrlReason reason, Downstream *downstream)
int SpdyUpstream::resume_read(IOCtrlReason reason, Downstream *downstream,
size_t consumed)
{
if(get_flow_control()) {
int32_t delta;
delta = determine_window_update_transmission(session_, 0);
if(delta != -1) {
window_update(0, delta);
}
delta = determine_window_update_transmission
(session_, downstream->get_stream_id());
if(delta != -1) {
window_update(downstream, delta);
assert(downstream->get_request_datalen() >= consumed);
if(consume(downstream->get_stream_id(), consumed) != 0) {
return -1;
}
downstream->dec_request_datalen(consumed);
}
return send();
}
@ -1142,24 +1124,22 @@ int SpdyUpstream::on_downstream_abort_request(Downstream *downstream,
return send();
}
int SpdyUpstream::handle_ign_data_chunk(size_t len)
int SpdyUpstream::consume(int32_t stream_id, size_t len)
{
int32_t window_size;
int rv;
if(spdylay_session_get_recv_data_length(session_) == -1) {
// No connection flow control
return 0;
}
rv = spdylay_session_consume(session_, stream_id, len);
window_size = 1 << get_config()->http2_upstream_connection_window_bits;
if(recv_ign_window_size_ >= window_size / 2) {
window_update(0, recv_ign_window_size_);
if(rv != 0) {
ULOG(WARNING, this) << "spdylay_session_consume() returned error: "
<< spdylay_strerror(rv);
return -1;
}
return 0;
}
int SpdyUpstream::on_timeout(Downstream *downstream)
{
if(LOG_ENABLED(INFO)) {

View File

@ -65,7 +65,8 @@ public:
int error_reply(Downstream *downstream, unsigned int status_code);
virtual void pause_read(IOCtrlReason reason);
virtual int resume_read(IOCtrlReason reason, Downstream *downstream);
virtual int resume_read(IOCtrlReason reason, Downstream *downstream,
size_t consumed);
virtual int on_downstream_header_complete(Downstream *downstream);
virtual int on_downstream_body(Downstream *downstream,
@ -74,7 +75,7 @@ public:
bool get_flow_control() const;
int handle_ign_data_chunk(size_t len);
int consume(int32_t stream_id, size_t len);
void maintain_downstream_concurrency();
void initiate_downstream(std::unique_ptr<Downstream> downstream);

View File

@ -57,7 +57,8 @@ public:
virtual int on_downstream_body_complete(Downstream *downstream) = 0;
virtual void pause_read(IOCtrlReason reason) = 0;
virtual int resume_read(IOCtrlReason reason, Downstream *downstream) = 0;
virtual int resume_read(IOCtrlReason reason, Downstream *downstream,
size_t consumed) = 0;
};
} // namespace shrpx