Refactor nghttp2_stream

Combine deferred_data and data into data_item and merge deferred_flags
into flags.
This commit is contained in:
Tatsuhiro Tsujikawa 2014-04-02 22:55:49 +09:00
parent 35a45f9d47
commit ef40879b5f
5 changed files with 117 additions and 105 deletions

View File

@ -2308,6 +2308,8 @@ int nghttp2_submit_headers(nghttp2_session *session, uint8_t flags,
*
* :enum:`NGHTTP2_ERR_NOMEM`
* Out of memory.
* :enum:`NGHTTP2_ERR_DATA_EXIST`
* DATA has been already submitted and not fully processed yet.
*/
int nghttp2_submit_data(nghttp2_session *session, uint8_t flags,
int32_t stream_id,

View File

@ -451,7 +451,7 @@ void nghttp2_session_del(nghttp2_session *session)
free(session->inflight_iv);
/* Have to free streams first, so that we can check
stream->data->queued */
stream->data_item->queued */
nghttp2_map_each_free(&session->streams, nghttp2_free_streams, NULL);
nghttp2_map_free(&session->streams);
@ -688,9 +688,13 @@ int nghttp2_session_add_frame(nghttp2_session *session,
stream = nghttp2_session_get_stream(session, data_frame->hd.stream_id);
if(stream) {
item->weight = nghttp2_stream_group_shared_wait(stream->stream_group);
if(stream->data_item) {
rv = NGHTTP2_ERR_DATA_EXIST;
} else {
item->weight = nghttp2_stream_group_shared_wait(stream->stream_group);
rv = nghttp2_stream_attach_data(stream, item, &session->ob_pq);
rv = nghttp2_stream_attach_data(stream, item, &session->ob_pq);
}
}
} else {
@ -867,10 +871,10 @@ int nghttp2_session_close_stream(nghttp2_session *session, int32_t stream_id,
DEBUGF(fprintf(stderr, "stream: stream(%p)=%d close\n",
stream, stream->stream_id));
if(stream->data) {
if(stream->data_item) {
nghttp2_outbound_item *item;
item = stream->data;
item = stream->data_item;
rv = nghttp2_stream_detach_data(stream, &session->ob_pq);
@ -1364,8 +1368,6 @@ static size_t nghttp2_session_next_data_read(nghttp2_session *session,
* NGHTTP2_ERR_STREAM_SHUT_WR
* The transmission is not allowed for this stream (e.g., a frame
* with END_STREAM flag set has already sent)
* NGHTTP2_ERR_DEFERRED_DATA_EXIST
* Another DATA frame has already been deferred.
* NGHTTP2_ERR_STREAM_CLOSING
* RST_STREAM was queued for this stream.
* NGHTTP2_ERR_INVALID_STREAM_STATE
@ -1380,12 +1382,6 @@ static int nghttp2_session_predicate_data_send(nghttp2_session *session,
if(rv != 0) {
return rv;
}
if(stream->deferred_data != NULL) {
/* stream->deferred_data != NULL means previously queued DATA
frame has not been sent. We don't allow new DATA frame is sent
in this case. */
return NGHTTP2_ERR_DEFERRED_DATA_EXIST;
}
if(nghttp2_session_is_my_stream_id(session, stream_id)) {
/* Request body data */
/* If stream->state is NGHTTP2_STREAM_CLOSING, RST_STREAM was
@ -1689,18 +1685,14 @@ static int nghttp2_session_prep_frame(nghttp2_session *session,
data_frame = nghttp2_outbound_item_get_data_frame(item);
stream = nghttp2_session_get_stream(session, data_frame->hd.stream_id);
if(stream && stream->data && stream->data != item) {
/* We don't allow multiple DATA for a stream at the same
time. */
return NGHTTP2_ERR_DATA_EXIST;
if(stream) {
assert(stream->data_item == item);
}
rv = nghttp2_session_predicate_data_send(session, data_frame->hd.stream_id);
if(rv != 0) {
int rv2;
stream = nghttp2_session_get_stream(session, data_frame->hd.stream_id);
if(stream) {
rv2 = nghttp2_stream_detach_data(stream, &session->ob_pq);
@ -1716,7 +1708,8 @@ static int nghttp2_session_prep_frame(nghttp2_session *session,
next_readmax = nghttp2_session_next_data_read(session, stream);
if(next_readmax == 0) {
nghttp2_stream_defer_data(stream, item, NGHTTP2_DEFERRED_FLOW_CONTROL);
nghttp2_stream_defer_data(stream,
NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL);
session->aob.item = NULL;
nghttp2_active_outbound_item_reset(&session->aob);
return NGHTTP2_ERR_DEFERRED;
@ -1726,7 +1719,7 @@ static int nghttp2_session_prep_frame(nghttp2_session *session,
next_readmax,
data_frame);
if(framerv == NGHTTP2_ERR_DEFERRED) {
nghttp2_stream_defer_data(stream, item, NGHTTP2_DEFERRED_NONE);
nghttp2_stream_defer_data(stream, NGHTTP2_STREAM_FLAG_DEFERRED_USER);
session->aob.item = NULL;
nghttp2_active_outbound_item_reset(&session->aob);
return NGHTTP2_ERR_DEFERRED;
@ -1953,8 +1946,8 @@ static int nghttp2_session_after_frame_sent(nghttp2_session *session)
if(nghttp2_is_fatal(rv)) {
return rv;
}
/* If rv is not fatal, the only possible error is closed
stream, so we have nothing to do here. */
/* TODO nghttp2_submit_data() may fail if stream has already
DATA frame item. We might have to handle it here. */
}
break;
}
@ -2114,8 +2107,8 @@ static int nghttp2_session_after_frame_sent(nghttp2_session *session)
next_readmax = nghttp2_session_next_data_read(session, stream);
if(next_readmax == 0) {
nghttp2_stream_defer_data(stream, aob->item,
NGHTTP2_DEFERRED_FLOW_CONTROL);
nghttp2_stream_defer_data(stream,
NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL);
aob->item = NULL;
nghttp2_active_outbound_item_reset(aob);
@ -2130,7 +2123,7 @@ static int nghttp2_session_after_frame_sent(nghttp2_session *session)
return rv;
}
if(rv == NGHTTP2_ERR_DEFERRED) {
nghttp2_stream_defer_data(stream, aob->item, NGHTTP2_DEFERRED_NONE);
nghttp2_stream_defer_data(stream, NGHTTP2_STREAM_FLAG_DEFERRED_USER);
aob->item = NULL;
nghttp2_active_outbound_item_reset(aob);
@ -3086,8 +3079,7 @@ static int nghttp2_update_remote_initial_window_size_func
}
/* If window size gets positive, push deferred DATA frame to
outbound queue. */
if(stream->deferred_data &&
(stream->deferred_flags & NGHTTP2_DEFERRED_FLOW_CONTROL) &&
if(nghttp2_stream_check_deferred_by_flow_control(stream) &&
stream->remote_window_size > 0 &&
arg->session->remote_window_size > 0) {
@ -3581,8 +3573,7 @@ static int nghttp2_push_back_deferred_data_func(nghttp2_map_entry *entry,
/* If DATA frame is deferred due to flow control, push it back to
outbound queue. */
if(stream->deferred_data &&
(stream->deferred_flags & NGHTTP2_DEFERRED_FLOW_CONTROL) &&
if(nghttp2_stream_check_deferred_by_flow_control(stream) &&
stream->remote_window_size > 0) {
rv = nghttp2_stream_detach_deferred_data(stream, &session->ob_pq);
@ -3653,8 +3644,7 @@ static int session_on_stream_window_update_received
stream->remote_window_size += frame->window_update.window_size_increment;
if(stream->remote_window_size > 0 &&
session->remote_window_size > 0 &&
stream->deferred_data != NULL &&
(stream->deferred_flags & NGHTTP2_DEFERRED_FLOW_CONTROL)) {
nghttp2_stream_check_deferred_by_flow_control(stream)) {
rv = nghttp2_stream_detach_deferred_data(stream, &session->ob_pq);
@ -5477,11 +5467,15 @@ int nghttp2_session_resume_data(nghttp2_session *session, int32_t stream_id)
int rv;
nghttp2_stream *stream;
stream = nghttp2_session_get_stream(session, stream_id);
if(stream == NULL || stream->deferred_data == NULL ||
(stream->deferred_flags & NGHTTP2_DEFERRED_FLOW_CONTROL)) {
if(stream == NULL ||
nghttp2_stream_check_deferred_by_flow_control(stream)) {
return NGHTTP2_ERR_INVALID_ARGUMENT;
}
if(!nghttp2_stream_check_deferred_data(stream)) {
return 0;
}
rv = nghttp2_stream_detach_deferred_data(stream, &session->ob_pq);
if(nghttp2_is_fatal(rv)) {

View File

@ -42,9 +42,7 @@ void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id,
stream->state = initial_state;
stream->shut_flags = NGHTTP2_SHUT_NONE;
stream->stream_user_data = stream_user_data;
stream->data = NULL;
stream->deferred_data = NULL;
stream->deferred_flags = NGHTTP2_DEFERRED_NONE;
stream->data_item = NULL;
stream->remote_window_size = remote_initial_window_size;
stream->local_window_size = local_initial_window_size;
stream->recv_window_size = 0;
@ -65,10 +63,12 @@ void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id,
void nghttp2_stream_free(nghttp2_stream *stream)
{
nghttp2_outbound_item_free(stream->deferred_data);
free(stream->deferred_data);
if(stream->flags & NGHTTP2_STREAM_FLAG_DEFERRED_ALL) {
nghttp2_outbound_item_free(stream->data_item);
free(stream->data_item);
}
/* We don't free stream->data. */
/* We don't free stream->data_item otherwise. */
}
void nghttp2_stream_shutdown(nghttp2_stream *stream, nghttp2_shut_flag flag)
@ -81,22 +81,22 @@ static int stream_push_data(nghttp2_stream *stream, nghttp2_pq *pq)
int rv;
ssize_t weight;
assert(stream->data);
assert(stream->data->queued == 0);
assert(stream->data_item);
assert(stream->data_item->queued == 0);
weight = nghttp2_stream_group_shared_wait(stream->stream_group);
if(stream->data->weight > weight) {
stream->data->weight = weight;
if(stream->data_item->weight > weight) {
stream->data_item->weight = weight;
}
rv = nghttp2_pq_push(pq, stream->data);
rv = nghttp2_pq_push(pq, stream->data_item);
if(rv != 0) {
return rv;
}
stream->data->queued = 1;
stream->data_item->queued = 1;
return 0;
}
@ -190,7 +190,7 @@ static void stream_update_dep_set_rest(nghttp2_stream *stream)
/*
* Performs dfs starting |stream|, search stream which can become
* NGHTTP2_STREAM_DPRI_TOP and queues its data.
* NGHTTP2_STREAM_DPRI_TOP and queues its data_item.
*
* This function returns the number of stream marked as
* NGHTTP2_STREAM_DPRI_TOP (including already marked as such) if it
@ -226,7 +226,7 @@ static ssize_t stream_update_dep_set_top(nghttp2_stream *stream,
DEBUGF(fprintf(stderr, "stream: stream=%d data is top\n",
stream->stream_id));
if(!stream->data->queued) {
if(!stream->data_item->queued) {
rv = stream_push_data(stream, pq);
if(rv != 0) {
@ -329,16 +329,21 @@ static int stream_update_dep_on_detach_data(nghttp2_stream *stream,
}
int nghttp2_stream_attach_data(nghttp2_stream *stream,
nghttp2_outbound_item *data,
nghttp2_outbound_item *data_item,
nghttp2_pq *pq)
{
assert(stream->data == NULL);
assert(stream->deferred_data == NULL);
/* This function may be called when stream->data_item is not-NULL.
In this case, stream->data_item == data_item. */
assert((stream->flags & NGHTTP2_STREAM_FLAG_DEFERRED_ALL) == 0);
stream->data = data;
if(stream->data_item) {
assert(stream->data_item == data_item);
} else {
stream->data_item = data_item;
}
DEBUGF(fprintf(stderr, "stream: stream=%d attach data=%p\n",
stream->stream_id, data));
stream->stream_id, data_item));
return stream_update_dep_on_attach_data(stream, pq);
}
@ -346,40 +351,41 @@ int nghttp2_stream_attach_data(nghttp2_stream *stream,
int nghttp2_stream_detach_data(nghttp2_stream *stream, nghttp2_pq *pq)
{
DEBUGF(fprintf(stderr, "stream: stream=%d detach data=%p\n",
stream->stream_id, stream->data));
stream->stream_id, stream->data_item));
stream->data = NULL;
stream->data_item = NULL;
stream->flags &= ~NGHTTP2_STREAM_FLAG_DEFERRED_ALL;
return stream_update_dep_on_detach_data(stream, pq);
}
void nghttp2_stream_defer_data(nghttp2_stream *stream,
nghttp2_outbound_item *data,
uint8_t flags)
void nghttp2_stream_defer_data(nghttp2_stream *stream, uint8_t flags)
{
assert(stream->data);
assert(stream->data == data);
assert(stream->deferred_data == NULL);
assert(stream->data_item);
stream->deferred_data = data;
stream->deferred_flags = flags;
stream->data = NULL;
stream->flags |= flags;
}
int nghttp2_stream_detach_deferred_data(nghttp2_stream *stream,
nghttp2_pq *pq)
{
nghttp2_outbound_item *data;
assert(stream->data == NULL);
assert(stream->deferred_data);
assert(stream->data_item);
data = stream->deferred_data;
stream->flags &= ~NGHTTP2_STREAM_FLAG_DEFERRED_ALL;
stream->deferred_data = NULL;
stream->deferred_flags = NGHTTP2_DEFERRED_NONE;
return nghttp2_stream_attach_data(stream, stream->data_item, pq);
}
return nghttp2_stream_attach_data(stream, data, pq);
int nghttp2_stream_check_deferred_data(nghttp2_stream *stream)
{
return stream->data_item &&
(stream->flags & NGHTTP2_STREAM_FLAG_DEFERRED_ALL);
}
int nghttp2_stream_check_deferred_by_flow_control(nghttp2_stream *stream)
{
return stream->data_item &&
(stream->flags & NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL);
}
static int update_initial_window_size
@ -466,7 +472,7 @@ void nghttp2_stream_dep_insert(nghttp2_stream *dep_stream,
{
nghttp2_stream *si;
assert(stream->data == NULL);
assert(stream->data_item == NULL);
DEBUGF(fprintf(stderr,
"stream: dep_insert dep_stream(%p)=%d, stream(%p)=%d\n",
@ -493,7 +499,7 @@ void nghttp2_stream_dep_add(nghttp2_stream *dep_stream,
{
nghttp2_stream *last_sib;
assert(stream->data == NULL);
assert(stream->data_item == NULL);
DEBUGF(fprintf(stderr,
"stream: dep_add dep_stream(%p)=%d, stream(%p)=%d\n",

View File

@ -84,7 +84,15 @@ typedef enum {
/* Indicates that this stream is pushed stream */
NGHTTP2_STREAM_FLAG_PUSH = 0x01,
/* Indicates that this stream was closed */
NGHTTP2_STREAM_FLAG_CLOSED = 0x02
NGHTTP2_STREAM_FLAG_CLOSED = 0x02,
/* Indicates the DATA is deferred due to flow control. */
NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL = 0x04,
/* Indicates the DATA is deferred by user callback */
NGHTTP2_STREAM_FLAG_DEFERRED_USER = 0x08,
/* bitwise OR of NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL and
NGHTTP2_STREAM_FLAG_DEFERRED_USER. */
NGHTTP2_STREAM_FLAG_DEFERRED_ALL = 0x0c,
} nghttp2_stream_flag;
typedef enum {
@ -94,12 +102,6 @@ typedef enum {
NGHTTP2_STREAM_DPRI_REST = 0x04
} nghttp2_stream_dpri;
typedef enum {
NGHTTP2_DEFERRED_NONE = 0,
/* Indicates the DATA is deferred due to flow control. */
NGHTTP2_DEFERRED_FLOW_CONTROL = 0x01
} nghttp2_deferred_flag;
struct nghttp2_stream_group;
typedef struct nghttp2_stream_group nghttp2_stream_group;
@ -127,10 +129,8 @@ struct nghttp2_stream {
nghttp2_stream *closed_next;
/* The arbitrary data provided by user for this stream. */
void *stream_user_data;
/* Active DATA frame */
nghttp2_outbound_item *data;
/* Deferred DATA frame */
nghttp2_outbound_item *deferred_data;
/* DATA frame item */
nghttp2_outbound_item *data_item;
/* stream ID */
int32_t stream_id;
/* priority group this stream belongs to */
@ -162,9 +162,6 @@ struct nghttp2_stream {
uint8_t flags;
/* Bitwise OR of zero or more nghttp2_shut_flag values */
uint8_t shut_flags;
/* The flags for defered DATA. Bitwise OR of zero or more
nghttp2_deferred_flag values */
uint8_t deferred_flags;
};
void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id,
@ -183,21 +180,33 @@ void nghttp2_stream_free(nghttp2_stream *stream);
void nghttp2_stream_shutdown(nghttp2_stream *stream, nghttp2_shut_flag flag);
/*
* Defer DATA frame |data|. We won't call this function in the
* situation where stream->deferred_data != NULL. If |flags| is
* bitwise OR of zero or more nghttp2_deferred_flag values.
* Defer DATA frame |stream->data_item|. We won't call this function
* in the situation where |stream->data_item| == NULL. If |flags| is
* bitwise OR of zero or more of NGHTTP2_STREAM_FLAG_DEFERRED_USER and
* NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL. The |flags| indicates
* the reason of this action.
*/
void nghttp2_stream_defer_data(nghttp2_stream *stream,
nghttp2_outbound_item *data,
uint8_t flags);
void nghttp2_stream_defer_data(nghttp2_stream *stream, uint8_t flags);
/*
* Detaches deferred data from this stream. This function does not
* free deferred data.
* Detaches deferred data in this stream and it is back to active
* state. The flags NGHTTP2_STREAM_FLAG_DEFERRED_USER and
* NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL are cleared if they are
* set.
*/
int nghttp2_stream_detach_deferred_data(nghttp2_stream *stream,
nghttp2_pq *pq);
/*
* Returns nonzero if data item is deferred by whatever reason.
*/
int nghttp2_stream_check_deferred_data(nghttp2_stream *stream);
/*
* Returns nonzero if data item is deferred by flow control.
*/
int nghttp2_stream_check_deferred_by_flow_control(nghttp2_stream *stream);
/*
* Updates the remote window size with the new value
* |new_initial_window_size|. The |old_initial_window_size| is used to
@ -269,7 +278,7 @@ void nghttp2_stream_dep_add(nghttp2_stream *dep_stream,
void nghttp2_stream_dep_remove(nghttp2_stream *stream);
/*
* Attaches |data| to |stream|. Updates dpri members in this
* Attaches |data_item| to |stream|. Updates dpri members in this
* dependency tree.
*
* This function returns 0 if it succeeds, or one of the following
@ -279,12 +288,13 @@ void nghttp2_stream_dep_remove(nghttp2_stream *stream);
* Out of memory
*/
int nghttp2_stream_attach_data(nghttp2_stream *stream,
nghttp2_outbound_item *data,
nghttp2_outbound_item *data_item,
nghttp2_pq *pq);
/*
* Detaches |data| from |stream|. Updates dpri members in this
* dependency tree.
* Detaches |stream->data_item|. Updates dpri members in this
* dependency tree. This function does not free |stream->data_item|.
* The caller must free it.
*
* This function returns 0 if it succeeds, or one of the following
* negative error codes:

View File

@ -2201,13 +2201,13 @@ void test_nghttp2_session_on_window_update_received(void)
CU_ASSERT(1 == user_data.frame_recv_cb_called);
CU_ASSERT(NGHTTP2_INITIAL_WINDOW_SIZE+16*1024 == stream->remote_window_size);
nghttp2_stream_defer_data(stream, data_item, NGHTTP2_DEFERRED_FLOW_CONTROL);
nghttp2_stream_defer_data(stream, NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL);
CU_ASSERT(0 == nghttp2_session_on_window_update_received(session, &frame));
CU_ASSERT(2 == user_data.frame_recv_cb_called);
CU_ASSERT(NGHTTP2_INITIAL_WINDOW_SIZE+16*1024*2 ==
stream->remote_window_size);
CU_ASSERT(NULL == stream->deferred_data);
CU_ASSERT(0 == (stream->flags & NGHTTP2_STREAM_FLAG_DEFERRED_ALL));
nghttp2_frame_window_update_free(&frame.window_update);
@ -4233,9 +4233,9 @@ void test_nghttp2_session_data_read_temporal_failure(void)
CU_ASSERT(data_size - NGHTTP2_INITIAL_WINDOW_SIZE == ud.data_source_length);
stream = nghttp2_session_get_stream(session, 1);
CU_ASSERT(NULL != stream->deferred_data);
CU_ASSERT(NGHTTP2_CAT_DATA == stream->deferred_data->frame_cat);
data_frame = (nghttp2_private_data*)stream->deferred_data->frame;
CU_ASSERT(nghttp2_stream_check_deferred_by_flow_control(stream));
CU_ASSERT(NGHTTP2_CAT_DATA == stream->data_item->frame_cat);
data_frame = (nghttp2_private_data*)stream->data_item->frame;
data_frame->data_prd.read_callback =
temporal_failure_data_source_read_callback;