Strict handling of max concurrent streams

Exceeding ACKed max concurrent streams results in connection error.
This change fixes the bug that num_{incoming,outgoing}_streams
is decremented wrongly if a stream is in reserved state and
RST_STREAM is send and its state is changed to NGHTTP2_STREAM_CLOSING.
This change also fixes the bug that transmission of push response
HEADERS does not increase num_outgoing_streams.
This commit is contained in:
Tatsuhiro Tsujikawa 2014-02-20 22:59:29 +09:00
parent 659c3b0aa0
commit 3395f7158f
3 changed files with 103 additions and 51 deletions

View File

@ -56,6 +56,18 @@ static int nghttp2_session_is_incoming_concurrent_streams_max
<= session->num_incoming_streams; <= session->num_incoming_streams;
} }
/*
* Returns non-zero if the number of incoming opened streams is larger
* than or equal to
* session->pending_local_max_concurrent_stream.
*/
static int nghttp2_session_is_incoming_concurrent_streams_pending_max
(nghttp2_session *session)
{
return session->pending_local_max_concurrent_stream
<= session->num_incoming_streams;
}
/* /*
* Returns non-zero if |lib_error| is non-fatal error. * Returns non-zero if |lib_error| is non-fatal error.
*/ */
@ -249,6 +261,9 @@ static int nghttp2_session_new(nghttp2_session **session_ptr,
(*session_ptr)->inflight_niv = -1; (*session_ptr)->inflight_niv = -1;
(*session_ptr)->pending_local_max_concurrent_stream =
NGHTTP2_INITIAL_MAX_CONCURRENT_STREAMS;
if(server) { if(server) {
(*session_ptr)->server = 1; (*session_ptr)->server = 1;
} }
@ -497,7 +512,13 @@ int nghttp2_session_add_frame(nghttp2_session *session,
case NGHTTP2_RST_STREAM: case NGHTTP2_RST_STREAM:
stream = nghttp2_session_get_stream(session, frame->hd.stream_id); stream = nghttp2_session_get_stream(session, frame->hd.stream_id);
if(stream) { if(stream) {
stream->state = NGHTTP2_STREAM_CLOSING; /* We rely on the stream state to decide whether number of
streams should be decremented or not. For purly reserved
streams, they are not counted to those numbers and we must
keep this state in order not to decrement the number. */
if(stream->state != NGHTTP2_STREAM_RESERVED) {
stream->state = NGHTTP2_STREAM_CLOSING;
}
} }
item->pri = -1; item->pri = -1;
break; break;
@ -1626,8 +1647,10 @@ static int nghttp2_session_after_frame_sent(nghttp2_session *session)
} }
break; break;
} }
case NGHTTP2_HCAT_RESPONSE:
case NGHTTP2_HCAT_PUSH_RESPONSE: case NGHTTP2_HCAT_PUSH_RESPONSE:
++session->num_outgoing_streams;
/* Fall through */
case NGHTTP2_HCAT_RESPONSE:
stream->state = NGHTTP2_STREAM_OPENED; stream->state = NGHTTP2_STREAM_OPENED;
if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
nghttp2_stream_shutdown(stream, NGHTTP2_SHUT_WR); nghttp2_stream_shutdown(stream, NGHTTP2_SHUT_WR);
@ -1690,13 +1713,12 @@ static int nghttp2_session_after_frame_sent(nghttp2_session *session)
if(frame->hd.flags & NGHTTP2_FLAG_ACK) { if(frame->hd.flags & NGHTTP2_FLAG_ACK) {
break; break;
} }
/* Only update max concurrent stream here. Applying it without /* Extract NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS here and use
ACK is safe because we can respond to the exceeding streams it to refuse the incoming streams with RST_STREAM. */
with REFUSED_STREAM and client will retry later. */
for(i = frame->settings.niv; i > 0; --i) { for(i = frame->settings.niv; i > 0; --i) {
if(frame->settings.iv[i - 1].settings_id == if(frame->settings.iv[i - 1].settings_id ==
NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS) { NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS) {
session->local_settings[NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] = session->pending_local_max_concurrent_stream =
frame->settings.iv[i - 1].value; frame->settings.iv[i - 1].value;
break; break;
} }
@ -2057,27 +2079,6 @@ static int session_detect_idle_stream(nghttp2_session *session,
return 0; return 0;
} }
/*
* Validates received HEADERS frame |frame| with NGHTTP2_HCAT_REQUEST
* or NGHTTP2_HCAT_PUSH_RESPONSE category, which both opens new
* stream.
*
* This function returns 0 if it succeeds, or non-zero
* nghttp2_error_code.
*/
static int nghttp2_session_validate_request_headers(nghttp2_session *session,
nghttp2_headers *frame)
{
if(nghttp2_session_is_incoming_concurrent_streams_max(session)) {
/* The spec does not clearly say what to do when max concurrent
streams number is reached. The mod_spdy sends
NGHTTP2_REFUSED_STREAM and we think it is reasonable. So we
follow it. */
return NGHTTP2_REFUSED_STREAM;
}
return 0;
}
/* /*
* Inflates header block in the memory pointed by |in| with |inlen| * Inflates header block in the memory pointed by |in| with |inlen|
* bytes. If this function returns NGHTTP2_ERR_PAUSE, the caller must * bytes. If this function returns NGHTTP2_ERR_PAUSE, the caller must
@ -2356,7 +2357,6 @@ int nghttp2_session_on_request_headers_received(nghttp2_session *session,
nghttp2_frame *frame) nghttp2_frame *frame)
{ {
int rv = 0; int rv = 0;
nghttp2_error_code error_code;
nghttp2_stream *stream; nghttp2_stream *stream;
if(frame->hd.stream_id == 0) { if(frame->hd.stream_id == 0) {
return nghttp2_session_inflate_handle_invalid_connection return nghttp2_session_inflate_handle_invalid_connection
@ -2375,11 +2375,13 @@ int nghttp2_session_on_request_headers_received(nghttp2_session *session,
} }
session->last_recv_stream_id = frame->hd.stream_id; session->last_recv_stream_id = frame->hd.stream_id;
error_code = nghttp2_session_validate_request_headers(session, if(nghttp2_session_is_incoming_concurrent_streams_max(session)) {
&frame->headers); return nghttp2_session_inflate_handle_invalid_connection
if(error_code != NGHTTP2_NO_ERROR) { (session, frame, NGHTTP2_ENHANCE_YOUR_CALM);
}
if(nghttp2_session_is_incoming_concurrent_streams_pending_max(session)) {
return nghttp2_session_inflate_handle_invalid_stream return nghttp2_session_inflate_handle_invalid_stream
(session, frame, error_code); (session, frame, NGHTTP2_REFUSED_STREAM);
} }
stream = nghttp2_session_open_stream(session, stream = nghttp2_session_open_stream(session,
@ -2444,10 +2446,16 @@ int nghttp2_session_on_push_response_headers_received(nghttp2_session *session,
/* We don't accept new stream after GOAWAY is sent or received. */ /* We don't accept new stream after GOAWAY is sent or received. */
return NGHTTP2_ERR_IGN_HEADER_BLOCK; return NGHTTP2_ERR_IGN_HEADER_BLOCK;
} }
rv = nghttp2_session_validate_request_headers(session, &frame->headers);
if(rv != 0) { if(nghttp2_session_is_incoming_concurrent_streams_max(session)) {
return nghttp2_session_inflate_handle_invalid_stream(session, frame, rv); return nghttp2_session_inflate_handle_invalid_connection
(session, frame, NGHTTP2_ENHANCE_YOUR_CALM);
} }
if(nghttp2_session_is_incoming_concurrent_streams_pending_max(session)) {
return nghttp2_session_inflate_handle_invalid_stream
(session, frame, NGHTTP2_REFUSED_STREAM);
}
nghttp2_stream_promise_fulfilled(stream); nghttp2_stream_promise_fulfilled(stream);
++session->num_incoming_streams; ++session->num_incoming_streams;
rv = session_call_on_begin_headers(session, frame); rv = session_call_on_begin_headers(session, frame);
@ -2749,9 +2757,7 @@ static int nghttp2_session_update_local_initial_window_size
/* /*
* Apply SETTINGS values |iv| having |niv| elements to the local * Apply SETTINGS values |iv| having |niv| elements to the local
* settings. SETTINGS_MAX_CONCURRENT_STREAMS is not applied here * settings.
* because it has been already applied on transmission of SETTINGS
* frame.
* *
* This function returns 0 if it succeeds, or one of the following * This function returns 0 if it succeeds, or one of the following
* negative error codes: * negative error codes:
@ -2803,13 +2809,12 @@ int nghttp2_session_update_local_settings(nghttp2_session *session,
} }
} }
for(i = 0; i < niv; ++i) { for(i = 0; i < niv; ++i) {
/* SETTINGS_MAX_CONCURRENT_STREAMS has already been applied on if(iv[i].settings_id > 0 && iv[i].settings_id <= NGHTTP2_SETTINGS_MAX) {
transmission of the SETTINGS frame. */
if(iv[i].settings_id > 0 && iv[i].settings_id <= NGHTTP2_SETTINGS_MAX &&
iv[i].settings_id != NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS) {
session->local_settings[iv[i].settings_id] = iv[i].value; session->local_settings[iv[i].settings_id] = iv[i].value;
} }
} }
session->pending_local_max_concurrent_stream =
NGHTTP2_INITIAL_MAX_CONCURRENT_STREAMS;
return 0; return 0;
} }

View File

@ -193,6 +193,9 @@ struct nghttp2_session {
uint32_t local_settings[NGHTTP2_SETTINGS_MAX+1]; uint32_t local_settings[NGHTTP2_SETTINGS_MAX+1];
/* Option flags. This is bitwise-OR of 0 or more of nghttp2_optmask. */ /* Option flags. This is bitwise-OR of 0 or more of nghttp2_optmask. */
uint32_t opt_flags; uint32_t opt_flags;
/* Unacked local SETTINGS_MAX_CONCURRENT_STREAMS value. We use this
to refuse the incoming stream if it exceeds this value. */
uint32_t pending_local_max_concurrent_stream;
/* Nonzero if the session is server side. */ /* Nonzero if the session is server side. */
uint8_t server; uint8_t server;
/* Flags indicating GOAWAY is sent and/or recieved. The flags are /* Flags indicating GOAWAY is sent and/or recieved. The flags are

View File

@ -1079,8 +1079,8 @@ void test_nghttp2_session_on_request_headers_received(void)
nghttp2_frame_headers_free(&frame.headers); nghttp2_frame_headers_free(&frame.headers);
/* More than max concurrent streams leads REFUSED_STREAM */ /* More than un-ACKed max concurrent streams leads REFUSED_STREAM */
session->local_settings[NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] = 1; session->pending_local_max_concurrent_stream = 1;
nghttp2_frame_headers_init(&frame.headers, nghttp2_frame_headers_init(&frame.headers,
NGHTTP2_FLAG_END_HEADERS | NGHTTP2_FLAG_PRIORITY, NGHTTP2_FLAG_END_HEADERS | NGHTTP2_FLAG_PRIORITY,
3, NGHTTP2_PRI_DEFAULT, NULL, 0); 3, NGHTTP2_PRI_DEFAULT, NULL, 0);
@ -1240,6 +1240,7 @@ void test_nghttp2_session_on_push_response_headers_received(void)
nghttp2_outbound_item *item; nghttp2_outbound_item *item;
memset(&callbacks, 0, sizeof(nghttp2_session_callbacks)); memset(&callbacks, 0, sizeof(nghttp2_session_callbacks));
callbacks.send_callback = null_send_callback;
callbacks.on_begin_headers_callback = on_begin_headers_callback; callbacks.on_begin_headers_callback = on_begin_headers_callback;
callbacks.on_invalid_frame_recv_callback = on_invalid_frame_recv_callback; callbacks.on_invalid_frame_recv_callback = on_invalid_frame_recv_callback;
@ -1262,8 +1263,9 @@ void test_nghttp2_session_on_push_response_headers_received(void)
CU_ASSERT(NGHTTP2_STREAM_OPENED == stream->state); CU_ASSERT(NGHTTP2_STREAM_OPENED == stream->state);
CU_ASSERT(1 == session->num_incoming_streams); CU_ASSERT(1 == session->num_incoming_streams);
/* If max concurrent streams limit is exceeded, RST_STREAMed */ /* If un-ACKed max concurrent streams limit is exceeded,
session->local_settings[NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] = 1; RST_STREAMed */
session->pending_local_max_concurrent_stream = 1;
stream = nghttp2_session_open_stream(session, 4, NGHTTP2_STREAM_FLAG_NONE, stream = nghttp2_session_open_stream(session, 4, NGHTTP2_STREAM_FLAG_NONE,
NGHTTP2_PRI_DEFAULT, NGHTTP2_PRI_DEFAULT,
NGHTTP2_STREAM_RESERVED, NULL); NGHTTP2_STREAM_RESERVED, NULL);
@ -1274,6 +1276,27 @@ void test_nghttp2_session_on_push_response_headers_received(void)
item = nghttp2_session_get_next_ob_item(session); item = nghttp2_session_get_next_ob_item(session);
CU_ASSERT(NGHTTP2_RST_STREAM == OB_CTRL_TYPE(item)); CU_ASSERT(NGHTTP2_RST_STREAM == OB_CTRL_TYPE(item));
CU_ASSERT(NGHTTP2_REFUSED_STREAM == OB_CTRL(item)->rst_stream.error_code); CU_ASSERT(NGHTTP2_REFUSED_STREAM == OB_CTRL(item)->rst_stream.error_code);
CU_ASSERT(1 == session->num_incoming_streams);
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(1 == session->num_incoming_streams);
/* If ACKed max concurrent streams limit is exceeded, GOAWAY is
issued */
session->local_settings[NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] = 1;
stream = nghttp2_session_open_stream(session, 6, NGHTTP2_STREAM_FLAG_NONE,
NGHTTP2_PRI_DEFAULT,
NGHTTP2_STREAM_RESERVED, NULL);
frame.hd.stream_id = 6;
CU_ASSERT(NGHTTP2_ERR_IGN_HEADER_BLOCK ==
nghttp2_session_on_push_response_headers_received
(session, &frame, stream));
item = nghttp2_session_get_next_ob_item(session);
CU_ASSERT(NGHTTP2_GOAWAY == OB_CTRL_TYPE(item));
CU_ASSERT(NGHTTP2_ENHANCE_YOUR_CALM == OB_CTRL(item)->goaway.error_code);
CU_ASSERT(1 == session->num_incoming_streams);
nghttp2_frame_headers_free(&frame.headers); nghttp2_frame_headers_free(&frame.headers);
nghttp2_session_del(session); nghttp2_session_del(session);
@ -1945,7 +1968,9 @@ void test_nghttp2_session_send_headers_push_reply(void)
nghttp2_frame_headers_init(&frame->headers, NGHTTP2_FLAG_END_HEADERS, 2, nghttp2_frame_headers_init(&frame->headers, NGHTTP2_FLAG_END_HEADERS, 2,
NGHTTP2_PRI_DEFAULT, NULL, 0); NGHTTP2_PRI_DEFAULT, NULL, 0);
nghttp2_session_add_frame(session, NGHTTP2_CAT_CTRL, frame, NULL); nghttp2_session_add_frame(session, NGHTTP2_CAT_CTRL, frame, NULL);
CU_ASSERT(0 == session->num_outgoing_streams);
CU_ASSERT(0 == nghttp2_session_send(session)); CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(1 == session->num_outgoing_streams);
stream = nghttp2_session_get_stream(session, 2); stream = nghttp2_session_get_stream(session, 2);
CU_ASSERT(NGHTTP2_STREAM_OPENED == stream->state); CU_ASSERT(NGHTTP2_STREAM_OPENED == stream->state);
@ -2718,9 +2743,7 @@ void test_nghttp2_submit_settings(void)
CU_ASSERT(0 == nghttp2_session_send(session)); CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(1 == ud.frame_send_cb_called); CU_ASSERT(1 == ud.frame_send_cb_called);
/* Only SETTINGS_MAX_CONCURRENT_STREAMS is applied on transmission */ CU_ASSERT(50 == session->pending_local_max_concurrent_stream);
CU_ASSERT(50 ==
session->local_settings[NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]);
nghttp2_frame_settings_init(&ack_frame.settings, NGHTTP2_FLAG_ACK, NULL, 0); nghttp2_frame_settings_init(&ack_frame.settings, NGHTTP2_FLAG_ACK, NULL, 0);
CU_ASSERT(0 == nghttp2_session_on_settings_received(session, &ack_frame, 0)); CU_ASSERT(0 == nghttp2_session_on_settings_received(session, &ack_frame, 0));
@ -2729,6 +2752,10 @@ void test_nghttp2_submit_settings(void)
CU_ASSERT(16*1024 == CU_ASSERT(16*1024 ==
session->local_settings[NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); session->local_settings[NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
CU_ASSERT(0 == session->hd_inflater.ctx.hd_table_bufsize_max); CU_ASSERT(0 == session->hd_inflater.ctx.hd_table_bufsize_max);
CU_ASSERT(50 ==
session->local_settings[NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]);
CU_ASSERT(NGHTTP2_INITIAL_MAX_CONCURRENT_STREAMS ==
session->pending_local_max_concurrent_stream);
nghttp2_session_del(session); nghttp2_session_del(session);
} }
@ -3207,20 +3234,37 @@ void test_nghttp2_session_max_concurrent_streams(void)
nghttp2_outbound_item *item; nghttp2_outbound_item *item;
memset(&callbacks, 0, sizeof(nghttp2_session_callbacks)); memset(&callbacks, 0, sizeof(nghttp2_session_callbacks));
callbacks.send_callback = null_send_callback;
nghttp2_session_server_new(&session, &callbacks, NULL); nghttp2_session_server_new(&session, &callbacks, NULL);
nghttp2_session_open_stream(session, 1, NGHTTP2_STREAM_FLAG_NONE, nghttp2_session_open_stream(session, 1, NGHTTP2_STREAM_FLAG_NONE,
NGHTTP2_PRI_DEFAULT, NGHTTP2_PRI_DEFAULT,
NGHTTP2_STREAM_OPENED, NULL); NGHTTP2_STREAM_OPENED, NULL);
/* Check un-ACKed SETTINGS_MAX_CONCURRENT_STREAMS */
nghttp2_frame_headers_init(&frame.headers, NGHTTP2_FLAG_END_HEADERS, 3, nghttp2_frame_headers_init(&frame.headers, NGHTTP2_FLAG_END_HEADERS, 3,
NGHTTP2_PRI_DEFAULT, NULL, 0); NGHTTP2_PRI_DEFAULT, NULL, 0);
session->local_settings[NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] = 1; session->pending_local_max_concurrent_stream = 1;
CU_ASSERT(NGHTTP2_ERR_IGN_HEADER_BLOCK == CU_ASSERT(NGHTTP2_ERR_IGN_HEADER_BLOCK ==
nghttp2_session_on_request_headers_received(session, &frame)); nghttp2_session_on_request_headers_received(session, &frame));
item = nghttp2_session_get_ob_pq_top(session); item = nghttp2_session_get_ob_pq_top(session);
CU_ASSERT(NGHTTP2_RST_STREAM == OB_CTRL_TYPE(item)); CU_ASSERT(NGHTTP2_RST_STREAM == OB_CTRL_TYPE(item));
CU_ASSERT(NGHTTP2_REFUSED_STREAM == OB_CTRL(item)->rst_stream.error_code) CU_ASSERT(NGHTTP2_REFUSED_STREAM == OB_CTRL(item)->rst_stream.error_code);
CU_ASSERT(0 == nghttp2_session_send(session));
/* Check ACKed SETTINGS_MAX_CONCURRENT_STREAMS */
session->local_settings[NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] = 1;
frame.hd.stream_id = 5;
CU_ASSERT(NGHTTP2_ERR_IGN_HEADER_BLOCK ==
nghttp2_session_on_request_headers_received(session, &frame));
item = nghttp2_session_get_ob_pq_top(session);
CU_ASSERT(NGHTTP2_GOAWAY == OB_CTRL_TYPE(item));
CU_ASSERT(NGHTTP2_ENHANCE_YOUR_CALM == OB_CTRL(item)->goaway.error_code);
nghttp2_frame_headers_free(&frame.headers); nghttp2_frame_headers_free(&frame.headers);
nghttp2_session_del(session); nghttp2_session_del(session);