Avoid excessive WINDOW_UPDATE queuing

This commit is contained in:
Tatsuhiro Tsujikawa 2015-10-01 01:19:57 +09:00
parent 2aa8d9642c
commit cea76226b1
5 changed files with 262 additions and 54 deletions

View File

@ -733,7 +733,18 @@ int nghttp2_session_add_item(nghttp2_session *session,
if (stream) { if (stream) {
stream->state = NGHTTP2_STREAM_CLOSING; stream->state = NGHTTP2_STREAM_CLOSING;
} }
/* fall through */ nghttp2_outbound_queue_push(&session->ob_reg, item);
item->queued = 1;
break;
case NGHTTP2_WINDOW_UPDATE:
if (stream) {
stream->window_update_queued = 1;
} else if (frame->hd.stream_id == 0) {
session->window_update_queued = 1;
}
nghttp2_outbound_queue_push(&session->ob_reg, item);
item->queued = 1;
break;
default: default:
nghttp2_outbound_queue_push(&session->ob_reg, item); nghttp2_outbound_queue_push(&session->ob_reg, item);
item->queued = 1; item->queued = 1;
@ -2206,6 +2217,21 @@ static void reschedule_stream(nghttp2_stream *stream) {
nghttp2_stream_reschedule(stream); nghttp2_stream_reschedule(stream);
} }
static int session_update_stream_consumed_size(nghttp2_session *session,
nghttp2_stream *stream,
size_t delta_size);
static int session_update_connection_consumed_size(nghttp2_session *session,
size_t delta_size);
static int session_update_recv_connection_window_size(nghttp2_session *session,
size_t delta_size);
static int session_update_recv_stream_window_size(nghttp2_session *session,
nghttp2_stream *stream,
size_t delta_size,
int send_window_update);
/* /*
* Called after a frame is sent. This function runs * Called after a frame is sent. This function runs
* on_frame_send_callback and handles stream closure upon END_STREAM * on_frame_send_callback and handles stream closure upon END_STREAM
@ -2367,6 +2393,44 @@ static int session_after_frame_sent1(nghttp2_session *session) {
break; break;
} }
case NGHTTP2_WINDOW_UPDATE:
rv = 0;
if (frame->hd.stream_id == 0) {
session->window_update_queued = 0;
if (session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE) {
rv = session_update_connection_consumed_size(session, 0);
} else {
rv = session_update_recv_connection_window_size(session, 0);
}
} else {
nghttp2_stream *stream;
stream = nghttp2_session_get_stream(session, frame->hd.stream_id);
if (!stream) {
break;
}
stream->window_update_queued = 0;
/* We don't have to send WINDOW_UPDATE if END_STREAM from peer
is seen. */
if (stream->shut_flags & NGHTTP2_SHUT_RD) {
break;
}
if (session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE) {
rv = session_update_stream_consumed_size(session, stream, 0);
} else {
rv = session_update_recv_stream_window_size(session, stream, 0, 1);
}
}
if (nghttp2_is_fatal(rv)) {
return rv;
}
break;
default: default:
break; break;
} }
@ -3755,9 +3819,9 @@ static int update_local_initial_window_size_func(nghttp2_map_entry *entry,
return nghttp2_session_terminate_session(arg->session, return nghttp2_session_terminate_session(arg->session,
NGHTTP2_FLOW_CONTROL_ERROR); NGHTTP2_FLOW_CONTROL_ERROR);
} }
if (!(arg->session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE)) { if (!(arg->session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE) &&
stream->window_update_queued == 0 &&
if (nghttp2_should_send_window_update(stream->local_window_size, nghttp2_should_send_window_update(stream->local_window_size,
stream->recv_window_size)) { stream->recv_window_size)) {
rv = nghttp2_session_add_window_update(arg->session, NGHTTP2_FLAG_NONE, rv = nghttp2_session_add_window_update(arg->session, NGHTTP2_FLAG_NONE,
@ -3766,9 +3830,9 @@ static int update_local_initial_window_size_func(nghttp2_map_entry *entry,
if (rv != 0) { if (rv != 0) {
return rv; return rv;
} }
stream->recv_window_size = 0; stream->recv_window_size = 0;
} }
}
return 0; return 0;
} }
@ -4413,21 +4477,21 @@ static int session_update_recv_stream_window_size(nghttp2_session *session,
} }
/* We don't have to send WINDOW_UPDATE if the data received is the /* We don't have to send WINDOW_UPDATE if the data received is the
last chunk in the incoming stream. */ last chunk in the incoming stream. */
if (send_window_update &&
!(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE)) {
/* We have to use local_settings here because it is the constraint /* We have to use local_settings here because it is the constraint
the remote endpoint should honor. */ the remote endpoint should honor. */
if (nghttp2_should_send_window_update(stream->local_window_size, if (send_window_update &&
!(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE) &&
stream->window_update_queued == 0 &&
nghttp2_should_send_window_update(stream->local_window_size,
stream->recv_window_size)) { stream->recv_window_size)) {
rv = nghttp2_session_add_window_update(session, NGHTTP2_FLAG_NONE, rv = nghttp2_session_add_window_update(session, NGHTTP2_FLAG_NONE,
stream->stream_id, stream->stream_id,
stream->recv_window_size); stream->recv_window_size);
if (rv == 0) { if (rv != 0) {
stream->recv_window_size = 0;
} else {
return rv; return rv;
} }
}
stream->recv_window_size = 0;
} }
return 0; return 0;
} }
@ -4453,9 +4517,9 @@ static int session_update_recv_connection_window_size(nghttp2_session *session,
return nghttp2_session_terminate_session(session, return nghttp2_session_terminate_session(session,
NGHTTP2_FLOW_CONTROL_ERROR); NGHTTP2_FLOW_CONTROL_ERROR);
} }
if (!(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE)) { if (!(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE) &&
session->window_update_queued == 0 &&
if (nghttp2_should_send_window_update(session->local_window_size, nghttp2_should_send_window_update(session->local_window_size,
session->recv_window_size)) { session->recv_window_size)) {
/* Use stream ID 0 to update connection-level flow control /* Use stream ID 0 to update connection-level flow control
window */ window */
@ -4467,13 +4531,13 @@ static int session_update_recv_connection_window_size(nghttp2_session *session,
session->recv_window_size = 0; session->recv_window_size = 0;
} }
}
return 0; return 0;
} }
static int session_update_consumed_size(nghttp2_session *session, static int session_update_consumed_size(nghttp2_session *session,
int32_t *consumed_size_ptr, int32_t *consumed_size_ptr,
int32_t *recv_window_size_ptr, int32_t *recv_window_size_ptr,
uint8_t window_update_queued,
int32_t stream_id, size_t delta_size, int32_t stream_id, size_t delta_size,
int32_t local_window_size) { int32_t local_window_size) {
int32_t recv_size; int32_t recv_size;
@ -4486,6 +4550,7 @@ static int session_update_consumed_size(nghttp2_session *session,
*consumed_size_ptr += (int32_t)delta_size; *consumed_size_ptr += (int32_t)delta_size;
if (window_update_queued == 0) {
/* recv_window_size may be smaller than consumed_size, because it /* recv_window_size may be smaller than consumed_size, because it
may be decreased by negative value with may be decreased by negative value with
nghttp2_submit_window_update(). */ nghttp2_submit_window_update(). */
@ -4502,6 +4567,7 @@ static int session_update_consumed_size(nghttp2_session *session,
*recv_window_size_ptr -= recv_size; *recv_window_size_ptr -= recv_size;
*consumed_size_ptr -= recv_size; *consumed_size_ptr -= recv_size;
} }
}
return 0; return 0;
} }
@ -4511,14 +4577,15 @@ static int session_update_stream_consumed_size(nghttp2_session *session,
size_t delta_size) { size_t delta_size) {
return session_update_consumed_size( return session_update_consumed_size(
session, &stream->consumed_size, &stream->recv_window_size, session, &stream->consumed_size, &stream->recv_window_size,
stream->stream_id, delta_size, stream->local_window_size); stream->window_update_queued, stream->stream_id, delta_size,
stream->local_window_size);
} }
static int session_update_connection_consumed_size(nghttp2_session *session, static int session_update_connection_consumed_size(nghttp2_session *session,
size_t delta_size) { size_t delta_size) {
return session_update_consumed_size(session, &session->consumed_size, return session_update_consumed_size(
&session->recv_window_size, 0, delta_size, session, &session->consumed_size, &session->recv_window_size,
session->local_window_size); session->window_update_queued, 0, delta_size, session->local_window_size);
} }
/* /*

View File

@ -291,6 +291,10 @@ struct nghttp2_session {
/* Flags indicating GOAWAY is sent and/or recieved. The flags are /* Flags indicating GOAWAY is sent and/or recieved. The flags are
composed by bitwise OR-ing nghttp2_goaway_flag. */ composed by bitwise OR-ing nghttp2_goaway_flag. */
uint8_t goaway_flags; uint8_t goaway_flags;
/* This flag is used to reduce excessive queuing of WINDOW_UPDATE to
this session. The nonzero does not necessarily mean
WINDOW_UPDATE is not queued. */
uint8_t window_update_queued;
}; };
/* Struct used when updating initial window size of each active /* Struct used when updating initial window size of each active

View File

@ -58,6 +58,7 @@ void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id,
stream->recv_window_size = 0; stream->recv_window_size = 0;
stream->consumed_size = 0; stream->consumed_size = 0;
stream->recv_reduction = 0; stream->recv_reduction = 0;
stream->window_update_queued = 0;
stream->dep_prev = NULL; stream->dep_prev = NULL;
stream->dep_next = NULL; stream->dep_next = NULL;

View File

@ -206,6 +206,10 @@ struct nghttp2_stream {
uint64_t cycle; uint64_t cycle;
/* Last written length of frame payload */ /* Last written length of frame payload */
size_t last_writelen; size_t last_writelen;
/* This flag is used to reduce excessive queuing of WINDOW_UPDATE to
this stream. The nonzero does not necessarily mean WINDOW_UPDATE
is not queued. */
uint8_t window_update_queued;
}; };
void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id, void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id,

View File

@ -690,6 +690,7 @@ void test_nghttp2_session_recv_data(void) {
callbacks.send_callback = null_send_callback; callbacks.send_callback = null_send_callback;
callbacks.on_data_chunk_recv_callback = on_data_chunk_recv_callback; callbacks.on_data_chunk_recv_callback = on_data_chunk_recv_callback;
callbacks.on_frame_recv_callback = on_frame_recv_callback; callbacks.on_frame_recv_callback = on_frame_recv_callback;
callbacks.on_frame_send_callback = on_frame_send_callback;
nghttp2_session_client_new(&session, &callbacks, &ud); nghttp2_session_client_new(&session, &callbacks, &ud);
@ -797,6 +798,68 @@ void test_nghttp2_session_recv_data(void) {
CU_ASSERT(NGHTTP2_PROTOCOL_ERROR == item->frame.goaway.error_code); CU_ASSERT(NGHTTP2_PROTOCOL_ERROR == item->frame.goaway.error_code);
nghttp2_session_del(session); nghttp2_session_del(session);
/* Check window_update_queued flag in both session and stream */
nghttp2_session_server_new(&session, &callbacks, &ud);
hd.length = 4096;
hd.type = NGHTTP2_DATA;
hd.flags = NGHTTP2_FLAG_NONE;
hd.stream_id = 1;
nghttp2_frame_pack_frame_hd(data, &hd);
stream = open_stream(session, 1);
/* Send 32767 bytes of DATA. In our current flow control algorithm,
it triggers first WINDOW_UPDATE of window_size_increment
32767. */
for (i = 0; i < 7; ++i) {
rv = nghttp2_session_mem_recv(session, data, NGHTTP2_FRAME_HDLEN + 4096);
CU_ASSERT(NGHTTP2_FRAME_HDLEN + 4096 == rv);
}
hd.length = 4095;
nghttp2_frame_pack_frame_hd(data, &hd);
rv = nghttp2_session_mem_recv(session, data, NGHTTP2_FRAME_HDLEN + 4095);
CU_ASSERT(NGHTTP2_FRAME_HDLEN + 4095 == rv);
/* Now 2 WINDOW_UPDATEs for session and stream should be queued. */
CU_ASSERT(0 == stream->recv_window_size);
CU_ASSERT(0 == session->recv_window_size);
CU_ASSERT(1 == stream->window_update_queued);
CU_ASSERT(1 == session->window_update_queued);
/* Then send 32768 bytes of DATA. Since we have not sent queued
WINDOW_UDPATE frame, recv_window_size should not be decreased */
hd.length = 4096;
nghttp2_frame_pack_frame_hd(data, &hd);
for (i = 0; i < 8; ++i) {
rv = nghttp2_session_mem_recv(session, data, NGHTTP2_FRAME_HDLEN + 4096);
CU_ASSERT(NGHTTP2_FRAME_HDLEN + 4096 == rv);
}
/* WINDOW_UPDATE is blocked for session and stream, so
recv_window_size must not be decreased. */
CU_ASSERT(32768 == stream->recv_window_size);
CU_ASSERT(32768 == session->recv_window_size);
CU_ASSERT(1 == stream->window_update_queued);
CU_ASSERT(1 == session->window_update_queued);
ud.frame_send_cb_called = 0;
/* This sends queued WINDOW_UPDATES. And then check
recv_window_size, and queue WINDOW_UPDATEs for both session and
stream, and send them at once. */
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(4 == ud.frame_send_cb_called);
CU_ASSERT(0 == stream->recv_window_size);
CU_ASSERT(0 == session->recv_window_size);
CU_ASSERT(0 == stream->window_update_queued);
CU_ASSERT(0 == session->window_update_queued);
nghttp2_session_del(session);
} }
void test_nghttp2_session_recv_data_no_auto_flow_control(void) { void test_nghttp2_session_recv_data_no_auto_flow_control(void) {
@ -810,9 +873,11 @@ void test_nghttp2_session_recv_data_no_auto_flow_control(void) {
ssize_t rv; ssize_t rv;
size_t sendlen; size_t sendlen;
nghttp2_stream *stream; nghttp2_stream *stream;
size_t i;
memset(&callbacks, 0, sizeof(nghttp2_session_callbacks)); memset(&callbacks, 0, sizeof(nghttp2_session_callbacks));
callbacks.send_callback = null_send_callback; callbacks.send_callback = null_send_callback;
callbacks.on_frame_send_callback = on_frame_send_callback;
nghttp2_option_new(&option); nghttp2_option_new(&option);
nghttp2_option_set_no_auto_window_update(option, 1); nghttp2_option_set_no_auto_window_update(option, 1);
@ -873,6 +938,73 @@ void test_nghttp2_session_recv_data_no_auto_flow_control(void) {
was not honored. */ was not honored. */
CU_ASSERT((int32_t)hd.length == session->consumed_size); CU_ASSERT((int32_t)hd.length == session->consumed_size);
nghttp2_session_del(session);
/* Check window_update_queued flag in both session and stream */
nghttp2_session_server_new2(&session, &callbacks, &ud, option);
stream = open_stream(session, 1);
hd.length = 4096;
hd.type = NGHTTP2_DATA;
hd.flags = NGHTTP2_FLAG_NONE;
hd.stream_id = 1;
nghttp2_frame_pack_frame_hd(data, &hd);
/* Receive up to 65535 bytes of DATA */
for (i = 0; i < 15; ++i) {
rv = nghttp2_session_mem_recv(session, data, NGHTTP2_FRAME_HDLEN + 4096);
CU_ASSERT(NGHTTP2_FRAME_HDLEN + 4096 == rv);
}
hd.length = 4095;
nghttp2_frame_pack_frame_hd(data, &hd);
rv = nghttp2_session_mem_recv(session, data, NGHTTP2_FRAME_HDLEN + 4095);
CU_ASSERT(NGHTTP2_FRAME_HDLEN + 4095 == rv);
CU_ASSERT(65535 == session->recv_window_size);
CU_ASSERT(65535 == stream->recv_window_size);
/* The first call of nghttp2_session_consume_connection() will queue
WINDOW_UPDATE. Next call does not. */
nghttp2_session_consume_connection(session, 32767);
nghttp2_session_consume_connection(session, 32768);
CU_ASSERT(32768 == session->recv_window_size);
CU_ASSERT(65535 == stream->recv_window_size);
CU_ASSERT(1 == session->window_update_queued);
CU_ASSERT(0 == stream->window_update_queued);
ud.frame_send_cb_called = 0;
/* This will send WINDOW_UPDATE, and check whether we should send
WINDOW_UPDATE, and queue and send it at once. */
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(0 == session->recv_window_size);
CU_ASSERT(65535 == stream->recv_window_size);
CU_ASSERT(0 == session->window_update_queued);
CU_ASSERT(0 == stream->window_update_queued);
CU_ASSERT(2 == ud.frame_send_cb_called);
/* Do the same for stream */
nghttp2_session_consume_stream(session, 1, 32767);
nghttp2_session_consume_stream(session, 1, 32768);
CU_ASSERT(0 == session->recv_window_size);
CU_ASSERT(32768 == stream->recv_window_size);
CU_ASSERT(0 == session->window_update_queued);
CU_ASSERT(1 == stream->window_update_queued);
ud.frame_send_cb_called = 0;
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(0 == session->recv_window_size);
CU_ASSERT(0 == stream->recv_window_size);
CU_ASSERT(0 == session->window_update_queued);
CU_ASSERT(0 == stream->window_update_queued);
CU_ASSERT(2 == ud.frame_send_cb_called);
nghttp2_session_del(session); nghttp2_session_del(session);
nghttp2_option_del(option); nghttp2_option_del(option);
} }