Add nghttp2_session_consume() API

Reworked no automatic WINDOW_UPDATE feature.  We added new API
nghttp2_session_consume() which tells the library how many bytes are
consumed by the application.  Instead of submitting WINDOW_UPDATE by
the application, the library is now responsible to submit
WINDOW_UPDATE based on consumed bytes.  This is more reliable method,
since it enables us to properly send WINDOW_UPDATE for stream and
connection individually.  The previous implementation of nghttpx had
broken connection window management.
This commit is contained in:
Tatsuhiro Tsujikawa 2014-07-25 21:26:03 +09:00
parent 9f17bee51d
commit 079db14d45
18 changed files with 395 additions and 276 deletions

View File

@ -1505,31 +1505,17 @@ int nghttp2_option_new(nghttp2_option **option_ptr);
*/ */
void nghttp2_option_del(nghttp2_option *option); void nghttp2_option_del(nghttp2_option *option);
/**
* @function
*
* This option prevents the library from sending WINDOW_UPDATE for a
* stream automatically. If this option is set to nonzero, the
* library won't send WINDOW_UPDATE for a stream and the application
* is responsible for sending WINDOW_UPDATE using
* `nghttp2_submit_window_update`. By default, this option is set to
* zero.
*/
void nghttp2_option_set_no_auto_stream_window_update(nghttp2_option *option,
int val);
/** /**
* @function * @function
* *
* This option prevents the library from sending WINDOW_UPDATE for a * This option prevents the library from sending WINDOW_UPDATE for a
* connection automatically. If this option is set to nonzero, the * connection automatically. If this option is set to nonzero, the
* library won't send WINDOW_UPDATE for a connection and the * library won't send WINDOW_UPDATE for DATA until application calls
* application is responsible for sending WINDOW_UPDATE with stream ID * `nghttp2_session_consume()` to indicate the consumed amount of
* 0 using `nghttp2_submit_window_update`. By default, this option is * data. Don't use `nghttp2_submit_window_update()` for this purpose.
* set to zero. * By default, this option is set to zero.
*/ */
void nghttp2_option_set_no_auto_connection_window_update void nghttp2_option_set_no_auto_window_update(nghttp2_option *option, int val);
(nghttp2_option *option, int val);
/** /**
* @function * @function
@ -2067,6 +2053,28 @@ int nghttp2_session_terminate_session2(nghttp2_session *session,
uint32_t nghttp2_session_get_remote_settings(nghttp2_session *session, uint32_t nghttp2_session_get_remote_settings(nghttp2_session *session,
nghttp2_settings_id id); nghttp2_settings_id id);
/**
* @function
*
* Tells the |session| that |size| bytes for a stream denoted by
* |stream_id| were consumed by application and are ready to
* WINDOW_UPDATE. This function is intended to be used without
* automatic window update (see
* `nghttp2_option_set_no_auto_window_update()`).
*
* This function returns 0 if it succeeds, or one of the following
* negative error codes:
*
* :enum:`NGHTTP2_ERR_NOMEM`
* Out of memory.
* :enum:`NGHTTP2_ERR_INVALID_ARGUMENT`
* The |stream_id| is 0.
* :enum:`NGHTTP2_ERR_INVALID_STATE`
* Automatic WINDOW_UPDATE is not disabled.
*/
int nghttp2_session_consume(nghttp2_session *session, int32_t stream_id,
size_t size);
/** /**
* @function * @function
* *
@ -2609,12 +2617,11 @@ int nghttp2_submit_goaway(nghttp2_session *session, uint8_t flags,
* difference. * difference.
* *
* If the |window_size_increment| is negative, the local window size * If the |window_size_increment| is negative, the local window size
* is decreased by -|window_size_increment|. If * is decreased by -|window_size_increment|. If automatic
* :enum:`NGHTTP2_OPT_NO_AUTO_STREAM_WINDOW_UPDATE` (or * WINDOW_UPDATE is enabled
* :enum:`NGHTTP2_OPT_NO_AUTO_CONNECTION_WINDOW_UPDATE` if |stream_id| * (`nghttp2_option_set_no_auto_window_update()`), and the library
* is 0) is not set and the library decided that the WINDOW_UPDATE * decided that the WINDOW_UPDATE should be submitted, then
* should be submitted, then WINDOW_UPDATE is queued with the current * WINDOW_UPDATE is queued with the current received bytes count.
* received bytes count.
* *
* If the |window_size_increment| is 0, the function does nothing and * If the |window_size_increment| is 0, the function does nothing and
* returns 0. * returns 0.

View File

@ -40,18 +40,10 @@ void nghttp2_option_del(nghttp2_option *option)
free(option); free(option);
} }
void nghttp2_option_set_no_auto_stream_window_update(nghttp2_option *option, void nghttp2_option_set_no_auto_window_update(nghttp2_option *option, int val)
int val)
{ {
option->opt_set_mask |= NGHTTP2_OPT_NO_AUTO_STREAM_WINDOW_UPDATE; option->opt_set_mask |= NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE;
option->no_auto_stream_window_update = val; option->no_auto_window_update = val;
}
void nghttp2_option_set_no_auto_connection_window_update
(nghttp2_option *option, int val)
{
option->opt_set_mask |= NGHTTP2_OPT_NO_AUTO_CONNECTION_WINDOW_UPDATE;
option->no_auto_connection_window_update = val;
} }
void nghttp2_option_set_peer_max_concurrent_streams(nghttp2_option *option, void nghttp2_option_set_peer_max_concurrent_streams(nghttp2_option *option,

View File

@ -37,22 +37,12 @@
typedef enum { typedef enum {
/** /**
* This option prevents the library from sending WINDOW_UPDATE for a * This option prevents the library from sending WINDOW_UPDATE for a
* stream automatically. If this option is set to nonzero, the * connection automatically. If this option is set to nonzero, the
* library won't send WINDOW_UPDATE for a stream and the application * library won't send WINDOW_UPDATE for DATA until application calls
* is responsible for sending WINDOW_UPDATE using * nghttp2_session_consume() to indicate the amount of consumed
* `nghttp2_submit_window_update`. By default, this option is set to * DATA. By default, this option is set to zero.
* zero.
*/ */
NGHTTP2_OPT_NO_AUTO_STREAM_WINDOW_UPDATE = 1, NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE = 1,
/**
* This option prevents the library from sending WINDOW_UPDATE for a
* connection automatically. If this option is set to nonzero, the
* library won't send WINDOW_UPDATE for a connection and the
* application is responsible for sending WINDOW_UPDATE with stream
* ID 0 using `nghttp2_submit_window_update`. By default, this
* option is set to zero.
*/
NGHTTP2_OPT_NO_AUTO_CONNECTION_WINDOW_UPDATE = 1 << 1,
/** /**
* This option sets the SETTINGS_MAX_CONCURRENT_STREAMS value of * This option sets the SETTINGS_MAX_CONCURRENT_STREAMS value of
* remote endpoint as if it is received in SETTINGS frame. Without * remote endpoint as if it is received in SETTINGS frame. Without
@ -66,7 +56,7 @@ typedef enum {
* will be overwritten if the local endpoint receives * will be overwritten if the local endpoint receives
* SETTINGS_MAX_CONCURRENT_STREAMS from the remote endpoint. * SETTINGS_MAX_CONCURRENT_STREAMS from the remote endpoint.
*/ */
NGHTTP2_OPT_PEER_MAX_CONCURRENT_STREAMS = 1 << 2 NGHTTP2_OPT_PEER_MAX_CONCURRENT_STREAMS = 1 << 1
} nghttp2_option_flag; } nghttp2_option_flag;
/** /**
@ -83,13 +73,9 @@ struct nghttp2_option {
*/ */
uint32_t peer_max_concurrent_streams; uint32_t peer_max_concurrent_streams;
/** /**
* NGHTTP2_OPT_NO_AUTO_STREAM_WINDOW_UPDATE * NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE
*/ */
uint8_t no_auto_stream_window_update; uint8_t no_auto_window_update;
/**
* NGHTTP2_OPT_NO_AUTO_CONNECTION_WINDOW_UPDATE
*/
uint8_t no_auto_connection_window_update;
}; };
#endif /* NGHTTP2_OPTION_H */ #endif /* NGHTTP2_OPTION_H */

View File

@ -352,6 +352,7 @@ static int session_new(nghttp2_session **session_ptr,
(*session_ptr)->remote_window_size = NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE; (*session_ptr)->remote_window_size = NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE;
(*session_ptr)->recv_window_size = 0; (*session_ptr)->recv_window_size = 0;
(*session_ptr)->consumed_size = 0;
(*session_ptr)->recv_reduction = 0; (*session_ptr)->recv_reduction = 0;
(*session_ptr)->local_window_size = NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE; (*session_ptr)->local_window_size = NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE;
@ -383,19 +384,10 @@ static int session_new(nghttp2_session **session_ptr,
if(option) { if(option) {
if((option->opt_set_mask & NGHTTP2_OPT_NO_AUTO_STREAM_WINDOW_UPDATE) && if((option->opt_set_mask & NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE) &&
option->no_auto_stream_window_update) { option->no_auto_window_update) {
(*session_ptr)->opt_flags |= (*session_ptr)->opt_flags |= NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE;
NGHTTP2_OPTMASK_NO_AUTO_STREAM_WINDOW_UPDATE;
}
if((option->opt_set_mask & NGHTTP2_OPT_NO_AUTO_CONNECTION_WINDOW_UPDATE) &&
option->no_auto_connection_window_update) {
(*session_ptr)->opt_flags |=
NGHTTP2_OPTMASK_NO_AUTO_CONNECTION_WINDOW_UPDATE;
} }
@ -3175,10 +3167,11 @@ static int update_local_initial_window_size_func
return nghttp2_session_terminate_session(arg->session, return nghttp2_session_terminate_session(arg->session,
NGHTTP2_FLOW_CONTROL_ERROR); NGHTTP2_FLOW_CONTROL_ERROR);
} }
if(!(arg->session->opt_flags & if(!(arg->session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE)) {
NGHTTP2_OPTMASK_NO_AUTO_STREAM_WINDOW_UPDATE)) {
if(nghttp2_should_send_window_update(stream->local_window_size, if(nghttp2_should_send_window_update(stream->local_window_size,
stream->recv_window_size)) { stream->recv_window_size)) {
rv = nghttp2_session_add_window_update(arg->session, rv = nghttp2_session_add_window_update(arg->session,
NGHTTP2_FLAG_NONE, NGHTTP2_FLAG_NONE,
stream->stream_id, stream->stream_id,
@ -3876,9 +3869,9 @@ static int adjust_recv_window_size(int32_t *recv_window_size_ptr,
/* /*
* Accumulates received bytes |delta_size| for stream-level flow * Accumulates received bytes |delta_size| for stream-level flow
* control and decides whether to send WINDOW_UPDATE to that * control and decides whether to send WINDOW_UPDATE to that stream.
* stream. If NGHTTP2_OPT_NO_AUTO_STREAM_WINDOW_UPDATE is set, * If NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE is set, WINDOW_UPDATE will not
* WINDOW_UPDATE will not be sent. * be sent.
* *
* This function returns 0 if it succeeds, or one of the following * This function returns 0 if it succeeds, or one of the following
* negative error codes: * negative error codes:
@ -3902,7 +3895,7 @@ static int session_update_recv_stream_window_size
/* We don't have to send WINDOW_UPDATE if the data received is the /* We don't have to send WINDOW_UPDATE if the data received is the
last chunk in the incoming stream. */ last chunk in the incoming stream. */
if(send_window_update && if(send_window_update &&
!(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_STREAM_WINDOW_UPDATE)) { !(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE)) {
/* We have to use local_settings here because it is the constraint /* We have to use local_settings here because it is the constraint
the remote endpoint should honor. */ the remote endpoint should honor. */
if(nghttp2_should_send_window_update(stream->local_window_size, if(nghttp2_should_send_window_update(stream->local_window_size,
@ -3924,8 +3917,8 @@ static int session_update_recv_stream_window_size
/* /*
* Accumulates received bytes |delta_size| for connection-level flow * Accumulates received bytes |delta_size| for connection-level flow
* control and decides whether to send WINDOW_UPDATE to the * control and decides whether to send WINDOW_UPDATE to the
* connection. If NGHTTP2_OPT_NO_AUTO_CONNECTION_WINDOW_UPDATE is * connection. If NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE is set,
* set, WINDOW_UPDATE will not be sent. * WINDOW_UPDATE will not be sent.
* *
* This function returns 0 if it succeeds, or one of the following * This function returns 0 if it succeeds, or one of the following
* negative error codes: * negative error codes:
@ -3944,30 +3937,92 @@ static int session_update_recv_connection_window_size
return nghttp2_session_terminate_session(session, return nghttp2_session_terminate_session(session,
NGHTTP2_FLOW_CONTROL_ERROR); NGHTTP2_FLOW_CONTROL_ERROR);
} }
if(!(session->opt_flags & if(!(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE)) {
NGHTTP2_OPTMASK_NO_AUTO_CONNECTION_WINDOW_UPDATE)) {
if(nghttp2_should_send_window_update(session->local_window_size, if(nghttp2_should_send_window_update(session->local_window_size,
session->recv_window_size)) { session->recv_window_size)) {
/* Use stream ID 0 to update connection-level flow control /* Use stream ID 0 to update connection-level flow control
window */ window */
rv = nghttp2_session_add_window_update(session, rv = nghttp2_session_add_window_update(session, NGHTTP2_FLAG_NONE, 0,
NGHTTP2_FLAG_NONE, session->recv_window_size);
0, if(rv != 0) {
session->recv_window_size);
if(rv == 0) {
session->recv_window_size = 0;
/* recv_ign_window_size keeps track of ignored DATA bytes
before any connection-level WINDOW_UPDATE therefore, we can
reset it here. */
session->recv_ign_window_size = 0;
} else {
return rv; return rv;
} }
session->recv_window_size = 0;
} }
} }
return 0; return 0;
} }
static int session_update_stream_consumed_size
(nghttp2_session *session, nghttp2_stream *stream, size_t delta_size)
{
int32_t recv_size;
int rv;
if((size_t)stream->consumed_size > NGHTTP2_MAX_WINDOW_SIZE - delta_size) {
return nghttp2_session_terminate_session(session,
NGHTTP2_FLOW_CONTROL_ERROR);
}
stream->consumed_size += delta_size;
/* recv_window_size may be smaller than consumed_size, because it
may be decreased by negative value with
nghttp2_submit_window_update(). */
recv_size = nghttp2_min(stream->consumed_size, stream->recv_window_size);
if(nghttp2_should_send_window_update(stream->local_window_size, recv_size)) {
rv = nghttp2_session_add_window_update(session, NGHTTP2_FLAG_NONE,
stream->stream_id, recv_size);
if(rv != 0) {
return rv;
}
stream->recv_window_size -= recv_size;
stream->consumed_size -= recv_size;
}
return 0;
}
static int session_update_connection_consumed_size
(nghttp2_session *session, size_t delta_size)
{
int32_t recv_size;
int rv;
if((size_t)session->consumed_size > NGHTTP2_MAX_WINDOW_SIZE - delta_size) {
return nghttp2_session_terminate_session(session,
NGHTTP2_FLOW_CONTROL_ERROR);
}
session->consumed_size += delta_size;
/* recv_window_size may be smaller than consumed_size, because it
may be decreased by negative value with
nghttp2_submit_window_update(). */
recv_size = nghttp2_min(session->consumed_size, session->recv_window_size);
if(nghttp2_should_send_window_update(session->local_window_size,
recv_size)) {
rv = nghttp2_session_add_window_update(session, NGHTTP2_FLAG_NONE, 0,
recv_size);
if(rv != 0) {
return rv;
}
session->recv_window_size -= recv_size;
session->consumed_size -= recv_size;
}
return 0;
}
/* /*
* Checks that we can receive the DATA frame for stream, which is * Checks that we can receive the DATA frame for stream, which is
* indicated by |session->iframe.frame.hd.stream_id|. If it is a * indicated by |session->iframe.frame.hd.stream_id|. If it is a
@ -5033,6 +5088,14 @@ ssize_t nghttp2_session_mem_recv(nghttp2_session *session,
return rv; return rv;
} }
/* Pad Length field is consumed immediately */
rv = nghttp2_session_consume(session, iframe->frame.hd.stream_id,
readlen);
if(nghttp2_is_fatal(rv)) {
return rv;
}
stream = nghttp2_session_get_stream(session, stream = nghttp2_session_get_stream(session,
iframe->frame.hd.stream_id); iframe->frame.hd.stream_id);
if(stream) { if(stream) {
@ -5098,6 +5161,18 @@ ssize_t nghttp2_session_mem_recv(nghttp2_session *session,
data_readlen = inbound_frame_effective_readlen data_readlen = inbound_frame_effective_readlen
(iframe, iframe->payloadleft, readlen); (iframe, iframe->payloadleft, readlen);
padlen = readlen - data_readlen;
if(padlen > 0) {
/* Padding is considered as "consumed" immediately */
rv = nghttp2_session_consume(session, iframe->frame.hd.stream_id,
padlen);
if(nghttp2_is_fatal(rv)) {
return rv;
}
}
DEBUGF(fprintf(stderr, "recv: data_readlen=%zu\n", data_readlen)); DEBUGF(fprintf(stderr, "recv: data_readlen=%zu\n", data_readlen));
if(stream && data_readlen > 0 && if(stream && data_readlen > 0 &&
@ -5142,8 +5217,6 @@ ssize_t nghttp2_session_mem_recv(nghttp2_session *session,
readlen, iframe->payloadleft)); readlen, iframe->payloadleft));
if(readlen > 0) { if(readlen > 0) {
session->recv_ign_window_size += readlen;
/* Update connection-level flow control window for ignored /* Update connection-level flow control window for ignored
DATA frame too */ DATA frame too */
rv = session_update_recv_connection_window_size(session, readlen); rv = session_update_recv_connection_window_size(session, readlen);
@ -5151,20 +5224,14 @@ ssize_t nghttp2_session_mem_recv(nghttp2_session *session,
return rv; return rv;
} }
if((session->opt_flags & if(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE) {
NGHTTP2_OPTMASK_NO_AUTO_CONNECTION_WINDOW_UPDATE) &&
nghttp2_should_send_window_update
(session->local_window_size, session->recv_ign_window_size)) {
rv = nghttp2_session_add_window_update /* Ignored DATA is considered as "consumed" immediately. */
(session, NGHTTP2_FLAG_NONE, 0, session->recv_ign_window_size); rv = session_update_connection_consumed_size(session, readlen);
if(nghttp2_is_fatal(rv)) { if(nghttp2_is_fatal(rv)) {
return rv; return rv;
} }
session->recv_window_size -= session->recv_ign_window_size;
session->recv_ign_window_size = 0;
} }
} }
@ -5737,3 +5804,36 @@ int nghttp2_session_get_stream_remote_close(nghttp2_session* session,
return (stream->shut_flags & NGHTTP2_SHUT_RD) != 0; return (stream->shut_flags & NGHTTP2_SHUT_RD) != 0;
} }
int nghttp2_session_consume(nghttp2_session *session, int32_t stream_id,
size_t size)
{
int rv;
nghttp2_stream *stream;
if(stream_id == 0) {
return NGHTTP2_ERR_INVALID_ARGUMENT;
}
if(!(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE)) {
return NGHTTP2_ERR_INVALID_STATE;
}
rv = session_update_connection_consumed_size(session, size);
if(nghttp2_is_fatal(rv)) {
return rv;
}
stream = nghttp2_session_get_stream(session, stream_id);
if(stream) {
rv = session_update_stream_consumed_size(session, stream, size);
if(nghttp2_is_fatal(rv)) {
return rv;
}
}
return 0;
}

View File

@ -43,8 +43,7 @@
* Option flags. * Option flags.
*/ */
typedef enum { typedef enum {
NGHTTP2_OPTMASK_NO_AUTO_STREAM_WINDOW_UPDATE = 1 << 0, NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE = 1 << 0,
NGHTTP2_OPTMASK_NO_AUTO_CONNECTION_WINDOW_UPDATE = 1 << 1
} nghttp2_optmask; } nghttp2_optmask;
typedef enum { typedef enum {
@ -206,15 +205,10 @@ struct nghttp2_session {
WINDOW_UPDATE. This could be negative after submitting negative WINDOW_UPDATE. This could be negative after submitting negative
value to WINDOW_UPDATE. */ value to WINDOW_UPDATE. */
int32_t recv_window_size; int32_t recv_window_size;
/* The number of bytes in ignored DATA frame received without /* The number of bytes consumed by the application and now is
connection-level WINDOW_UPDATE. Since we do not call subject to WINDOW_UPDATE. This is only used when auto
on_data_chunk_recv_callback for ignored DATA chunk, if WINDOW_UPDATE is turned off. */
nghttp2_option_set_no_auto_connection_window_update is used, int32_t consumed_size;
application may not have a chance to send connection
WINDOW_UPDATE. To fix this, we accumulate those received bytes,
and if it exceeds certain number, we automatically send
connection-level WINDOW_UPDATE. */
int32_t recv_ign_window_size;
/* The amount of recv_window_size cut using submitting negative /* The amount of recv_window_size cut using submitting negative
value to WINDOW_UPDATE */ value to WINDOW_UPDATE */
int32_t recv_reduction; int32_t recv_reduction;

View File

@ -48,6 +48,7 @@ void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id,
stream->remote_window_size = remote_initial_window_size; stream->remote_window_size = remote_initial_window_size;
stream->local_window_size = local_initial_window_size; stream->local_window_size = local_initial_window_size;
stream->recv_window_size = 0; stream->recv_window_size = 0;
stream->consumed_size = 0;
stream->recv_reduction = 0; stream->recv_reduction = 0;
stream->blocked_sent = 0; stream->blocked_sent = 0;

View File

@ -151,6 +151,10 @@ struct nghttp2_stream {
WINDOW_UPDATE. This could be negative after submitting negative WINDOW_UPDATE. This could be negative after submitting negative
value to WINDOW_UPDATE */ value to WINDOW_UPDATE */
int32_t recv_window_size; int32_t recv_window_size;
/* The number of bytes consumed by the application and now is
subject to WINDOW_UPDATE. This is only used when auto
WINDOW_UPDATE is turned off. */
int32_t consumed_size;
/* The amount of recv_window_size cut using submitting negative /* The amount of recv_window_size cut using submitting negative
value to WINDOW_UPDATE */ value to WINDOW_UPDATE */
int32_t recv_reduction; int32_t recv_reduction;

View File

@ -348,27 +348,30 @@ int nghttp2_submit_window_update(nghttp2_session *session, uint8_t flags,
if(rv != 0) { if(rv != 0) {
return rv; return rv;
} }
/* recv_ign_window_size keeps track of ignored DATA bytes before
any connection-level WINDOW_UPDATE therefore, we can reset it
here. */
session->recv_ign_window_size = 0;
} else { } else {
stream = nghttp2_session_get_stream(session, stream_id); stream = nghttp2_session_get_stream(session, stream_id);
if(stream) { if(!stream) {
rv = nghttp2_adjust_local_window_size(&stream->local_window_size,
&stream->recv_window_size,
&stream->recv_reduction,
&window_size_increment);
if(rv != 0) {
return rv;
}
} else {
return 0; return 0;
} }
rv = nghttp2_adjust_local_window_size(&stream->local_window_size,
&stream->recv_window_size,
&stream->recv_reduction,
&window_size_increment);
if(rv != 0) {
return rv;
}
} }
if(window_size_increment > 0) { if(window_size_increment > 0) {
if(stream_id == 0) {
session->consumed_size =
nghttp2_max(0, session->consumed_size - window_size_increment);
} else {
stream->consumed_size =
nghttp2_max(0, stream->consumed_size - window_size_increment);
}
return nghttp2_session_add_window_update(session, flags, stream_id, return nghttp2_session_add_window_update(session, flags, stream_id,
window_size_increment); window_size_increment);
} }

View File

@ -562,9 +562,7 @@ void fill_default_config()
nghttp2_option_new(&mod_config()->http2_option); nghttp2_option_new(&mod_config()->http2_option);
nghttp2_option_set_no_auto_stream_window_update nghttp2_option_set_no_auto_window_update
(mod_config()->http2_option, 1);
nghttp2_option_set_no_auto_connection_window_update
(mod_config()->http2_option, 1); (mod_config()->http2_option, 1);
mod_config()->tls_proto_mask = 0; mod_config()->tls_proto_mask = 0;

View File

@ -46,6 +46,8 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
response_body_buf_(nullptr), response_body_buf_(nullptr),
request_headers_sum_(0), request_headers_sum_(0),
response_headers_sum_(0), response_headers_sum_(0),
request_datalen_(0),
response_datalen_(0),
stream_id_(stream_id), stream_id_(stream_id),
priority_(priority), priority_(priority),
downstream_stream_id_(-1), downstream_stream_id_(-1),
@ -453,7 +455,13 @@ int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen)
return -1; return -1;
} }
request_bodylen_ += datalen; request_bodylen_ += datalen;
return dconn_->push_upload_data_chunk(data, datalen); if(dconn_->push_upload_data_chunk(data, datalen) != 0) {
return -1;
}
request_datalen_ += datalen;
return 0;
} }
int Downstream::end_upload_data() int Downstream::end_upload_data()
@ -818,4 +826,29 @@ bool Downstream::get_expect_final_response() const
return expect_final_response_; return expect_final_response_;
} }
size_t Downstream::get_request_datalen() const
{
return request_datalen_;
}
void Downstream::reset_request_datalen()
{
request_datalen_ = 0;
}
void Downstream::add_response_datalen(size_t len)
{
response_datalen_ += len;
}
size_t Downstream::get_response_datalen() const
{
return response_datalen_;
}
void Downstream::reset_response_datalen()
{
response_datalen_ = 0;
}
} // namespace shrpx } // namespace shrpx

View File

@ -142,6 +142,8 @@ public:
void set_request_http2_expect_body(bool f); void set_request_http2_expect_body(bool f);
int push_upload_data_chunk(const uint8_t *data, size_t datalen); int push_upload_data_chunk(const uint8_t *data, size_t datalen);
int end_upload_data(); int end_upload_data();
size_t get_request_datalen() const;
void reset_request_datalen();
enum { enum {
INITIAL, INITIAL,
HEADER_COMPLETE, HEADER_COMPLETE,
@ -211,6 +213,9 @@ public:
bool get_non_final_response() const; bool get_non_final_response() const;
void set_expect_final_response(bool f); void set_expect_final_response(bool f);
bool get_expect_final_response() const; bool get_expect_final_response() const;
void add_response_datalen(size_t len);
size_t get_response_datalen() const;
void reset_response_datalen();
// Call this method when there is incoming data in downstream // Call this method when there is incoming data in downstream
// connection. // connection.
@ -250,6 +255,10 @@ private:
size_t request_headers_sum_; size_t request_headers_sum_;
size_t response_headers_sum_; size_t response_headers_sum_;
// The number of bytes not consumed by the application yet.
size_t request_datalen_;
size_t response_datalen_;
int32_t stream_id_; int32_t stream_id_;
int32_t priority_; int32_t priority_;
// stream ID in backend connection // stream ID in backend connection

View File

@ -67,6 +67,15 @@ Http2DownstreamConnection::~Http2DownstreamConnection()
if(submit_rst_stream(downstream_) == 0) { if(submit_rst_stream(downstream_) == 0) {
http2session_->notify(); http2session_->notify();
} }
if(downstream_->get_downstream_stream_id() != -1) {
http2session_->consume(downstream_->get_downstream_stream_id(),
downstream_->get_response_datalen());
downstream_->reset_response_datalen();
http2session_->notify();
}
} }
http2session_->remove_downstream_connection(this); http2session_->remove_downstream_connection(this);
// Downstream and DownstreamConnection may be deleted // Downstream and DownstreamConnection may be deleted
@ -122,6 +131,16 @@ void Http2DownstreamConnection::detach_downstream(Downstream *downstream)
if(submit_rst_stream(downstream) == 0) { if(submit_rst_stream(downstream) == 0) {
http2session_->notify(); http2session_->notify();
} }
if(downstream_->get_downstream_stream_id() != -1) {
http2session_->consume(downstream_->get_downstream_stream_id(),
downstream_->get_response_datalen());
downstream_->reset_response_datalen();
http2session_->notify();
}
downstream->set_downstream_connection(nullptr); downstream->set_downstream_connection(nullptr);
downstream_ = nullptr; downstream_ = nullptr;
@ -469,34 +488,29 @@ int Http2DownstreamConnection::end_upload_data()
int Http2DownstreamConnection::resume_read(IOCtrlReason reason) int Http2DownstreamConnection::resume_read(IOCtrlReason reason)
{ {
int rv1 = 0, rv2 = 0; int rv;
if(http2session_->get_state() == Http2Session::CONNECTED &&
http2session_->get_flow_control()) { if(http2session_->get_state() != Http2Session::CONNECTED ||
int32_t window_size_increment; !http2session_->get_flow_control()) {
window_size_increment = http2::determine_window_update_transmission
(http2session_->get_session(), 0);
if(window_size_increment != -1) {
rv1 = http2session_->submit_window_update(nullptr, window_size_increment);
if(rv1 == 0) {
http2session_->notify();
}
}
if(downstream_ && downstream_->get_downstream_stream_id() != -1) {
window_size_increment = http2::determine_window_update_transmission
(http2session_->get_session(), downstream_->get_downstream_stream_id());
if(window_size_increment != -1) {
rv2 = http2session_->submit_window_update(this, window_size_increment);
if(rv2 == 0) {
http2session_->notify();
}
}
}
}
if(rv1 == 0 && rv2 == 0) {
return 0; return 0;
} }
DLOG(WARNING, this) << "Sending WINDOW_UPDATE failed";
return -1; if(!downstream_ || downstream_->get_downstream_stream_id() == -1) {
return 0;
}
rv = http2session_->consume(downstream_->get_downstream_stream_id(),
downstream_->get_response_datalen());
if(rv != 0) {
return -1;
}
downstream_->reset_response_datalen();
http2session_->notify();
return 0;
} }
int Http2DownstreamConnection::on_read() int Http2DownstreamConnection::on_read()

View File

@ -58,7 +58,6 @@ Http2Session::Http2Session(event_base *evbase, SSL_CTX *ssl_ctx)
wrbev_(nullptr), wrbev_(nullptr),
rdbev_(nullptr), rdbev_(nullptr),
settings_timerev_(nullptr), settings_timerev_(nullptr),
recv_ign_window_size_(0),
fd_(-1), fd_(-1),
state_(DISCONNECTED), state_(DISCONNECTED),
notified_(false), notified_(false),
@ -661,28 +660,6 @@ int Http2Session::submit_rst_stream(int32_t stream_id,
return 0; return 0;
} }
int Http2Session::submit_window_update(Http2DownstreamConnection *dconn,
int32_t amount)
{
assert(state_ == CONNECTED);
int rv;
int32_t stream_id;
if(dconn) {
stream_id = dconn->get_downstream()->get_downstream_stream_id();
} else {
stream_id = 0;
recv_ign_window_size_ = 0;
}
rv = nghttp2_submit_window_update(session_, NGHTTP2_FLAG_NONE,
stream_id, amount);
if(rv < NGHTTP2_ERR_FATAL) {
SSLOG(FATAL, this) << "nghttp2_submit_window_update() failed: "
<< nghttp2_strerror(rv);
return -1;
}
return 0;
}
int Http2Session::submit_priority(Http2DownstreamConnection *dconn, int Http2Session::submit_priority(Http2DownstreamConnection *dconn,
int32_t pri) int32_t pri)
{ {
@ -768,6 +745,12 @@ int on_stream_close_callback
if(dconn) { if(dconn) {
auto downstream = dconn->get_downstream(); auto downstream = dconn->get_downstream();
if(downstream && downstream->get_downstream_stream_id() == stream_id) { if(downstream && downstream->get_downstream_stream_id() == stream_id) {
http2session->consume(downstream->get_downstream_stream_id(),
downstream->get_response_datalen());
downstream->reset_response_datalen();
if(error_code == NGHTTP2_NO_ERROR) { if(error_code == NGHTTP2_NO_ERROR) {
if(downstream->get_response_state() != Downstream::MSG_COMPLETE) { if(downstream->get_response_state() != Downstream::MSG_COMPLETE) {
downstream->set_response_state(Downstream::MSG_RESET); downstream->set_response_state(Downstream::MSG_RESET);
@ -1180,13 +1163,21 @@ int on_data_chunk_recv_callback(nghttp2_session *session,
(nghttp2_session_get_stream_user_data(session, stream_id)); (nghttp2_session_get_stream_user_data(session, stream_id));
if(!sd || !sd->dconn) { if(!sd || !sd->dconn) {
http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR); http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR);
http2session->handle_ign_data_chunk(len);
if(http2session->consume(stream_id, len) != 0) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
return 0; return 0;
} }
auto downstream = sd->dconn->get_downstream(); auto downstream = sd->dconn->get_downstream();
if(!downstream || downstream->get_downstream_stream_id() != stream_id) { if(!downstream || downstream->get_downstream_stream_id() != stream_id) {
http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR); http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR);
http2session->handle_ign_data_chunk(len);
if(http2session->consume(stream_id, len) != 0) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
return 0; return 0;
} }
@ -1194,7 +1185,11 @@ int on_data_chunk_recv_callback(nghttp2_session *session,
// HTTP. // HTTP.
if(downstream->get_non_final_response()) { if(downstream->get_non_final_response()) {
http2session->submit_rst_stream(stream_id, NGHTTP2_PROTOCOL_ERROR); http2session->submit_rst_stream(stream_id, NGHTTP2_PROTOCOL_ERROR);
http2session->handle_ign_data_chunk(len);
if(http2session->consume(stream_id, len) != 0) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
return 0; return 0;
} }
@ -1204,9 +1199,16 @@ int on_data_chunk_recv_callback(nghttp2_session *session,
rv = upstream->on_downstream_body(downstream, data, len, false); rv = upstream->on_downstream_body(downstream, data, len, false);
if(rv != 0) { if(rv != 0) {
http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR); http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR);
http2session->handle_ign_data_chunk(len);
if(http2session->consume(stream_id, len) != 0) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
downstream->set_response_state(Downstream::MSG_RESET); downstream->set_response_state(Downstream::MSG_RESET);
} }
downstream->add_response_datalen(len);
call_downstream_readcb(http2session, downstream); call_downstream_readcb(http2session, downstream);
return 0; return 0;
} }
@ -1546,16 +1548,17 @@ SSL* Http2Session::get_ssl() const
return ssl_; return ssl_;
} }
int Http2Session::handle_ign_data_chunk(size_t len) int Http2Session::consume(int32_t stream_id, size_t len)
{ {
int32_t window_size; int rv;
recv_ign_window_size_ += len; rv = nghttp2_session_consume(session_, stream_id, len);
window_size = nghttp2_session_get_effective_local_window_size(session_); if(rv != 0) {
SSLOG(WARNING, this) << "nghttp2_session_consume() returned error: "
<< nghttp2_strerror(rv);
if(recv_ign_window_size_ >= window_size / 2) { return -1;
submit_window_update(nullptr, recv_ign_window_size_);
} }
return 0; return 0;

View File

@ -70,10 +70,6 @@ public:
int submit_rst_stream(int32_t stream_id, nghttp2_error_code error_code); int submit_rst_stream(int32_t stream_id, nghttp2_error_code error_code);
// To send WINDOW_UPDATE for a connection, specify nullptr to
// |dconn|.
int submit_window_update(Http2DownstreamConnection *dconn, int32_t amount);
int submit_priority(Http2DownstreamConnection *dconn, int32_t pri); int submit_priority(Http2DownstreamConnection *dconn, int32_t pri);
int terminate_session(nghttp2_error_code error_code); int terminate_session(nghttp2_error_code error_code);
@ -108,7 +104,7 @@ public:
SSL* get_ssl() const; SSL* get_ssl() const;
int handle_ign_data_chunk(size_t len); int consume(int32_t stream_id, size_t len);
enum { enum {
// Disconnected // Disconnected
@ -140,7 +136,6 @@ private:
bufferevent *wrbev_; bufferevent *wrbev_;
bufferevent *rdbev_; bufferevent *rdbev_;
event *settings_timerev_; event *settings_timerev_;
int32_t recv_ign_window_size_;
// fd_ is used for proxy connection and no TLS connection. For // fd_ is used for proxy connection and no TLS connection. For
// direct or TLS connection, it may be -1 even after connection is // direct or TLS connection, it may be -1 even after connection is
// established. Use bufferevent_getfd(bev_) to get file descriptor // established. Use bufferevent_getfd(bev_) to get file descriptor

View File

@ -60,12 +60,17 @@ int on_stream_close_callback
ULOG(INFO, upstream) << "Stream stream_id=" << stream_id ULOG(INFO, upstream) << "Stream stream_id=" << stream_id
<< " is being closed"; << " is being closed";
} }
auto downstream = upstream->find_downstream(stream_id); auto downstream = upstream->find_downstream(stream_id);
if(!downstream) { if(!downstream) {
return 0; return 0;
} }
upstream->consume(stream_id, downstream->get_request_datalen());
downstream->reset_request_datalen();
if(downstream->get_request_state() == Downstream::CONNECT_FAIL) { if(downstream->get_request_state() == Downstream::CONNECT_FAIL) {
upstream->remove_downstream(downstream); upstream->remove_downstream(downstream);
@ -476,19 +481,21 @@ int on_data_chunk_recv_callback(nghttp2_session *session,
auto upstream = static_cast<Http2Upstream*>(user_data); auto upstream = static_cast<Http2Upstream*>(user_data);
auto downstream = upstream->find_downstream(stream_id); auto downstream = upstream->find_downstream(stream_id);
if(!downstream) { if(!downstream || !downstream->get_downstream_connection()) {
upstream->handle_ign_data_chunk(len); if(upstream->consume(stream_id, len) != 0) {
return 0; return NGHTTP2_ERR_CALLBACK_FAILURE;
} }
if(!downstream->get_downstream_connection()) {
upstream->handle_ign_data_chunk(len);
return 0; return 0;
} }
if(downstream->push_upload_data_chunk(data, len) != 0) { if(downstream->push_upload_data_chunk(data, len) != 0) {
upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR); upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
upstream->handle_ign_data_chunk(len);
if(upstream->consume(stream_id, len) != 0) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
return 0; return 0;
} }
@ -586,8 +593,7 @@ nghttp2_error_code infer_upstream_rst_stream_error_code
Http2Upstream::Http2Upstream(ClientHandler *handler) Http2Upstream::Http2Upstream(ClientHandler *handler)
: handler_(handler), : handler_(handler),
session_(nullptr), session_(nullptr),
settings_timerev_(nullptr), settings_timerev_(nullptr)
recv_ign_window_size_(0)
{ {
handler->set_upstream_timeouts(&get_config()->http2_upstream_read_timeout, handler->set_upstream_timeouts(&get_config()->http2_upstream_read_timeout,
&get_config()->upstream_write_timeout); &get_config()->upstream_write_timeout);
@ -989,30 +995,6 @@ int Http2Upstream::rst_stream(Downstream *downstream,
return 0; return 0;
} }
int Http2Upstream::window_update(Downstream *downstream,
int32_t window_size_increment)
{
int rv;
int32_t stream_id;
if(downstream) {
stream_id = downstream->get_stream_id();
} else {
stream_id = 0;
recv_ign_window_size_ = 0;
}
rv = nghttp2_submit_window_update(session_, NGHTTP2_FLAG_NONE,
stream_id, window_size_increment);
if(rv < NGHTTP2_ERR_FATAL) {
ULOG(FATAL, this) << "nghttp2_submit_window_update() failed: "
<< nghttp2_strerror(rv);
DIE();
}
return 0;
}
int Http2Upstream::terminate_session(nghttp2_error_code error_code) int Http2Upstream::terminate_session(nghttp2_error_code error_code)
{ {
int rv; int rv;
@ -1304,18 +1286,14 @@ void Http2Upstream::pause_read(IOCtrlReason reason)
int Http2Upstream::resume_read(IOCtrlReason reason, Downstream *downstream) int Http2Upstream::resume_read(IOCtrlReason reason, Downstream *downstream)
{ {
if(get_flow_control()) { if(get_flow_control()) {
int32_t window_size_increment; if(consume(downstream->get_stream_id(),
window_size_increment = http2::determine_window_update_transmission downstream->get_request_datalen()) != 0) {
(session_, 0); return -1;
if(window_size_increment != -1) {
window_update(nullptr, window_size_increment);
}
window_size_increment = http2::determine_window_update_transmission
(session_, downstream->get_stream_id());
if(window_size_increment != -1) {
window_update(downstream, window_size_increment);
} }
downstream->reset_request_datalen();
} }
return send(); return send();
} }
@ -1333,16 +1311,16 @@ int Http2Upstream::on_downstream_abort_request(Downstream *downstream,
return send(); return send();
} }
int Http2Upstream::handle_ign_data_chunk(size_t len) int Http2Upstream::consume(int32_t stream_id, size_t len)
{ {
int32_t window_size; int rv;
recv_ign_window_size_ += len; rv = nghttp2_session_consume(session_, stream_id, len);
window_size = nghttp2_session_get_effective_local_window_size(session_); if(rv != 0) {
ULOG(WARNING, this) << "nghttp2_session_consume() returned error: "
if(recv_ign_window_size_ >= window_size / 2) { << nghttp2_strerror(rv);
window_update(nullptr, recv_ign_window_size_); return -1;
} }
return 0; return 0;

View File

@ -60,9 +60,6 @@ public:
nghttp2_session* get_http2_session(); nghttp2_session* get_http2_session();
int rst_stream(Downstream *downstream, nghttp2_error_code error_code); int rst_stream(Downstream *downstream, nghttp2_error_code error_code);
// To send WINDOW_UPDATE for a connection, specify nullptr to
// |downstream|.
int window_update(Downstream *downstream, int32_t window_size_increment);
int terminate_session(nghttp2_error_code error_code); int terminate_session(nghttp2_error_code error_code);
int error_reply(Downstream *downstream, unsigned int status_code); int error_reply(Downstream *downstream, unsigned int status_code);
@ -81,16 +78,13 @@ public:
int upgrade_upstream(HttpsUpstream *upstream); int upgrade_upstream(HttpsUpstream *upstream);
int start_settings_timer(); int start_settings_timer();
void stop_settings_timer(); void stop_settings_timer();
int handle_ign_data_chunk(size_t len); int consume(int32_t stream_id, size_t len);
private: private:
DownstreamQueue downstream_queue_; DownstreamQueue downstream_queue_;
std::unique_ptr<HttpsUpstream> pre_upstream_; std::unique_ptr<HttpsUpstream> pre_upstream_;
ClientHandler *handler_; ClientHandler *handler_;
nghttp2_session *session_; nghttp2_session *session_;
event *settings_timerev_; event *settings_timerev_;
// Received DATA frame size while it is not sent to backend before
// any connection-level WINDOW_UPDATE
int32_t recv_ign_window_size_;
bool flow_control_; bool flow_control_;
}; };

View File

@ -406,6 +406,23 @@ int HttpsUpstream::on_write()
int rv = 0; int rv = 0;
auto downstream = get_downstream(); auto downstream = get_downstream();
if(downstream) { if(downstream) {
// We need to postpone detachment until all data are sent so that
// we can notify nghttp2 library all data consumed.
if(downstream->get_response_state() == Downstream::MSG_COMPLETE) {
auto dconn = downstream->get_downstream_connection();
if(downstream->get_response_connection_close()) {
// Connection close
downstream->set_downstream_connection(nullptr);
delete dconn;
} else {
// Keep-alive
dconn->detach_downstream(downstream);
}
}
rv = downstream->resume_read(SHRPX_NO_BUFFER); rv = downstream->resume_read(SHRPX_NO_BUFFER);
} }
return rv; return rv;
@ -493,17 +510,20 @@ void https_downstream_readcb(bufferevent *bev, void *ptr)
return; return;
} }
// If pending data exist, we defer detachment to correctly notify
// the all consumed data to nghttp2 library.
if(handler->get_outbuf_length() == 0) {
if(downstream->get_response_connection_close()) {
// Connection close
downstream->set_downstream_connection(nullptr);
if(downstream->get_response_connection_close()) { delete dconn;
// Connection close
downstream->set_downstream_connection(nullptr);
delete dconn; dconn = nullptr;
} else {
dconn = nullptr; // Keep-alive
} else { dconn->detach_downstream(downstream);
// Keep-alive }
dconn->detach_downstream(downstream);
} }
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {

View File

@ -4762,25 +4762,13 @@ void test_nghttp2_session_set_option(void)
nghttp2_option_new(&option); nghttp2_option_new(&option);
nghttp2_option_set_no_auto_stream_window_update(option, 1); nghttp2_option_set_no_auto_window_update(option, 1);
memset(&callbacks, 0, sizeof(nghttp2_session_callbacks)); memset(&callbacks, 0, sizeof(nghttp2_session_callbacks));
nghttp2_session_client_new2(&session, &callbacks, NULL, option); nghttp2_session_client_new2(&session, &callbacks, NULL, option);
CU_ASSERT(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_STREAM_WINDOW_UPDATE); CU_ASSERT(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE);
CU_ASSERT(!(session->opt_flags &
NGHTTP2_OPTMASK_NO_AUTO_CONNECTION_WINDOW_UPDATE));
nghttp2_session_del(session);
nghttp2_option_set_no_auto_stream_window_update(option, 0);
nghttp2_option_set_no_auto_connection_window_update(option, 1);
nghttp2_session_server_new2(&session, &callbacks, NULL, option);
CU_ASSERT(!(session->opt_flags &
NGHTTP2_OPTMASK_NO_AUTO_STREAM_WINDOW_UPDATE));
CU_ASSERT(session->opt_flags &
NGHTTP2_OPTMASK_NO_AUTO_CONNECTION_WINDOW_UPDATE);
nghttp2_session_del(session); nghttp2_session_del(session);
nghttp2_option_set_peer_max_concurrent_streams(option, 100); nghttp2_option_set_peer_max_concurrent_streams(option, 100);