Retain incoming closed streams for dependency tree

The number of closed stream to keep is limited by
MAX_CONCURRENT_STREAMS - current active stream.
This commit is contained in:
Tatsuhiro Tsujikawa 2014-03-30 17:41:54 +09:00
parent a9d97d9d35
commit 74daa16a1c
7 changed files with 209 additions and 7 deletions

View File

@ -128,6 +128,20 @@ int nghttp2_session_is_my_stream_id(nghttp2_session *session,
nghttp2_stream* nghttp2_session_get_stream(nghttp2_session *session,
int32_t stream_id)
{
nghttp2_stream *stream;
stream = (nghttp2_stream*)nghttp2_map_find(&session->streams, stream_id);
if(stream == NULL || (stream->flags & NGHTTP2_STREAM_FLAG_CLOSED)) {
return NULL;
}
return stream;
}
nghttp2_stream* nghttp2_session_get_stream_raw(nghttp2_session *session,
int32_t stream_id)
{
return (nghttp2_stream*)nghttp2_map_find(&session->streams, stream_id);
}
@ -510,7 +524,7 @@ int nghttp2_session_reprioritize_stream
old_stream_group = stream->stream_group;
dep_stream = nghttp2_session_get_stream(session, dep->stream_id);
dep_stream = nghttp2_session_get_stream_raw(session, dep->stream_id);
if(dep_stream == NULL) {
return 0;
@ -736,6 +750,10 @@ nghttp2_stream* nghttp2_session_open_stream(nghttp2_session *session,
dep_stream = NULL;
if(session->server && !nghttp2_session_is_my_stream_id(session, stream_id)) {
nghttp2_session_adjust_closed_stream(session, 1);
}
switch(pri_spec->pri_type) {
case NGHTTP2_PRIORITY_TYPE_GROUP:
pri_group_id = pri_spec->group.pri_group_id;
@ -743,7 +761,8 @@ nghttp2_stream* nghttp2_session_open_stream(nghttp2_session *session,
break;
case NGHTTP2_PRIORITY_TYPE_DEP:
dep_stream = nghttp2_session_get_stream(session, pri_spec->dep.stream_id);
dep_stream = nghttp2_session_get_stream_raw(session,
pri_spec->dep.stream_id);
if(dep_stream) {
pri_group_id = dep_stream->stream_group->pri_group_id;
@ -838,7 +857,6 @@ int nghttp2_session_close_stream(nghttp2_session *session, int32_t stream_id,
{
int rv;
nghttp2_stream *stream;
nghttp2_stream_group *stream_group;
stream = nghttp2_session_get_stream(session, stream_id);
@ -892,6 +910,29 @@ int nghttp2_session_close_stream(nghttp2_session *session, int32_t stream_id,
}
}
/* Closes both directions just in case they are not closed yet */
stream->flags |= NGHTTP2_STREAM_FLAG_CLOSED;
if(session->server && !nghttp2_session_is_my_stream_id(session, stream_id)) {
/* On server side, retain incoming stream object at most
MAX_CONCURRENT_STREAMS combined with the current active streams
to make dependency tree work better. */
nghttp2_session_keep_closed_stream(session, stream);
} else {
nghttp2_session_destroy_stream(session, stream);
}
return 0;
}
void nghttp2_session_destroy_stream(nghttp2_session *session,
nghttp2_stream *stream)
{
nghttp2_stream_group *stream_group;
DEBUGF(fprintf(stderr, "stream: destroy closed stream(%p)=%d\n",
stream, stream->stream_id));
nghttp2_stream_dep_remove(stream);
stream_group = stream->stream_group;
@ -899,11 +940,55 @@ int nghttp2_session_close_stream(nghttp2_session *session, int32_t stream_id,
nghttp2_stream_group_remove_stream(stream_group, stream);
nghttp2_session_close_stream_group_if_empty(session, stream_group);
nghttp2_map_remove(&session->streams, stream_id);
nghttp2_map_remove(&session->streams, stream->stream_id);
nghttp2_stream_free(stream);
free(stream);
}
return 0;
void nghttp2_session_keep_closed_stream(nghttp2_session *session,
nghttp2_stream *stream)
{
DEBUGF(fprintf(stderr, "stream: keep closed stream(%p)=%d\n",
stream, stream->stream_id));
if(session->closed_stream_tail) {
session->closed_stream_tail->closed_next = stream;
} else {
session->closed_stream_head = stream;
}
session->closed_stream_tail = stream;
++session->num_closed_streams;
nghttp2_session_adjust_closed_stream(session, 0);
}
void nghttp2_session_adjust_closed_stream(nghttp2_session *session,
ssize_t offset)
{
DEBUGF(fprintf(stderr, "stream: adjusting kept closed streams "
"num_closed_streams=%zu, num_incoming_streams=%zu, "
"max_concurrent_streams=%u\n",
session->num_closed_streams, session->num_incoming_streams,
session->pending_local_max_concurrent_stream));
while(session->num_closed_streams > 0 &&
session->num_closed_streams + session->num_incoming_streams + offset
> session->pending_local_max_concurrent_stream) {
nghttp2_stream *head_stream;
head_stream = session->closed_stream_head;
session->closed_stream_head = head_stream->closed_next;
if(session->closed_stream_tail == head_stream) {
session->closed_stream_tail = NULL;
}
nghttp2_session_destroy_stream(session, head_stream);
/* head_stream is now freed */
--session->num_closed_streams;
}
}
/*

View File

@ -130,6 +130,12 @@ struct nghttp2_session {
enqueue if priority is equal. */
int64_t next_seq;
void *user_data;
/* Points to the latest closed stream. NULL if there is no closed
stream. */
nghttp2_stream *closed_stream_head;
/* Points to the oldest closed stream. NULL if there is no closed
stream. */
nghttp2_stream *closed_stream_tail;
/* In-flight SETTINGS values. NULL does not necessarily mean there
is no in-flight SETTINGS. */
nghttp2_settings_entry *inflight_iv;
@ -142,6 +148,11 @@ struct nghttp2_session {
/* The number of incoming streams. This will be capped by
local_settings[NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]. */
size_t num_incoming_streams;
/* The number of closed streams still kept in |streams| hash. The
closed streams can be accessed through single linked list
|closed_stream_head|. The current implementation only keeps
incoming streams and session is initialized as server. */
size_t num_closed_streams;
/* The number of bytes allocated for nvbuf */
size_t nvbuflen;
/* Next Stream ID. Made unsigned int to detect >= (1 << 31). */
@ -330,6 +341,11 @@ nghttp2_stream* nghttp2_session_open_stream(nghttp2_session *session,
* is indicated by the |error_code|. When closing the stream,
* on_stream_close_callback will be called.
*
* If the session is initialized as server and |stream| is incoming
* stream, stream is just marked closed and this function calls
* nghttp2_session_keep_closed_stream() with |stream|. Otherwise,
* |stream| will be deleted from memory.
*
* This function returns 0 if it succeeds, or one the following
* negative error codes:
*
@ -343,6 +359,33 @@ nghttp2_stream* nghttp2_session_open_stream(nghttp2_session *session,
int nghttp2_session_close_stream(nghttp2_session *session, int32_t stream_id,
nghttp2_error_code error_code);
/*
* Deletes |stream| from memory. After this function returns, stream
* cannot be accessed.
*
*/
void nghttp2_session_destroy_stream(nghttp2_session *session,
nghttp2_stream *stream);
/*
* Tries to keep incoming closed stream |stream|. Due to the
* limitation of maximum number of streams in memory, |stream| is not
* closed and just deleted from memory (see
* nghttp2_session_destroy_stream).
*/
void nghttp2_session_keep_closed_stream(nghttp2_session *session,
nghttp2_stream *stream);
/*
* Deletes closed stream to ensure that number of incoming streams
* including active and closed is in the maximum number of allowed
* stream. If |offset| is nonzero, it is decreased from the maximum
* number of allowed stream when comparing number of active and closed
* stream and the maximum number.
*/
void nghttp2_session_adjust_closed_stream(nghttp2_session *session,
ssize_t offset);
/*
* If further receptions and transmissions over the stream |stream_id|
* are disallowed, close the stream with error code NGHTTP2_NO_ERROR.
@ -506,11 +549,19 @@ int nghttp2_session_on_data_received(nghttp2_session *session,
/*
* Returns nghttp2_stream* object whose stream ID is |stream_id|. It
* could be NULL if such stream does not exist.
* could be NULL if such stream does not exist. This function returns
* NULL if stream is marked as closed.
*/
nghttp2_stream* nghttp2_session_get_stream(nghttp2_session *session,
int32_t stream_id);
/*
* This function behaves like nghttp2_session_get_stream(), but it
* returns stream object even if it is marked as closed.
*/
nghttp2_stream* nghttp2_session_get_stream_raw(nghttp2_session *session,
int32_t stream_id);
/*
* Returns nghttp2_stream_group* object whose priority group ID is
* |pri_group_id|. It could be NULL if such priority group does not

View File

@ -53,6 +53,8 @@ void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id,
stream->sib_prev = NULL;
stream->sib_next = NULL;
stream->closed_next = NULL;
stream->stream_group = NULL;
stream->dpri = NGHTTP2_STREAM_DPRI_NO_DATA;
stream->num_substreams = 1;

View File

@ -82,7 +82,9 @@ typedef enum {
typedef enum {
NGHTTP2_STREAM_FLAG_NONE = 0,
/* Indicates that this stream is pushed stream */
NGHTTP2_STREAM_FLAG_PUSH = 0x01
NGHTTP2_STREAM_FLAG_PUSH = 0x01,
/* Indicates that this stream was closed */
NGHTTP2_STREAM_FLAG_CLOSED = 0x02
} nghttp2_stream_flag;
typedef enum {
@ -118,6 +120,11 @@ struct nghttp2_stream {
dep_prev and sib_prev are NULL. */
nghttp2_stream *dep_prev, *dep_next;
nghttp2_stream *sib_prev, *sib_next;
/* When stream is kept after closure, it may be kept in single
linked list pointed by nghttp2_session closed_stream_head.
closed_next points to the next stream object if it is the element
of the list. */
nghttp2_stream *closed_next;
/* The arbitrary data provided by user for this stream. */
void *stream_user_data;
/* Active DATA frame */

View File

@ -223,6 +223,8 @@ int main(int argc, char* argv[])
test_nghttp2_session_stream_attach_data) ||
!CU_add_test(pSuite, "session_stream_attach_data_subtree",
test_nghttp2_session_stream_attach_data_subtree) ||
!CU_add_test(pSuite, "session_stream_keep_closed_stream",
test_nghttp2_session_keep_closed_stream) ||
!CU_add_test(pSuite, "frame_pack_headers",
test_nghttp2_frame_pack_headers) ||
!CU_add_test(pSuite, "frame_pack_headers_frame_too_large",

View File

@ -5335,3 +5335,57 @@ void test_nghttp2_session_stream_attach_data_subtree(void)
nghttp2_session_del(session);
}
void test_nghttp2_session_keep_closed_stream(void)
{
nghttp2_session *session;
nghttp2_session_callbacks callbacks;
/* nghttp2_stream *stream; */
const size_t max_concurrent_streams = 5;
nghttp2_settings_entry iv = {
NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
max_concurrent_streams
};
size_t i;
memset(&callbacks, 0, sizeof(callbacks));
callbacks.send_callback = null_send_callback;
nghttp2_session_server_new(&session, &callbacks, NULL);
nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, &iv, 1);
nghttp2_session_send(session);
for(i = 0; i < max_concurrent_streams; ++i) {
open_stream(session, i * 2 + 1);
}
CU_ASSERT(0 == session->num_closed_streams);
nghttp2_session_close_stream(session, 1, NGHTTP2_NO_ERROR);
CU_ASSERT(1 == session->num_closed_streams);
CU_ASSERT(1 == session->closed_stream_tail->stream_id);
CU_ASSERT(session->closed_stream_tail == session->closed_stream_head);
nghttp2_session_close_stream(session, 5, NGHTTP2_NO_ERROR);
CU_ASSERT(2 == session->num_closed_streams);
CU_ASSERT(5 == session->closed_stream_tail->stream_id);
CU_ASSERT(1 == session->closed_stream_head->stream_id);
open_stream(session, 11);
CU_ASSERT(1 == session->num_closed_streams);
CU_ASSERT(5 == session->closed_stream_tail->stream_id);
CU_ASSERT(session->closed_stream_tail == session->closed_stream_head);
open_stream(session, 13);
CU_ASSERT(0 == session->num_closed_streams);
CU_ASSERT(NULL == session->closed_stream_tail);
CU_ASSERT(NULL == session->closed_stream_head);
nghttp2_session_del(session);
}

View File

@ -103,5 +103,6 @@ void test_nghttp2_session_stream_dep_add_subtree(void);
void test_nghttp2_session_stream_dep_remove_subtree(void);
void test_nghttp2_session_stream_attach_data(void);
void test_nghttp2_session_stream_attach_data_subtree(void);
void test_nghttp2_session_keep_closed_stream(void);
#endif /* NGHTTP2_SESSION_TEST_H */