Add connection-level flow control

This commit is contained in:
Tatsuhiro Tsujikawa 2013-07-16 20:54:24 +09:00
parent 3ed5c78a2c
commit d54cfb88ff
2 changed files with 173 additions and 34 deletions

View File

@ -671,22 +671,30 @@ static int nghttp2_session_predicate_window_update_send
}
/*
* Returns the maximum length of next data read. If the flow control
* is enabled, the return value takes into account the current window
* size.
* Returns the maximum length of next data read. If the
* connection-level and/or stream-wise flow control are enabled, the
* return value takes into account those current window sizes.
*/
static size_t nghttp2_session_next_data_read(nghttp2_session *session,
nghttp2_stream *stream)
{
/* TODO implement connection-level flow control here */
if(stream->remote_flow_control == 0) {
if(session->remote_flow_control == 0 && stream->remote_flow_control == 0) {
return NGHTTP2_DATA_PAYLOAD_LENGTH;
} else if(stream->window_size > 0) {
return stream->window_size < NGHTTP2_DATA_PAYLOAD_LENGTH ?
stream->window_size : NGHTTP2_DATA_PAYLOAD_LENGTH;
} else {
int32_t session_window_size =
session->remote_flow_control ? session->window_size : INT32_MAX;
int32_t stream_window_size =
stream->remote_flow_control ? stream->window_size : INT32_MAX;
int32_t window_size = nghttp2_min(session_window_size,
stream_window_size);
if(window_size > 0) {
return window_size < NGHTTP2_DATA_PAYLOAD_LENGTH ?
window_size : NGHTTP2_DATA_PAYLOAD_LENGTH;
} else {
return 0;
}
}
}
/*
@ -1282,10 +1290,14 @@ int nghttp2_session_send(nghttp2_session *session)
if(session->aob.item->frame_cat == NGHTTP2_CAT_DATA) {
nghttp2_data *frame;
nghttp2_stream *stream;
uint16_t len = nghttp2_get_uint16(&session->aob.framebuf[0]);
frame = nghttp2_outbound_item_get_data_frame(session->aob.item);
stream = nghttp2_session_get_stream(session, frame->hd.stream_id);
if(stream && stream->remote_flow_control) {
stream->window_size -= nghttp2_get_uint16(&session->aob.framebuf[0]);
stream->window_size -= len;
}
if(session->remote_flow_control) {
session->window_size -= len;
}
}
if(session->aob.framebufoff == session->aob.framebuflen) {
@ -1574,9 +1586,11 @@ static int nghttp2_update_initial_window_size_func(nghttp2_map_entry *entry,
arg->old_window_size);
/* If window size gets positive, push deferred DATA frame to
outbound queue. */
if(stream->window_size > 0 &&
stream->deferred_data &&
(stream->deferred_flags & NGHTTP2_DEFERRED_FLOW_CONTROL)) {
if(stream->deferred_data &&
(stream->deferred_flags & NGHTTP2_DEFERRED_FLOW_CONTROL) &&
stream->window_size > 0 &&
(arg->session->remote_flow_control == 0 ||
arg->session->window_size > 0)) {
int rv;
rv = nghttp2_pq_push(&arg->session->ob_pq, stream->deferred_data);
if(rv == 0) {
@ -1693,11 +1707,18 @@ int nghttp2_session_on_settings_received(nghttp2_session *session,
break;
case NGHTTP2_SETTINGS_FLOW_CONTROL_OPTIONS:
if(entry->value == 1) {
if(session->remote_settings[entry->settings_id] == 0) {
rv = nghttp2_session_disable_flow_control(session);
if(rv != 0) {
return rv;
}
}
} else if(session->remote_settings[entry->settings_id] == 1) {
/* Re-enabling flow control is subject to connection-level
error(?) */
return nghttp2_session_fail_session(session,
NGHTTP2_FLOW_CONTROL_ERROR);
}
break;
}
session->remote_settings[entry->settings_id] = entry->value;
@ -1728,16 +1749,76 @@ int nghttp2_session_on_goaway_received(nghttp2_session *session,
return 0;
}
static int nghttp2_push_back_deferred_data_func(nghttp2_map_entry *entry,
void *ptr)
{
nghttp2_session *session;
nghttp2_stream *stream;
session = (nghttp2_session*)ptr;
stream = (nghttp2_stream*)entry;
/* If DATA frame is deferred due to flow control, push it back to
outbound queue. */
if(stream->deferred_data &&
(stream->deferred_flags & NGHTTP2_DEFERRED_FLOW_CONTROL) &&
(stream->remote_flow_control == 0 || stream->window_size > 0)) {
int rv;
rv = nghttp2_pq_push(&session->ob_pq, stream->deferred_data);
if(rv == 0) {
nghttp2_stream_detach_deferred_data(stream);
} else {
/* FATAL */
assert(rv < NGHTTP2_ERR_FATAL);
return rv;
}
}
return 0;
}
/*
* Push back deferred DATA frames to queue if they are deferred due to
* connection-level flow control.
*/
static int nghttp2_session_push_back_deferred_data(nghttp2_session *session)
{
return nghttp2_map_each(&session->streams,
nghttp2_push_back_deferred_data_func, session);
}
int nghttp2_session_on_window_update_received(nghttp2_session *session,
nghttp2_frame *frame)
{
if(frame->hd.stream_id == 0) {
/* Handle connection-level flow control here */
/* Handle connection-level flow control */
if(session->remote_flow_control == 0) {
/* The sepc says receiving WINDOW_UPDATE from peer when flow
control is disabled is error, but disabling flow control and
receiving WINDOW_UPDATE are asynchronous, so it is hard to
determine that the peer is misbehaving or not without
measuring RTT. For now, we just ignore such frames. */
return 0;
}
if(INT32_MAX - frame->window_update.window_size_increment <
session->window_size) {
return nghttp2_session_fail_session
(session, NGHTTP2_FLOW_CONTROL_ERROR);
}
session->window_size += frame->window_update.window_size_increment;
nghttp2_session_call_on_frame_received(session, frame);
/* To queue the DATA deferred by connection-level flow-control, we
have to check all streams. Bad. */
if(session->window_size > 0) {
return nghttp2_session_push_back_deferred_data(session);
} else {
return 0;
}
} else {
nghttp2_stream *stream;
stream = nghttp2_session_get_stream(session, frame->hd.stream_id);
if(stream) {
if(stream->remote_flow_control == 0) {
/* Same reason with connection-level flow control */
return 0;
}
if(INT32_MAX - frame->window_update.window_size_increment <
stream->window_size) {
int r;
@ -1747,6 +1828,8 @@ int nghttp2_session_on_window_update_received(nghttp2_session *session,
} else {
stream->window_size += frame->window_update.window_size_increment;
if(stream->window_size > 0 &&
(session->remote_flow_control == 0 ||
session->window_size > 0) &&
stream->deferred_data != NULL &&
(stream->deferred_flags & NGHTTP2_DEFERRED_FLOW_CONTROL)) {
int r;
@ -2020,6 +2103,20 @@ static int nghttp2_session_process_data_frame(nghttp2_session *session)
}
}
static int32_t adjust_recv_window_size(int32_t recv_window_size, int32_t delta)
{
/* If NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE is set and the application
does not send WINDOW_UPDATE and the remote endpoint keeps
sending data, stream->recv_window_size will eventually
overflow. */
if(recv_window_size > INT32_MAX - delta) {
recv_window_size = INT32_MAX;
} else {
recv_window_size += delta;
}
return recv_window_size;
}
/*
* Accumulates received bytes |delta_size| and decides whether to send
* WINDOW_UPDATE. If NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE is set,
@ -2035,16 +2132,9 @@ static int nghttp2_session_update_recv_window_size(nghttp2_session *session,
nghttp2_stream *stream,
int32_t delta_size)
{
if(stream) {
/* If NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE is set and the application
does not send WINDOW_UPDATE and the remote endpoint keeps
sending data, stream->recv_window_size will eventually
overflow. */
if(stream->recv_window_size > INT32_MAX - delta_size) {
stream->recv_window_size = INT32_MAX;
} else {
stream->recv_window_size += delta_size;
}
if(stream && stream->local_flow_control) {
stream->recv_window_size = adjust_recv_window_size
(stream->recv_window_size, delta_size);
if(!(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE)) {
/* This is just a heuristics. */
/* We have to use local_settings here because it is the constraint
@ -2064,6 +2154,28 @@ static int nghttp2_session_update_recv_window_size(nghttp2_session *session,
}
}
}
if(session->local_flow_control) {
session->recv_window_size = adjust_recv_window_size
(session->recv_window_size, delta_size);
if(!(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE)) {
/* Same heuristics above */
if((size_t)session->recv_window_size*2 >=
NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE) {
int r;
/* Use stream ID 0 to update connection-level flow control
window */
r = nghttp2_session_add_window_update(session,
NGHTTP2_FLAG_NONE,
0,
session->recv_window_size);
if(r == 0) {
session->recv_window_size = 0;
} else {
return r;
}
}
}
}
return 0;
}
@ -2255,7 +2367,7 @@ ssize_t nghttp2_session_mem_recv(nghttp2_session *session,
(data_flags & NGHTTP2_FLAG_END_STREAM) == 0)) {
nghttp2_stream *stream;
stream = nghttp2_session_get_stream(session, data_stream_id);
if(stream->local_flow_control) {
if(session->local_flow_control || stream->local_flow_control) {
r = nghttp2_session_update_recv_window_size(session,
stream,
readlen);

View File

@ -1894,11 +1894,19 @@ void test_nghttp2_session_flow_control(void)
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(64*1024 == ud.data_source_length);
/* Back 32KiB */
/* Back 32KiB in stream window */
nghttp2_frame_window_update_init(&frame.window_update, NGHTTP2_FLAG_NONE,
1, 32*1024);
nghttp2_session_on_window_update_received(session, &frame);
/* Send nothing because of connection-level window */
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(64*1024 == ud.data_source_length);
/* Back 32KiB in connection-level window */
frame.hd.stream_id = 0;
nghttp2_session_on_window_update_received(session, &frame);
/* Sends another 32KiB data */
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(32*1024 == ud.data_source_length);
@ -1914,18 +1922,26 @@ void test_nghttp2_session_flow_control(void)
new_initial_window_size;
CU_ASSERT(-48*1024 == stream->window_size);
/* Back 48KiB */
/* Back 48KiB to stream window */
frame.hd.stream_id = 1;
frame.window_update.window_size_increment = 48*1024;
nghttp2_session_on_window_update_received(session, &frame);
/* Nothing is sent because window_size is less than 0 */
/* Nothing is sent because window_size is 0 */
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(32*1024 == ud.data_source_length);
/* Back 16KiB */
/* Back 16KiB in stream window */
frame.hd.stream_id = 1;
frame.window_update.window_size_increment = 16*1024;
nghttp2_session_on_window_update_received(session, &frame);
/* Back 24KiB in connection-level window */
frame.hd.stream_id = 0;
frame.window_update.window_size_increment = 24*1024;
nghttp2_session_on_window_update_received(session, &frame);
/* Sends another 16KiB data */
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(16*1024 == ud.data_source_length);
@ -1938,7 +1954,16 @@ void test_nghttp2_session_flow_control(void)
nghttp2_session_on_settings_received(session, &settings_frame);
nghttp2_frame_settings_free(&settings_frame.settings);
/* Sends another 16KiB data */
/* Sends another 8KiB data */
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(8*1024 == ud.data_source_length);
/* Back 8KiB in connection-level window */
frame.hd.stream_id = 0;
frame.window_update.window_size_increment = 8*1024;
nghttp2_session_on_window_update_received(session, &frame);
/* Sends last 8KiB data */
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(0 == ud.data_source_length);
CU_ASSERT(nghttp2_session_get_stream(session, 1)->shut_flags &
@ -1981,10 +2006,12 @@ void test_nghttp2_session_data_read_temporal_failure(void)
data_frame->data_prd.read_callback =
temporal_failure_data_source_read_callback;
/* Back 64KiB */
/* Back 64KiB to both connection-level and stream-wise window */
nghttp2_frame_window_update_init(&frame.window_update, NGHTTP2_FLAG_NONE,
1, 64*1024);
nghttp2_session_on_window_update_received(session, &frame);
frame.hd.stream_id = 0;
nghttp2_session_on_window_update_received(session, &frame);
nghttp2_frame_window_update_free(&frame.window_update);
/* Sending data will fail (soft fail) and treated as stream error */