From 1155606d5e9bf56acb7986c0bfdd9bd4b80ca6fe Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Fri, 9 Aug 2013 23:40:41 +0900 Subject: [PATCH] Rewrite re-prioritization with PRIORITY --- lib/nghttp2_outbound_item.h | 2 + lib/nghttp2_pq.c | 17 +++++ lib/nghttp2_pq.h | 10 +++ lib/nghttp2_session.c | 117 ++++++++++++++++++++++------------- lib/nghttp2_session.h | 6 ++ lib/nghttp2_submit.c | 7 +++ tests/main.c | 5 +- tests/nghttp2_pq_test.c | 50 +++++++++++++++ tests/nghttp2_pq_test.h | 1 + tests/nghttp2_session_test.c | 87 ++++++++++++++++++-------- tests/nghttp2_session_test.h | 2 +- 11 files changed, 231 insertions(+), 73 deletions(-) diff --git a/lib/nghttp2_outbound_item.h b/lib/nghttp2_outbound_item.h index 24929efb..8c367381 100644 --- a/lib/nghttp2_outbound_item.h +++ b/lib/nghttp2_outbound_item.h @@ -34,6 +34,8 @@ /* Priority for PING */ #define NGHTTP2_OB_PRI_PING -10 +/* Priority for SETTINGS */ +#define NGHTTP2_OB_PRI_SETTINGS -9 typedef struct { nghttp2_data_provider *data_prd; diff --git a/lib/nghttp2_pq.c b/lib/nghttp2_pq.c index 2a8d8938..04fc48cc 100644 --- a/lib/nghttp2_pq.c +++ b/lib/nghttp2_pq.c @@ -126,3 +126,20 @@ size_t nghttp2_pq_size(nghttp2_pq *pq) { return pq->length; } + +void nghttp2_pq_update(nghttp2_pq *pq, nghttp2_pq_item_cb fun, void *arg) +{ + size_t i; + int rv = 0; + if(pq->length == 0) { + return; + } + for(i = 0; i < pq->length; ++i) { + rv |= (*fun)(pq->q[i], arg); + } + if(rv) { + for(i = pq->length; i > 0; --i) { + bubble_down(pq, i - 1); + } + } +} diff --git a/lib/nghttp2_pq.h b/lib/nghttp2_pq.h index c574ba6d..2c8c363b 100644 --- a/lib/nghttp2_pq.h +++ b/lib/nghttp2_pq.h @@ -96,4 +96,14 @@ int nghttp2_pq_empty(nghttp2_pq *pq); */ size_t nghttp2_pq_size(nghttp2_pq *pq); +typedef int (*nghttp2_pq_item_cb)(void *item, void *arg); + +/* + * Updates each item in |pq| using function |fun| and re-construct + * priority queue. The |fun| must return non-zero if it modifies the + * item in a way that it affects ordering in the priority queue. The + * |arg| is passed to the 2nd parameter of |fun|. + */ +void nghttp2_pq_update(nghttp2_pq *pq, nghttp2_pq_item_cb fun, void *arg); + #endif /* NGHTTP2_PQ_H */ diff --git a/lib/nghttp2_session.c b/lib/nghttp2_session.c index 3bed0fe9..1b6e12fd 100644 --- a/lib/nghttp2_session.c +++ b/lib/nghttp2_session.c @@ -308,11 +308,60 @@ void nghttp2_session_del(nghttp2_session *session) free(session); } +static int outbound_item_update_pri +(nghttp2_outbound_item *item, nghttp2_stream *stream) +{ + if(item->frame_cat == NGHTTP2_CAT_CTRL) { + if(((nghttp2_frame*)item->frame)->hd.stream_id != stream->stream_id) { + return 0; + } + switch(((nghttp2_frame*)item->frame)->hd.type) { + case NGHTTP2_HEADERS: + case NGHTTP2_PUSH_PROMISE: + break; + default: + return 0; + } + } else { + if(((nghttp2_data*)item->frame)->hd.stream_id != stream->stream_id) { + return 0; + } + } + item->pri = stream->pri; + return 1; +} + +static int update_stream_pri(void *ptr, void *arg) +{ + nghttp2_outbound_item *item = (nghttp2_outbound_item*)ptr; + nghttp2_stream *stream = (nghttp2_stream*)arg; + return outbound_item_update_pri(item, stream); +} + +void nghttp2_session_reprioritize_stream +(nghttp2_session *session, nghttp2_stream *stream, int32_t pri) +{ + if(stream->pri == pri) { + return; + } + stream->pri = pri; + nghttp2_pq_update(&session->ob_pq, update_stream_pri, stream); + nghttp2_pq_update(&session->ob_ss_pq, update_stream_pri, stream); + if(stream->deferred_data) { + stream->deferred_data->pri = pri; + } + if(session->aob.item) { + outbound_item_update_pri(session->aob.item, stream); + } +} + int nghttp2_session_add_frame(nghttp2_session *session, nghttp2_frame_category frame_cat, void *abs_frame, void *aux_data) { + /* TODO Return error if stream is not found for the frame requiring + stream presence. */ int r = 0; nghttp2_outbound_item *item; item = malloc(sizeof(nghttp2_outbound_item)); @@ -342,33 +391,26 @@ int nghttp2_session_add_frame(nghttp2_session *session, } } break; - case NGHTTP2_PRIORITY: { - stream = nghttp2_session_get_stream(session, frame->hd.stream_id); - if(stream) { - item->pri = stream->pri; - } + case NGHTTP2_PRIORITY: + item->pri = -1; break; - } - case NGHTTP2_RST_STREAM: { + case NGHTTP2_RST_STREAM: stream = nghttp2_session_get_stream(session, frame->hd.stream_id); if(stream) { stream->state = NGHTTP2_STREAM_CLOSING; - item->pri = stream->pri; } - break; - } - case NGHTTP2_SETTINGS: - /* Should NGHTTP2_SETTINGS have higher priority? Yes */ item->pri = -1; break; - case NGHTTP2_PUSH_PROMISE: { + case NGHTTP2_SETTINGS: + item->pri = NGHTTP2_OB_PRI_SETTINGS; + break; + case NGHTTP2_PUSH_PROMISE: /* Use priority of associated stream */ stream = nghttp2_session_get_stream(session, frame->hd.stream_id); if(stream) { item->pri = stream->pri; } break; - } case NGHTTP2_PING: /* Ping has highest priority. */ item->pri = NGHTTP2_OB_PRI_PING; @@ -377,15 +419,7 @@ int nghttp2_session_add_frame(nghttp2_session *session, /* Should GOAWAY have higher priority? */ break; case NGHTTP2_WINDOW_UPDATE: - if(frame->hd.stream_id == 0) { - /* Connection level window update should have higher priority */ - item->pri = -1; - } else { - stream = nghttp2_session_get_stream(session, frame->hd.stream_id); - if(stream) { - item->pri = stream->pri; - } - } + item->pri = -1; break; } if(frame->hd.type == NGHTTP2_HEADERS && @@ -714,7 +748,17 @@ static int nghttp2_session_predicate_headers_send(nghttp2_session *session, static int nghttp2_session_predicate_priority_send (nghttp2_session *session, int32_t stream_id) { - return nghttp2_session_predicate_stream_frame_send(session, stream_id); + nghttp2_stream *stream; + stream = nghttp2_session_get_stream(session, stream_id); + if(stream == NULL) { + return NGHTTP2_ERR_STREAM_CLOSED; + } + /* Sending PRIORITY to reserved state is OK */ + if(stream->state != NGHTTP2_STREAM_CLOSING) { + return 0; + } else { + return NGHTTP2_ERR_STREAM_CLOSING; + } } /* @@ -1273,15 +1317,7 @@ static int nghttp2_session_after_frame_sent(nghttp2_session *session) break; } case NGHTTP2_PRIORITY: - // TODO Update priority of the stream if the stream is initiated - // by the local endpoint. The spec is not detailed about this. - if(nghttp2_session_is_my_stream_id(session, frame->hd.stream_id)) { - nghttp2_stream *stream; - stream = nghttp2_session_get_stream(session, frame->hd.stream_id); - if(stream) { - stream->pri = frame->priority.pri; - } - } + /* nothing to do */ break; case NGHTTP2_RST_STREAM: nghttp2_session_close_stream(session, frame->hd.stream_id, @@ -1804,17 +1840,12 @@ int nghttp2_session_on_priority_received(nghttp2_session *session, } stream = nghttp2_session_get_stream(session, frame->hd.stream_id); if(stream) { - if((stream->shut_flags & NGHTTP2_SHUT_RD) == 0) { - // Update the priority of the stream if the stream is initiated - // by the remote endpoint. - if(!nghttp2_session_is_my_stream_id(session, frame->hd.stream_id)) { - stream->pri = frame->priority.pri; - } - nghttp2_session_call_on_frame_received(session, frame); - } else { - return nghttp2_session_handle_invalid_stream(session, frame, - NGHTTP2_PROTOCOL_ERROR); + /* Only update priority on server side for now */ + if(session->server) { + nghttp2_session_reprioritize_stream(session, stream, + frame->priority.pri); } + nghttp2_session_call_on_frame_received(session, frame); } return 0; } diff --git a/lib/nghttp2_session.h b/lib/nghttp2_session.h index b2a7564e..163b5a7a 100644 --- a/lib/nghttp2_session.h +++ b/lib/nghttp2_session.h @@ -538,4 +538,10 @@ int nghttp2_session_update_local_settings(nghttp2_session *session, nghttp2_settings_entry *iv, size_t niv); +/* + * Re-prioritize |stream|. The new priority is |pri|. + */ +void nghttp2_session_reprioritize_stream +(nghttp2_session *session, nghttp2_stream *stream, int32_t pri); + #endif /* NGHTTP2_SESSION_H */ diff --git a/lib/nghttp2_submit.c b/lib/nghttp2_submit.c index 574c0ba6..0c97f00a 100644 --- a/lib/nghttp2_submit.c +++ b/lib/nghttp2_submit.c @@ -132,6 +132,13 @@ int nghttp2_submit_priority(nghttp2_session *session, int32_t stream_id, free(frame); return r; } + /* Only update priority if the sender is client for now */ + if(!session->server) { + nghttp2_stream *stream = nghttp2_session_get_stream(session, stream_id); + if(stream) { + nghttp2_session_reprioritize_stream(session, stream, pri); + } + } return 0; } diff --git a/tests/main.c b/tests/main.c index e25dd81f..900d74a1 100644 --- a/tests/main.c +++ b/tests/main.c @@ -67,6 +67,7 @@ int main(int argc, char* argv[]) /* add the tests to the suite */ if(!CU_add_test(pSuite, "pq", test_nghttp2_pq) || + !CU_add_test(pSuite, "pq_update", test_nghttp2_pq_update) || !CU_add_test(pSuite, "map", test_nghttp2_map) || !CU_add_test(pSuite, "map_functional", test_nghttp2_map_functional) || !CU_add_test(pSuite, "map_each_free", test_nghttp2_map_each_free) || @@ -117,8 +118,6 @@ int main(int argc, char* argv[]) test_nghttp2_session_send_headers_header_comp_error) || !CU_add_test(pSuite, "session_send_headers_push_reply", test_nghttp2_session_send_headers_push_reply) || - !CU_add_test(pSuite, "session_send_priority", - test_nghttp2_session_send_priority) || !CU_add_test(pSuite, "session_send_rst_stream", test_nghttp2_session_send_rst_stream) || !CU_add_test(pSuite, "session_send_push_promise", @@ -126,6 +125,8 @@ int main(int argc, char* argv[]) !CU_add_test(pSuite, "session_is_my_stream_id", test_nghttp2_session_is_my_stream_id) || !CU_add_test(pSuite, "session_upgrade", test_nghttp2_session_upgrade) || + !CU_add_test(pSuite, "session_reprioritize_stream", + test_nghttp2_session_reprioritize_stream) || !CU_add_test(pSuite, "submit_response", test_nghttp2_submit_response) || !CU_add_test(pSuite, "submit_response_without_data", test_nghttp2_submit_response_without_data) || diff --git a/tests/nghttp2_pq_test.c b/tests/nghttp2_pq_test.c index c03e3a0a..2d7d6a6f 100644 --- a/tests/nghttp2_pq_test.c +++ b/tests/nghttp2_pq_test.c @@ -77,3 +77,53 @@ void test_nghttp2_pq(void) nghttp2_pq_free(&pq); } +typedef struct { + int key; + int val; +} node; + +static int node_compar(const void *lhs, const void *rhs) +{ + node *ln = (node*)lhs; + node *rn = (node*)rhs; + return ln->key - rn->key; +} + +static int node_update(void *item, void *arg) +{ + node *nd = (node*)item; + if((nd->key % 2) == 0) { + nd->key *= -1; + return 1; + } else { + return 0; + } +} + +void test_nghttp2_pq_update(void) +{ + nghttp2_pq pq; + node nodes[10]; + size_t i; + node *nd; + int ans[] = {-8, -6, -4, -2, 0, 1, 3, 5, 7, 9}; + + nghttp2_pq_init(&pq, node_compar); + + for(i = 0; i < sizeof(nodes)/sizeof(nodes[0]); ++i) { + nodes[i].key = i; + nodes[i].val = i; + nghttp2_pq_push(&pq, &nodes[i]); + } + + nghttp2_pq_update(&pq, node_update, NULL); + + for(i = 0; i < sizeof(nodes)/sizeof(nodes[0]); ++i) { + nd = nghttp2_pq_top(&pq); + CU_ASSERT(ans[i] == nd->key); + nghttp2_pq_pop(&pq); + } + + nghttp2_pq_free(&pq); +} + diff --git a/tests/nghttp2_pq_test.h b/tests/nghttp2_pq_test.h index bd5cfc43..1f78cb7e 100644 --- a/tests/nghttp2_pq_test.h +++ b/tests/nghttp2_pq_test.h @@ -26,5 +26,6 @@ #define NGHTTP2_PQ_TEST_H void test_nghttp2_pq(void); +void test_nghttp2_pq_update(void); #endif /* NGHTTP2_PQ_TEST_H */ diff --git a/tests/nghttp2_session_test.c b/tests/nghttp2_session_test.c index d44a4779..a4e28a96 100644 --- a/tests/nghttp2_session_test.c +++ b/tests/nghttp2_session_test.c @@ -1344,29 +1344,6 @@ void test_nghttp2_session_send_headers_push_reply(void) nghttp2_session_del(session); } -void test_nghttp2_session_send_priority(void) -{ - nghttp2_session *session; - nghttp2_session_callbacks callbacks; - my_user_data user_data; - nghttp2_frame *frame; - memset(&callbacks, 0, sizeof(nghttp2_session_callbacks)); - callbacks.send_callback = null_send_callback; - nghttp2_session_client_new(&session, &callbacks, &user_data); - nghttp2_session_open_stream(session, 1, NGHTTP2_FLAG_NONE, - NGHTTP2_PRI_DEFAULT, - NGHTTP2_STREAM_OPENING, NULL); - - frame = malloc(sizeof(nghttp2_frame)); - nghttp2_frame_priority_init(&frame->priority, 1, 1000000007); - nghttp2_session_add_frame(session, NGHTTP2_CAT_CTRL, frame, NULL); - CU_ASSERT(0 == nghttp2_session_send(session)); - - CU_ASSERT(1000000007 == nghttp2_session_get_stream(session, 1)->pri); - - nghttp2_session_del(session); -} - void test_nghttp2_session_send_rst_stream(void) { nghttp2_session *session; @@ -1544,6 +1521,58 @@ void test_nghttp2_session_upgrade(void) nghttp2_session_del(session); } +void test_nghttp2_session_reprioritize_stream(void) +{ + nghttp2_session *session; + nghttp2_session_callbacks callbacks; + my_user_data ud; + const char *nv[] = {NULL}; + nghttp2_stream *stream; + nghttp2_outbound_item *item; + + memset(&callbacks, 0, sizeof(nghttp2_session_callbacks)); + callbacks.send_callback = block_count_send_callback; + + nghttp2_session_server_new(&session, &callbacks, &ud); + nghttp2_session_open_stream(session, 1, NGHTTP2_FLAG_NONE, + 5000, + NGHTTP2_STREAM_OPENING, NULL); + stream = nghttp2_session_open_stream(session, 3, NGHTTP2_FLAG_NONE, + NGHTTP2_PRI_DEFAULT, + NGHTTP2_STREAM_OPENING, NULL); + + CU_ASSERT(0 == nghttp2_submit_push_promise(session, + NGHTTP2_FLAG_END_PUSH_PROMISE, + 3, nv)); + ud.block_count = 0; + CU_ASSERT(0 == nghttp2_session_send(session)); + /* Now PUSH_PROMISE is in aob */ + + CU_ASSERT(0 == nghttp2_submit_response(session, 1, nv, NULL)); + CU_ASSERT(0 == nghttp2_submit_response(session, 3, nv, NULL)); + + nghttp2_session_reprioritize_stream(session, stream, 120); + + CU_ASSERT(session->aob.item != NULL); + CU_ASSERT(120 == session->aob.item->pri); + CU_ASSERT(120 == stream->pri); + CU_ASSERT(5000 == nghttp2_session_get_stream(session, 1)->pri); + item = nghttp2_session_get_next_ob_item(session); + CU_ASSERT(120 == item->pri); + CU_ASSERT(NGHTTP2_HEADERS == OB_CTRL_TYPE(item)); + CU_ASSERT(3 == OB_CTRL(item)->hd.stream_id); + + nghttp2_session_del(session); + + /* Check aob.item == NULL case */ + nghttp2_session_server_new(&session, &callbacks, &ud); + stream = nghttp2_session_open_stream(session, 1, NGHTTP2_FLAG_NONE, + NGHTTP2_PRI_DEFAULT, + NGHTTP2_STREAM_OPENING, NULL); + nghttp2_session_reprioritize_stream(session, stream, 120); + nghttp2_session_del(session); +} + void test_nghttp2_submit_response(void) { nghttp2_session *session; @@ -1841,17 +1870,21 @@ void test_nghttp2_submit_priority(void) { nghttp2_session *session; nghttp2_session_callbacks callbacks; - my_user_data ud; + nghttp2_stream *stream; memset(&callbacks, 0, sizeof(nghttp2_session_callbacks)); callbacks.send_callback = null_send_callback; callbacks.on_frame_send_callback = on_frame_send_callback; - nghttp2_session_server_new(&session, &callbacks, &ud); + nghttp2_session_client_new(&session, &callbacks, NULL); + stream = nghttp2_session_open_stream(session, 1, NGHTTP2_FLAG_NONE, + NGHTTP2_PRI_DEFAULT, + NGHTTP2_STREAM_OPENING, NULL); CU_ASSERT(NGHTTP2_ERR_INVALID_ARGUMENT == nghttp2_submit_priority(session, 1, -1)); CU_ASSERT(0 == nghttp2_submit_priority(session, 1, 1000000007)); + CU_ASSERT(1000000007 == stream->pri); nghttp2_session_del(session); } @@ -3008,7 +3041,7 @@ void test_nghttp2_session_on_ctrl_not_send(void) CU_ASSERT(0 == nghttp2_session_send(session)); CU_ASSERT(1 == user_data.frame_not_send_cb_called); CU_ASSERT(NGHTTP2_HEADERS == user_data.not_sent_frame_type); - CU_ASSERT(NGHTTP2_ERR_STREAM_CLOSING == user_data.not_sent_error); + CU_ASSERT(NGHTTP2_ERR_STREAM_CLOSED == user_data.not_sent_error); stream = nghttp2_session_open_stream(session, 3, NGHTTP2_FLAG_NONE, NGHTTP2_PRI_DEFAULT, @@ -3024,7 +3057,7 @@ void test_nghttp2_session_on_ctrl_not_send(void) CU_ASSERT(0 == nghttp2_session_send(session)); CU_ASSERT(1 == user_data.frame_not_send_cb_called); CU_ASSERT(NGHTTP2_HEADERS == user_data.not_sent_frame_type); - CU_ASSERT(NGHTTP2_ERR_STREAM_CLOSING == user_data.not_sent_error); + CU_ASSERT(NGHTTP2_ERR_STREAM_CLOSED == user_data.not_sent_error); nghttp2_session_del(session); diff --git a/tests/nghttp2_session_test.h b/tests/nghttp2_session_test.h index 3a5584cf..ed59bee8 100644 --- a/tests/nghttp2_session_test.h +++ b/tests/nghttp2_session_test.h @@ -47,11 +47,11 @@ void test_nghttp2_session_send_headers_start_stream(void); void test_nghttp2_session_send_headers_reply(void); void test_nghttp2_session_send_headers_header_comp_error(void); void test_nghttp2_session_send_headers_push_reply(void); -void test_nghttp2_session_send_priority(void); void test_nghttp2_session_send_rst_stream(void); void test_nghttp2_session_send_push_promise(void); void test_nghttp2_session_is_my_stream_id(void); void test_nghttp2_session_upgrade(void); +void test_nghttp2_session_reprioritize_stream(void); void test_nghttp2_submit_response(void); void test_nghttp2_submit_response_without_data(void); void test_nghttp2_submit_request_with_data(void);