Handle situation where request HEADERS in queue is reset by RST_STREAM

Previously we did not handle the situation where RST_STREAM is
submitted against a stream while requet HEADERS which opens that
stream is still in queue.  Due to max concurrent streams limit,
RST_STREAM is sent first, and then request HEADERS, which effectively
voids RST_STREAM.

In this commit, we checks RST_STREAM against currently pending request
HEADERS in queue and if stream ID matches, we mark that HEADERS as
canceled and RST_STREAM is not sent in this case.  The library will
call on_frame_not_sent_callback for the canceled HEADERS with error
code from RST_STREAM.
This commit is contained in:
Tatsuhiro Tsujikawa 2015-02-13 23:41:48 +09:00
parent 011e3b325d
commit 442572c1f4
5 changed files with 138 additions and 31 deletions

View File

@ -44,6 +44,12 @@
typedef struct {
nghttp2_data_provider data_prd;
void *stream_user_data;
/* error code when request HEADERS is canceled by RST_STREAM while
it is in queue. */
uint32_t error_code;
/* nonzero if request HEADERS is canceled. The error code is stored
in |error_code|. */
uint8_t canceled;
/* nonzero if this item should be attached to stream object to make
it under priority control */
uint8_t attach_stream;

View File

@ -130,3 +130,17 @@ void nghttp2_pq_update(nghttp2_pq *pq, nghttp2_pq_item_cb fun, void *arg) {
}
}
}
int nghttp2_pq_each(nghttp2_pq *pq, nghttp2_pq_item_cb fun, void *arg) {
size_t i;
if (pq->length == 0) {
return 0;
}
for (i = 0; i < pq->length; ++i) {
if ((*fun)(pq->q[i], arg)) {
return 1;
}
}
return 0;
}

View File

@ -109,4 +109,13 @@ typedef int (*nghttp2_pq_item_cb)(void *item, void *arg);
*/
void nghttp2_pq_update(nghttp2_pq *pq, nghttp2_pq_item_cb fun, void *arg);
/*
* Applys |fun| to each item in |pq|. The |arg| is passed as arg
* parameter to callback function. This function must not change the
* ordering key. If the return value from callback is nonzero, this
* function returns 1 immediately without iterating remaining items.
* Otherwise this function returns 0.
*/
int nghttp2_pq_each(nghttp2_pq *pq, nghttp2_pq_item_cb fun, void *arg);
#endif /* NGHTTP2_PQ_H */

View File

@ -772,6 +772,30 @@ int nghttp2_session_add_item(nghttp2_session *session,
return 0;
}
typedef struct {
int32_t stream_id;
uint32_t error_code;
} nghttp2_rst_target;
static int cancel_pending_request(void *pq_item, void *arg) {
nghttp2_outbound_item *item;
nghttp2_rst_target *t;
nghttp2_headers_aux_data *aux_data;
item = pq_item;
t = arg;
aux_data = &item->aux_data.headers;
if (item->frame.hd.stream_id != t->stream_id || aux_data->canceled) {
return 0;
}
aux_data->error_code = t->error_code;
aux_data->canceled = 1;
return 1;
}
int nghttp2_session_add_rst_stream(nghttp2_session *session, int32_t stream_id,
uint32_t error_code) {
int rv;
@ -779,6 +803,7 @@ int nghttp2_session_add_rst_stream(nghttp2_session *session, int32_t stream_id,
nghttp2_frame *frame;
nghttp2_stream *stream;
nghttp2_mem *mem;
nghttp2_rst_target t = {stream_id, error_code};
mem = &session->mem;
stream = nghttp2_session_get_stream(session, stream_id);
@ -786,6 +811,26 @@ int nghttp2_session_add_rst_stream(nghttp2_session *session, int32_t stream_id,
return 0;
}
/* Cancel pending request HEADERS in ob_ss_pq if this RST_STREAM
refers to that stream. */
if (!session->server && nghttp2_session_is_my_stream_id(session, stream_id) &&
nghttp2_pq_top(&session->ob_ss_pq)) {
nghttp2_outbound_item *top;
nghttp2_frame *headers_frame;
top = nghttp2_pq_top(&session->ob_ss_pq);
headers_frame = &top->frame;
assert(headers_frame->hd.type == NGHTTP2_HEADERS);
if (headers_frame->hd.stream_id <= stream_id &&
(uint32_t)stream_id < session->next_stream_id) {
if (nghttp2_pq_each(&session->ob_ss_pq, cancel_pending_request, &t)) {
return 0;
}
}
}
item = nghttp2_mem_malloc(mem, sizeof(nghttp2_outbound_item));
if (item == NULL) {
return NGHTTP2_ERR_NOMEM;
@ -1243,7 +1288,7 @@ static int session_predicate_for_stream_send(nghttp2_session *session,
}
/*
* This function checks HEADERS frame |frame|, which opens stream, can
* This function checks request HEADERS frame, which opens stream, can
* be sent at this time.
*
* This function returns 0 if it succeeds, or one of the following
@ -1253,8 +1298,14 @@ static int session_predicate_for_stream_send(nghttp2_session *session,
* New stream cannot be created because of GOAWAY: session is
* going down or received last_stream_id is strictly less than
* frame->hd.stream_id.
* NGHTTP2_ERR_STREAM_CLOSING
* request HEADERS was canceled by RST_STREAM while it is in queue.
*/
static int session_predicate_request_headers_send(nghttp2_session *session) {
static int session_predicate_request_headers_send(nghttp2_session *session,
nghttp2_outbound_item *item) {
if (item->aux_data.headers.canceled) {
return NGHTTP2_ERR_STREAM_CLOSING;
}
/* If we are terminating session (NGHTTP2_GOAWAY_TERM_ON_SEND) or
GOAWAY was received from peer, new request is not allowed. */
if (session->goaway_flags &
@ -1683,7 +1734,7 @@ static int session_prep_frame(nghttp2_session *session,
return NGHTTP2_ERR_NOMEM;
}
rv = session_predicate_request_headers_send(session);
rv = session_predicate_request_headers_send(session, item);
if (rv != 0) {
return rv;
}
@ -2653,6 +2704,9 @@ static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session,
break;
}
if (rv < 0) {
int32_t opened_stream_id = 0;
uint32_t error_code = NGHTTP2_INTERNAL_ERROR;
DEBUGF(fprintf(stderr, "send: frame preparation failed with %s\n",
nghttp2_strerror(rv)));
/* TODO If the error comes from compressor, the connection
@ -2660,7 +2714,6 @@ static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session,
if (item->frame.hd.type != NGHTTP2_DATA &&
session->callbacks.on_frame_not_send_callback && is_non_fatal(rv)) {
nghttp2_frame *frame = &item->frame;
int32_t opened_stream_id = 0;
/* The library is responsible for the transmission of
WINDOW_UPDATE frame, so we don't call error callback for
it. */
@ -2673,29 +2726,33 @@ static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session,
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
/* We have to close stream opened by failed request HEADERS
or PUSH_PROMISE. */
switch (item->frame.hd.type) {
case NGHTTP2_HEADERS:
if (item->frame.headers.cat == NGHTTP2_HCAT_REQUEST) {
opened_stream_id = item->frame.hd.stream_id;
}
/* We have to close stream opened by failed request HEADERS
or PUSH_PROMISE. */
switch (item->frame.hd.type) {
case NGHTTP2_HEADERS:
if (item->frame.headers.cat == NGHTTP2_HCAT_REQUEST) {
opened_stream_id = item->frame.hd.stream_id;
if (item->aux_data.headers.canceled) {
error_code = item->aux_data.headers.error_code;
}
break;
case NGHTTP2_PUSH_PROMISE:
opened_stream_id = item->frame.push_promise.promised_stream_id;
break;
}
if (opened_stream_id) {
/* careful not to override rv */
int rv2;
rv2 = nghttp2_session_close_stream(session, opened_stream_id,
NGHTTP2_NO_ERROR);
break;
case NGHTTP2_PUSH_PROMISE:
opened_stream_id = item->frame.push_promise.promised_stream_id;
break;
}
if (opened_stream_id) {
/* careful not to override rv */
int rv2;
rv2 = nghttp2_session_close_stream(session, opened_stream_id,
error_code);
if (nghttp2_is_fatal(rv2)) {
return rv2;
}
if (nghttp2_is_fatal(rv2)) {
return rv2;
}
}
nghttp2_outbound_item_free(item, mem);
nghttp2_mem_free(mem, item);
active_outbound_item_reset(aob, mem);

View File

@ -64,6 +64,7 @@ typedef struct {
uint8_t not_sent_frame_type;
int not_sent_error;
int stream_close_cb_called;
uint32_t stream_close_error_code;
size_t data_source_length;
int32_t stream_id;
size_t block_count;
@ -351,6 +352,7 @@ static int on_stream_close_callback(nghttp2_session *session _U_,
void *user_data) {
my_user_data *my_data = (my_user_data *)user_data;
++my_data->stream_close_cb_called;
my_data->stream_close_error_code = error_code;
return 0;
}
@ -6774,27 +6776,46 @@ void test_nghttp2_session_reset_pending_headers(void) {
nghttp2_session_callbacks callbacks;
nghttp2_stream *stream;
int32_t stream_id;
my_user_data ud;
memset(&callbacks, 0, sizeof(nghttp2_session_callbacks));
callbacks.send_callback = null_send_callback;
callbacks.on_frame_send_callback = on_frame_send_callback;
callbacks.on_frame_not_send_callback = on_frame_not_send_callback;
callbacks.on_stream_close_callback = on_stream_close_callback;
nghttp2_session_client_new(&session, &callbacks, NULL);
nghttp2_session_client_new(&session, &callbacks, &ud);
/* See that if request HEADERS and RST_STREAM were submitted in this
order, HEADERS is sent first. This is useful feature since
client can issue RST_STREAM in things go wrong while preparing
data for HEADERS, but this may be rare in practice. On the other
hand, we don't have same property for PUSH_PROMISE and RST_STREAM
to reserved stream. We may fix this if this is a significant
problem. */
stream_id = nghttp2_submit_request(session, NULL, NULL, 0, NULL, NULL);
CU_ASSERT(stream_id >= 1);
nghttp2_submit_rst_stream(session, NGHTTP2_FLAG_NONE, stream_id,
NGHTTP2_NO_ERROR);
NGHTTP2_CANCEL);
session->remote_settings.max_concurrent_streams = 0;
/* RST_STREAM cancels pending HEADERS and is not actually sent. */
ud.frame_send_cb_called = 0;
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(0 == ud.frame_send_cb_called);
stream = nghttp2_session_get_stream(session, stream_id);
CU_ASSERT(NULL == stream);
/* See HEADERS is not sent. on_stream_close is called just like
transmission failure. */
session->remote_settings.max_concurrent_streams = 1;
ud.frame_not_send_cb_called = 0;
ud.stream_close_error_code = 0;
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(1 == ud.frame_not_send_cb_called);
CU_ASSERT(NGHTTP2_HEADERS == ud.not_sent_frame_type);
CU_ASSERT(NGHTTP2_CANCEL == ud.stream_close_error_code);
stream = nghttp2_session_get_stream(session, stream_id);
CU_ASSERT(NULL == stream);