From cea76226b1367e6bbd05f3a07b5a9ec6a7e20371 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Thu, 1 Oct 2015 01:19:57 +0900 Subject: [PATCH] Avoid excessive WINDOW_UPDATE queuing --- lib/nghttp2_session.c | 175 ++++++++++++++++++++++++----------- lib/nghttp2_session.h | 4 + lib/nghttp2_stream.c | 1 + lib/nghttp2_stream.h | 4 + tests/nghttp2_session_test.c | 132 ++++++++++++++++++++++++++ 5 files changed, 262 insertions(+), 54 deletions(-) diff --git a/lib/nghttp2_session.c b/lib/nghttp2_session.c index 540039fe..4b39ef4c 100644 --- a/lib/nghttp2_session.c +++ b/lib/nghttp2_session.c @@ -733,7 +733,18 @@ int nghttp2_session_add_item(nghttp2_session *session, if (stream) { stream->state = NGHTTP2_STREAM_CLOSING; } - /* fall through */ + nghttp2_outbound_queue_push(&session->ob_reg, item); + item->queued = 1; + break; + case NGHTTP2_WINDOW_UPDATE: + if (stream) { + stream->window_update_queued = 1; + } else if (frame->hd.stream_id == 0) { + session->window_update_queued = 1; + } + nghttp2_outbound_queue_push(&session->ob_reg, item); + item->queued = 1; + break; default: nghttp2_outbound_queue_push(&session->ob_reg, item); item->queued = 1; @@ -2206,6 +2217,21 @@ static void reschedule_stream(nghttp2_stream *stream) { nghttp2_stream_reschedule(stream); } +static int session_update_stream_consumed_size(nghttp2_session *session, + nghttp2_stream *stream, + size_t delta_size); + +static int session_update_connection_consumed_size(nghttp2_session *session, + size_t delta_size); + +static int session_update_recv_connection_window_size(nghttp2_session *session, + size_t delta_size); + +static int session_update_recv_stream_window_size(nghttp2_session *session, + nghttp2_stream *stream, + size_t delta_size, + int send_window_update); + /* * Called after a frame is sent. This function runs * on_frame_send_callback and handles stream closure upon END_STREAM @@ -2367,6 +2393,44 @@ static int session_after_frame_sent1(nghttp2_session *session) { break; } + case NGHTTP2_WINDOW_UPDATE: + rv = 0; + + if (frame->hd.stream_id == 0) { + session->window_update_queued = 0; + if (session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE) { + rv = session_update_connection_consumed_size(session, 0); + } else { + rv = session_update_recv_connection_window_size(session, 0); + } + } else { + nghttp2_stream *stream; + + stream = nghttp2_session_get_stream(session, frame->hd.stream_id); + if (!stream) { + break; + } + + stream->window_update_queued = 0; + + /* We don't have to send WINDOW_UPDATE if END_STREAM from peer + is seen. */ + if (stream->shut_flags & NGHTTP2_SHUT_RD) { + break; + } + + if (session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE) { + rv = session_update_stream_consumed_size(session, stream, 0); + } else { + rv = session_update_recv_stream_window_size(session, stream, 0, 1); + } + } + + if (nghttp2_is_fatal(rv)) { + return rv; + } + + break; default: break; } @@ -3755,19 +3819,19 @@ static int update_local_initial_window_size_func(nghttp2_map_entry *entry, return nghttp2_session_terminate_session(arg->session, NGHTTP2_FLOW_CONTROL_ERROR); } - if (!(arg->session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE)) { + if (!(arg->session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE) && + stream->window_update_queued == 0 && + nghttp2_should_send_window_update(stream->local_window_size, + stream->recv_window_size)) { - if (nghttp2_should_send_window_update(stream->local_window_size, - stream->recv_window_size)) { - - rv = nghttp2_session_add_window_update(arg->session, NGHTTP2_FLAG_NONE, - stream->stream_id, - stream->recv_window_size); - if (rv != 0) { - return rv; - } - stream->recv_window_size = 0; + rv = nghttp2_session_add_window_update(arg->session, NGHTTP2_FLAG_NONE, + stream->stream_id, + stream->recv_window_size); + if (rv != 0) { + return rv; } + + stream->recv_window_size = 0; } return 0; } @@ -4413,21 +4477,21 @@ static int session_update_recv_stream_window_size(nghttp2_session *session, } /* We don't have to send WINDOW_UPDATE if the data received is the last chunk in the incoming stream. */ + /* We have to use local_settings here because it is the constraint + the remote endpoint should honor. */ if (send_window_update && - !(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE)) { - /* We have to use local_settings here because it is the constraint - the remote endpoint should honor. */ - if (nghttp2_should_send_window_update(stream->local_window_size, - stream->recv_window_size)) { - rv = nghttp2_session_add_window_update(session, NGHTTP2_FLAG_NONE, - stream->stream_id, - stream->recv_window_size); - if (rv == 0) { - stream->recv_window_size = 0; - } else { - return rv; - } + !(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE) && + stream->window_update_queued == 0 && + nghttp2_should_send_window_update(stream->local_window_size, + stream->recv_window_size)) { + rv = nghttp2_session_add_window_update(session, NGHTTP2_FLAG_NONE, + stream->stream_id, + stream->recv_window_size); + if (rv != 0) { + return rv; } + + stream->recv_window_size = 0; } return 0; } @@ -4453,20 +4517,19 @@ static int session_update_recv_connection_window_size(nghttp2_session *session, return nghttp2_session_terminate_session(session, NGHTTP2_FLOW_CONTROL_ERROR); } - if (!(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE)) { - - if (nghttp2_should_send_window_update(session->local_window_size, - session->recv_window_size)) { - /* Use stream ID 0 to update connection-level flow control - window */ - rv = nghttp2_session_add_window_update(session, NGHTTP2_FLAG_NONE, 0, - session->recv_window_size); - if (rv != 0) { - return rv; - } - - session->recv_window_size = 0; + if (!(session->opt_flags & NGHTTP2_OPTMASK_NO_AUTO_WINDOW_UPDATE) && + session->window_update_queued == 0 && + nghttp2_should_send_window_update(session->local_window_size, + session->recv_window_size)) { + /* Use stream ID 0 to update connection-level flow control + window */ + rv = nghttp2_session_add_window_update(session, NGHTTP2_FLAG_NONE, 0, + session->recv_window_size); + if (rv != 0) { + return rv; } + + session->recv_window_size = 0; } return 0; } @@ -4474,6 +4537,7 @@ static int session_update_recv_connection_window_size(nghttp2_session *session, static int session_update_consumed_size(nghttp2_session *session, int32_t *consumed_size_ptr, int32_t *recv_window_size_ptr, + uint8_t window_update_queued, int32_t stream_id, size_t delta_size, int32_t local_window_size) { int32_t recv_size; @@ -4486,21 +4550,23 @@ static int session_update_consumed_size(nghttp2_session *session, *consumed_size_ptr += (int32_t)delta_size; - /* recv_window_size may be smaller than consumed_size, because it - may be decreased by negative value with - nghttp2_submit_window_update(). */ - recv_size = nghttp2_min(*consumed_size_ptr, *recv_window_size_ptr); + if (window_update_queued == 0) { + /* recv_window_size may be smaller than consumed_size, because it + may be decreased by negative value with + nghttp2_submit_window_update(). */ + recv_size = nghttp2_min(*consumed_size_ptr, *recv_window_size_ptr); - if (nghttp2_should_send_window_update(local_window_size, recv_size)) { - rv = nghttp2_session_add_window_update(session, NGHTTP2_FLAG_NONE, - stream_id, recv_size); + if (nghttp2_should_send_window_update(local_window_size, recv_size)) { + rv = nghttp2_session_add_window_update(session, NGHTTP2_FLAG_NONE, + stream_id, recv_size); - if (rv != 0) { - return rv; + if (rv != 0) { + return rv; + } + + *recv_window_size_ptr -= recv_size; + *consumed_size_ptr -= recv_size; } - - *recv_window_size_ptr -= recv_size; - *consumed_size_ptr -= recv_size; } return 0; @@ -4511,14 +4577,15 @@ static int session_update_stream_consumed_size(nghttp2_session *session, size_t delta_size) { return session_update_consumed_size( session, &stream->consumed_size, &stream->recv_window_size, - stream->stream_id, delta_size, stream->local_window_size); + stream->window_update_queued, stream->stream_id, delta_size, + stream->local_window_size); } static int session_update_connection_consumed_size(nghttp2_session *session, size_t delta_size) { - return session_update_consumed_size(session, &session->consumed_size, - &session->recv_window_size, 0, delta_size, - session->local_window_size); + return session_update_consumed_size( + session, &session->consumed_size, &session->recv_window_size, + session->window_update_queued, 0, delta_size, session->local_window_size); } /* diff --git a/lib/nghttp2_session.h b/lib/nghttp2_session.h index a6c2d45e..67860b20 100644 --- a/lib/nghttp2_session.h +++ b/lib/nghttp2_session.h @@ -291,6 +291,10 @@ struct nghttp2_session { /* Flags indicating GOAWAY is sent and/or recieved. The flags are composed by bitwise OR-ing nghttp2_goaway_flag. */ uint8_t goaway_flags; + /* This flag is used to reduce excessive queuing of WINDOW_UPDATE to + this session. The nonzero does not necessarily mean + WINDOW_UPDATE is not queued. */ + uint8_t window_update_queued; }; /* Struct used when updating initial window size of each active diff --git a/lib/nghttp2_stream.c b/lib/nghttp2_stream.c index e8dfd0c2..3a5e4ad0 100644 --- a/lib/nghttp2_stream.c +++ b/lib/nghttp2_stream.c @@ -58,6 +58,7 @@ void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id, stream->recv_window_size = 0; stream->consumed_size = 0; stream->recv_reduction = 0; + stream->window_update_queued = 0; stream->dep_prev = NULL; stream->dep_next = NULL; diff --git a/lib/nghttp2_stream.h b/lib/nghttp2_stream.h index a464f68c..c8aca99a 100644 --- a/lib/nghttp2_stream.h +++ b/lib/nghttp2_stream.h @@ -206,6 +206,10 @@ struct nghttp2_stream { uint64_t cycle; /* Last written length of frame payload */ size_t last_writelen; + /* This flag is used to reduce excessive queuing of WINDOW_UPDATE to + this stream. The nonzero does not necessarily mean WINDOW_UPDATE + is not queued. */ + uint8_t window_update_queued; }; void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id, diff --git a/tests/nghttp2_session_test.c b/tests/nghttp2_session_test.c index 2e0eed34..70d1c0d8 100644 --- a/tests/nghttp2_session_test.c +++ b/tests/nghttp2_session_test.c @@ -690,6 +690,7 @@ void test_nghttp2_session_recv_data(void) { callbacks.send_callback = null_send_callback; callbacks.on_data_chunk_recv_callback = on_data_chunk_recv_callback; callbacks.on_frame_recv_callback = on_frame_recv_callback; + callbacks.on_frame_send_callback = on_frame_send_callback; nghttp2_session_client_new(&session, &callbacks, &ud); @@ -797,6 +798,68 @@ void test_nghttp2_session_recv_data(void) { CU_ASSERT(NGHTTP2_PROTOCOL_ERROR == item->frame.goaway.error_code); nghttp2_session_del(session); + + /* Check window_update_queued flag in both session and stream */ + nghttp2_session_server_new(&session, &callbacks, &ud); + + hd.length = 4096; + hd.type = NGHTTP2_DATA; + hd.flags = NGHTTP2_FLAG_NONE; + hd.stream_id = 1; + nghttp2_frame_pack_frame_hd(data, &hd); + + stream = open_stream(session, 1); + + /* Send 32767 bytes of DATA. In our current flow control algorithm, + it triggers first WINDOW_UPDATE of window_size_increment + 32767. */ + for (i = 0; i < 7; ++i) { + rv = nghttp2_session_mem_recv(session, data, NGHTTP2_FRAME_HDLEN + 4096); + CU_ASSERT(NGHTTP2_FRAME_HDLEN + 4096 == rv); + } + + hd.length = 4095; + nghttp2_frame_pack_frame_hd(data, &hd); + rv = nghttp2_session_mem_recv(session, data, NGHTTP2_FRAME_HDLEN + 4095); + CU_ASSERT(NGHTTP2_FRAME_HDLEN + 4095 == rv); + + /* Now 2 WINDOW_UPDATEs for session and stream should be queued. */ + CU_ASSERT(0 == stream->recv_window_size); + CU_ASSERT(0 == session->recv_window_size); + CU_ASSERT(1 == stream->window_update_queued); + CU_ASSERT(1 == session->window_update_queued); + + /* Then send 32768 bytes of DATA. Since we have not sent queued + WINDOW_UDPATE frame, recv_window_size should not be decreased */ + hd.length = 4096; + nghttp2_frame_pack_frame_hd(data, &hd); + + for (i = 0; i < 8; ++i) { + rv = nghttp2_session_mem_recv(session, data, NGHTTP2_FRAME_HDLEN + 4096); + CU_ASSERT(NGHTTP2_FRAME_HDLEN + 4096 == rv); + } + + /* WINDOW_UPDATE is blocked for session and stream, so + recv_window_size must not be decreased. */ + CU_ASSERT(32768 == stream->recv_window_size); + CU_ASSERT(32768 == session->recv_window_size); + CU_ASSERT(1 == stream->window_update_queued); + CU_ASSERT(1 == session->window_update_queued); + + ud.frame_send_cb_called = 0; + + /* This sends queued WINDOW_UPDATES. And then check + recv_window_size, and queue WINDOW_UPDATEs for both session and + stream, and send them at once. */ + CU_ASSERT(0 == nghttp2_session_send(session)); + + CU_ASSERT(4 == ud.frame_send_cb_called); + CU_ASSERT(0 == stream->recv_window_size); + CU_ASSERT(0 == session->recv_window_size); + CU_ASSERT(0 == stream->window_update_queued); + CU_ASSERT(0 == session->window_update_queued); + + nghttp2_session_del(session); } void test_nghttp2_session_recv_data_no_auto_flow_control(void) { @@ -810,9 +873,11 @@ void test_nghttp2_session_recv_data_no_auto_flow_control(void) { ssize_t rv; size_t sendlen; nghttp2_stream *stream; + size_t i; memset(&callbacks, 0, sizeof(nghttp2_session_callbacks)); callbacks.send_callback = null_send_callback; + callbacks.on_frame_send_callback = on_frame_send_callback; nghttp2_option_new(&option); nghttp2_option_set_no_auto_window_update(option, 1); @@ -873,6 +938,73 @@ void test_nghttp2_session_recv_data_no_auto_flow_control(void) { was not honored. */ CU_ASSERT((int32_t)hd.length == session->consumed_size); + nghttp2_session_del(session); + + /* Check window_update_queued flag in both session and stream */ + nghttp2_session_server_new2(&session, &callbacks, &ud, option); + + stream = open_stream(session, 1); + + hd.length = 4096; + hd.type = NGHTTP2_DATA; + hd.flags = NGHTTP2_FLAG_NONE; + hd.stream_id = 1; + nghttp2_frame_pack_frame_hd(data, &hd); + + /* Receive up to 65535 bytes of DATA */ + for (i = 0; i < 15; ++i) { + rv = nghttp2_session_mem_recv(session, data, NGHTTP2_FRAME_HDLEN + 4096); + CU_ASSERT(NGHTTP2_FRAME_HDLEN + 4096 == rv); + } + + hd.length = 4095; + nghttp2_frame_pack_frame_hd(data, &hd); + + rv = nghttp2_session_mem_recv(session, data, NGHTTP2_FRAME_HDLEN + 4095); + CU_ASSERT(NGHTTP2_FRAME_HDLEN + 4095 == rv); + + CU_ASSERT(65535 == session->recv_window_size); + CU_ASSERT(65535 == stream->recv_window_size); + + /* The first call of nghttp2_session_consume_connection() will queue + WINDOW_UPDATE. Next call does not. */ + nghttp2_session_consume_connection(session, 32767); + nghttp2_session_consume_connection(session, 32768); + + CU_ASSERT(32768 == session->recv_window_size); + CU_ASSERT(65535 == stream->recv_window_size); + CU_ASSERT(1 == session->window_update_queued); + CU_ASSERT(0 == stream->window_update_queued); + + ud.frame_send_cb_called = 0; + + /* This will send WINDOW_UPDATE, and check whether we should send + WINDOW_UPDATE, and queue and send it at once. */ + CU_ASSERT(0 == nghttp2_session_send(session)); + CU_ASSERT(0 == session->recv_window_size); + CU_ASSERT(65535 == stream->recv_window_size); + CU_ASSERT(0 == session->window_update_queued); + CU_ASSERT(0 == stream->window_update_queued); + CU_ASSERT(2 == ud.frame_send_cb_called); + + /* Do the same for stream */ + nghttp2_session_consume_stream(session, 1, 32767); + nghttp2_session_consume_stream(session, 1, 32768); + + CU_ASSERT(0 == session->recv_window_size); + CU_ASSERT(32768 == stream->recv_window_size); + CU_ASSERT(0 == session->window_update_queued); + CU_ASSERT(1 == stream->window_update_queued); + + ud.frame_send_cb_called = 0; + + CU_ASSERT(0 == nghttp2_session_send(session)); + CU_ASSERT(0 == session->recv_window_size); + CU_ASSERT(0 == stream->recv_window_size); + CU_ASSERT(0 == session->window_update_queued); + CU_ASSERT(0 == stream->window_update_queued); + CU_ASSERT(2 == ud.frame_send_cb_called); + nghttp2_session_del(session); nghttp2_option_del(option); }