Rewrite re-prioritization with PRIORITY

This commit is contained in:
Tatsuhiro Tsujikawa 2013-08-09 23:40:41 +09:00
parent 0dec04921d
commit 1155606d5e
11 changed files with 231 additions and 73 deletions

View File

@ -34,6 +34,8 @@
/* Priority for PING */
#define NGHTTP2_OB_PRI_PING -10
/* Priority for SETTINGS */
#define NGHTTP2_OB_PRI_SETTINGS -9
typedef struct {
nghttp2_data_provider *data_prd;

View File

@ -126,3 +126,20 @@ size_t nghttp2_pq_size(nghttp2_pq *pq)
{
return pq->length;
}
void nghttp2_pq_update(nghttp2_pq *pq, nghttp2_pq_item_cb fun, void *arg)
{
size_t i;
int rv = 0;
if(pq->length == 0) {
return;
}
for(i = 0; i < pq->length; ++i) {
rv |= (*fun)(pq->q[i], arg);
}
if(rv) {
for(i = pq->length; i > 0; --i) {
bubble_down(pq, i - 1);
}
}
}

View File

@ -96,4 +96,14 @@ int nghttp2_pq_empty(nghttp2_pq *pq);
*/
size_t nghttp2_pq_size(nghttp2_pq *pq);
typedef int (*nghttp2_pq_item_cb)(void *item, void *arg);
/*
* Updates each item in |pq| using function |fun| and re-construct
* priority queue. The |fun| must return non-zero if it modifies the
* item in a way that it affects ordering in the priority queue. The
* |arg| is passed to the 2nd parameter of |fun|.
*/
void nghttp2_pq_update(nghttp2_pq *pq, nghttp2_pq_item_cb fun, void *arg);
#endif /* NGHTTP2_PQ_H */

View File

@ -308,11 +308,60 @@ void nghttp2_session_del(nghttp2_session *session)
free(session);
}
static int outbound_item_update_pri
(nghttp2_outbound_item *item, nghttp2_stream *stream)
{
if(item->frame_cat == NGHTTP2_CAT_CTRL) {
if(((nghttp2_frame*)item->frame)->hd.stream_id != stream->stream_id) {
return 0;
}
switch(((nghttp2_frame*)item->frame)->hd.type) {
case NGHTTP2_HEADERS:
case NGHTTP2_PUSH_PROMISE:
break;
default:
return 0;
}
} else {
if(((nghttp2_data*)item->frame)->hd.stream_id != stream->stream_id) {
return 0;
}
}
item->pri = stream->pri;
return 1;
}
static int update_stream_pri(void *ptr, void *arg)
{
nghttp2_outbound_item *item = (nghttp2_outbound_item*)ptr;
nghttp2_stream *stream = (nghttp2_stream*)arg;
return outbound_item_update_pri(item, stream);
}
void nghttp2_session_reprioritize_stream
(nghttp2_session *session, nghttp2_stream *stream, int32_t pri)
{
if(stream->pri == pri) {
return;
}
stream->pri = pri;
nghttp2_pq_update(&session->ob_pq, update_stream_pri, stream);
nghttp2_pq_update(&session->ob_ss_pq, update_stream_pri, stream);
if(stream->deferred_data) {
stream->deferred_data->pri = pri;
}
if(session->aob.item) {
outbound_item_update_pri(session->aob.item, stream);
}
}
int nghttp2_session_add_frame(nghttp2_session *session,
nghttp2_frame_category frame_cat,
void *abs_frame,
void *aux_data)
{
/* TODO Return error if stream is not found for the frame requiring
stream presence. */
int r = 0;
nghttp2_outbound_item *item;
item = malloc(sizeof(nghttp2_outbound_item));
@ -342,33 +391,26 @@ int nghttp2_session_add_frame(nghttp2_session *session,
}
}
break;
case NGHTTP2_PRIORITY: {
stream = nghttp2_session_get_stream(session, frame->hd.stream_id);
if(stream) {
item->pri = stream->pri;
}
case NGHTTP2_PRIORITY:
item->pri = -1;
break;
}
case NGHTTP2_RST_STREAM: {
case NGHTTP2_RST_STREAM:
stream = nghttp2_session_get_stream(session, frame->hd.stream_id);
if(stream) {
stream->state = NGHTTP2_STREAM_CLOSING;
item->pri = stream->pri;
}
break;
}
case NGHTTP2_SETTINGS:
/* Should NGHTTP2_SETTINGS have higher priority? Yes */
item->pri = -1;
break;
case NGHTTP2_PUSH_PROMISE: {
case NGHTTP2_SETTINGS:
item->pri = NGHTTP2_OB_PRI_SETTINGS;
break;
case NGHTTP2_PUSH_PROMISE:
/* Use priority of associated stream */
stream = nghttp2_session_get_stream(session, frame->hd.stream_id);
if(stream) {
item->pri = stream->pri;
}
break;
}
case NGHTTP2_PING:
/* Ping has highest priority. */
item->pri = NGHTTP2_OB_PRI_PING;
@ -377,15 +419,7 @@ int nghttp2_session_add_frame(nghttp2_session *session,
/* Should GOAWAY have higher priority? */
break;
case NGHTTP2_WINDOW_UPDATE:
if(frame->hd.stream_id == 0) {
/* Connection level window update should have higher priority */
item->pri = -1;
} else {
stream = nghttp2_session_get_stream(session, frame->hd.stream_id);
if(stream) {
item->pri = stream->pri;
}
}
item->pri = -1;
break;
}
if(frame->hd.type == NGHTTP2_HEADERS &&
@ -714,7 +748,17 @@ static int nghttp2_session_predicate_headers_send(nghttp2_session *session,
static int nghttp2_session_predicate_priority_send
(nghttp2_session *session, int32_t stream_id)
{
return nghttp2_session_predicate_stream_frame_send(session, stream_id);
nghttp2_stream *stream;
stream = nghttp2_session_get_stream(session, stream_id);
if(stream == NULL) {
return NGHTTP2_ERR_STREAM_CLOSED;
}
/* Sending PRIORITY to reserved state is OK */
if(stream->state != NGHTTP2_STREAM_CLOSING) {
return 0;
} else {
return NGHTTP2_ERR_STREAM_CLOSING;
}
}
/*
@ -1273,15 +1317,7 @@ static int nghttp2_session_after_frame_sent(nghttp2_session *session)
break;
}
case NGHTTP2_PRIORITY:
// TODO Update priority of the stream if the stream is initiated
// by the local endpoint. The spec is not detailed about this.
if(nghttp2_session_is_my_stream_id(session, frame->hd.stream_id)) {
nghttp2_stream *stream;
stream = nghttp2_session_get_stream(session, frame->hd.stream_id);
if(stream) {
stream->pri = frame->priority.pri;
}
}
/* nothing to do */
break;
case NGHTTP2_RST_STREAM:
nghttp2_session_close_stream(session, frame->hd.stream_id,
@ -1804,17 +1840,12 @@ int nghttp2_session_on_priority_received(nghttp2_session *session,
}
stream = nghttp2_session_get_stream(session, frame->hd.stream_id);
if(stream) {
if((stream->shut_flags & NGHTTP2_SHUT_RD) == 0) {
// Update the priority of the stream if the stream is initiated
// by the remote endpoint.
if(!nghttp2_session_is_my_stream_id(session, frame->hd.stream_id)) {
stream->pri = frame->priority.pri;
}
nghttp2_session_call_on_frame_received(session, frame);
} else {
return nghttp2_session_handle_invalid_stream(session, frame,
NGHTTP2_PROTOCOL_ERROR);
/* Only update priority on server side for now */
if(session->server) {
nghttp2_session_reprioritize_stream(session, stream,
frame->priority.pri);
}
nghttp2_session_call_on_frame_received(session, frame);
}
return 0;
}

View File

@ -538,4 +538,10 @@ int nghttp2_session_update_local_settings(nghttp2_session *session,
nghttp2_settings_entry *iv,
size_t niv);
/*
* Re-prioritize |stream|. The new priority is |pri|.
*/
void nghttp2_session_reprioritize_stream
(nghttp2_session *session, nghttp2_stream *stream, int32_t pri);
#endif /* NGHTTP2_SESSION_H */

View File

@ -132,6 +132,13 @@ int nghttp2_submit_priority(nghttp2_session *session, int32_t stream_id,
free(frame);
return r;
}
/* Only update priority if the sender is client for now */
if(!session->server) {
nghttp2_stream *stream = nghttp2_session_get_stream(session, stream_id);
if(stream) {
nghttp2_session_reprioritize_stream(session, stream, pri);
}
}
return 0;
}

View File

@ -67,6 +67,7 @@ int main(int argc, char* argv[])
/* add the tests to the suite */
if(!CU_add_test(pSuite, "pq", test_nghttp2_pq) ||
!CU_add_test(pSuite, "pq_update", test_nghttp2_pq_update) ||
!CU_add_test(pSuite, "map", test_nghttp2_map) ||
!CU_add_test(pSuite, "map_functional", test_nghttp2_map_functional) ||
!CU_add_test(pSuite, "map_each_free", test_nghttp2_map_each_free) ||
@ -117,8 +118,6 @@ int main(int argc, char* argv[])
test_nghttp2_session_send_headers_header_comp_error) ||
!CU_add_test(pSuite, "session_send_headers_push_reply",
test_nghttp2_session_send_headers_push_reply) ||
!CU_add_test(pSuite, "session_send_priority",
test_nghttp2_session_send_priority) ||
!CU_add_test(pSuite, "session_send_rst_stream",
test_nghttp2_session_send_rst_stream) ||
!CU_add_test(pSuite, "session_send_push_promise",
@ -126,6 +125,8 @@ int main(int argc, char* argv[])
!CU_add_test(pSuite, "session_is_my_stream_id",
test_nghttp2_session_is_my_stream_id) ||
!CU_add_test(pSuite, "session_upgrade", test_nghttp2_session_upgrade) ||
!CU_add_test(pSuite, "session_reprioritize_stream",
test_nghttp2_session_reprioritize_stream) ||
!CU_add_test(pSuite, "submit_response", test_nghttp2_submit_response) ||
!CU_add_test(pSuite, "submit_response_without_data",
test_nghttp2_submit_response_without_data) ||

View File

@ -77,3 +77,53 @@ void test_nghttp2_pq(void)
nghttp2_pq_free(&pq);
}
typedef struct {
int key;
int val;
} node;
static int node_compar(const void *lhs, const void *rhs)
{
node *ln = (node*)lhs;
node *rn = (node*)rhs;
return ln->key - rn->key;
}
static int node_update(void *item, void *arg)
{
node *nd = (node*)item;
if((nd->key % 2) == 0) {
nd->key *= -1;
return 1;
} else {
return 0;
}
}
void test_nghttp2_pq_update(void)
{
nghttp2_pq pq;
node nodes[10];
size_t i;
node *nd;
int ans[] = {-8, -6, -4, -2, 0, 1, 3, 5, 7, 9};
nghttp2_pq_init(&pq, node_compar);
for(i = 0; i < sizeof(nodes)/sizeof(nodes[0]); ++i) {
nodes[i].key = i;
nodes[i].val = i;
nghttp2_pq_push(&pq, &nodes[i]);
}
nghttp2_pq_update(&pq, node_update, NULL);
for(i = 0; i < sizeof(nodes)/sizeof(nodes[0]); ++i) {
nd = nghttp2_pq_top(&pq);
CU_ASSERT(ans[i] == nd->key);
nghttp2_pq_pop(&pq);
}
nghttp2_pq_free(&pq);
}

View File

@ -26,5 +26,6 @@
#define NGHTTP2_PQ_TEST_H
void test_nghttp2_pq(void);
void test_nghttp2_pq_update(void);
#endif /* NGHTTP2_PQ_TEST_H */

View File

@ -1344,29 +1344,6 @@ void test_nghttp2_session_send_headers_push_reply(void)
nghttp2_session_del(session);
}
void test_nghttp2_session_send_priority(void)
{
nghttp2_session *session;
nghttp2_session_callbacks callbacks;
my_user_data user_data;
nghttp2_frame *frame;
memset(&callbacks, 0, sizeof(nghttp2_session_callbacks));
callbacks.send_callback = null_send_callback;
nghttp2_session_client_new(&session, &callbacks, &user_data);
nghttp2_session_open_stream(session, 1, NGHTTP2_FLAG_NONE,
NGHTTP2_PRI_DEFAULT,
NGHTTP2_STREAM_OPENING, NULL);
frame = malloc(sizeof(nghttp2_frame));
nghttp2_frame_priority_init(&frame->priority, 1, 1000000007);
nghttp2_session_add_frame(session, NGHTTP2_CAT_CTRL, frame, NULL);
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(1000000007 == nghttp2_session_get_stream(session, 1)->pri);
nghttp2_session_del(session);
}
void test_nghttp2_session_send_rst_stream(void)
{
nghttp2_session *session;
@ -1544,6 +1521,58 @@ void test_nghttp2_session_upgrade(void)
nghttp2_session_del(session);
}
void test_nghttp2_session_reprioritize_stream(void)
{
nghttp2_session *session;
nghttp2_session_callbacks callbacks;
my_user_data ud;
const char *nv[] = {NULL};
nghttp2_stream *stream;
nghttp2_outbound_item *item;
memset(&callbacks, 0, sizeof(nghttp2_session_callbacks));
callbacks.send_callback = block_count_send_callback;
nghttp2_session_server_new(&session, &callbacks, &ud);
nghttp2_session_open_stream(session, 1, NGHTTP2_FLAG_NONE,
5000,
NGHTTP2_STREAM_OPENING, NULL);
stream = nghttp2_session_open_stream(session, 3, NGHTTP2_FLAG_NONE,
NGHTTP2_PRI_DEFAULT,
NGHTTP2_STREAM_OPENING, NULL);
CU_ASSERT(0 == nghttp2_submit_push_promise(session,
NGHTTP2_FLAG_END_PUSH_PROMISE,
3, nv));
ud.block_count = 0;
CU_ASSERT(0 == nghttp2_session_send(session));
/* Now PUSH_PROMISE is in aob */
CU_ASSERT(0 == nghttp2_submit_response(session, 1, nv, NULL));
CU_ASSERT(0 == nghttp2_submit_response(session, 3, nv, NULL));
nghttp2_session_reprioritize_stream(session, stream, 120);
CU_ASSERT(session->aob.item != NULL);
CU_ASSERT(120 == session->aob.item->pri);
CU_ASSERT(120 == stream->pri);
CU_ASSERT(5000 == nghttp2_session_get_stream(session, 1)->pri);
item = nghttp2_session_get_next_ob_item(session);
CU_ASSERT(120 == item->pri);
CU_ASSERT(NGHTTP2_HEADERS == OB_CTRL_TYPE(item));
CU_ASSERT(3 == OB_CTRL(item)->hd.stream_id);
nghttp2_session_del(session);
/* Check aob.item == NULL case */
nghttp2_session_server_new(&session, &callbacks, &ud);
stream = nghttp2_session_open_stream(session, 1, NGHTTP2_FLAG_NONE,
NGHTTP2_PRI_DEFAULT,
NGHTTP2_STREAM_OPENING, NULL);
nghttp2_session_reprioritize_stream(session, stream, 120);
nghttp2_session_del(session);
}
void test_nghttp2_submit_response(void)
{
nghttp2_session *session;
@ -1841,17 +1870,21 @@ void test_nghttp2_submit_priority(void)
{
nghttp2_session *session;
nghttp2_session_callbacks callbacks;
my_user_data ud;
nghttp2_stream *stream;
memset(&callbacks, 0, sizeof(nghttp2_session_callbacks));
callbacks.send_callback = null_send_callback;
callbacks.on_frame_send_callback = on_frame_send_callback;
nghttp2_session_server_new(&session, &callbacks, &ud);
nghttp2_session_client_new(&session, &callbacks, NULL);
stream = nghttp2_session_open_stream(session, 1, NGHTTP2_FLAG_NONE,
NGHTTP2_PRI_DEFAULT,
NGHTTP2_STREAM_OPENING, NULL);
CU_ASSERT(NGHTTP2_ERR_INVALID_ARGUMENT ==
nghttp2_submit_priority(session, 1, -1));
CU_ASSERT(0 == nghttp2_submit_priority(session, 1, 1000000007));
CU_ASSERT(1000000007 == stream->pri);
nghttp2_session_del(session);
}
@ -3008,7 +3041,7 @@ void test_nghttp2_session_on_ctrl_not_send(void)
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(1 == user_data.frame_not_send_cb_called);
CU_ASSERT(NGHTTP2_HEADERS == user_data.not_sent_frame_type);
CU_ASSERT(NGHTTP2_ERR_STREAM_CLOSING == user_data.not_sent_error);
CU_ASSERT(NGHTTP2_ERR_STREAM_CLOSED == user_data.not_sent_error);
stream = nghttp2_session_open_stream(session, 3, NGHTTP2_FLAG_NONE,
NGHTTP2_PRI_DEFAULT,
@ -3024,7 +3057,7 @@ void test_nghttp2_session_on_ctrl_not_send(void)
CU_ASSERT(0 == nghttp2_session_send(session));
CU_ASSERT(1 == user_data.frame_not_send_cb_called);
CU_ASSERT(NGHTTP2_HEADERS == user_data.not_sent_frame_type);
CU_ASSERT(NGHTTP2_ERR_STREAM_CLOSING == user_data.not_sent_error);
CU_ASSERT(NGHTTP2_ERR_STREAM_CLOSED == user_data.not_sent_error);
nghttp2_session_del(session);

View File

@ -47,11 +47,11 @@ void test_nghttp2_session_send_headers_start_stream(void);
void test_nghttp2_session_send_headers_reply(void);
void test_nghttp2_session_send_headers_header_comp_error(void);
void test_nghttp2_session_send_headers_push_reply(void);
void test_nghttp2_session_send_priority(void);
void test_nghttp2_session_send_rst_stream(void);
void test_nghttp2_session_send_push_promise(void);
void test_nghttp2_session_is_my_stream_id(void);
void test_nghttp2_session_upgrade(void);
void test_nghttp2_session_reprioritize_stream(void);
void test_nghttp2_submit_response(void);
void test_nghttp2_submit_response_without_data(void);
void test_nghttp2_submit_request_with_data(void);