diff --git a/lib/includes/spdylay/spdylay.h b/lib/includes/spdylay/spdylay.h index 5d37308d..b5b03c7f 100644 --- a/lib/includes/spdylay/spdylay.h +++ b/lib/includes/spdylay/spdylay.h @@ -99,6 +99,12 @@ typedef enum { SPDYLAY_SETTINGS_INITIAL_WINDOW_SIZE = 7 } spdylay_settings_id; +/* Maximum ID of spdylay_settings_id. */ +#define SPDYLAY_SETTINGS_MAX 7 + +/* Default maximum concurrent streams */ +#define SPDYLAY_CONCURRENT_STREAMS_MAX 100 + typedef enum { SPDYLAY_OK = 0, SPDYLAY_PROTOCOL_ERROR = 1, diff --git a/lib/spdylay_session.c b/lib/spdylay_session.c index 26f8768a..57501fd1 100644 --- a/lib/spdylay_session.c +++ b/lib/spdylay_session.c @@ -109,6 +109,20 @@ static int spdylay_session_new(spdylay_session **session_ptr, free(*session_ptr); return r; } + r = spdylay_pq_init(&(*session_ptr)->ob_ss_pq, spdylay_outbound_item_compar); + if(r != 0) { + spdylay_pq_free(&(*session_ptr)->ob_pq); + spdylay_map_free(&(*session_ptr)->streams); + spdylay_zlib_inflate_free(&(*session_ptr)->hd_inflater); + spdylay_zlib_deflate_free(&(*session_ptr)->hd_deflater); + free(*session_ptr); + return r; + } + + memset((*session_ptr)->settings, 0, sizeof((*session_ptr)->settings)); + (*session_ptr)->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = + SPDYLAY_CONCURRENT_STREAMS_MAX; + (*session_ptr)->callbacks = *callbacks; (*session_ptr)->user_data = user_data; @@ -156,7 +170,7 @@ static void spdylay_free_streams(key_type key, void *val) free(val); } -static void spdylay_outbound_item_free(spdylay_outbound_item *item) +void spdylay_outbound_item_free(spdylay_outbound_item *item) { if(item == NULL) { return; @@ -196,18 +210,26 @@ static void spdylay_outbound_item_free(spdylay_outbound_item *item) free(item->aux_data); } -void spdylay_session_del(spdylay_session *session) +static void spdylay_session_ob_pq_free(spdylay_pq *pq) { - spdylay_map_each(&session->streams, spdylay_free_streams); - spdylay_map_free(&session->streams); - while(!spdylay_pq_empty(&session->ob_pq)) { - spdylay_outbound_item *item = (spdylay_outbound_item*) - spdylay_pq_top(&session->ob_pq); + while(!spdylay_pq_empty(pq)) { + spdylay_outbound_item *item = (spdylay_outbound_item*)spdylay_pq_top(pq); spdylay_outbound_item_free(item); free(item); - spdylay_pq_pop(&session->ob_pq); + spdylay_pq_pop(pq); } - spdylay_pq_free(&session->ob_pq); + spdylay_pq_free(pq); +} + +void spdylay_session_del(spdylay_session *session) +{ + if(session == NULL) { + return; + } + spdylay_map_each(&session->streams, spdylay_free_streams); + spdylay_map_free(&session->streams); + spdylay_session_ob_pq_free(&session->ob_pq); + spdylay_session_ob_pq_free(&session->ob_ss_pq); spdylay_zlib_deflate_free(&session->hd_deflater); spdylay_zlib_inflate_free(&session->hd_inflater); free(session->iframe.buf); @@ -279,7 +301,11 @@ int spdylay_session_add_frame(spdylay_session *session, break; } }; - r = spdylay_pq_push(&session->ob_pq, item); + if(frame_type == SPDYLAY_SYN_STREAM) { + r = spdylay_pq_push(&session->ob_ss_pq, item); + } else { + r = spdylay_pq_push(&session->ob_pq, item); + } if(r != 0) { free(item); return r; @@ -531,6 +557,86 @@ spdylay_outbound_item* spdylay_session_get_ob_pq_top return (spdylay_outbound_item*)spdylay_pq_top(&session->ob_pq); } +spdylay_outbound_item* spdylay_session_get_next_ob_item +(spdylay_session *session) +{ + if(spdylay_pq_empty(&session->ob_pq)) { + if(spdylay_pq_empty(&session->ob_ss_pq)) { + return NULL; + } else { + /* Return item only when concurrent connection limit is not + reached */ + if(session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] + > spdylay_map_size(&session->streams)) { + return spdylay_pq_top(&session->ob_ss_pq); + } else { + return NULL; + } + } + } else { + if(spdylay_pq_empty(&session->ob_ss_pq)) { + return spdylay_pq_top(&session->ob_pq); + } else { + spdylay_outbound_item *item, *syn_stream_item; + item = spdylay_pq_top(&session->ob_pq); + syn_stream_item = spdylay_pq_top(&session->ob_ss_pq); + if(session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] + <= spdylay_map_size(&session->streams) || + item->pri < syn_stream_item->pri || + (item->pri == syn_stream_item->pri && + item->seq < syn_stream_item->seq)) { + return item; + } else { + return syn_stream_item; + } + } + } +} + +spdylay_outbound_item* spdylay_session_pop_next_ob_item +(spdylay_session *session) +{ + if(spdylay_pq_empty(&session->ob_pq)) { + if(spdylay_pq_empty(&session->ob_ss_pq)) { + return NULL; + } else { + /* Pop item only when concurrent connection limit is not + reached */ + if(session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] + > spdylay_map_size(&session->streams)) { + spdylay_outbound_item *item; + item = spdylay_pq_top(&session->ob_ss_pq); + spdylay_pq_pop(&session->ob_ss_pq); + return item; + } else { + return NULL; + } + } + } else { + if(spdylay_pq_empty(&session->ob_ss_pq)) { + spdylay_outbound_item *item; + item = spdylay_pq_top(&session->ob_pq); + spdylay_pq_pop(&session->ob_pq); + return item; + } else { + spdylay_outbound_item *item, *syn_stream_item; + item = spdylay_pq_top(&session->ob_pq); + syn_stream_item = spdylay_pq_top(&session->ob_ss_pq); + if(session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] + <= spdylay_map_size(&session->streams) || + item->pri < syn_stream_item->pri || + (item->pri == syn_stream_item->pri && + item->seq < syn_stream_item->seq)) { + spdylay_pq_pop(&session->ob_pq); + return item; + } else { + spdylay_pq_pop(&session->ob_ss_pq); + return syn_stream_item; + } + } + } +} + static int spdylay_session_after_frame_sent(spdylay_session *session) { /* TODO handle FIN flag. */ @@ -645,31 +751,32 @@ static int spdylay_session_after_frame_sent(spdylay_session *session) int r; if(frame->data.flags & SPDYLAY_FLAG_FIN) { spdylay_active_outbound_item_reset(&session->aob); - } else if(spdylay_pq_empty(&session->ob_pq) || - session->aob.item->pri <= - spdylay_session_get_ob_pq_top(session)->pri) { - /* If priority of this stream is higher or equal to other stream - waiting at the top of the queue, we continue to send this - data. */ - /* We assume that buffer has at least - SPDYLAY_DATA_FRAME_LENGTH. */ - r = spdylay_session_pack_data_overwrite(session, - session->aob.framebuf, - SPDYLAY_DATA_FRAME_LENGTH, - &frame->data); - if(r < 0) { - spdylay_active_outbound_item_reset(&session->aob); - return r; - } - session->aob.framebufoff = 0; } else { - r = spdylay_pq_push(&session->ob_pq, session->aob.item); - if(r == 0) { - session->aob.item = NULL; - spdylay_active_outbound_item_reset(&session->aob); + spdylay_outbound_item* item = spdylay_session_get_next_ob_item(session); + if(item == NULL || session->aob.item->pri <= item->pri) { + /* If priority of this stream is higher or equal to other stream + waiting at the top of the queue, we continue to send this + data. */ + /* We assume that buffer has at least + SPDYLAY_DATA_FRAME_LENGTH. */ + r = spdylay_session_pack_data_overwrite(session, + session->aob.framebuf, + SPDYLAY_DATA_FRAME_LENGTH, + &frame->data); + if(r < 0) { + spdylay_active_outbound_item_reset(&session->aob); + return r; + } + session->aob.framebufoff = 0; } else { - spdylay_active_outbound_item_reset(&session->aob); - return r; + r = spdylay_pq_push(&session->ob_pq, session->aob.item); + if(r == 0) { + session->aob.item = NULL; + spdylay_active_outbound_item_reset(&session->aob); + } else { + spdylay_active_outbound_item_reset(&session->aob); + return r; + } } } } else { @@ -681,15 +788,18 @@ static int spdylay_session_after_frame_sent(spdylay_session *session) int spdylay_session_send(spdylay_session *session) { int r; - while(session->aob.item || !spdylay_pq_empty(&session->ob_pq)) { + while(1) { const uint8_t *data; size_t datalen; ssize_t sentlen; if(session->aob.item == NULL) { - spdylay_outbound_item *item = spdylay_pq_top(&session->ob_pq); + spdylay_outbound_item *item; uint8_t *framebuf; ssize_t framebuflen; - spdylay_pq_pop(&session->ob_pq); + item = spdylay_session_pop_next_ob_item(session); + if(item == NULL) { + break; + } framebuflen = spdylay_session_prep_frame(session, item, &framebuf); if(framebuflen < 0) { /* TODO Call error callback? */ @@ -1323,13 +1433,16 @@ int spdylay_session_want_write(spdylay_session *session) { /* * Unless GOAWAY is sent or received, we want to write frames if - * there is pending ones. After GOAWAY is sent or received, we want - * to write frames if there is pending ones AND there are active - * frames. + * there is pending ones. If pending frame is SYN_STREAM and + * concurrent stream limit is reached, we don't want to write + * SYN_STREAM. After GOAWAY is sent or received, we want to write + * frames if there is pending ones AND there are active frames. */ - return (session->aob.item != NULL || !spdylay_pq_empty(&session->ob_pq)) && - (!session->goaway_flags || - spdylay_map_size(&session->streams) > 0); + return (session->aob.item != NULL || !spdylay_pq_empty(&session->ob_pq) || + (!spdylay_pq_empty(&session->ob_ss_pq) && + session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] + > spdylay_map_size(&session->streams))) && + (!session->goaway_flags || spdylay_map_size(&session->streams) > 0); } int spdylay_session_add_ping(spdylay_session *session, uint32_t unique_id) diff --git a/lib/spdylay_session.h b/lib/spdylay_session.h index 9b938bc2..740374b9 100644 --- a/lib/spdylay_session.h +++ b/lib/spdylay_session.h @@ -100,7 +100,10 @@ struct spdylay_session { int64_t next_seq; spdylay_map /* */ streams; + /* Queue for outbound frames other than SYN_STREAM */ spdylay_pq /* */ ob_pq; + /* Queue for outbound SYN_STREAM frame */ + spdylay_pq /* */ ob_ss_pq; spdylay_active_outbound_item aob; @@ -119,6 +122,10 @@ struct spdylay_session { /* This is the value in GOAWAY frame sent by remote endpoint. */ int32_t last_good_stream_id; + /* Settings value store. We just use ID as index. The index = 0 is + unused. */ + uint32_t settings[SPDYLAY_SETTINGS_MAX+1]; + spdylay_session_callbacks callbacks; void *user_data; }; @@ -293,4 +300,30 @@ uint32_t spdylay_session_get_next_unique_id(spdylay_session *session); */ spdylay_outbound_item* spdylay_session_get_ob_pq_top(spdylay_session *session); +/* + * Pops and returns next item to send. If there is no such item, + * returns NULL. This function takes into account max concurrent + * streams. That means if session->ob_pq is empty but + * session->ob_ss_pq has item and max concurrent streams is reached, + * then this function returns NULL. + */ +spdylay_outbound_item* spdylay_session_pop_next_ob_item +(spdylay_session *session); + +/* + * Returns next item to send. If there is no such item, this function + * returns NULL. This function takes into account max concurrent + * streams. That means if session->ob_pq is empty but + * session->ob_ss_pq has item and max concurrent streams is reached, + * then this function returns NULL. + */ +spdylay_outbound_item* spdylay_session_get_next_ob_item +(spdylay_session *session); + +/* + * Deallocates resource for |item|. If |item| is NULL, this function + * does nothing. + */ +void spdylay_outbound_item_free(spdylay_outbound_item *item); + #endif /* SPDYLAY_SESSION_H */ diff --git a/tests/main.c b/tests/main.c index 53d3232f..006f24d9 100644 --- a/tests/main.c +++ b/tests/main.c @@ -103,6 +103,10 @@ int main(int argc, char* argv[]) test_spdylay_session_is_my_stream_id) || !CU_add_test(pSuite, "session_send_rst_stream", test_spdylay_session_send_rst_stream) || + !CU_add_test(pSuite, "session_get_next_ob_item", + test_spdylay_session_get_next_ob_item) || + !CU_add_test(pSuite, "session_pop_next_ob_item", + test_spdylay_session_pop_next_ob_item) || !CU_add_test(pSuite, "frame_unpack_nv", test_spdylay_frame_unpack_nv) || !CU_add_test(pSuite, "frame_count_nv_space", test_spdylay_frame_count_nv_space) || diff --git a/tests/spdylay_session_test.c b/tests/spdylay_session_test.c index f4b42717..a67a9003 100644 --- a/tests/spdylay_session_test.c +++ b/tests/spdylay_session_test.c @@ -209,7 +209,7 @@ void test_spdylay_session_add_frame() CU_ASSERT(0 == spdylay_session_add_frame(session, SPDYLAY_SYN_STREAM, frame, aux_data)); - CU_ASSERT(0 == spdylay_pq_empty(&session->ob_pq)); + CU_ASSERT(0 == spdylay_pq_empty(&session->ob_ss_pq)); CU_ASSERT(0 == spdylay_session_send(session)); CU_ASSERT(memcmp(hd_ans1, acc.buf, 4) == 0); /* check stream id */ @@ -790,3 +790,96 @@ void test_spdylay_session_send_rst_stream() spdylay_session_del(session); } + +void test_spdylay_session_get_next_ob_item() +{ + spdylay_session *session; + spdylay_session_callbacks callbacks; + const char *nv[] = { NULL }; + memset(&callbacks, 0, sizeof(spdylay_session_callbacks)); + callbacks.send_callback = null_send_callback; + + spdylay_session_server_new(&session, &callbacks, NULL); + session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 2; + + CU_ASSERT(NULL == spdylay_session_get_next_ob_item(session)); + spdylay_submit_ping(session); + CU_ASSERT(SPDYLAY_PING == + spdylay_session_get_next_ob_item(session)->frame_type); + + spdylay_submit_request(session, 0, nv, NULL, NULL); + CU_ASSERT(SPDYLAY_PING == + spdylay_session_get_next_ob_item(session)->frame_type); + + CU_ASSERT(0 == spdylay_session_send(session)); + CU_ASSERT(NULL == spdylay_session_get_next_ob_item(session)); + + spdylay_session_open_stream(session, 1, SPDYLAY_FLAG_NONE, + 3, SPDYLAY_STREAM_OPENING, NULL); + + spdylay_submit_request(session, 0, nv, NULL, NULL); + CU_ASSERT(NULL == spdylay_session_get_next_ob_item(session)); + + spdylay_submit_response(session, 1, nv, NULL); + CU_ASSERT(SPDYLAY_SYN_REPLY == + spdylay_session_get_next_ob_item(session)->frame_type); + + session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 3; + + CU_ASSERT(SPDYLAY_SYN_STREAM == + spdylay_session_get_next_ob_item(session)->frame_type); + + spdylay_session_del(session); +} + +void test_spdylay_session_pop_next_ob_item() +{ + spdylay_session *session; + spdylay_session_callbacks callbacks; + const char *nv[] = { NULL }; + spdylay_outbound_item *item; + memset(&callbacks, 0, sizeof(spdylay_session_callbacks)); + callbacks.send_callback = null_send_callback; + + spdylay_session_server_new(&session, &callbacks, NULL); + session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 1; + + CU_ASSERT(NULL == spdylay_session_pop_next_ob_item(session)); + spdylay_submit_ping(session); + spdylay_submit_request(session, 0, nv, NULL, NULL); + + item = spdylay_session_pop_next_ob_item(session); + CU_ASSERT(SPDYLAY_PING == item->frame_type); + spdylay_outbound_item_free(item); + free(item); + + item = spdylay_session_pop_next_ob_item(session); + CU_ASSERT(SPDYLAY_SYN_STREAM == item->frame_type); + spdylay_outbound_item_free(item); + free(item); + + CU_ASSERT(NULL == spdylay_session_pop_next_ob_item(session)); + + spdylay_session_open_stream(session, 1, SPDYLAY_FLAG_NONE, + 3, SPDYLAY_STREAM_OPENING, NULL); + + spdylay_submit_request(session, 0, nv, NULL, NULL); + spdylay_submit_response(session, 1, nv, NULL); + + item = spdylay_session_pop_next_ob_item(session); + CU_ASSERT(SPDYLAY_SYN_REPLY == item->frame_type); + spdylay_outbound_item_free(item); + free(item); + + CU_ASSERT(NULL == spdylay_session_pop_next_ob_item(session)); + + spdylay_submit_response(session, 1, nv, NULL); + session->settings[SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS] = 2; + + item = spdylay_session_pop_next_ob_item(session); + CU_ASSERT(SPDYLAY_SYN_STREAM == item->frame_type); + spdylay_outbound_item_free(item); + free(item); + + spdylay_session_del(session); +} diff --git a/tests/spdylay_session_test.h b/tests/spdylay_session_test.h index 75134524..66efd7bb 100644 --- a/tests/spdylay_session_test.h +++ b/tests/spdylay_session_test.h @@ -43,5 +43,7 @@ void test_spdylay_session_on_data_received(); void test_spdylay_session_on_rst_received(); void test_spdylay_session_is_my_stream_id(); void test_spdylay_session_send_rst_stream(); +void test_spdylay_session_get_next_ob_item(); +void test_spdylay_session_pop_next_ob_item(); #endif // SPDYLAY_SESSION_TEST_H