From 74daa16a1ca3c787d26985e0d5f6fdaa4dd79a73 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sun, 30 Mar 2014 17:41:54 +0900 Subject: [PATCH] Retain incoming closed streams for dependency tree The number of closed stream to keep is limited by MAX_CONCURRENT_STREAMS - current active stream. --- lib/nghttp2_session.c | 95 ++++++++++++++++++++++++++++++++++-- lib/nghttp2_session.h | 53 +++++++++++++++++++- lib/nghttp2_stream.c | 2 + lib/nghttp2_stream.h | 9 +++- tests/main.c | 2 + tests/nghttp2_session_test.c | 54 ++++++++++++++++++++ tests/nghttp2_session_test.h | 1 + 7 files changed, 209 insertions(+), 7 deletions(-) diff --git a/lib/nghttp2_session.c b/lib/nghttp2_session.c index 58d20cf4..ddaa809b 100644 --- a/lib/nghttp2_session.c +++ b/lib/nghttp2_session.c @@ -128,6 +128,20 @@ int nghttp2_session_is_my_stream_id(nghttp2_session *session, nghttp2_stream* nghttp2_session_get_stream(nghttp2_session *session, int32_t stream_id) +{ + nghttp2_stream *stream; + + stream = (nghttp2_stream*)nghttp2_map_find(&session->streams, stream_id); + + if(stream == NULL || (stream->flags & NGHTTP2_STREAM_FLAG_CLOSED)) { + return NULL; + } + + return stream; +} + +nghttp2_stream* nghttp2_session_get_stream_raw(nghttp2_session *session, + int32_t stream_id) { return (nghttp2_stream*)nghttp2_map_find(&session->streams, stream_id); } @@ -510,7 +524,7 @@ int nghttp2_session_reprioritize_stream old_stream_group = stream->stream_group; - dep_stream = nghttp2_session_get_stream(session, dep->stream_id); + dep_stream = nghttp2_session_get_stream_raw(session, dep->stream_id); if(dep_stream == NULL) { return 0; @@ -736,6 +750,10 @@ nghttp2_stream* nghttp2_session_open_stream(nghttp2_session *session, dep_stream = NULL; + if(session->server && !nghttp2_session_is_my_stream_id(session, stream_id)) { + nghttp2_session_adjust_closed_stream(session, 1); + } + switch(pri_spec->pri_type) { case NGHTTP2_PRIORITY_TYPE_GROUP: pri_group_id = pri_spec->group.pri_group_id; @@ -743,7 +761,8 @@ nghttp2_stream* nghttp2_session_open_stream(nghttp2_session *session, break; case NGHTTP2_PRIORITY_TYPE_DEP: - dep_stream = nghttp2_session_get_stream(session, pri_spec->dep.stream_id); + dep_stream = nghttp2_session_get_stream_raw(session, + pri_spec->dep.stream_id); if(dep_stream) { pri_group_id = dep_stream->stream_group->pri_group_id; @@ -838,7 +857,6 @@ int nghttp2_session_close_stream(nghttp2_session *session, int32_t stream_id, { int rv; nghttp2_stream *stream; - nghttp2_stream_group *stream_group; stream = nghttp2_session_get_stream(session, stream_id); @@ -892,6 +910,29 @@ int nghttp2_session_close_stream(nghttp2_session *session, int32_t stream_id, } } + /* Closes both directions just in case they are not closed yet */ + stream->flags |= NGHTTP2_STREAM_FLAG_CLOSED; + + if(session->server && !nghttp2_session_is_my_stream_id(session, stream_id)) { + /* On server side, retain incoming stream object at most + MAX_CONCURRENT_STREAMS combined with the current active streams + to make dependency tree work better. */ + nghttp2_session_keep_closed_stream(session, stream); + } else { + nghttp2_session_destroy_stream(session, stream); + } + + return 0; +} + +void nghttp2_session_destroy_stream(nghttp2_session *session, + nghttp2_stream *stream) +{ + nghttp2_stream_group *stream_group; + + DEBUGF(fprintf(stderr, "stream: destroy closed stream(%p)=%d\n", + stream, stream->stream_id)); + nghttp2_stream_dep_remove(stream); stream_group = stream->stream_group; @@ -899,11 +940,55 @@ int nghttp2_session_close_stream(nghttp2_session *session, int32_t stream_id, nghttp2_stream_group_remove_stream(stream_group, stream); nghttp2_session_close_stream_group_if_empty(session, stream_group); - nghttp2_map_remove(&session->streams, stream_id); + nghttp2_map_remove(&session->streams, stream->stream_id); nghttp2_stream_free(stream); free(stream); +} - return 0; +void nghttp2_session_keep_closed_stream(nghttp2_session *session, + nghttp2_stream *stream) +{ + DEBUGF(fprintf(stderr, "stream: keep closed stream(%p)=%d\n", + stream, stream->stream_id)); + + if(session->closed_stream_tail) { + session->closed_stream_tail->closed_next = stream; + } else { + session->closed_stream_head = stream; + } + session->closed_stream_tail = stream; + + ++session->num_closed_streams; + + nghttp2_session_adjust_closed_stream(session, 0); +} + +void nghttp2_session_adjust_closed_stream(nghttp2_session *session, + ssize_t offset) +{ + DEBUGF(fprintf(stderr, "stream: adjusting kept closed streams " + "num_closed_streams=%zu, num_incoming_streams=%zu, " + "max_concurrent_streams=%u\n", + session->num_closed_streams, session->num_incoming_streams, + session->pending_local_max_concurrent_stream)); + + while(session->num_closed_streams > 0 && + session->num_closed_streams + session->num_incoming_streams + offset + > session->pending_local_max_concurrent_stream) { + nghttp2_stream *head_stream; + + head_stream = session->closed_stream_head; + + session->closed_stream_head = head_stream->closed_next; + + if(session->closed_stream_tail == head_stream) { + session->closed_stream_tail = NULL; + } + + nghttp2_session_destroy_stream(session, head_stream); + /* head_stream is now freed */ + --session->num_closed_streams; + } } /* diff --git a/lib/nghttp2_session.h b/lib/nghttp2_session.h index b74d481c..f406d573 100644 --- a/lib/nghttp2_session.h +++ b/lib/nghttp2_session.h @@ -130,6 +130,12 @@ struct nghttp2_session { enqueue if priority is equal. */ int64_t next_seq; void *user_data; + /* Points to the latest closed stream. NULL if there is no closed + stream. */ + nghttp2_stream *closed_stream_head; + /* Points to the oldest closed stream. NULL if there is no closed + stream. */ + nghttp2_stream *closed_stream_tail; /* In-flight SETTINGS values. NULL does not necessarily mean there is no in-flight SETTINGS. */ nghttp2_settings_entry *inflight_iv; @@ -142,6 +148,11 @@ struct nghttp2_session { /* The number of incoming streams. This will be capped by local_settings[NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]. */ size_t num_incoming_streams; + /* The number of closed streams still kept in |streams| hash. The + closed streams can be accessed through single linked list + |closed_stream_head|. The current implementation only keeps + incoming streams and session is initialized as server. */ + size_t num_closed_streams; /* The number of bytes allocated for nvbuf */ size_t nvbuflen; /* Next Stream ID. Made unsigned int to detect >= (1 << 31). */ @@ -330,6 +341,11 @@ nghttp2_stream* nghttp2_session_open_stream(nghttp2_session *session, * is indicated by the |error_code|. When closing the stream, * on_stream_close_callback will be called. * + * If the session is initialized as server and |stream| is incoming + * stream, stream is just marked closed and this function calls + * nghttp2_session_keep_closed_stream() with |stream|. Otherwise, + * |stream| will be deleted from memory. + * * This function returns 0 if it succeeds, or one the following * negative error codes: * @@ -343,6 +359,33 @@ nghttp2_stream* nghttp2_session_open_stream(nghttp2_session *session, int nghttp2_session_close_stream(nghttp2_session *session, int32_t stream_id, nghttp2_error_code error_code); +/* + * Deletes |stream| from memory. After this function returns, stream + * cannot be accessed. + * + */ +void nghttp2_session_destroy_stream(nghttp2_session *session, + nghttp2_stream *stream); + +/* + * Tries to keep incoming closed stream |stream|. Due to the + * limitation of maximum number of streams in memory, |stream| is not + * closed and just deleted from memory (see + * nghttp2_session_destroy_stream). + */ +void nghttp2_session_keep_closed_stream(nghttp2_session *session, + nghttp2_stream *stream); + +/* + * Deletes closed stream to ensure that number of incoming streams + * including active and closed is in the maximum number of allowed + * stream. If |offset| is nonzero, it is decreased from the maximum + * number of allowed stream when comparing number of active and closed + * stream and the maximum number. + */ +void nghttp2_session_adjust_closed_stream(nghttp2_session *session, + ssize_t offset); + /* * If further receptions and transmissions over the stream |stream_id| * are disallowed, close the stream with error code NGHTTP2_NO_ERROR. @@ -506,11 +549,19 @@ int nghttp2_session_on_data_received(nghttp2_session *session, /* * Returns nghttp2_stream* object whose stream ID is |stream_id|. It - * could be NULL if such stream does not exist. + * could be NULL if such stream does not exist. This function returns + * NULL if stream is marked as closed. */ nghttp2_stream* nghttp2_session_get_stream(nghttp2_session *session, int32_t stream_id); +/* + * This function behaves like nghttp2_session_get_stream(), but it + * returns stream object even if it is marked as closed. + */ +nghttp2_stream* nghttp2_session_get_stream_raw(nghttp2_session *session, + int32_t stream_id); + /* * Returns nghttp2_stream_group* object whose priority group ID is * |pri_group_id|. It could be NULL if such priority group does not diff --git a/lib/nghttp2_stream.c b/lib/nghttp2_stream.c index 5373b972..e58e7dd4 100644 --- a/lib/nghttp2_stream.c +++ b/lib/nghttp2_stream.c @@ -53,6 +53,8 @@ void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id, stream->sib_prev = NULL; stream->sib_next = NULL; + stream->closed_next = NULL; + stream->stream_group = NULL; stream->dpri = NGHTTP2_STREAM_DPRI_NO_DATA; stream->num_substreams = 1; diff --git a/lib/nghttp2_stream.h b/lib/nghttp2_stream.h index ce632b9d..b5300950 100644 --- a/lib/nghttp2_stream.h +++ b/lib/nghttp2_stream.h @@ -82,7 +82,9 @@ typedef enum { typedef enum { NGHTTP2_STREAM_FLAG_NONE = 0, /* Indicates that this stream is pushed stream */ - NGHTTP2_STREAM_FLAG_PUSH = 0x01 + NGHTTP2_STREAM_FLAG_PUSH = 0x01, + /* Indicates that this stream was closed */ + NGHTTP2_STREAM_FLAG_CLOSED = 0x02 } nghttp2_stream_flag; typedef enum { @@ -118,6 +120,11 @@ struct nghttp2_stream { dep_prev and sib_prev are NULL. */ nghttp2_stream *dep_prev, *dep_next; nghttp2_stream *sib_prev, *sib_next; + /* When stream is kept after closure, it may be kept in single + linked list pointed by nghttp2_session closed_stream_head. + closed_next points to the next stream object if it is the element + of the list. */ + nghttp2_stream *closed_next; /* The arbitrary data provided by user for this stream. */ void *stream_user_data; /* Active DATA frame */ diff --git a/tests/main.c b/tests/main.c index 03076584..91468adb 100644 --- a/tests/main.c +++ b/tests/main.c @@ -223,6 +223,8 @@ int main(int argc, char* argv[]) test_nghttp2_session_stream_attach_data) || !CU_add_test(pSuite, "session_stream_attach_data_subtree", test_nghttp2_session_stream_attach_data_subtree) || + !CU_add_test(pSuite, "session_stream_keep_closed_stream", + test_nghttp2_session_keep_closed_stream) || !CU_add_test(pSuite, "frame_pack_headers", test_nghttp2_frame_pack_headers) || !CU_add_test(pSuite, "frame_pack_headers_frame_too_large", diff --git a/tests/nghttp2_session_test.c b/tests/nghttp2_session_test.c index da9c7108..c421ea6e 100644 --- a/tests/nghttp2_session_test.c +++ b/tests/nghttp2_session_test.c @@ -5335,3 +5335,57 @@ void test_nghttp2_session_stream_attach_data_subtree(void) nghttp2_session_del(session); } + +void test_nghttp2_session_keep_closed_stream(void) +{ + nghttp2_session *session; + nghttp2_session_callbacks callbacks; + /* nghttp2_stream *stream; */ + const size_t max_concurrent_streams = 5; + nghttp2_settings_entry iv = { + NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + max_concurrent_streams + }; + size_t i; + + memset(&callbacks, 0, sizeof(callbacks)); + callbacks.send_callback = null_send_callback; + + nghttp2_session_server_new(&session, &callbacks, NULL); + + nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, &iv, 1); + + nghttp2_session_send(session); + + for(i = 0; i < max_concurrent_streams; ++i) { + open_stream(session, i * 2 + 1); + } + + CU_ASSERT(0 == session->num_closed_streams); + + nghttp2_session_close_stream(session, 1, NGHTTP2_NO_ERROR); + + CU_ASSERT(1 == session->num_closed_streams); + CU_ASSERT(1 == session->closed_stream_tail->stream_id); + CU_ASSERT(session->closed_stream_tail == session->closed_stream_head); + + nghttp2_session_close_stream(session, 5, NGHTTP2_NO_ERROR); + + CU_ASSERT(2 == session->num_closed_streams); + CU_ASSERT(5 == session->closed_stream_tail->stream_id); + CU_ASSERT(1 == session->closed_stream_head->stream_id); + + open_stream(session, 11); + + CU_ASSERT(1 == session->num_closed_streams); + CU_ASSERT(5 == session->closed_stream_tail->stream_id); + CU_ASSERT(session->closed_stream_tail == session->closed_stream_head); + + open_stream(session, 13); + + CU_ASSERT(0 == session->num_closed_streams); + CU_ASSERT(NULL == session->closed_stream_tail); + CU_ASSERT(NULL == session->closed_stream_head); + + nghttp2_session_del(session); +} diff --git a/tests/nghttp2_session_test.h b/tests/nghttp2_session_test.h index 20425b5c..40d8cd85 100644 --- a/tests/nghttp2_session_test.h +++ b/tests/nghttp2_session_test.h @@ -103,5 +103,6 @@ void test_nghttp2_session_stream_dep_add_subtree(void); void test_nghttp2_session_stream_dep_remove_subtree(void); void test_nghttp2_session_stream_attach_data(void); void test_nghttp2_session_stream_attach_data_subtree(void); +void test_nghttp2_session_keep_closed_stream(void); #endif /* NGHTTP2_SESSION_TEST_H */