From 5b59e46e2bb4a5b07cf040e6e10a4f7457fa06cd Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sun, 16 Aug 2015 19:01:10 +0900 Subject: [PATCH] Rewrite priority handling We now use priority queue per stream, which contains the stream which has ready to send a frame, or one of its descendants have a frame to send. We maintain invariant that if a stream is queued, then its ancestors are also queued (except for root). When we re-schedule stream after transmission, we re-schedule all ancestors, so that streams on the other path can get a chance to send. This is basically the same mechanism h2o project uses, but there are differences in the details. --- lib/nghttp2_helper.h | 7 + lib/nghttp2_outbound_item.h | 3 +- lib/nghttp2_pq.c | 60 ++- lib/nghttp2_pq.h | 21 +- lib/nghttp2_session.c | 361 +++++---------- lib/nghttp2_session.h | 66 ++- lib/nghttp2_stream.c | 864 ++++++++++++----------------------- lib/nghttp2_stream.h | 123 ++--- tests/main.c | 2 + tests/nghttp2_pq_test.c | 168 ++++++- tests/nghttp2_pq_test.h | 2 + tests/nghttp2_session_test.c | 547 ++++++++++++---------- 12 files changed, 1056 insertions(+), 1168 deletions(-) diff --git a/lib/nghttp2_helper.h b/lib/nghttp2_helper.h index 0a7f8c81..979b64af 100644 --- a/lib/nghttp2_helper.h +++ b/lib/nghttp2_helper.h @@ -30,6 +30,7 @@ #endif /* HAVE_CONFIG_H */ #include +#include #include #include "nghttp2_mem.h" @@ -39,6 +40,12 @@ #define lstreq(A, B, N) ((sizeof((A)) - 1) == (N) && memcmp((A), (B), (N)) == 0) +#define nghttp2_struct_of(ptr, type, member) \ + ({ \ + const typeof(((type *)0)->member) *nghttp2__mptr = (ptr); \ + (type *)(void *)((char *)nghttp2__mptr - __builtin_offsetof(type, member)); \ + }) + /* * Copies 2 byte unsigned integer |n| in host byte order to |buf| in * network byte order. diff --git a/lib/nghttp2_outbound_item.h b/lib/nghttp2_outbound_item.h index 0be5cb76..c16d5dfe 100644 --- a/lib/nghttp2_outbound_item.h +++ b/lib/nghttp2_outbound_item.h @@ -112,7 +112,8 @@ struct nghttp2_outbound_item { proportional to effective weight (inside a tree). */ uint64_t cycle; nghttp2_outbound_item *qnext; - /* nonzero if this object is queued. */ + /* nonzero if this object is queued, except for DATA or HEADERS + which are attached to stream as item. */ uint8_t queued; }; diff --git a/lib/nghttp2_pq.c b/lib/nghttp2_pq.c index f04c189f..065176e5 100644 --- a/lib/nghttp2_pq.c +++ b/lib/nghttp2_pq.c @@ -24,13 +24,15 @@ */ #include "nghttp2_pq.h" +#include +#include + +#include "nghttp2_helper.h" + int nghttp2_pq_init(nghttp2_pq *pq, nghttp2_less less, nghttp2_mem *mem) { pq->mem = mem; - pq->capacity = 128; - pq->q = nghttp2_mem_malloc(mem, pq->capacity * sizeof(void *)); - if (pq->q == NULL) { - return NGHTTP2_ERR_NOMEM; - } + pq->capacity = 0; + pq->q = NULL; pq->length = 0; pq->less = less; return 0; @@ -42,9 +44,11 @@ void nghttp2_pq_free(nghttp2_pq *pq) { } static void swap(nghttp2_pq *pq, size_t i, size_t j) { - void *t = pq->q[i]; + nghttp2_pq_entry *t = pq->q[i]; pq->q[i] = pq->q[j]; + pq->q[i]->index = i; pq->q[j] = t; + pq->q[j]->index = j; } static void bubble_up(nghttp2_pq *pq, size_t index) { @@ -59,24 +63,29 @@ static void bubble_up(nghttp2_pq *pq, size_t index) { } } -int nghttp2_pq_push(nghttp2_pq *pq, void *item) { +int nghttp2_pq_push(nghttp2_pq *pq, nghttp2_pq_entry *item) { if (pq->capacity <= pq->length) { void *nq; + size_t ncapacity; + + ncapacity = nghttp2_max(4, (pq->capacity * 2)); + nq = nghttp2_mem_realloc(pq->mem, pq->q, - (pq->capacity * 2) * sizeof(void *)); + ncapacity * sizeof(nghttp2_pq_entry *)); if (nq == NULL) { return NGHTTP2_ERR_NOMEM; } - pq->capacity *= 2; + pq->capacity = ncapacity; pq->q = nq; } pq->q[pq->length] = item; + item->index = pq->length; ++pq->length; bubble_up(pq, pq->length - 1); return 0; } -void *nghttp2_pq_top(nghttp2_pq *pq) { +nghttp2_pq_entry *nghttp2_pq_top(nghttp2_pq *pq) { if (pq->length == 0) { return NULL; } else { @@ -85,11 +94,9 @@ void *nghttp2_pq_top(nghttp2_pq *pq) { } static void bubble_down(nghttp2_pq *pq, size_t index) { - size_t lchild = index * 2 + 1; + size_t i, j = index * 2 + 1; size_t minindex = index; - size_t i, j; - for (i = 0; i < 2; ++i) { - j = lchild + i; + for (i = 0; i < 2; ++i, ++j) { if (j >= pq->length) { break; } @@ -106,11 +113,31 @@ static void bubble_down(nghttp2_pq *pq, size_t index) { void nghttp2_pq_pop(nghttp2_pq *pq) { if (pq->length > 0) { pq->q[0] = pq->q[pq->length - 1]; + pq->q[0]->index = 0; --pq->length; bubble_down(pq, 0); } } +void nghttp2_pq_remove(nghttp2_pq *pq, nghttp2_pq_entry *item) { + assert(pq->q[item->index] == item); + + if (item->index == pq->length - 1) { + --pq->length; + return; + } + + pq->q[item->index] = pq->q[pq->length - 1]; + pq->q[item->index]->index = item->index; + --pq->length; + + if (pq->less(item, pq->q[item->index])) { + bubble_down(pq, item->index); + } else { + bubble_up(pq, item->index); + } +} + int nghttp2_pq_empty(nghttp2_pq *pq) { return pq->length == 0; } size_t nghttp2_pq_size(nghttp2_pq *pq) { return pq->length; } @@ -144,3 +171,8 @@ int nghttp2_pq_each(nghttp2_pq *pq, nghttp2_pq_item_cb fun, void *arg) { } return 0; } + +void nghttp2_pq_increase_key(nghttp2_pq *pq, nghttp2_pq_entry *item) { + assert(pq->q[item->index] == item); + bubble_down(pq, item->index); +} diff --git a/lib/nghttp2_pq.h b/lib/nghttp2_pq.h index 1775d038..f4cb6db3 100644 --- a/lib/nghttp2_pq.h +++ b/lib/nghttp2_pq.h @@ -35,9 +35,11 @@ /* Implementation of priority queue */ +typedef struct { size_t index; } nghttp2_pq_entry; + typedef struct { /* The pointer to the pointer to the item stored */ - void **q; + nghttp2_pq_entry **q; /* Memory allocator */ nghttp2_mem *mem; /* The number of items sotred */ @@ -75,13 +77,13 @@ void nghttp2_pq_free(nghttp2_pq *pq); * NGHTTP2_ERR_NOMEM * Out of memory. */ -int nghttp2_pq_push(nghttp2_pq *pq, void *item); +int nghttp2_pq_push(nghttp2_pq *pq, nghttp2_pq_entry *item); /* * Returns item at the top of the queue |pq|. If the queue is empty, * this function returns NULL. */ -void *nghttp2_pq_top(nghttp2_pq *pq); +nghttp2_pq_entry *nghttp2_pq_top(nghttp2_pq *pq); /* * Pops item at the top of the queue |pq|. The popped item is not @@ -99,7 +101,7 @@ int nghttp2_pq_empty(nghttp2_pq *pq); */ size_t nghttp2_pq_size(nghttp2_pq *pq); -typedef int (*nghttp2_pq_item_cb)(void *item, void *arg); +typedef int (*nghttp2_pq_item_cb)(nghttp2_pq_entry *item, void *arg); /* * Updates each item in |pq| using function |fun| and re-construct @@ -118,4 +120,15 @@ void nghttp2_pq_update(nghttp2_pq *pq, nghttp2_pq_item_cb fun, void *arg); */ int nghttp2_pq_each(nghttp2_pq *pq, nghttp2_pq_item_cb fun, void *arg); +/* + * Performs "increase-key" operation against |item|, assuming |item| + * is in |pq|, and its key is already updated. + */ +void nghttp2_pq_increase_key(nghttp2_pq *pq, nghttp2_pq_entry *item); + +/* + * Removes |item| from priority queue. + */ +void nghttp2_pq_remove(nghttp2_pq *pq, nghttp2_pq_entry *item); + #endif /* NGHTTP2_PQ_H */ diff --git a/lib/nghttp2_session.c b/lib/nghttp2_session.c index 1c5d8306..54fe19db 100644 --- a/lib/nghttp2_session.c +++ b/lib/nghttp2_session.c @@ -34,6 +34,7 @@ #include "nghttp2_priority_spec.h" #include "nghttp2_option.h" #include "nghttp2_http.h" +#include "nghttp2_pq.h" /* * Returns non-zero if the number of outgoing opened streams is larger @@ -223,15 +224,6 @@ nghttp2_stream *nghttp2_session_get_stream_raw(nghttp2_session *session, return (nghttp2_stream *)nghttp2_map_find(&session->streams, stream_id); } -static int outbound_item_less(const void *lhsx, const void *rhsx) { - const nghttp2_outbound_item *lhs, *rhs; - - lhs = (const nghttp2_outbound_item *)lhsx; - rhs = (const nghttp2_outbound_item *)rhsx; - - return (lhs->cycle < rhs->cycle) ? 1 : 0; -} - static void session_inbound_frame_reset(nghttp2_session *session) { nghttp2_inbound_frame *iframe = &session->iframe; nghttp2_mem *mem = &session->mem; @@ -332,11 +324,6 @@ static int session_new(nghttp2_session **session_ptr, /* next_stream_id is initialized in either nghttp2_session_client_new2 or nghttp2_session_server_new2 */ - rv = nghttp2_pq_init(&(*session_ptr)->ob_da_pq, outbound_item_less, mem); - if (rv != 0) { - goto fail_ob_da_pq; - } - rv = nghttp2_hd_deflate_init(&(*session_ptr)->hd_deflater, mem); if (rv != 0) { goto fail_hd_deflater; @@ -352,7 +339,7 @@ static int session_new(nghttp2_session **session_ptr, nghttp2_stream_init(&(*session_ptr)->root, 0, NGHTTP2_STREAM_FLAG_NONE, NGHTTP2_STREAM_INITIAL, NGHTTP2_DEFAULT_WEIGHT, 0, 0, - NULL); + NULL, mem); (*session_ptr)->remote_window_size = NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE; (*session_ptr)->recv_window_size = 0; @@ -444,8 +431,6 @@ fail_map: fail_hd_inflater: nghttp2_hd_deflate_free(&(*session_ptr)->hd_deflater); fail_hd_deflater: - nghttp2_pq_free(&(*session_ptr)->ob_da_pq); -fail_ob_da_pq: nghttp2_mem_free(mem, *session_ptr); fail_session: return rv; @@ -541,16 +526,6 @@ static int free_streams(nghttp2_map_entry *entry, void *ptr) { return 0; } -static void ob_pq_free(nghttp2_pq *pq, nghttp2_mem *mem) { - while (!nghttp2_pq_empty(pq)) { - nghttp2_outbound_item *item = (nghttp2_outbound_item *)nghttp2_pq_top(pq); - nghttp2_outbound_item_free(item, mem); - nghttp2_mem_free(mem, item); - nghttp2_pq_pop(pq); - } - nghttp2_pq_free(pq); -} - static void ob_q_free(nghttp2_outbound_queue *q, nghttp2_mem *mem) { nghttp2_outbound_item *item, *next; for (item = q->head; item;) { @@ -620,7 +595,7 @@ void nghttp2_session_del(nghttp2_session *session) { ob_q_free(&session->ob_urgent, mem); ob_q_free(&session->ob_reg, mem); ob_q_free(&session->ob_syn, mem); - ob_pq_free(&session->ob_da_pq, mem); + active_outbound_item_reset(&session->aob, mem); session_inbound_frame_reset(session); nghttp2_hd_deflate_free(&session->hd_deflater); @@ -667,14 +642,14 @@ nghttp2_session_reprioritize_stream(nghttp2_session *session, if (pri_spec->stream_id == 0) { dep_stream = &session->root; - } else if (nghttp2_stream_dep_subtree_find(stream, dep_stream)) { + } else if (nghttp2_stream_dep_find_ancestor(dep_stream, stream)) { DEBUGF(fprintf(stderr, "stream: cycle detected, dep_stream(%p)=%d " "stream(%p)=%d\n", dep_stream, dep_stream->stream_id, stream, stream->stream_id)); nghttp2_stream_dep_remove_subtree(dep_stream); - rv = nghttp2_stream_dep_add_subtree(&session->root, dep_stream, session); + rv = nghttp2_stream_dep_add_subtree(&session->root, dep_stream); if (rv != 0) { return rv; } @@ -686,9 +661,9 @@ nghttp2_session_reprioritize_stream(nghttp2_session *session, stream->weight = pri_spec->weight; if (pri_spec->exclusive) { - rv = nghttp2_stream_dep_insert_subtree(dep_stream, stream, session); + rv = nghttp2_stream_dep_insert_subtree(dep_stream, stream); } else { - rv = nghttp2_stream_dep_add_subtree(dep_stream, stream, session); + rv = nghttp2_stream_dep_add_subtree(dep_stream, stream); } if (rv != 0) { @@ -719,19 +694,19 @@ int nghttp2_session_add_item(nghttp2_session *session, /* TODO If 2 HEADERS are submitted for reserved stream, then both of them are queued into ob_syn, which is not desirable. */ - if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) { + if (frame->headers.cat == NGHTTP2_HCAT_REQUEST || + (stream && stream->state == NGHTTP2_STREAM_RESERVED)) { nghttp2_outbound_queue_push(&session->ob_syn, item); item->queued = 1; break; } - if (stream && (stream->state == NGHTTP2_STREAM_RESERVED || - item->aux_data.headers.attach_stream)) { + if (stream && item->aux_data.headers.attach_stream) { if (stream->item) { return NGHTTP2_ERR_DATA_EXIST; } - rv = nghttp2_stream_attach_item(stream, item, session); + rv = nghttp2_stream_attach_item(stream, item); if (rv != 0) { return rv; @@ -769,7 +744,7 @@ int nghttp2_session_add_item(nghttp2_session *session, return NGHTTP2_ERR_DATA_EXIST; } - rv = nghttp2_stream_attach_item(stream, item, session); + rv = nghttp2_stream_attach_item(stream, item); if (rv != 0) { return rv; @@ -865,12 +840,18 @@ nghttp2_stream *nghttp2_session_open_stream(nghttp2_session *session, assert(stream->state == NGHTTP2_STREAM_IDLE); assert(nghttp2_stream_in_dep_tree(stream)); nghttp2_session_detach_idle_stream(session, stream); - nghttp2_stream_dep_remove(stream); + rv = nghttp2_stream_dep_remove(stream); + if (rv != 0) { + return NULL; + } } else { if (session->server && initial_state != NGHTTP2_STREAM_IDLE && !nghttp2_session_is_my_stream_id(session, stream_id)) { - nghttp2_session_adjust_closed_stream(session, 1); + rv = nghttp2_session_adjust_closed_stream(session, 1); + if (rv != 0) { + return NULL; + } } stream = nghttp2_mem_malloc(mem, sizeof(nghttp2_stream)); @@ -916,7 +897,7 @@ nghttp2_stream *nghttp2_session_open_stream(nghttp2_session *session, nghttp2_stream_init(stream, stream_id, flags, initial_state, pri_spec->weight, session->remote_settings.initial_window_size, session->local_settings.initial_window_size, - stream_user_data); + stream_user_data, mem); if (stream_alloc) { rv = nghttp2_map_insert(&session->streams, &stream->map_entry); @@ -942,7 +923,10 @@ nghttp2_stream *nghttp2_session_open_stream(nghttp2_session *session, /* Idle stream does not count toward the concurrent streams limit. This is used as anchor node in dependency tree. */ assert(session->server); - nghttp2_session_keep_idle_stream(session, stream); + rv = nghttp2_session_keep_idle_stream(session, stream); + if (rv != 0) { + return NULL; + } break; default: if (nghttp2_session_is_my_stream_id(session, stream_id)) { @@ -968,7 +952,10 @@ nghttp2_stream *nghttp2_session_open_stream(nghttp2_session *session, assert(dep_stream); if (pri_spec->exclusive) { - nghttp2_stream_dep_insert(dep_stream, stream); + rv = nghttp2_stream_dep_insert(dep_stream, stream); + if (rv != 0) { + return NULL; + } } else { nghttp2_stream_dep_add(dep_stream, stream); } @@ -997,7 +984,7 @@ int nghttp2_session_close_stream(nghttp2_session *session, int32_t stream_id, item = stream->item; - rv = nghttp2_stream_detach_item(stream, session); + rv = nghttp2_stream_detach_item(stream); if (rv != 0) { return rv; @@ -1045,17 +1032,21 @@ int nghttp2_session_close_stream(nghttp2_session *session, int32_t stream_id, /* On server side, retain stream at most MAX_CONCURRENT_STREAMS combined with the current active incoming streams to make dependency tree work better. */ - nghttp2_session_keep_closed_stream(session, stream); + rv = nghttp2_session_keep_closed_stream(session, stream); } else { - nghttp2_session_destroy_stream(session, stream); + rv = nghttp2_session_destroy_stream(session, stream); + } + if (rv != 0) { + return rv; } return 0; } -void nghttp2_session_destroy_stream(nghttp2_session *session, - nghttp2_stream *stream) { +int nghttp2_session_destroy_stream(nghttp2_session *session, + nghttp2_stream *stream) { nghttp2_mem *mem; + int rv; DEBUGF(fprintf(stderr, "stream: destroy closed stream(%p)=%d\n", stream, stream->stream_id)); @@ -1063,16 +1054,23 @@ void nghttp2_session_destroy_stream(nghttp2_session *session, mem = &session->mem; if (nghttp2_stream_in_dep_tree(stream)) { - nghttp2_stream_dep_remove(stream); + rv = nghttp2_stream_dep_remove(stream); + if (rv != 0) { + return rv; + } } nghttp2_map_remove(&session->streams, stream->stream_id); nghttp2_stream_free(stream); nghttp2_mem_free(mem, stream); + + return 0; } -void nghttp2_session_keep_closed_stream(nghttp2_session *session, - nghttp2_stream *stream) { +int nghttp2_session_keep_closed_stream(nghttp2_session *session, + nghttp2_stream *stream) { + int rv; + DEBUGF(fprintf(stderr, "stream: keep closed stream(%p)=%d, state=%d\n", stream, stream->stream_id, stream->state)); @@ -1086,11 +1084,18 @@ void nghttp2_session_keep_closed_stream(nghttp2_session *session, ++session->num_closed_streams; - nghttp2_session_adjust_closed_stream(session, 0); + rv = nghttp2_session_adjust_closed_stream(session, 0); + if (rv != 0) { + return rv; + } + + return 0; } -void nghttp2_session_keep_idle_stream(nghttp2_session *session, - nghttp2_stream *stream) { +int nghttp2_session_keep_idle_stream(nghttp2_session *session, + nghttp2_stream *stream) { + int rv; + DEBUGF(fprintf(stderr, "stream: keep idle stream(%p)=%d, state=%d\n", stream, stream->stream_id, stream->state)); @@ -1104,7 +1109,12 @@ void nghttp2_session_keep_idle_stream(nghttp2_session *session, ++session->num_idle_streams; - nghttp2_session_adjust_idle_stream(session); + rv = nghttp2_session_adjust_idle_stream(session); + if (rv != 0) { + return rv; + } + + return 0; } void nghttp2_session_detach_idle_stream(nghttp2_session *session, @@ -1135,9 +1145,10 @@ void nghttp2_session_detach_idle_stream(nghttp2_session *session, --session->num_idle_streams; } -void nghttp2_session_adjust_closed_stream(nghttp2_session *session, - ssize_t offset) { +int nghttp2_session_adjust_closed_stream(nghttp2_session *session, + ssize_t offset) { size_t num_stream_max; + int rv; num_stream_max = nghttp2_min(session->local_settings.max_concurrent_streams, session->pending_local_max_concurrent_stream); @@ -1152,12 +1163,22 @@ void nghttp2_session_adjust_closed_stream(nghttp2_session *session, session->num_closed_streams + session->num_incoming_streams + offset > num_stream_max) { nghttp2_stream *head_stream; + nghttp2_stream *next; head_stream = session->closed_stream_head; assert(head_stream); - session->closed_stream_head = head_stream->closed_next; + next = head_stream->closed_next; + + rv = nghttp2_session_destroy_stream(session, head_stream); + if (rv != 0) { + return rv; + } + + /* head_stream is now freed */ + + session->closed_stream_head = next; if (session->closed_stream_head) { session->closed_stream_head->closed_prev = NULL; @@ -1165,14 +1186,15 @@ void nghttp2_session_adjust_closed_stream(nghttp2_session *session, session->closed_stream_tail = NULL; } - nghttp2_session_destroy_stream(session, head_stream); - /* head_stream is now freed */ --session->num_closed_streams; } + + return 0; } -void nghttp2_session_adjust_idle_stream(nghttp2_session *session) { +int nghttp2_session_adjust_idle_stream(nghttp2_session *session) { size_t max; + int rv; /* Make minimum number of idle streams 2 so that allocating 2 streams at once is easy. This happens when PRIORITY frame to @@ -1188,11 +1210,21 @@ void nghttp2_session_adjust_idle_stream(nghttp2_session *session) { while (session->num_idle_streams > max) { nghttp2_stream *head; + nghttp2_stream *next; head = session->idle_stream_head; assert(head); - session->idle_stream_head = head->closed_next; + next = head->closed_next; + + rv = nghttp2_session_destroy_stream(session, head); + if (rv != 0) { + return rv; + } + + /* head is now destroyed */ + + session->idle_stream_head = next; if (session->idle_stream_head) { session->idle_stream_head->closed_prev = NULL; @@ -1200,10 +1232,10 @@ void nghttp2_session_adjust_idle_stream(nghttp2_session *session) { session->idle_stream_tail = NULL; } - nghttp2_session_destroy_stream(session, head); - /* head is now destroyed */ --session->num_idle_streams; } + + return 0; } /* @@ -1746,7 +1778,7 @@ static int session_prep_frame(nghttp2_session *session, if (stream && stream->item == item) { int rv2; - rv2 = nghttp2_stream_detach_item(stream, session); + rv2 = nghttp2_stream_detach_item(stream); if (nghttp2_is_fatal(rv2)) { return rv2; @@ -1908,7 +1940,7 @@ static int session_prep_frame(nghttp2_session *session, if (stream) { int rv2; - rv2 = nghttp2_stream_detach_item(stream, session); + rv2 = nghttp2_stream_detach_item(stream); if (nghttp2_is_fatal(rv2)) { return rv2; @@ -1927,8 +1959,8 @@ static int session_prep_frame(nghttp2_session *session, queue when session->remote_window_size > 0 */ assert(session->remote_window_size > 0); - rv = nghttp2_stream_defer_item( - stream, NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL, session); + rv = nghttp2_stream_defer_item(stream, + NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL); if (nghttp2_is_fatal(rv)) { return rv; @@ -1943,8 +1975,7 @@ static int session_prep_frame(nghttp2_session *session, next_readmax, frame, &item->aux_data.data, stream); if (rv == NGHTTP2_ERR_DEFERRED) { - rv = nghttp2_stream_defer_item(stream, NGHTTP2_STREAM_FLAG_DEFERRED_USER, - session); + rv = nghttp2_stream_defer_item(stream, NGHTTP2_STREAM_FLAG_DEFERRED_USER); if (nghttp2_is_fatal(rv)) { return rv; @@ -1955,7 +1986,7 @@ static int session_prep_frame(nghttp2_session *session, return NGHTTP2_ERR_DEFERRED; } if (rv == NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE) { - rv = nghttp2_stream_detach_item(stream, session); + rv = nghttp2_stream_detach_item(stream); if (nghttp2_is_fatal(rv)) { return rv; @@ -1971,7 +2002,7 @@ static int session_prep_frame(nghttp2_session *session, if (rv != 0) { int rv2; - rv2 = nghttp2_stream_detach_item(stream, session); + rv2 = nghttp2_stream_detach_item(stream); if (nghttp2_is_fatal(rv2)) { return rv2; @@ -1999,9 +2030,8 @@ nghttp2_session_get_next_ob_item(nghttp2_session *session) { } } - if (session->remote_window_size > 0 && - !nghttp2_pq_empty(&session->ob_da_pq)) { - return nghttp2_pq_top(&session->ob_da_pq); + if (session->remote_window_size > 0) { + return nghttp2_stream_next_outbound_item(&session->root); } return NULL; @@ -2034,17 +2064,8 @@ nghttp2_session_pop_next_ob_item(nghttp2_session *session) { } } - if (session->remote_window_size > 0 && - !nghttp2_pq_empty(&session->ob_da_pq)) { - item = nghttp2_pq_top(&session->ob_da_pq); - nghttp2_pq_pop(&session->ob_da_pq); - - if (nghttp2_pq_empty(&session->ob_da_pq)) { - session->last_cycle = 0; - } - - item->queued = 0; - return item; + if (session->remote_window_size > 0) { + return nghttp2_stream_next_outbound_item(&session->root); } return NULL; @@ -2150,24 +2171,10 @@ static int session_close_stream_on_goaway(nghttp2_session *session, return 0; } -static void session_outbound_item_schedule(nghttp2_session *session, - nghttp2_outbound_item *item, - int32_t weight) { - /* Schedule next write. Offset proportional to the write size. - Stream with heavier weight is scheduled earlier. */ - size_t delta = item->frame.hd.length * NGHTTP2_MAX_WEIGHT / weight; +static void reschedule_stream(nghttp2_stream *stream) { + stream->last_writelen = stream->item->frame.hd.length; - if (session->last_cycle < item->cycle) { - session->last_cycle = item->cycle; - } - - /* We pretend to ignore overflow given that the value range of - item->cycle, which is uint64_t. nghttp2 won't explode even when - overflow occurs, there might be some disturbance of priority. We - also reset session->last_cycle to 0, when there is no DATA frame - to send (queue is empty), so the possibility of overflow is - generally very small. */ - item->cycle = session->last_cycle + delta; + nghttp2_stream_reschedule(stream); } /* @@ -2218,7 +2225,7 @@ static int session_after_frame_sent1(nghttp2_session *session) { } if (stream->item == item) { - rv = nghttp2_stream_detach_item(stream, session); + rv = nghttp2_stream_detach_item(stream); if (nghttp2_is_fatal(rv)) { return rv; @@ -2352,7 +2359,7 @@ static int session_after_frame_sent1(nghttp2_session *session) { } if (stream && aux_data->eof) { - rv = nghttp2_stream_detach_item(stream, session); + rv = nghttp2_stream_detach_item(stream); if (nghttp2_is_fatal(rv)) { return rv; @@ -2446,7 +2453,6 @@ static int session_after_frame_sent2(nghttp2_session *session) { return 0; } else { - nghttp2_outbound_item *next_item; nghttp2_stream *stream; nghttp2_data_aux_data *aux_data; @@ -2471,7 +2477,7 @@ static int session_after_frame_sent2(nghttp2_session *session) { further data. */ if (nghttp2_session_predicate_data_send(session, stream) != 0) { if (stream) { - rv = nghttp2_stream_detach_item(stream, session); + rv = nghttp2_stream_detach_item(stream); if (nghttp2_is_fatal(rv)) { return rv; @@ -2483,115 +2489,9 @@ static int session_after_frame_sent2(nghttp2_session *session) { return 0; } - /* Assuming stream is not NULL */ - assert(stream); - next_item = nghttp2_session_get_next_ob_item(session); - - /* If priority of this stream is higher or equal to other stream - waiting at the top of the queue, we continue to send this - data. */ - if (stream->dpri == NGHTTP2_STREAM_DPRI_TOP && - (next_item == NULL || (next_item->frame.hd.type == NGHTTP2_DATA && - outbound_item_less(item, next_item)))) { - size_t next_readmax; - - next_readmax = nghttp2_session_next_data_read(session, stream); - - if (next_readmax == 0) { - - if (session->remote_window_size == 0 && - stream->remote_window_size > 0) { - - /* If DATA cannot be sent solely due to connection level - window size, just push item to queue again. We never pop - DATA item while connection level window size is 0. */ - rv = nghttp2_pq_push(&session->ob_da_pq, aob->item); - - if (nghttp2_is_fatal(rv)) { - return rv; - } - - aob->item->queued = 1; - } else { - rv = nghttp2_stream_defer_item( - stream, NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL, session); - - if (nghttp2_is_fatal(rv)) { - return rv; - } - } - - aob->item = NULL; - active_outbound_item_reset(aob, mem); - - return 0; - } - - nghttp2_bufs_reset(framebufs); - - rv = nghttp2_session_pack_data(session, framebufs, next_readmax, frame, - aux_data, stream); - if (nghttp2_is_fatal(rv)) { - return rv; - } - if (rv == NGHTTP2_ERR_DEFERRED) { - rv = nghttp2_stream_defer_item( - stream, NGHTTP2_STREAM_FLAG_DEFERRED_USER, session); - - if (nghttp2_is_fatal(rv)) { - return rv; - } - - aob->item = NULL; - active_outbound_item_reset(aob, mem); - - return 0; - } - - if (rv == NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE) { - /* Stop DATA frame chain and issue RST_STREAM to close the - stream. We don't return - NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE intentionally. */ - rv = nghttp2_session_add_rst_stream(session, frame->hd.stream_id, - NGHTTP2_INTERNAL_ERROR); - - if (nghttp2_is_fatal(rv)) { - return rv; - } - - rv = nghttp2_stream_detach_item(stream, session); - - if (nghttp2_is_fatal(rv)) { - return rv; - } - - active_outbound_item_reset(aob, mem); - - return 0; - } - assert(rv == 0); - - if (aux_data->no_copy) { - aob->state = NGHTTP2_OB_SEND_NO_COPY; - } else { - aob->state = NGHTTP2_OB_SEND_DATA; - } - - return 0; - } - - if (stream->dpri == NGHTTP2_STREAM_DPRI_TOP) { - rv = nghttp2_pq_push(&session->ob_da_pq, aob->item); - - if (nghttp2_is_fatal(rv)) { - return rv; - } - - aob->item->queued = 1; - } - aob->item = NULL; active_outbound_item_reset(&session->aob, mem); + return 0; } /* Unreachable */ @@ -2648,22 +2548,6 @@ static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session, return 0; } - if (item->frame.hd.type == NGHTTP2_DATA || - item->frame.hd.type == NGHTTP2_HEADERS) { - nghttp2_frame *frame; - nghttp2_stream *stream; - - frame = &item->frame; - stream = nghttp2_session_get_stream(session, frame->hd.stream_id); - - if (stream && item == stream->item && - stream->dpri != NGHTTP2_STREAM_DPRI_TOP) { - /* We have DATA with higher priority in queue within the - same dependency tree. */ - break; - } - } - rv = session_prep_frame(session, item); if (rv == NGHTTP2_ERR_DEFERRED) { DEBUGF(fprintf(stderr, "send: frame transmission deferred\n")); @@ -2835,7 +2719,7 @@ static ssize_t nghttp2_session_mem_send_internal(nghttp2_session *session, } if (rv == NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE) { - rv = nghttp2_stream_detach_item(stream, session); + rv = nghttp2_stream_detach_item(stream); if (nghttp2_is_fatal(rv)) { return rv; @@ -3794,7 +3678,7 @@ static int update_remote_initial_window_size_func(nghttp2_map_entry *entry, nghttp2_stream_check_deferred_by_flow_control(stream)) { rv = nghttp2_stream_resume_deferred_item( - stream, NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL, arg->session); + stream, NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL); if (nghttp2_is_fatal(rv)) { return rv; @@ -4365,7 +4249,7 @@ static int session_on_stream_window_update_received(nghttp2_session *session, nghttp2_stream_check_deferred_by_flow_control(stream)) { rv = nghttp2_stream_resume_deferred_item( - stream, NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL, session); + stream, NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL); if (nghttp2_is_fatal(rv)) { return rv; @@ -5970,7 +5854,7 @@ int nghttp2_session_want_write(nghttp2_session *session) { if (session->aob.item == NULL && nghttp2_outbound_queue_top(&session->ob_urgent) == NULL && nghttp2_outbound_queue_top(&session->ob_reg) == NULL && - (nghttp2_pq_empty(&session->ob_da_pq) || + (nghttp2_pq_empty(&session->root.obq) || session->remote_window_size == 0) && (nghttp2_outbound_queue_top(&session->ob_syn) == NULL || session_is_outgoing_concurrent_streams_max(session))) { @@ -6330,8 +6214,7 @@ int nghttp2_session_pack_data(nghttp2_session *session, nghttp2_bufs *bufs, return rv; } - session_outbound_item_schedule( - session, stream->item, nghttp2_stream_compute_effective_weight(stream)); + reschedule_stream(stream); return 0; } @@ -6367,8 +6250,8 @@ int nghttp2_session_resume_data(nghttp2_session *session, int32_t stream_id) { return NGHTTP2_ERR_INVALID_ARGUMENT; } - rv = nghttp2_stream_resume_deferred_item( - stream, NGHTTP2_STREAM_FLAG_DEFERRED_USER, session); + rv = nghttp2_stream_resume_deferred_item(stream, + NGHTTP2_STREAM_FLAG_DEFERRED_USER); if (nghttp2_is_fatal(rv)) { return rv; @@ -6380,8 +6263,8 @@ int nghttp2_session_resume_data(nghttp2_session *session, int32_t stream_id) { size_t nghttp2_session_get_outbound_queue_size(nghttp2_session *session) { return nghttp2_outbound_queue_size(&session->ob_urgent) + nghttp2_outbound_queue_size(&session->ob_reg) + - nghttp2_outbound_queue_size(&session->ob_syn) + - nghttp2_pq_size(&session->ob_da_pq); + nghttp2_outbound_queue_size(&session->ob_syn); + /* TODO account for item attached to stream */ } int32_t diff --git a/lib/nghttp2_session.h b/lib/nghttp2_session.h index 5400a22c..27e6aca7 100644 --- a/lib/nghttp2_session.h +++ b/lib/nghttp2_session.h @@ -30,7 +30,6 @@ #endif /* HAVE_CONFIG_H */ #include -#include "nghttp2_pq.h" #include "nghttp2_map.h" #include "nghttp2_frame.h" #include "nghttp2_hd.h" @@ -163,8 +162,6 @@ struct nghttp2_session { response) frame, which are subject to SETTINGS_MAX_CONCURRENT_STREAMS limit. */ nghttp2_outbound_queue ob_syn; - /* Queue for DATA frame */ - nghttp2_pq /* */ ob_da_pq; nghttp2_active_outbound_item aob; nghttp2_inbound_frame iframe; nghttp2_hd_deflater hd_deflater; @@ -436,26 +433,43 @@ int nghttp2_session_close_stream(nghttp2_session *session, int32_t stream_id, * Deletes |stream| from memory. After this function returns, stream * cannot be accessed. * + * This function returns 0 if it succeeds, or one the following + * negative error codes: + * + * NGHTTP2_ERR_NOMEM + * Out of memory */ -void nghttp2_session_destroy_stream(nghttp2_session *session, - nghttp2_stream *stream); +int nghttp2_session_destroy_stream(nghttp2_session *session, + nghttp2_stream *stream); /* * Tries to keep incoming closed stream |stream|. Due to the * limitation of maximum number of streams in memory, |stream| is not * closed and just deleted from memory (see * nghttp2_session_destroy_stream). + * + * This function returns 0 if it succeeds, or one the following + * negative error codes: + * + * NGHTTP2_ERR_NOMEM + * Out of memory */ -void nghttp2_session_keep_closed_stream(nghttp2_session *session, - nghttp2_stream *stream); +int nghttp2_session_keep_closed_stream(nghttp2_session *session, + nghttp2_stream *stream); /* * Appends |stream| to linked list |session->idle_stream_head|. We * apply fixed limit for list size. To fit into that limit, one or * more oldest streams are removed from list as necessary. + * + * This function returns 0 if it succeeds, or one the following + * negative error codes: + * + * NGHTTP2_ERR_NOMEM + * Out of memory */ -void nghttp2_session_keep_idle_stream(nghttp2_session *session, - nghttp2_stream *stream); +int nghttp2_session_keep_idle_stream(nghttp2_session *session, + nghttp2_stream *stream); /* * Detaches |stream| from idle streams linked list. @@ -469,15 +483,27 @@ void nghttp2_session_detach_idle_stream(nghttp2_session *session, * stream. If |offset| is nonzero, it is decreased from the maximum * number of allowed stream when comparing number of active and closed * stream and the maximum number. + * + * This function returns 0 if it succeeds, or one the following + * negative error codes: + * + * NGHTTP2_ERR_NOMEM + * Out of memory */ -void nghttp2_session_adjust_closed_stream(nghttp2_session *session, - ssize_t offset); +int nghttp2_session_adjust_closed_stream(nghttp2_session *session, + ssize_t offset); /* * Deletes idle stream to ensure that number of idle streams is in * certain limit. + * + * This function returns 0 if it succeeds, or one the following + * negative error codes: + * + * NGHTTP2_ERR_NOMEM + * Out of memory */ -void nghttp2_session_adjust_idle_stream(nghttp2_session *session); +int nghttp2_session_adjust_idle_stream(nghttp2_session *session); /* * If further receptions and transmissions over the stream |stream_id| @@ -699,21 +725,21 @@ int nghttp2_session_pack_data(nghttp2_session *session, nghttp2_bufs *bufs, nghttp2_stream *stream); /* - * Pops and returns next item to send. If there is no such item, + * Pops and returns next item to send. If there is no such item, * returns NULL. This function takes into account max concurrent - * streams. That means if session->ob_pq is empty but - * session->ob_ss_pq has item and max concurrent streams is reached, - * then this function returns NULL. + * streams. That means if session->ob_syn has item and max concurrent + * streams is reached, the even if other queues contain items, then + * this function returns NULL. */ nghttp2_outbound_item * nghttp2_session_pop_next_ob_item(nghttp2_session *session); /* - * Returns next item to send. If there is no such item, this function + * Returns next item to send. If there is no such item, this function * returns NULL. This function takes into account max concurrent - * streams. That means if session->ob_pq is empty but - * session->ob_ss_pq has item and max concurrent streams is reached, - * then this function returns NULL. + * streams. That means if session->ob_syn has item and max concurrent + * streams is reached, the even if other queues contain items, then + * this function returns NULL. */ nghttp2_outbound_item * nghttp2_session_get_next_ob_item(nghttp2_session *session); diff --git a/lib/nghttp2_stream.c b/lib/nghttp2_stream.c index eccf8e62..3816712b 100644 --- a/lib/nghttp2_stream.c +++ b/lib/nghttp2_stream.c @@ -30,12 +30,23 @@ #include "nghttp2_session.h" #include "nghttp2_helper.h" +static int stream_weight_less(const void *lhsx, const void *rhsx) { + const nghttp2_stream *lhs, *rhs; + + lhs = nghttp2_struct_of(lhsx, nghttp2_stream, pq_entry); + rhs = nghttp2_struct_of(rhsx, nghttp2_stream, pq_entry); + + return lhs->cycle <= rhs->cycle; +} + void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id, uint8_t flags, nghttp2_stream_state initial_state, int32_t weight, int32_t remote_initial_window_size, int32_t local_initial_window_size, - void *stream_user_data) { + void *stream_user_data, nghttp2_mem *mem) { nghttp2_map_entry_init(&stream->map_entry, stream_id); + nghttp2_pq_init(&stream->obq, stream_weight_less, mem); + stream->stream_id = stream_id; stream->flags = flags; stream->state = initial_state; @@ -56,142 +67,155 @@ void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id, stream->closed_prev = NULL; stream->closed_next = NULL; - stream->dpri = NGHTTP2_STREAM_DPRI_NO_ITEM; stream->weight = weight; stream->sum_dep_weight = 0; - stream->sum_norest_weight = 0; stream->http_flags = NGHTTP2_HTTP_FLAG_NONE; stream->content_length = -1; stream->recv_content_length = 0; stream->status_code = -1; + + stream->queued = 0; + stream->descendant_last_cycle = 0; + stream->cycle = 0; + stream->last_writelen = 0; } -void nghttp2_stream_free(nghttp2_stream *stream _U_) { +void nghttp2_stream_free(nghttp2_stream *stream) { + nghttp2_pq_free(&stream->obq); /* We don't free stream->item. If it is assigned to aob, then - active_outbound_item_reset() will delete it. If it is queued, - then it is deleted when pq is deleted in nghttp2_session_del(). - Otherwise, nghttp2_session_del() will delete it. */ + active_outbound_item_reset() will delete it. Otherwise, + nghttp2_stream_close() or session_del() will delete it. */ } void nghttp2_stream_shutdown(nghttp2_stream *stream, nghttp2_shut_flag flag) { stream->shut_flags |= flag; } -static int stream_push_item(nghttp2_stream *stream, nghttp2_session *session) { - /* This is required for Android NDK r10d */ - int rv = 0; - nghttp2_outbound_item *item; +/* + * Returns nonzero if |stream| is active. This function does not take + * into account its descendants. + */ +static int stream_active(nghttp2_stream *stream) { + return stream->item && + (stream->flags & NGHTTP2_STREAM_FLAG_DEFERRED_ALL) == 0; +} - assert(stream->item); - assert(stream->item->queued == 0); +/* + * Returns nonzero if |stream| or one of its descendants is active + */ +static int stream_subtree_active(nghttp2_stream *stream) { + return stream_active(stream) || !nghttp2_pq_empty(&stream->obq); +} - item = stream->item; +/* + * Returns next cycle for |stream|. + */ +static uint64_t stream_next_cycle(nghttp2_stream *stream, uint64_t last_cycle) { + return last_cycle + + stream->last_writelen * NGHTTP2_MAX_WEIGHT / stream->weight; +} - /* If item is now sent, don't push it to the queue. Otherwise, we - may push same item twice. */ - if (session->aob.item == item) { - return 0; - } +static int stream_obq_push(nghttp2_stream *dep_stream, nghttp2_stream *stream) { + int rv; - switch (item->frame.hd.type) { - case NGHTTP2_DATA: - /* Penalize item by delaying scheduling according to effective - weight. This will delay low priority stream, which is good. - OTOH, this may incur delay for high priority item. Will - see. */ - item->cycle = session->last_cycle + - NGHTTP2_DATA_PAYLOADLEN * NGHTTP2_MAX_WEIGHT / - nghttp2_stream_compute_effective_weight(stream); + for (; dep_stream && !stream->queued; + stream = dep_stream, dep_stream = dep_stream->dep_prev) { + stream->cycle = + stream_next_cycle(stream, dep_stream->descendant_last_cycle); - rv = nghttp2_pq_push(&session->ob_da_pq, item); + DEBUGF(fprintf(stderr, "stream: stream=%d obq push cycle=%ld\n", + stream->stream_id, stream->cycle)); + + DEBUGF(fprintf(stderr, "stream: push stream %d to stream %d\n", + stream->stream_id, dep_stream->stream_id)); + + rv = nghttp2_pq_push(&dep_stream->obq, &stream->pq_entry); if (rv != 0) { return rv; } - break; - case NGHTTP2_HEADERS: - if (stream->state == NGHTTP2_STREAM_RESERVED) { - nghttp2_outbound_queue_push(&session->ob_syn, item); - } else { - nghttp2_outbound_queue_push(&session->ob_reg, item); - } - break; - default: - /* should not reach here */ - assert(0); + stream->queued = 1; } - item->queued = 1; - return 0; } -typedef enum { - DFS_NOERROR, - /* Don't traverse descendants */ - DFS_SKIP_DESCENDANT, - /* Stop traversal, and return immediately */ - DFS_ABORT -} dfs_error_code; - -/* depth first traversal, starting at |stream|. |precb| is, if non - * NULL, called against stream before traversing its descendants. - * |postcb| is, if non NULL, called against stream just after - * traversing its all descendants. |data| is arbitrary pointer, which - * gets passed to |precb| and |postcb|. - * - * The application can change dfs behaviour by adjusting return value - * from |precb|. Returning DFS_NOERROR will resume traversal. - * Returning DFS_SKIP_DESCENDANT will skip all traversal for the - * descendant streams. Returning DFS_ABORT will immediately return - * from this function, and dfs returns DFS_ABORT. Returning any other - * values will also make this function return immediately, and dfs - * returns the value |precb| returned. +/* + * Removes |stream| from parent's obq. If removal of |stream| makes + * parent's obq empty, and parent is not active, then parent is also + * removed. This process is repeated recursively. */ -static int dfs(nghttp2_stream *stream, - int (*precb)(nghttp2_stream *stream, void *data), - void (*postcb)(nghttp2_stream *stream, void *data), void *data) { - int rv; - nghttp2_stream *start; +static void stream_obq_remove(nghttp2_stream *stream) { + nghttp2_stream *dep_stream; - start = stream; + dep_stream = stream->dep_prev; - for (;;) { - if (precb) { - rv = precb(stream, data); - switch (rv) { - case DFS_NOERROR: - break; - case DFS_SKIP_DESCENDANT: - goto back; - case DFS_ABORT: - default: - return rv; - } - } - if (!stream->dep_next) { - goto back; - } - stream = stream->dep_next; - continue; + if (!stream->queued) { + return; + } - back: - for (;;) { - if (postcb) { - postcb(stream, data); - } - if (stream == start) { - return 0; - } - if (stream->sib_next) { - stream = stream->sib_next; - break; - } - stream = stream->dep_prev; + for (; dep_stream; stream = dep_stream, dep_stream = dep_stream->dep_prev) { + DEBUGF(fprintf(stderr, "stream: remove stream %d from stream %d\n", + stream->stream_id, dep_stream->stream_id)); + + nghttp2_pq_remove(&dep_stream->obq, &stream->pq_entry); + + assert(stream->queued); + + stream->queued = 0; + stream->cycle = 0; + stream->descendant_last_cycle = 0; + + if (stream_subtree_active(dep_stream)) { + return; } } } +/* + * Moves |stream| from |src|'s obq to |dest|'s obq. Removal from + * |src|'s obq is just done calling nghttp2_pq_remove(), so it does + * not recursively remove |src| and ancestors, like + * stream_obq_remove(). + */ +static int stream_obq_move(nghttp2_stream *dest, nghttp2_stream *src, + nghttp2_stream *stream) { + if (!stream->queued) { + return 0; + } + + DEBUGF(fprintf(stderr, "stream: remove stream %d from stream %d (move)\n", + stream->stream_id, src->stream_id)); + + nghttp2_pq_remove(&src->obq, &stream->pq_entry); + stream->queued = 0; + + return stream_obq_push(dest, stream); +} + +void nghttp2_stream_reschedule(nghttp2_stream *stream) { + nghttp2_stream *dep_stream; + + assert(stream->queued); + + dep_stream = stream->dep_prev; + + for (; dep_stream; stream = dep_stream, dep_stream = dep_stream->dep_prev) { + dep_stream->descendant_last_cycle = + nghttp2_max(dep_stream->descendant_last_cycle, stream->cycle); + + stream->cycle = + stream_next_cycle(stream, dep_stream->descendant_last_cycle); + + DEBUGF(fprintf(stderr, "stream: stream=%d obq resched cycle=%ld\n", + stream->stream_id, stream->cycle)); + + nghttp2_pq_increase_key(&dep_stream->obq, &stream->pq_entry); + + dep_stream->last_writelen = stream->last_writelen; + } +} + static nghttp2_stream *stream_last_sib(nghttp2_stream *stream) { for (; stream->sib_next; stream = stream->sib_next) ; @@ -206,228 +230,77 @@ int32_t nghttp2_stream_dep_distributed_weight(nghttp2_stream *stream, return nghttp2_max(1, weight); } -static int stream_update_dep_set_rest_precb(nghttp2_stream *stream, - void *data _U_) { - DEBUGF(fprintf(stderr, "stream: stream=%d is rest\n", stream->stream_id)); +#ifdef STREAM_DEP_DEBUG - if (stream->dpri == NGHTTP2_STREAM_DPRI_REST) { - return DFS_SKIP_DESCENDANT; +static void ensure_inactive(nghttp2_stream *stream) { + nghttp2_stream *si; + + if (stream->queued) { + fprintf(stderr, "stream(%p)=%d, stream->queued = 1; want 0\n", stream, + stream->stream_id); + assert(0); } - if (stream->dpri == NGHTTP2_STREAM_DPRI_TOP) { - stream->dpri = NGHTTP2_STREAM_DPRI_REST; - return DFS_SKIP_DESCENDANT; + if (stream_active(stream)) { + fprintf(stderr, "stream(%p)=%d, stream_active(stream) = 1; want 0\n", + stream, stream->stream_id); + assert(0); } - return DFS_NOERROR; -} - -static void stream_update_dep_set_rest(nghttp2_stream *stream) { - dfs(stream, stream_update_dep_set_rest_precb, NULL, NULL); -} - -static int stream_update_dep_set_top_precb(nghttp2_stream *stream, - void *data _U_) { - stream->sum_norest_weight = 0; - - if (stream->dpri == NGHTTP2_STREAM_DPRI_TOP) { - return DFS_SKIP_DESCENDANT; + if (!nghttp2_pq_empty(&stream->obq)) { + fprintf(stderr, "stream(%p)=%d, nghttp2_pq_size() = %zu; want 0\n", stream, + stream->stream_id, nghttp2_pq_size(&stream->obq)); + assert(0); } - if (stream->dpri == NGHTTP2_STREAM_DPRI_REST) { - DEBUGF( - fprintf(stderr, "stream: stream=%d item is top\n", stream->stream_id)); - - stream->dpri = NGHTTP2_STREAM_DPRI_TOP; - - return DFS_SKIP_DESCENDANT; - } - - return DFS_NOERROR; -} - -static void stream_update_dep_set_top_postcb(nghttp2_stream *stream, - void *data) { - nghttp2_stream *start; - - start = data; - - if (start == stream) { - return; - } - - if (stream->dpri == NGHTTP2_STREAM_DPRI_TOP || - (stream->dpri == NGHTTP2_STREAM_DPRI_NO_ITEM && - stream->sum_norest_weight > 0)) { - stream->dep_prev->sum_norest_weight += stream->weight; + for (si = stream->dep_next; si; si = si->sib_next) { + ensure_inactive(si); } } -/* - * Performs dfs starting |stream|, search stream which can become - * NGHTTP2_STREAM_DPRI_TOP and set its dpri. This function also - * updates sum_norest_weight if stream->dpri == - * NGHTTP2_STREAM_DPRI_NO_ITEM. This function returns nonzero if - * stream's subtree contains at least one NGHTTP2_STRAEM_DPRI_TOP - * stream. - */ -static int stream_update_dep_set_top(nghttp2_stream *stream) { - dfs(stream, stream_update_dep_set_top_precb, stream_update_dep_set_top_postcb, - stream); - return stream->dpri == NGHTTP2_STREAM_DPRI_TOP || - (stream->dpri == NGHTTP2_STREAM_DPRI_NO_ITEM && - stream->sum_norest_weight > 0); -} +static void check_queued(nghttp2_stream *stream) { + nghttp2_stream *si; + int queued; -static int stream_update_dep_queue_top_precb(nghttp2_stream *stream, - void *data) { - int rv; - nghttp2_session *session; - - session = data; - - if (stream->dpri == NGHTTP2_STREAM_DPRI_REST) { - return DFS_SKIP_DESCENDANT; - } - - if (stream->dpri == NGHTTP2_STREAM_DPRI_TOP) { - if (!stream->item->queued) { - DEBUGF(fprintf(stderr, "stream: stream=%d enqueue\n", stream->stream_id)); - rv = stream_push_item(stream, session); - - if (rv != 0) { - return rv; + if (stream->queued) { + if (!stream_subtree_active(stream)) { + fprintf(stderr, + "stream(%p)=%d, stream->queued == 1, but " + "stream_active() == %d and nghttp2_pq_size(&stream->obq) = %zu\n", + stream, stream->stream_id, stream_active(stream), + nghttp2_pq_size(&stream->obq)); + assert(0); + } + if (!stream_active(stream)) { + queued = 0; + for (si = stream->dep_next; si; si = si->sib_next) { + if (si->queued) { + ++queued; + } + } + if (queued == 0) { + fprintf(stderr, "stream(%p)=%d, stream->queued == 1, and " + "!stream_active(), but no descendants is queued\n", + stream, stream->stream_id); + assert(0); } } - return DFS_SKIP_DESCENDANT; - } - - if (stream->sum_norest_weight == 0) { - return DFS_SKIP_DESCENDANT; - } - - return DFS_NOERROR; -} - -/* - * Performs dfs starting |stream|, and dueue stream whose dpri is - * NGHTTP2_STREAM_DPRI_TOP and has not been queued yet. - * - * This function returns 0 if it succeeds, or one of the following - * negative error codes: - * - * NGHTTP2_ERR_NOMEM - * Out of memory. - */ -static int stream_update_dep_queue_top(nghttp2_stream *stream, - nghttp2_session *session) { - return dfs(stream, stream_update_dep_queue_top_precb, NULL, session); -} - -/* - * Updates stream->sum_norest_weight recursively towards root. - * |delta| must not be 0. We have to gather effective sum of weight - * of descendants. |delta| is added to stream->sum_norest_weight. If - * stream->sum_norest_weight becomes 0, we have to update parent - * stream, decreasing its sum_norest_weight by stream->weight. If - * stream->sum_norest_weight becomes from 0 to positive, then we have - * to update parent stream, increasing its sum_norest_weight by - * stream->weight. Otherwise, we stop recursive call. - */ -static void stream_update_dep_sum_norest_weight(nghttp2_stream *stream, - int32_t delta) { - int32_t old; - - for (;;) { - if (!stream) { - return; - } - - assert(delta != 0); - assert(stream->sum_norest_weight + delta >= 0); - - old = stream->sum_norest_weight; - stream->sum_norest_weight += delta; - - if (old == 0) { - assert(delta > 0); - delta = stream->weight; - stream = stream->dep_prev; - continue; - } - - assert(old > 0); - - if (stream->sum_norest_weight == 0) { - delta = -stream->weight; - stream = stream->dep_prev; - continue; - } - - break; - } -} - -/* - * Returns stream whose dpri is NGHTTP2_STREAM_DPRI_NO_ITEM along the - * path following stream->dep_prev (stream's ancestors, including - * itself). In other words, find stream which blocks the descendant - * streams. If there is no such stream, returns NULL. - */ -static nghttp2_stream *stream_get_dep_blocking(nghttp2_stream *stream) { - for (; stream; stream = stream->dep_prev) { - if (stream->dpri != NGHTTP2_STREAM_DPRI_NO_ITEM) { - return stream; - } - } - return NULL; -} - -#ifdef STREAM_DEP_DEBUG - -static void ensure_rest_or_no_item(nghttp2_stream *stream) { - nghttp2_stream *si; - switch (stream->dpri) { - case NGHTTP2_STREAM_DPRI_TOP: - fprintf(stderr, "NGHTTP2_STREAM_DPRI_TOP; want REST or NO_ITEM\n"); - assert(0); - break; - case NGHTTP2_STREAM_DPRI_REST: - case NGHTTP2_STREAM_DPRI_NO_ITEM: for (si = stream->dep_next; si; si = si->sib_next) { - ensure_rest_or_no_item(si); + check_queued(si); } - break; - default: - fprintf(stderr, "invalid dpri %d\n", stream->dpri); - assert(0); - } -} - -static void check_dpri(nghttp2_stream *stream) { - nghttp2_stream *si; - switch (stream->dpri) { - case NGHTTP2_STREAM_DPRI_TOP: - if (!stream->item->queued) { - fprintf(stderr, "stream->item->queued is not nonzero while it is in " - "NGHTTP2_STREAM_DPRI_TOP\n"); + } else { + if (stream_active(stream) || !nghttp2_pq_empty(&stream->obq)) { + fprintf(stderr, "stream(%p) = %d, stream->queued == 0, but " + "stream_active(stream) == %d and " + "nghttp2_pq_size(&stream->obq) = %zu\n", + stream, stream->stream_id, stream_active(stream), + nghttp2_pq_size(&stream->obq)); assert(0); } - /* fall through */ - case NGHTTP2_STREAM_DPRI_REST: for (si = stream->dep_next; si; si = si->sib_next) { - ensure_rest_or_no_item(si); + ensure_inactive(si); } - break; - case NGHTTP2_STREAM_DPRI_NO_ITEM: - for (si = stream->dep_next; si; si = si->sib_next) { - check_dpri(si); - } - break; - default: - fprintf(stderr, "invalid dpri %d\n", stream->dpri); - assert(0); } } @@ -447,33 +320,6 @@ static void check_sum_dep(nghttp2_stream *stream) { } } -static int check_sum_norest(nghttp2_stream *stream) { - nghttp2_stream *si; - int32_t n = 0; - switch (stream->dpri) { - case NGHTTP2_STREAM_DPRI_TOP: - return 1; - case NGHTTP2_STREAM_DPRI_REST: - return 0; - case NGHTTP2_STREAM_DPRI_NO_ITEM: - for (si = stream->dep_next; si; si = si->sib_next) { - if (check_sum_norest(si)) { - n += si->weight; - } - } - break; - default: - fprintf(stderr, "invalid dpri %d\n", stream->dpri); - assert(0); - } - if (n != stream->sum_norest_weight) { - fprintf(stderr, "stream(%p)=%d, sum_norest_weight = %d; want %d\n", stream, - stream->stream_id, n, stream->sum_norest_weight); - assert(0); - } - return n > 0; -} - static void check_dep_prev(nghttp2_stream *stream) { nghttp2_stream *si; for (si = stream->dep_next; si; si = si->sib_next) { @@ -489,6 +335,8 @@ static void check_dep_prev(nghttp2_stream *stream) { #ifdef STREAM_DEP_DEBUG static void validate_tree(nghttp2_stream *stream) { + nghttp2_stream *si; + if (!stream) { return; } @@ -496,82 +344,43 @@ static void validate_tree(nghttp2_stream *stream) { for (; stream->dep_prev; stream = stream->dep_prev) ; - check_dpri(stream); + assert(stream->stream_id == 0); + assert(!stream->queued); + + fprintf(stderr, "checking...\n"); + if (nghttp2_pq_empty(&stream->obq)) { + fprintf(stderr, "root obq empty\n"); + for (si = stream->dep_next; si; si = si->sib_next) { + ensure_inactive(si); + } + } else { + for (si = stream->dep_next; si; si = si->sib_next) { + check_queued(si); + } + } + check_sum_dep(stream); - check_sum_norest(stream); check_dep_prev(stream); } #else /* !STREAM_DEP_DEBUG */ static void validate_tree(nghttp2_stream *stream _U_) {} #endif /* !STREAM_DEP_DEBUG*/ -static int stream_update_dep_on_attach_item(nghttp2_stream *stream, - nghttp2_session *session) { - nghttp2_stream *blocking_stream, *si; +static int stream_update_dep_on_attach_item(nghttp2_stream *stream) { int rv; - stream->dpri = NGHTTP2_STREAM_DPRI_REST; - - blocking_stream = stream_get_dep_blocking(stream->dep_prev); - - /* If we found REST or TOP in ascendants, we don't have to update - any metadata. */ - if (blocking_stream) { - validate_tree(stream); - return 0; - } - - stream->dpri = NGHTTP2_STREAM_DPRI_TOP; - if (stream->sum_norest_weight == 0) { - stream_update_dep_sum_norest_weight(stream->dep_prev, stream->weight); - } else { - for (si = stream->dep_next; si; si = si->sib_next) { - stream_update_dep_set_rest(si); - } - } - - if (!stream->item->queued) { - DEBUGF(fprintf(stderr, "stream: stream=%d enqueue\n", stream->stream_id)); - rv = stream_push_item(stream, session); - - if (rv != 0) { - return rv; - } + rv = stream_obq_push(stream->dep_prev, stream); + if (rv != 0) { + return rv; } validate_tree(stream); return 0; } -static int stream_update_dep_on_detach_item(nghttp2_stream *stream, - nghttp2_session *session) { - int rv; - - if (stream->dpri == NGHTTP2_STREAM_DPRI_REST) { - stream->dpri = NGHTTP2_STREAM_DPRI_NO_ITEM; - validate_tree(stream); - return 0; - } - - if (stream->dpri == NGHTTP2_STREAM_DPRI_NO_ITEM) { - /* nghttp2_stream_defer_item() does not clear stream->item, but - set dpri = NGHTTP2_STREAM_DPRI_NO_ITEM. Catch this case - here. */ - validate_tree(stream); - return 0; - } - - stream->dpri = NGHTTP2_STREAM_DPRI_NO_ITEM; - - if (stream_update_dep_set_top(stream) == 0) { - stream_update_dep_sum_norest_weight(stream->dep_prev, -stream->weight); - validate_tree(stream); - return 0; - } - - rv = stream_update_dep_queue_top(stream, session); - if (rv != 0) { - return rv; +static int stream_update_dep_on_detach_item(nghttp2_stream *stream) { + if (nghttp2_pq_empty(&stream->obq)) { + stream_obq_remove(stream); } validate_tree(stream); @@ -580,8 +389,9 @@ static int stream_update_dep_on_detach_item(nghttp2_stream *stream, } int nghttp2_stream_attach_item(nghttp2_stream *stream, - nghttp2_outbound_item *item, - nghttp2_session *session) { + nghttp2_outbound_item *item) { + int rv; + assert((stream->flags & NGHTTP2_STREAM_FLAG_DEFERRED_ALL) == 0); assert(stream->item == NULL); @@ -590,22 +400,30 @@ int nghttp2_stream_attach_item(nghttp2_stream *stream, stream->item = item; - return stream_update_dep_on_attach_item(stream, session); + rv = stream_update_dep_on_attach_item(stream); + if (rv != 0) { + /* This may relave stream->queued == 1, but stream->item == NULL. + But only consequence of this error is fatal one, and session + destruction. In that execution path, these inconsistency does + not matter. */ + stream->item = NULL; + return rv; + } + + return 0; } -int nghttp2_stream_detach_item(nghttp2_stream *stream, - nghttp2_session *session) { +int nghttp2_stream_detach_item(nghttp2_stream *stream) { DEBUGF(fprintf(stderr, "stream: stream=%d detach item=%p\n", stream->stream_id, stream->item)); stream->item = NULL; stream->flags &= ~NGHTTP2_STREAM_FLAG_DEFERRED_ALL; - return stream_update_dep_on_detach_item(stream, session); + return stream_update_dep_on_detach_item(stream); } -int nghttp2_stream_defer_item(nghttp2_stream *stream, uint8_t flags, - nghttp2_session *session) { +int nghttp2_stream_defer_item(nghttp2_stream *stream, uint8_t flags) { assert(stream->item); DEBUGF(fprintf(stderr, "stream: stream=%d defer item=%p cause=%02x\n", @@ -613,11 +431,10 @@ int nghttp2_stream_defer_item(nghttp2_stream *stream, uint8_t flags, stream->flags |= flags; - return stream_update_dep_on_detach_item(stream, session); + return stream_update_dep_on_detach_item(stream); } -int nghttp2_stream_resume_deferred_item(nghttp2_stream *stream, uint8_t flags, - nghttp2_session *session) { +int nghttp2_stream_resume_deferred_item(nghttp2_stream *stream, uint8_t flags) { assert(stream->item); DEBUGF(fprintf(stderr, "stream: stream=%d resume item=%p flags=%02x\n", @@ -629,7 +446,7 @@ int nghttp2_stream_resume_deferred_item(nghttp2_stream *stream, uint8_t flags, return 0; } - return stream_update_dep_on_attach_item(stream, session); + return stream_update_dep_on_attach_item(stream); } int nghttp2_stream_check_deferred_item(nghttp2_stream *stream) { @@ -675,54 +492,20 @@ void nghttp2_stream_promise_fulfilled(nghttp2_stream *stream) { stream->flags &= ~NGHTTP2_STREAM_FLAG_PUSH; } -nghttp2_stream *nghttp2_stream_get_dep_root(nghttp2_stream *stream) { - for (; stream->dep_prev; stream = stream->dep_prev) - ; - return stream; -} - -static int stream_dep_subtree_find_precb(nghttp2_stream *stream, void *data) { - nghttp2_stream *target; - - target = data; - - if (target == stream) { - return DFS_ABORT; - } - - return DFS_NOERROR; -} - -int nghttp2_stream_dep_subtree_find(nghttp2_stream *stream, - nghttp2_stream *target) { - return dfs(stream, stream_dep_subtree_find_precb, NULL, target) == DFS_ABORT; -} - -int32_t nghttp2_stream_compute_effective_weight(nghttp2_stream *stream) { - int32_t weight; - - assert(stream->dep_prev); - - weight = stream->weight * 100; - - for (;;) { - stream = stream->dep_prev; - /* Not consider weight of root; it could make weight too small */ - if (!stream || !stream->dep_prev) { - break; +int nghttp2_stream_dep_find_ancestor(nghttp2_stream *stream, + nghttp2_stream *target) { + for (; stream; stream = stream->dep_prev) { + if (stream == target) { + return 1; } - weight = stream->weight * weight / stream->sum_norest_weight; } - - return nghttp2_max(1, weight / 100); + return 0; } -void nghttp2_stream_dep_insert(nghttp2_stream *dep_stream, - nghttp2_stream *stream) { +int nghttp2_stream_dep_insert(nghttp2_stream *dep_stream, + nghttp2_stream *stream) { nghttp2_stream *si; - nghttp2_stream *blocking_stream; - - assert(stream->dpri == NGHTTP2_STREAM_DPRI_NO_ITEM); + int rv; DEBUGF(fprintf(stderr, "stream: dep_insert dep_stream(%p)=%d, stream(%p)=%d\n", @@ -731,17 +514,21 @@ void nghttp2_stream_dep_insert(nghttp2_stream *dep_stream, stream->sum_dep_weight = dep_stream->sum_dep_weight; dep_stream->sum_dep_weight = stream->weight; - blocking_stream = stream_get_dep_blocking(dep_stream); - - stream->sum_norest_weight = 0; - if (dep_stream->dep_next) { for (si = dep_stream->dep_next; si; si = si->sib_next) { si->dep_prev = stream; - if (!blocking_stream && (si->dpri == NGHTTP2_STREAM_DPRI_TOP || - (si->dpri == NGHTTP2_STREAM_DPRI_NO_ITEM && - si->sum_norest_weight))) { - stream->sum_norest_weight += si->weight; + if (si->queued) { + rv = stream_obq_move(stream, dep_stream, si); + if (rv != 0) { + return rv; + } + } + } + + if (stream_subtree_active(stream)) { + rv = stream_obq_push(dep_stream, stream); + if (rv != 0) { + return rv; } } @@ -751,11 +538,9 @@ void nghttp2_stream_dep_insert(nghttp2_stream *dep_stream, dep_stream->dep_next = stream; stream->dep_prev = dep_stream; - if (stream->sum_norest_weight) { - dep_stream->sum_norest_weight = stream->weight; - } - validate_tree(stream); + + return 0; } static void set_dep_prev(nghttp2_stream *stream, nghttp2_stream *dep) { @@ -869,8 +654,6 @@ static void unlink_dep(nghttp2_stream *stream) { void nghttp2_stream_dep_add(nghttp2_stream *dep_stream, nghttp2_stream *stream) { - assert(stream->dpri == NGHTTP2_STREAM_DPRI_NO_ITEM); - DEBUGF(fprintf(stderr, "stream: dep_add dep_stream(%p)=%d, stream(%p)=%d\n", dep_stream, dep_stream->stream_id, stream, stream->stream_id)); @@ -885,37 +668,27 @@ void nghttp2_stream_dep_add(nghttp2_stream *dep_stream, validate_tree(stream); } -void nghttp2_stream_dep_remove(nghttp2_stream *stream) { - nghttp2_stream *dep_prev, *si, *blocking_stream; - int32_t sum_dep_weight_delta, sum_norest_weight_delta; - - assert(stream->dpri == NGHTTP2_STREAM_DPRI_NO_ITEM); +int nghttp2_stream_dep_remove(nghttp2_stream *stream) { + nghttp2_stream *dep_prev, *si; + int32_t sum_dep_weight_delta; + int rv; DEBUGF(fprintf(stderr, "stream: dep_remove stream(%p)=%d\n", stream, stream->stream_id)); - blocking_stream = stream_get_dep_blocking(stream->dep_prev); - /* Distribute weight of |stream| to direct descendants */ sum_dep_weight_delta = -stream->weight; - sum_norest_weight_delta = 0; - - /* blocking_stream == NULL means that ascendants are all - NGHTTP2_STREAM_DPRI_NO_ITEM */ - if (!blocking_stream && stream->sum_norest_weight) { - sum_norest_weight_delta -= stream->weight; - } - for (si = stream->dep_next; si; si = si->sib_next) { si->weight = nghttp2_stream_dep_distributed_weight(stream, si->weight); sum_dep_weight_delta += si->weight; - if (!blocking_stream && - (si->dpri == NGHTTP2_STREAM_DPRI_TOP || - (si->dpri == NGHTTP2_STREAM_DPRI_NO_ITEM && si->sum_norest_weight))) { - sum_norest_weight_delta += si->weight; + if (si->queued) { + rv = stream_obq_move(stream->dep_prev, stream, si); + if (rv != 0) { + return rv; + } } } @@ -924,7 +697,10 @@ void nghttp2_stream_dep_remove(nghttp2_stream *stream) { dep_prev = stream->dep_prev; dep_prev->sum_dep_weight += sum_dep_weight_delta; - dep_prev->sum_norest_weight += sum_norest_weight_delta; + + if (stream->queued) { + stream_obq_remove(stream); + } if (stream->sib_prev) { unlink_sib(stream); @@ -933,7 +709,6 @@ void nghttp2_stream_dep_remove(nghttp2_stream *stream) { } stream->sum_dep_weight = 0; - stream->sum_norest_weight = 0; stream->dep_prev = NULL; stream->dep_next = NULL; @@ -941,14 +716,14 @@ void nghttp2_stream_dep_remove(nghttp2_stream *stream) { stream->sib_next = NULL; validate_tree(dep_prev); + + return 0; } int nghttp2_stream_dep_insert_subtree(nghttp2_stream *dep_stream, - nghttp2_stream *stream, - nghttp2_session *session) { + nghttp2_stream *stream) { nghttp2_stream *last_sib; nghttp2_stream *dep_next; - nghttp2_stream *blocking_stream; nghttp2_stream *si; int rv; @@ -956,24 +731,12 @@ int nghttp2_stream_dep_insert_subtree(nghttp2_stream *dep_stream, "stream(%p)=%d\n", dep_stream, dep_stream->stream_id, stream, stream->stream_id)); - blocking_stream = stream_get_dep_blocking(dep_stream); - - if (blocking_stream) { - stream_update_dep_set_rest(stream); - } + stream->sum_dep_weight += dep_stream->sum_dep_weight; + dep_stream->sum_dep_weight = stream->weight; if (dep_stream->dep_next) { - stream->sum_dep_weight += dep_stream->sum_dep_weight; - dep_stream->sum_dep_weight = stream->weight; - dep_next = dep_stream->dep_next; - if (!blocking_stream && dep_stream->sum_norest_weight) { - for (si = dep_next; si; si = si->sib_next) { - stream_update_dep_set_rest(si); - } - } - link_dep(dep_stream, stream); if (stream->dep_next) { @@ -986,36 +749,22 @@ int nghttp2_stream_dep_insert_subtree(nghttp2_stream *dep_stream, for (si = dep_next; si; si = si->sib_next) { si->dep_prev = stream; + if (si->queued) { + rv = stream_obq_move(stream, dep_stream, si); + if (rv != 0) { + return rv; + } + } } } else { link_dep(dep_stream, stream); - - assert(dep_stream->sum_dep_weight == 0); - dep_stream->sum_dep_weight = stream->weight; } - if (blocking_stream) { - validate_tree(dep_stream); - return 0; - } - - if (stream_update_dep_set_top(stream) == 0) { - validate_tree(dep_stream); - return 0; - } - - /* If dep_stream has stream whose dpri is NGHTTP2_DPRI_TOP in its - subtree, parent stream already accounted dep_stream->weight in - its sum_norest_weight */ - if (dep_stream->sum_norest_weight == 0) { - stream_update_dep_sum_norest_weight(dep_stream->dep_prev, - dep_stream->weight); - } - dep_stream->sum_norest_weight = stream->weight; - - rv = stream_update_dep_queue_top(stream, session); - if (rv != 0) { - return rv; + if (stream_subtree_active(stream)) { + rv = stream_obq_push(dep_stream, stream); + if (rv != 0) { + return rv; + } } validate_tree(dep_stream); @@ -1024,55 +773,26 @@ int nghttp2_stream_dep_insert_subtree(nghttp2_stream *dep_stream, } int nghttp2_stream_dep_add_subtree(nghttp2_stream *dep_stream, - nghttp2_stream *stream, - nghttp2_session *session) { - nghttp2_stream *blocking_stream; + nghttp2_stream *stream) { int rv; DEBUGF(fprintf(stderr, "stream: dep_add_subtree dep_stream(%p)=%d " "stream(%p)=%d\n", dep_stream, dep_stream->stream_id, stream, stream->stream_id)); - if (dep_stream->dep_next) { - dep_stream->sum_dep_weight += stream->weight; + dep_stream->sum_dep_weight += stream->weight; + if (dep_stream->dep_next) { insert_link_dep(dep_stream, stream); } else { link_dep(dep_stream, stream); - - assert(dep_stream->sum_dep_weight == 0); - dep_stream->sum_dep_weight = stream->weight; } - blocking_stream = stream_get_dep_blocking(dep_stream); - - if (blocking_stream) { - /* We cannot make any assumption for stream if its dpri is not - NGHTTP2_DPRI_TOP. Just dfs under stream here. */ - stream_update_dep_set_rest(stream); - - validate_tree(dep_stream); - return 0; - } - - if (stream->dpri == NGHTTP2_STREAM_DPRI_TOP) { - stream_update_dep_sum_norest_weight(dep_stream, stream->weight); - validate_tree(dep_stream); - return 0; - } - - if (stream_update_dep_set_top(stream) == 0) { - validate_tree(dep_stream); - return 0; - } - - /* Newly added subtree contributes to dep_stream's - sum_norest_weight */ - stream_update_dep_sum_norest_weight(dep_stream, stream->weight); - - rv = stream_update_dep_queue_top(stream, session); - if (rv != 0) { - return rv; + if (stream_subtree_active(stream)) { + rv = stream_obq_push(dep_stream, stream); + if (rv != 0) { + return rv; + } } validate_tree(dep_stream); @@ -1081,7 +801,7 @@ int nghttp2_stream_dep_add_subtree(nghttp2_stream *dep_stream, } void nghttp2_stream_dep_remove_subtree(nghttp2_stream *stream) { - nghttp2_stream *next, *dep_prev, *blocking_stream; + nghttp2_stream *next, *dep_prev; DEBUGF(fprintf(stderr, "stream: dep_remove_subtree stream(%p)=%d\n", stream, stream->stream_id)); @@ -1104,12 +824,8 @@ void nghttp2_stream_dep_remove_subtree(nghttp2_stream *stream) { dep_prev->sum_dep_weight -= stream->weight; - blocking_stream = stream_get_dep_blocking(dep_prev); - - if (!blocking_stream && (stream->dpri == NGHTTP2_STREAM_DPRI_TOP || - (stream->dpri == NGHTTP2_STREAM_DPRI_NO_ITEM && - stream->sum_norest_weight))) { - stream_update_dep_sum_norest_weight(dep_prev, -stream->weight); + if (stream->queued) { + stream_obq_remove(stream); } validate_tree(dep_prev); @@ -1123,3 +839,19 @@ int nghttp2_stream_in_dep_tree(nghttp2_stream *stream) { return stream->dep_prev || stream->dep_next || stream->sib_prev || stream->sib_next; } + +nghttp2_outbound_item * +nghttp2_stream_next_outbound_item(nghttp2_stream *stream) { + nghttp2_pq_entry *ent; + + for (;;) { + if (stream_active(stream)) { + return stream->item; + } + ent = nghttp2_pq_top(&stream->obq); + if (!ent) { + return NULL; + } + stream = nghttp2_struct_of(ent, nghttp2_stream, pq_entry); + } +} diff --git a/lib/nghttp2_stream.h b/lib/nghttp2_stream.h index 72d1ae72..241388d8 100644 --- a/lib/nghttp2_stream.h +++ b/lib/nghttp2_stream.h @@ -131,13 +131,6 @@ typedef enum { NGHTTP2_HTTP_FLAG_EXPECT_FINAL_RESPONSE = 1 << 13 } nghttp2_http_flag; -typedef enum { - NGHTTP2_STREAM_DPRI_NONE = 0, - NGHTTP2_STREAM_DPRI_NO_ITEM = 0x01, - NGHTTP2_STREAM_DPRI_TOP = 0x02, - NGHTTP2_STREAM_DPRI_REST = 0x04 -} nghttp2_stream_dpri; - struct nghttp2_stream_roots; typedef struct nghttp2_stream_roots nghttp2_stream_roots; @@ -149,6 +142,12 @@ typedef struct nghttp2_stream nghttp2_stream; struct nghttp2_stream { /* Intrusive Map */ nghttp2_map_entry map_entry; + /* Entry for dep_prev->obq */ + nghttp2_pq_entry pq_entry; + /* Priority Queue storing direct descendant (nghttp2_stream). Only + streams which itself has some data to send, or has a descendant + which has some data to sent. */ + nghttp2_pq obq; /* Content-Length of request/response body. -1 if unknown. */ int64_t content_length; /* Received body so far */ @@ -173,9 +172,6 @@ struct nghttp2_stream { nghttp2_outbound_item *item; /* stream ID */ int32_t stream_id; - /* categorized priority of this stream. Only stream bearing - NGHTTP2_STREAM_DPRI_TOP can send item. */ - nghttp2_stream_dpri dpri; /* Current remote window size. This value is computed against the current initial window size of remote endpoint. */ int32_t remote_window_size; @@ -198,13 +194,6 @@ struct nghttp2_stream { int32_t weight; /* sum of weight of direct descendants */ int32_t sum_dep_weight; - /* sum of weight of direct descendants which have at least one - descendant with dpri == NGHTTP2_STREAM_DPRI_TOP. We use this - value to calculate effective weight. This value is only - meaningful iff dpri == NGHTTP2_STREAM_DPRI_NO_ITEM and all - streams along the path to the root stream (follow dep_prev) have - NGHTTP2_STREAM_DPRI_NO_ITEM. */ - int32_t sum_norest_weight; nghttp2_stream_state state; /* status code from remote server */ int16_t status_code; @@ -214,13 +203,24 @@ struct nghttp2_stream { uint8_t flags; /* Bitwise OR of zero or more nghttp2_shut_flag values */ uint8_t shut_flags; + /* Nonzero if this stream has been queued to stream pointed by + dep_prev. We maintain the invariant that if a stream is queued, + then its ancestors, except for root, are also queued. This + invariant may break in fatal error condition. */ + uint8_t queued; + /* Base last_cycle for direct descendent streams. */ + uint64_t descendant_last_cycle; + /* Next scheduled time to sent item */ + uint64_t cycle; + /* Last written length of frame payload */ + size_t last_writelen; }; void nghttp2_stream_init(nghttp2_stream *stream, int32_t stream_id, uint8_t flags, nghttp2_stream_state initial_state, int32_t weight, int32_t remote_initial_window_size, int32_t local_initial_window_size, - void *stream_user_data); + void *stream_user_data, nghttp2_mem *mem); void nghttp2_stream_free(nghttp2_stream *stream); @@ -243,8 +243,7 @@ void nghttp2_stream_shutdown(nghttp2_stream *stream, nghttp2_shut_flag flag); * NGHTTP2_ERR_NOMEM * Out of memory */ -int nghttp2_stream_defer_item(nghttp2_stream *stream, uint8_t flags, - nghttp2_session *session); +int nghttp2_stream_defer_item(nghttp2_stream *stream, uint8_t flags); /* * Put back deferred data in this stream to active state. The |flags| @@ -253,9 +252,14 @@ int nghttp2_stream_defer_item(nghttp2_stream *stream, uint8_t flags, * NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL and given masks are * cleared if they are set. So even if this function is called, if * one of flag is still set, data does not become active. + * + * This function returns 0 if it succeeds, or one of the following + * negative error codes: + * + * NGHTTP2_ERR_NOMEM + * Out of memory */ -int nghttp2_stream_resume_deferred_item(nghttp2_stream *stream, uint8_t flags, - nghttp2_session *session); +int nghttp2_stream_resume_deferred_item(nghttp2_stream *stream, uint8_t flags); /* * Returns nonzero if item is deferred by whatever reason. @@ -299,16 +303,10 @@ int nghttp2_stream_update_local_initial_window_size( void nghttp2_stream_promise_fulfilled(nghttp2_stream *stream); /* - * Returns the stream positioned in root of the dependency tree the - * |stream| belongs to. + * Returns nonzero if |target| is an ancestor of |stream|. */ -nghttp2_stream *nghttp2_stream_get_dep_root(nghttp2_stream *stream); - -/* - * Returns nonzero if |target| is found in subtree of |stream|. - */ -int nghttp2_stream_dep_subtree_find(nghttp2_stream *stream, - nghttp2_stream *target); +int nghttp2_stream_dep_find_ancestor(nghttp2_stream *stream, + nghttp2_stream *target); /* * Computes distributed weight of a stream of the |weight| under the @@ -317,34 +315,35 @@ int nghttp2_stream_dep_subtree_find(nghttp2_stream *stream, int32_t nghttp2_stream_dep_distributed_weight(nghttp2_stream *stream, int32_t weight); -int32_t nghttp2_stream_compute_effective_weight(nghttp2_stream *stream); - /* * Makes the |stream| depend on the |dep_stream|. This dependency is * exclusive. All existing direct descendants of |dep_stream| become * the descendants of the |stream|. This function assumes - * |stream->data| is NULL and no dpri members are changed in this - * dependency tree. + * |stream->item| is NULL. + * + * This function returns 0 if it succeeds, or one of the following + * negative error codes: + * + * NGHTTP2_ERR_NOMEM + * Out of memory */ -void nghttp2_stream_dep_insert(nghttp2_stream *dep_stream, - nghttp2_stream *stream); +int nghttp2_stream_dep_insert(nghttp2_stream *dep_stream, + nghttp2_stream *stream); /* * Makes the |stream| depend on the |dep_stream|. This dependency is - * not exclusive. This function assumes |stream->data| is NULL and no - * dpri members are changed in this dependency tree. + * not exclusive. This function assumes |stream->item| is NULL. */ void nghttp2_stream_dep_add(nghttp2_stream *dep_stream, nghttp2_stream *stream); /* * Removes the |stream| from the current dependency tree. This - * function assumes |stream->data| is NULL. + * function assumes |stream->item| is NULL. */ -void nghttp2_stream_dep_remove(nghttp2_stream *stream); +int nghttp2_stream_dep_remove(nghttp2_stream *stream); /* - * Attaches |item| to |stream|. Updates dpri members in this - * dependency tree. + * Attaches |item| to |stream|. * * This function returns 0 if it succeeds, or one of the following * negative error codes: @@ -353,13 +352,11 @@ void nghttp2_stream_dep_remove(nghttp2_stream *stream); * Out of memory */ int nghttp2_stream_attach_item(nghttp2_stream *stream, - nghttp2_outbound_item *item, - nghttp2_session *session); + nghttp2_outbound_item *item); /* - * Detaches |stream->item|. Updates dpri members in this dependency - * tree. This function does not free |stream->item|. The caller must - * free it. + * Detaches |stream->item|. This function does not free + * |stream->item|. The caller must free it. * * This function returns 0 if it succeeds, or one of the following * negative error codes: @@ -367,12 +364,11 @@ int nghttp2_stream_attach_item(nghttp2_stream *stream, * NGHTTP2_ERR_NOMEM * Out of memory */ -int nghttp2_stream_detach_item(nghttp2_stream *stream, - nghttp2_session *session); +int nghttp2_stream_detach_item(nghttp2_stream *stream); /* * Makes the |stream| depend on the |dep_stream|. This dependency is - * exclusive. Updates dpri members in this dependency tree. + * exclusive. * * This function returns 0 if it succeeds, or one of the following * negative error codes: @@ -381,12 +377,11 @@ int nghttp2_stream_detach_item(nghttp2_stream *stream, * Out of memory */ int nghttp2_stream_dep_insert_subtree(nghttp2_stream *dep_stream, - nghttp2_stream *stream, - nghttp2_session *session); + nghttp2_stream *stream); /* * Makes the |stream| depend on the |dep_stream|. This dependency is - * not exclusive. Updates dpri members in this dependency tree. + * not exclusive. * * This function returns 0 if it succeeds, or one of the following * negative error codes: @@ -395,13 +390,11 @@ int nghttp2_stream_dep_insert_subtree(nghttp2_stream *dep_stream, * Out of memory */ int nghttp2_stream_dep_add_subtree(nghttp2_stream *dep_stream, - nghttp2_stream *stream, - nghttp2_session *session); + nghttp2_stream *stream); /* - * Removes subtree whose root stream is |stream|. Removing subtree - * does not change dpri values. The effective_weight of streams in - * removed subtree is not updated. + * Removes subtree whose root stream is |stream|. The + * effective_weight of streams in removed subtree is not updated. * * This function returns 0 if it succeeds, or one of the following * negative error codes: @@ -416,4 +409,16 @@ void nghttp2_stream_dep_remove_subtree(nghttp2_stream *stream); */ int nghttp2_stream_in_dep_tree(nghttp2_stream *stream); +/* + * Schedules transmission of |stream|'s item, assuming stream->item is + * attached, and stream->last_writelen was updated. + */ +void nghttp2_stream_reschedule(nghttp2_stream *stream); + +/* + * Returns a stream which has highest priority. + */ +nghttp2_outbound_item * +nghttp2_stream_next_outbound_item(nghttp2_stream *stream); + #endif /* NGHTTP2_STREAM */ diff --git a/tests/main.c b/tests/main.c index f83a3e36..54821193 100644 --- a/tests/main.c +++ b/tests/main.c @@ -67,6 +67,8 @@ int main(int argc _U_, char *argv[] _U_) { /* add the tests to the suite */ if (!CU_add_test(pSuite, "pq", test_nghttp2_pq) || !CU_add_test(pSuite, "pq_update", test_nghttp2_pq_update) || + !CU_add_test(pSuite, "pq_remove", test_nghttp2_pq_remove) || + !CU_add_test(pSuite, "pq_increase_key", test_nghttp2_pq_increase_key) || !CU_add_test(pSuite, "map", test_nghttp2_map) || !CU_add_test(pSuite, "map_functional", test_nghttp2_map_functional) || !CU_add_test(pSuite, "map_each_free", test_nghttp2_map_each_free) || diff --git a/tests/nghttp2_pq_test.c b/tests/nghttp2_pq_test.c index a88f75b7..76278125 100644 --- a/tests/nghttp2_pq_test.c +++ b/tests/nghttp2_pq_test.c @@ -28,47 +28,89 @@ #include "nghttp2_pq.h" +typedef struct { + nghttp2_pq_entry ent; + const char *s; +} string_entry; + +static string_entry *string_entry_new(const char *s) { + nghttp2_mem *mem; + string_entry *ent; + + mem = nghttp2_mem_default(); + + ent = nghttp2_mem_malloc(mem, sizeof(string_entry)); + ent->s = s; + + return ent; +} + +static void string_entry_del(string_entry *ent) { + free(ent); +} + static int pq_less(const void *lhs, const void *rhs) { - return strcmp(lhs, rhs) < 0; + return strcmp(((string_entry *)lhs)->s, ((string_entry *)rhs)->s) < 0; } void test_nghttp2_pq(void) { int i; nghttp2_pq pq; + string_entry *top; + nghttp2_pq_init(&pq, pq_less, nghttp2_mem_default()); CU_ASSERT(nghttp2_pq_empty(&pq)); CU_ASSERT(0 == nghttp2_pq_size(&pq)); - CU_ASSERT(0 == nghttp2_pq_push(&pq, (void *)"foo")); + CU_ASSERT(0 == nghttp2_pq_push(&pq, &string_entry_new("foo")->ent)); CU_ASSERT(0 == nghttp2_pq_empty(&pq)); CU_ASSERT(1 == nghttp2_pq_size(&pq)); - CU_ASSERT(strcmp("foo", nghttp2_pq_top(&pq)) == 0); - CU_ASSERT(0 == nghttp2_pq_push(&pq, (void *)"bar")); - CU_ASSERT(strcmp("bar", nghttp2_pq_top(&pq)) == 0); - CU_ASSERT(0 == nghttp2_pq_push(&pq, (void *)"baz")); - CU_ASSERT(strcmp("bar", nghttp2_pq_top(&pq)) == 0); - CU_ASSERT(0 == nghttp2_pq_push(&pq, (void *)"C")); + top = (string_entry *)nghttp2_pq_top(&pq); + CU_ASSERT(strcmp("foo", top->s) == 0); + CU_ASSERT(0 == nghttp2_pq_push(&pq, &string_entry_new("bar")->ent)); + top = (string_entry *)nghttp2_pq_top(&pq); + CU_ASSERT(strcmp("bar", top->s) == 0); + CU_ASSERT(0 == nghttp2_pq_push(&pq, &string_entry_new("baz")->ent)); + top = (string_entry *)nghttp2_pq_top(&pq); + CU_ASSERT(strcmp("bar", top->s) == 0); + CU_ASSERT(0 == nghttp2_pq_push(&pq, &string_entry_new("C")->ent)); CU_ASSERT(4 == nghttp2_pq_size(&pq)); - CU_ASSERT(strcmp("C", nghttp2_pq_top(&pq)) == 0); + + top = (string_entry *)nghttp2_pq_top(&pq); + CU_ASSERT(strcmp("C", top->s) == 0); + string_entry_del(top); nghttp2_pq_pop(&pq); + CU_ASSERT(3 == nghttp2_pq_size(&pq)); - CU_ASSERT(strcmp("bar", nghttp2_pq_top(&pq)) == 0); + + top = (string_entry *)nghttp2_pq_top(&pq); + CU_ASSERT(strcmp("bar", top->s) == 0); nghttp2_pq_pop(&pq); - CU_ASSERT(strcmp("baz", nghttp2_pq_top(&pq)) == 0); + string_entry_del(top); + + top = (string_entry *)nghttp2_pq_top(&pq); + CU_ASSERT(strcmp("baz", top->s) == 0); nghttp2_pq_pop(&pq); - CU_ASSERT(strcmp("foo", nghttp2_pq_top(&pq)) == 0); + string_entry_del(top); + + top = (string_entry *)nghttp2_pq_top(&pq); + CU_ASSERT(strcmp("foo", top->s) == 0); nghttp2_pq_pop(&pq); + string_entry_del(top); + CU_ASSERT(nghttp2_pq_empty(&pq)); CU_ASSERT(0 == nghttp2_pq_size(&pq)); CU_ASSERT(NULL == nghttp2_pq_top(&pq)); /* Add bunch of entry to see realloc works */ for (i = 0; i < 10000; ++i) { - CU_ASSERT(0 == nghttp2_pq_push(&pq, (void *)"foo")); + CU_ASSERT(0 == nghttp2_pq_push(&pq, &string_entry_new("foo")->ent)); CU_ASSERT((size_t)(i + 1) == nghttp2_pq_size(&pq)); } for (i = 10000; i > 0; --i) { - CU_ASSERT(NULL != nghttp2_pq_top(&pq)); + top = (string_entry *)nghttp2_pq_top(&pq); + CU_ASSERT(NULL != top); nghttp2_pq_pop(&pq); + string_entry_del(top); CU_ASSERT((size_t)(i - 1) == nghttp2_pq_size(&pq)); } @@ -76,6 +118,7 @@ void test_nghttp2_pq(void) { } typedef struct { + nghttp2_pq_entry ent; int key; int val; } node; @@ -86,7 +129,7 @@ static int node_less(const void *lhs, const void *rhs) { return ln->key < rn->key; } -static int node_update(void *item, void *arg _U_) { +static int node_update(nghttp2_pq_entry *item, void *arg _U_) { node *nd = (node *)item; if ((nd->key % 2) == 0) { nd->key *= -1; @@ -108,16 +151,107 @@ void test_nghttp2_pq_update(void) { for (i = 0; i < (int)(sizeof(nodes) / sizeof(nodes[0])); ++i) { nodes[i].key = i; nodes[i].val = i; - nghttp2_pq_push(&pq, &nodes[i]); + nghttp2_pq_push(&pq, &nodes[i].ent); } nghttp2_pq_update(&pq, node_update, NULL); for (i = 0; i < (int)(sizeof(nodes) / sizeof(nodes[0])); ++i) { - nd = nghttp2_pq_top(&pq); + nd = (node *)nghttp2_pq_top(&pq); CU_ASSERT(ans[i] == nd->key); nghttp2_pq_pop(&pq); } nghttp2_pq_free(&pq); } + +static void push_nodes(nghttp2_pq *pq, node *dest, size_t n) { + size_t i; + for (i = 0; i < n; ++i) { + dest[i].key = (int)i; + dest[i].val = (int)i; + nghttp2_pq_push(pq, &dest[i].ent); + } +} + +static void check_nodes(nghttp2_pq *pq, size_t n, int *ans_key, int *ans_val) { + size_t i; + for (i = 0; i < n; ++i) { + node *nd = (node *)nghttp2_pq_top(pq); + CU_ASSERT(ans_key[i] == nd->key); + CU_ASSERT(ans_val[i] == nd->val); + nghttp2_pq_pop(pq); + } +} + +void test_nghttp2_pq_remove(void) { + nghttp2_pq pq; + node nodes[10]; + int ans_key1[] = {1, 2, 3, 4, 5}; + int ans_val1[] = {1, 2, 3, 4, 5}; + int ans_key2[] = {0, 1, 2, 4, 5}; + int ans_val2[] = {0, 1, 2, 4, 5}; + int ans_key3[] = {0, 1, 2, 3, 4}; + int ans_val3[] = {0, 1, 2, 3, 4}; + + nghttp2_pq_init(&pq, node_less, nghttp2_mem_default()); + + push_nodes(&pq, nodes, 6); + + nghttp2_pq_remove(&pq, &nodes[0].ent); + + check_nodes(&pq, 5, ans_key1, ans_val1); + + nghttp2_pq_free(&pq); + + nghttp2_pq_init(&pq, node_less, nghttp2_mem_default()); + + push_nodes(&pq, nodes, 6); + + nghttp2_pq_remove(&pq, &nodes[3].ent); + + check_nodes(&pq, 5, ans_key2, ans_val2); + + nghttp2_pq_free(&pq); + + nghttp2_pq_init(&pq, node_less, nghttp2_mem_default()); + + push_nodes(&pq, nodes, 6); + + nghttp2_pq_remove(&pq, &nodes[5].ent); + + check_nodes(&pq, 5, ans_key3, ans_val3); + + nghttp2_pq_free(&pq); +} + +void test_nghttp2_pq_increase_key(void) { + nghttp2_pq pq; + node nodes[10]; + int ans_key1[] = {1, 2, 3, 3, 4, 5}; + int ans_val1[] = {1, 2, 0, 3, 4, 5}; + int ans_key2[] = {0, 1, 2, 3, 4, 6}; + int ans_val2[] = {0, 1, 2, 3, 4, 5}; + + nghttp2_pq_init(&pq, node_less, nghttp2_mem_default()); + + push_nodes(&pq, nodes, 6); + + nodes[0].key = 3; + nghttp2_pq_increase_key(&pq, &nodes[0].ent); + + check_nodes(&pq, 6, ans_key1, ans_val1); + + nghttp2_pq_free(&pq); + + nghttp2_pq_init(&pq, node_less, nghttp2_mem_default()); + + push_nodes(&pq, nodes, 6); + + nodes[5].key = 6; + nghttp2_pq_increase_key(&pq, &nodes[5].ent); + + check_nodes(&pq, 6, ans_key2, ans_val2); + + nghttp2_pq_free(&pq); +} diff --git a/tests/nghttp2_pq_test.h b/tests/nghttp2_pq_test.h index 7818194d..7a8afe25 100644 --- a/tests/nghttp2_pq_test.h +++ b/tests/nghttp2_pq_test.h @@ -27,5 +27,7 @@ void test_nghttp2_pq(void); void test_nghttp2_pq_update(void); +void test_nghttp2_pq_remove(void); +void test_nghttp2_pq_increase_key(void); #endif /* NGHTTP2_PQ_TEST_H */ diff --git a/tests/nghttp2_session_test.c b/tests/nghttp2_session_test.c index 84765359..7815ce68 100644 --- a/tests/nghttp2_session_test.c +++ b/tests/nghttp2_session_test.c @@ -2686,7 +2686,7 @@ void test_nghttp2_session_on_window_update_received(void) { data_item = create_data_ob_item(mem); - CU_ASSERT(0 == nghttp2_stream_attach_item(stream, data_item, session)); + CU_ASSERT(0 == nghttp2_stream_attach_item(stream, data_item)); nghttp2_frame_window_update_init(&frame.window_update, NGHTTP2_FLAG_NONE, 1, 16 * 1024); @@ -2696,9 +2696,8 @@ void test_nghttp2_session_on_window_update_received(void) { CU_ASSERT(NGHTTP2_INITIAL_WINDOW_SIZE + 16 * 1024 == stream->remote_window_size); - CU_ASSERT(0 == - nghttp2_stream_defer_item( - stream, NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL, session)); + CU_ASSERT(0 == nghttp2_stream_defer_item( + stream, NGHTTP2_STREAM_FLAG_DEFERRED_FLOW_CONTROL)); CU_ASSERT(0 == nghttp2_session_on_window_update_received(session, &frame)); CU_ASSERT(2 == user_data.frame_recv_cb_called); @@ -4679,7 +4678,7 @@ void test_nghttp2_session_pop_next_ob_item(void) { stream = nghttp2_session_get_stream(session, 1); - nghttp2_stream_detach_item(stream, session); + nghttp2_stream_detach_item(stream); nghttp2_outbound_item_free(item, mem); mem->free(item, NULL); @@ -4887,7 +4886,7 @@ void test_nghttp2_session_defer_data(void) { /* Resume deferred DATA */ CU_ASSERT(0 == nghttp2_session_resume_data(session, 1)); - item = (nghttp2_outbound_item *)nghttp2_pq_top(&session->ob_da_pq); + item = stream->item; item->aux_data.data.data_prd.read_callback = fixed_length_data_source_read_callback; ud.block_count = 1; @@ -4905,7 +4904,6 @@ void test_nghttp2_session_defer_data(void) { /* Resume deferred DATA */ CU_ASSERT(0 == nghttp2_session_resume_data(session, 1)); - item = (nghttp2_outbound_item *)nghttp2_pq_top(&session->ob_da_pq); item->aux_data.data.data_prd.read_callback = fixed_length_data_source_read_callback; ud.block_count = 1; @@ -5165,7 +5163,6 @@ void test_nghttp2_session_data_read_temporal_failure(void) { CU_ASSERT(data_size - NGHTTP2_INITIAL_WINDOW_SIZE == ud.data_source_length); stream = nghttp2_session_get_stream(session, 1); - CU_ASSERT(nghttp2_stream_check_deferred_by_flow_control(stream)); CU_ASSERT(NGHTTP2_DATA == stream->item->frame.hd.type); stream->item->aux_data.data.data_prd.read_callback = @@ -5872,7 +5869,7 @@ void test_nghttp2_session_stream_dep_add_subtree(void) { */ nghttp2_stream_dep_remove_subtree(e); - nghttp2_stream_dep_add_subtree(a, e, session); + nghttp2_stream_dep_add_subtree(a, e); /* becomes * a @@ -5919,7 +5916,7 @@ void test_nghttp2_session_stream_dep_add_subtree(void) { */ nghttp2_stream_dep_remove_subtree(e); - nghttp2_stream_dep_insert_subtree(a, e, session); + nghttp2_stream_dep_insert_subtree(a, e); /* becomes * a @@ -6098,7 +6095,7 @@ void test_nghttp2_session_stream_dep_all_your_stream_are_belong_to_us(void) { */ nghttp2_stream_dep_remove_subtree(c); - CU_ASSERT(0 == nghttp2_stream_dep_insert_subtree(&session->root, c, session)); + CU_ASSERT(0 == nghttp2_stream_dep_insert_subtree(&session->root, c)); /* * c @@ -6112,9 +6109,9 @@ void test_nghttp2_session_stream_dep_all_your_stream_are_belong_to_us(void) { CU_ASSERT(NGHTTP2_DEFAULT_WEIGHT == a->sum_dep_weight); CU_ASSERT(0 == b->sum_dep_weight); - CU_ASSERT(0 == a->sum_norest_weight); - CU_ASSERT(0 == b->sum_norest_weight); - CU_ASSERT(0 == c->sum_norest_weight); + CU_ASSERT(nghttp2_pq_empty(&a->obq)); + CU_ASSERT(nghttp2_pq_empty(&b->obq)); + CU_ASSERT(nghttp2_pq_empty(&c->obq)); check_stream_dep_sib(c, root, a, NULL, NULL); check_stream_dep_sib(a, c, b, NULL, NULL); @@ -6135,7 +6132,7 @@ void test_nghttp2_session_stream_dep_all_your_stream_are_belong_to_us(void) { */ nghttp2_stream_dep_remove_subtree(c); - CU_ASSERT(0 == nghttp2_stream_dep_insert_subtree(&session->root, c, session)); + CU_ASSERT(0 == nghttp2_stream_dep_insert_subtree(&session->root, c)); /* * c @@ -6147,9 +6144,9 @@ void test_nghttp2_session_stream_dep_all_your_stream_are_belong_to_us(void) { CU_ASSERT(0 == b->sum_dep_weight); CU_ASSERT(0 == a->sum_dep_weight); - CU_ASSERT(0 == a->sum_norest_weight); - CU_ASSERT(0 == b->sum_norest_weight); - CU_ASSERT(0 == c->sum_norest_weight); + CU_ASSERT(nghttp2_pq_empty(&a->obq)); + CU_ASSERT(nghttp2_pq_empty(&b->obq)); + CU_ASSERT(nghttp2_pq_empty(&c->obq)); check_stream_dep_sib(c, root, b, NULL, NULL); check_stream_dep_sib(b, c, NULL, NULL, a); @@ -6173,7 +6170,7 @@ void test_nghttp2_session_stream_dep_all_your_stream_are_belong_to_us(void) { */ nghttp2_stream_dep_remove_subtree(c); - CU_ASSERT(0 == nghttp2_stream_dep_insert_subtree(&session->root, c, session)); + CU_ASSERT(0 == nghttp2_stream_dep_insert_subtree(&session->root, c)); /* * c @@ -6188,10 +6185,10 @@ void test_nghttp2_session_stream_dep_all_your_stream_are_belong_to_us(void) { CU_ASSERT(NGHTTP2_DEFAULT_WEIGHT == a->sum_dep_weight); CU_ASSERT(0 == b->sum_dep_weight); - CU_ASSERT(0 == a->sum_norest_weight); - CU_ASSERT(0 == b->sum_norest_weight); - CU_ASSERT(0 == c->sum_norest_weight); - CU_ASSERT(0 == d->sum_norest_weight); + CU_ASSERT(nghttp2_pq_empty(&a->obq)); + CU_ASSERT(nghttp2_pq_empty(&b->obq)); + CU_ASSERT(nghttp2_pq_empty(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); check_stream_dep_sib(c, root, d, NULL, NULL); check_stream_dep_sib(d, c, NULL, NULL, a); @@ -6217,10 +6214,10 @@ void test_nghttp2_session_stream_dep_all_your_stream_are_belong_to_us(void) { db = create_data_ob_item(mem); - nghttp2_stream_attach_item(b, db, session); + nghttp2_stream_attach_item(b, db); nghttp2_stream_dep_remove_subtree(c); - CU_ASSERT(0 == nghttp2_stream_dep_insert_subtree(&session->root, c, session)); + CU_ASSERT(0 == nghttp2_stream_dep_insert_subtree(&session->root, c)); /* * c @@ -6229,14 +6226,15 @@ void test_nghttp2_session_stream_dep_all_your_stream_are_belong_to_us(void) { * | * b */ - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == d->dpri); - CU_ASSERT(16 == a->sum_norest_weight); - CU_ASSERT(16 == c->sum_norest_weight); - CU_ASSERT(0 == d->sum_norest_weight); + CU_ASSERT(c->queued); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(!d->queued); + + CU_ASSERT(1 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); check_stream_dep_sib(c, root, d, NULL, NULL); check_stream_dep_sib(d, c, NULL, NULL, a); @@ -6263,11 +6261,11 @@ void test_nghttp2_session_stream_dep_all_your_stream_are_belong_to_us(void) { db = create_data_ob_item(mem); dc = create_data_ob_item(mem); - nghttp2_stream_attach_item(b, db, session); - nghttp2_stream_attach_item(c, dc, session); + nghttp2_stream_attach_item(b, db); + nghttp2_stream_attach_item(c, dc); nghttp2_stream_dep_remove_subtree(c); - CU_ASSERT(0 == nghttp2_stream_dep_insert_subtree(&session->root, c, session)); + CU_ASSERT(0 == nghttp2_stream_dep_insert_subtree(&session->root, c)); /* * c @@ -6277,10 +6275,10 @@ void test_nghttp2_session_stream_dep_all_your_stream_are_belong_to_us(void) { * b */ - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_REST == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == d->dpri); + CU_ASSERT(c->queued); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(!d->queued); check_stream_dep_sib(c, root, d, NULL, NULL); check_stream_dep_sib(d, c, NULL, NULL, a); @@ -6317,94 +6315,82 @@ void test_nghttp2_session_stream_attach_item(void) { db = create_data_ob_item(mem); - nghttp2_stream_attach_item(b, db, session); + nghttp2_stream_attach_item(b, db); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == d->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(!c->queued); + CU_ASSERT(!d->queued); - CU_ASSERT(16 == nghttp2_stream_compute_effective_weight(b)); - - CU_ASSERT(16 == a->sum_norest_weight); - - CU_ASSERT(1 == db->queued); + CU_ASSERT(1 == nghttp2_pq_size(&a->obq)); + /* Attach item to c */ dc = create_data_ob_item(mem); - nghttp2_stream_attach_item(c, dc, session); + nghttp2_stream_attach_item(c, dc); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == d->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(!d->queued); - CU_ASSERT(16 * 16 / 32 == nghttp2_stream_compute_effective_weight(b)); - CU_ASSERT(16 * 16 / 32 == nghttp2_stream_compute_effective_weight(c)); - - CU_ASSERT(32 == a->sum_norest_weight); - - CU_ASSERT(1 == dc->queued); + CU_ASSERT(2 == nghttp2_pq_size(&a->obq)); + /* Attach item to a */ da = create_data_ob_item(mem); - nghttp2_stream_attach_item(a, da, session); + nghttp2_stream_attach_item(a, da); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_REST == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_REST == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == d->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(!d->queued); - CU_ASSERT(16 == nghttp2_stream_compute_effective_weight(a)); + CU_ASSERT(2 == nghttp2_pq_size(&a->obq)); - CU_ASSERT(1 == da->queued); + /* Detach item from a */ + nghttp2_stream_detach_item(a); - nghttp2_stream_detach_item(a, session); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(!d->queued); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == d->dpri); - - CU_ASSERT(16 * 16 / 32 == nghttp2_stream_compute_effective_weight(b)); - CU_ASSERT(16 * 16 / 32 == nghttp2_stream_compute_effective_weight(c)); + CU_ASSERT(2 == nghttp2_pq_size(&a->obq)); + /* Attach item to d */ dd = create_data_ob_item(mem); - nghttp2_stream_attach_item(d, dd, session); + nghttp2_stream_attach_item(d, dd); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_REST == d->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(d->queued); - CU_ASSERT(16 * 16 / 32 == nghttp2_stream_compute_effective_weight(b)); - CU_ASSERT(16 * 16 / 32 == nghttp2_stream_compute_effective_weight(c)); + CU_ASSERT(2 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&c->obq)); - CU_ASSERT(0 == dd->queued); + /* Detach item from c */ + nghttp2_stream_detach_item(c); - nghttp2_stream_detach_item(c, session); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(d->queued); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == d->dpri); + CU_ASSERT(2 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&c->obq)); - CU_ASSERT(16 * 16 / 32 == nghttp2_stream_compute_effective_weight(b)); - CU_ASSERT(16 * 16 / 32 == nghttp2_stream_compute_effective_weight(d)); + /* Detach item from b */ + nghttp2_stream_detach_item(b); - CU_ASSERT(1 == dd->queued); + CU_ASSERT(a->queued); + CU_ASSERT(!b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(d->queued); - nghttp2_stream_detach_item(b, session); - - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == d->dpri); - - CU_ASSERT(16 * 16 / 16 == nghttp2_stream_compute_effective_weight(d)); - - CU_ASSERT(1 == dd->queued); + CU_ASSERT(1 == nghttp2_pq_size(&a->obq)); /* exercises insertion */ e = open_stream_with_dep_excl(session, 9, a); @@ -6418,18 +6404,17 @@ void test_nghttp2_session_stream_attach_item(void) { * d */ - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == e->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == d->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(e->queued); + CU_ASSERT(!b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(d->queued); - CU_ASSERT(16 * 16 / 16 == nghttp2_stream_compute_effective_weight(d)); - - CU_ASSERT(16 == a->sum_norest_weight); - CU_ASSERT(16 == e->sum_norest_weight); - CU_ASSERT(16 == c->sum_norest_weight); - CU_ASSERT(0 == b->sum_norest_weight); + CU_ASSERT(1 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&e->obq)); + CU_ASSERT(nghttp2_pq_empty(&b->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); /* exercises deletion */ nghttp2_stream_dep_remove(e); @@ -6441,18 +6426,28 @@ void test_nghttp2_session_stream_attach_item(void) { * d */ - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == d->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(!b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(d->queued); - CU_ASSERT(16 * 16 / 16 == nghttp2_stream_compute_effective_weight(d)); + CU_ASSERT(1 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(nghttp2_pq_empty(&b->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); /* e's weight 16 is distributed equally among c and b, both now have weight 8 each. */ - CU_ASSERT(8 == a->sum_norest_weight); - CU_ASSERT(16 == c->sum_norest_weight); - CU_ASSERT(0 == b->sum_norest_weight); + CU_ASSERT(8 == b->weight); + CU_ASSERT(8 == c->weight); + + /* da, db, dc have been detached */ + nghttp2_outbound_item_free(da, mem); + nghttp2_outbound_item_free(db, mem); + nghttp2_outbound_item_free(dc, mem); + free(da); + free(db); + free(dc); nghttp2_session_del(session); @@ -6474,25 +6469,36 @@ void test_nghttp2_session_stream_attach_item(void) { db = create_data_ob_item(mem); dc = create_data_ob_item(mem); - nghttp2_stream_attach_item(a, da, session); - nghttp2_stream_attach_item(b, db, session); - nghttp2_stream_attach_item(c, dc, session); + nghttp2_stream_attach_item(a, da); + nghttp2_stream_attach_item(b, db); + nghttp2_stream_attach_item(c, dc); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_REST == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_REST == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == d->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(!d->queued); - /* check that all children's item get queued */ - nghttp2_stream_detach_item(a, session); + CU_ASSERT(2 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(nghttp2_pq_empty(&b->obq)); + CU_ASSERT(nghttp2_pq_empty(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == d->dpri); + /* Detach item from a */ + nghttp2_stream_detach_item(a); - CU_ASSERT(1 == db->queued); - CU_ASSERT(1 == dc->queued); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(!d->queued); + + CU_ASSERT(2 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(nghttp2_pq_empty(&b->obq)); + CU_ASSERT(nghttp2_pq_empty(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); + + /* da has been detached */ + nghttp2_outbound_item_free(da, mem); + free(da); nghttp2_session_del(session); } @@ -6528,30 +6534,30 @@ void test_nghttp2_session_stream_attach_item_subtree(void) { de = create_data_ob_item(mem); - nghttp2_stream_attach_item(e, de, session); + nghttp2_stream_attach_item(e, de); db = create_data_ob_item(mem); - nghttp2_stream_attach_item(b, db, session); + nghttp2_stream_attach_item(b, db); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == d->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == e->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == f->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(!c->queued); + CU_ASSERT(!d->queued); + CU_ASSERT(e->queued); + CU_ASSERT(!f->queued); - CU_ASSERT(16 == nghttp2_stream_compute_effective_weight(b)); - CU_ASSERT(32 == nghttp2_stream_compute_effective_weight(e)); - - CU_ASSERT(16 == a->sum_norest_weight); - CU_ASSERT(0 == c->sum_norest_weight); - CU_ASSERT(0 == d->sum_norest_weight); + CU_ASSERT(1 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(nghttp2_pq_empty(&b->obq)); + CU_ASSERT(nghttp2_pq_empty(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); + CU_ASSERT(nghttp2_pq_empty(&e->obq)); + CU_ASSERT(nghttp2_pq_empty(&f->obq)); /* Insert subtree e under a */ nghttp2_stream_dep_remove_subtree(e); - nghttp2_stream_dep_insert_subtree(a, e, session); + nghttp2_stream_dep_insert_subtree(a, e); /* * a @@ -6563,22 +6569,25 @@ void test_nghttp2_session_stream_attach_item_subtree(void) { * d */ - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_REST == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == d->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == e->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == f->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(!c->queued); + CU_ASSERT(!d->queued); + CU_ASSERT(e->queued); + CU_ASSERT(!f->queued); - CU_ASSERT(16 == nghttp2_stream_compute_effective_weight(e)); - - CU_ASSERT(32 == a->sum_norest_weight); + CU_ASSERT(1 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(nghttp2_pq_empty(&b->obq)); + CU_ASSERT(nghttp2_pq_empty(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&e->obq)); + CU_ASSERT(nghttp2_pq_empty(&f->obq)); /* Remove subtree b */ nghttp2_stream_dep_remove_subtree(b); - CU_ASSERT(0 == nghttp2_stream_dep_add_subtree(&session->root, b, session)); + CU_ASSERT(0 == nghttp2_stream_dep_add_subtree(&session->root, b)); /* * a b @@ -6590,34 +6599,45 @@ void test_nghttp2_session_stream_attach_item_subtree(void) { * d */ - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == d->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == e->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == f->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(!c->queued); + CU_ASSERT(!d->queued); + CU_ASSERT(e->queued); + CU_ASSERT(!f->queued); - CU_ASSERT(16 == nghttp2_stream_compute_effective_weight(b)); - CU_ASSERT(16 == nghttp2_stream_compute_effective_weight(e)); + CU_ASSERT(1 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(nghttp2_pq_empty(&b->obq)); + CU_ASSERT(nghttp2_pq_empty(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); + CU_ASSERT(nghttp2_pq_empty(&e->obq)); + CU_ASSERT(nghttp2_pq_empty(&f->obq)); - /* Remove subtree a */ + /* Remove subtree a, and add it to root again */ nghttp2_stream_dep_remove_subtree(a); - CU_ASSERT(0 == nghttp2_stream_dep_add_subtree(&session->root, a, session)); + CU_ASSERT(0 == nghttp2_stream_dep_add_subtree(&session->root, a)); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == d->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == e->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == f->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(!c->queued); + CU_ASSERT(!d->queued); + CU_ASSERT(e->queued); + CU_ASSERT(!f->queued); + + CU_ASSERT(1 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(nghttp2_pq_empty(&b->obq)); + CU_ASSERT(nghttp2_pq_empty(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); + CU_ASSERT(nghttp2_pq_empty(&e->obq)); + CU_ASSERT(nghttp2_pq_empty(&f->obq)); /* Remove subtree c */ nghttp2_stream_dep_remove_subtree(c); - CU_ASSERT(0 == nghttp2_stream_dep_add_subtree(&session->root, c, session)); + CU_ASSERT(0 == nghttp2_stream_dep_add_subtree(&session->root, c)); /* * a b c @@ -6627,26 +6647,28 @@ void test_nghttp2_session_stream_attach_item_subtree(void) { * f */ - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == d->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == e->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == f->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(!c->queued); + CU_ASSERT(!d->queued); + CU_ASSERT(e->queued); + CU_ASSERT(!f->queued); - CU_ASSERT(32 == a->sum_norest_weight); - CU_ASSERT(0 == c->sum_norest_weight); + CU_ASSERT(1 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(nghttp2_pq_empty(&b->obq)); + CU_ASSERT(nghttp2_pq_empty(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); + CU_ASSERT(nghttp2_pq_empty(&e->obq)); + CU_ASSERT(nghttp2_pq_empty(&f->obq)); dd = create_data_ob_item(mem); - nghttp2_stream_attach_item(d, dd, session); - - CU_ASSERT(16 == c->sum_norest_weight); + nghttp2_stream_attach_item(d, dd); /* Add subtree c to a */ nghttp2_stream_dep_remove_subtree(c); - nghttp2_stream_dep_add_subtree(a, c, session); + nghttp2_stream_dep_add_subtree(a, c); /* * a b @@ -6656,50 +6678,53 @@ void test_nghttp2_session_stream_attach_item_subtree(void) { * d f */ - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == d->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == e->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == f->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(d->queued); + CU_ASSERT(e->queued); + CU_ASSERT(!f->queued); - CU_ASSERT(16 == nghttp2_stream_compute_effective_weight(b)); - CU_ASSERT(16 * 16 / 48 == nghttp2_stream_compute_effective_weight(d)); - CU_ASSERT(16 * 32 / 48 == nghttp2_stream_compute_effective_weight(e)); - - CU_ASSERT(48 == a->sum_norest_weight); - CU_ASSERT(16 == c->sum_norest_weight); + CU_ASSERT(2 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(nghttp2_pq_empty(&b->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); + CU_ASSERT(nghttp2_pq_empty(&e->obq)); + CU_ASSERT(nghttp2_pq_empty(&f->obq)); /* Insert b under a */ nghttp2_stream_dep_remove_subtree(b); - nghttp2_stream_dep_insert_subtree(a, b, session); + nghttp2_stream_dep_insert_subtree(a, b); /* * a * | * b * | - * e--c + * c--e * | | - * f d + * d f */ - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_REST == d->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_REST == e->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == f->dpri); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(d->queued); + CU_ASSERT(e->queued); + CU_ASSERT(!f->queued); - CU_ASSERT(16 == nghttp2_stream_compute_effective_weight(b)); - - CU_ASSERT(16 == a->sum_norest_weight); + CU_ASSERT(1 == nghttp2_pq_size(&a->obq)); + CU_ASSERT(2 == nghttp2_pq_size(&b->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); + CU_ASSERT(nghttp2_pq_empty(&e->obq)); + CU_ASSERT(nghttp2_pq_empty(&f->obq)); /* Remove subtree b */ nghttp2_stream_dep_remove_subtree(b); - CU_ASSERT(0 == nghttp2_stream_dep_add_subtree(&session->root, b, session)); + CU_ASSERT(0 == nghttp2_stream_dep_add_subtree(&session->root, b)); /* * b a @@ -6709,21 +6734,26 @@ void test_nghttp2_session_stream_attach_item_subtree(void) { * f d */ - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_REST == d->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_REST == e->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == f->dpri); + CU_ASSERT(!a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(d->queued); + CU_ASSERT(e->queued); + CU_ASSERT(!f->queued); - CU_ASSERT(0 == a->sum_norest_weight); + CU_ASSERT(nghttp2_pq_empty(&a->obq)); + CU_ASSERT(2 == nghttp2_pq_size(&b->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); + CU_ASSERT(nghttp2_pq_empty(&e->obq)); + CU_ASSERT(nghttp2_pq_empty(&f->obq)); /* Remove subtree c, and detach item from b, and then re-add subtree c under b */ nghttp2_stream_dep_remove_subtree(c); - nghttp2_stream_detach_item(b, session); - nghttp2_stream_dep_add_subtree(b, c, session); + nghttp2_stream_detach_item(b); + nghttp2_stream_dep_add_subtree(b, c); /* * b a @@ -6733,21 +6763,26 @@ void test_nghttp2_session_stream_attach_item_subtree(void) { * f d */ - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == d->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == e->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == f->dpri); + CU_ASSERT(!a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(d->queued); + CU_ASSERT(e->queued); + CU_ASSERT(!f->queued); - CU_ASSERT(48 == b->sum_norest_weight); + CU_ASSERT(nghttp2_pq_empty(&a->obq)); + CU_ASSERT(2 == nghttp2_pq_size(&b->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); + CU_ASSERT(nghttp2_pq_empty(&e->obq)); + CU_ASSERT(nghttp2_pq_empty(&f->obq)); /* Attach data to a, and add subtree a under b */ da = create_data_ob_item(mem); - nghttp2_stream_attach_item(a, da, session); + nghttp2_stream_attach_item(a, da); nghttp2_stream_dep_remove_subtree(a); - nghttp2_stream_dep_add_subtree(b, a, session); + nghttp2_stream_dep_add_subtree(b, a); /* * b @@ -6756,18 +6791,24 @@ void test_nghttp2_session_stream_attach_item_subtree(void) { * | | * f d */ - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == d->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == e->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == f->dpri); - CU_ASSERT(64 == b->sum_norest_weight); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(d->queued); + CU_ASSERT(e->queued); + CU_ASSERT(!f->queued); + + CU_ASSERT(nghttp2_pq_empty(&a->obq)); + CU_ASSERT(3 == nghttp2_pq_size(&b->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); + CU_ASSERT(nghttp2_pq_empty(&e->obq)); + CU_ASSERT(nghttp2_pq_empty(&f->obq)); /* Remove subtree c, and add under f */ nghttp2_stream_dep_remove_subtree(c); - nghttp2_stream_dep_insert_subtree(f, c, session); + nghttp2_stream_dep_insert_subtree(f, c); /* * b @@ -6780,14 +6821,24 @@ void test_nghttp2_session_stream_attach_item_subtree(void) { * | * d */ - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == a->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == b->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == c->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_REST == d->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_TOP == e->dpri); - CU_ASSERT(NGHTTP2_STREAM_DPRI_NO_ITEM == f->dpri); - CU_ASSERT(48 == b->sum_norest_weight); + CU_ASSERT(a->queued); + CU_ASSERT(b->queued); + CU_ASSERT(c->queued); + CU_ASSERT(d->queued); + CU_ASSERT(e->queued); + CU_ASSERT(f->queued); + + CU_ASSERT(nghttp2_pq_empty(&a->obq)); + CU_ASSERT(2 == nghttp2_pq_size(&b->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&c->obq)); + CU_ASSERT(nghttp2_pq_empty(&d->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&e->obq)); + CU_ASSERT(1 == nghttp2_pq_size(&f->obq)); + + /* db has been detached */ + nghttp2_outbound_item_free(db, mem); + free(db); nghttp2_session_del(session); } @@ -6990,7 +7041,7 @@ void test_nghttp2_session_large_dep_tree(void) { stream_id = 1; for (i = 0; i < 250; ++i) { stream = nghttp2_session_get_stream(session, stream_id); - CU_ASSERT(nghttp2_stream_dep_subtree_find(&session->root, stream)); + CU_ASSERT(nghttp2_stream_dep_find_ancestor(stream, &session->root)); CU_ASSERT(nghttp2_stream_in_dep_tree(stream)); }