Replace priority queue with linear queue where possible
After reviewing codebase, only queue for DATA frames requires priorities. Other frames can be replaced multiple linear queues. Replacing priority queue with linear queue allows us to simplify codebase a bit; for example, now nghttp2_session.next_seq is gone.
This commit is contained in:
parent
eb05777d88
commit
f2cf2b625c
|
@ -39,7 +39,8 @@
|
|||
} while (0)
|
||||
#endif
|
||||
|
||||
typedef int (*nghttp2_compar)(const void *lhs, const void *rhs);
|
||||
/* "less" function, return nonzero if |lhs| is less than |rhs|. */
|
||||
typedef int (*nghttp2_less)(const void *lhs, const void *rhs);
|
||||
|
||||
/* Internal error code. They must be in the range [-499, -100],
|
||||
inclusive. */
|
||||
|
|
|
@ -65,3 +65,32 @@ void nghttp2_outbound_item_free(nghttp2_outbound_item *item, nghttp2_mem *mem) {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void nghttp2_outbound_queue_init(nghttp2_outbound_queue *q) {
|
||||
q->head = q->tail = NULL;
|
||||
q->n = 0;
|
||||
}
|
||||
|
||||
void nghttp2_outbound_queue_push(nghttp2_outbound_queue *q,
|
||||
nghttp2_outbound_item *item) {
|
||||
if (q->tail) {
|
||||
q->tail = q->tail->qnext = item;
|
||||
} else {
|
||||
q->head = q->tail = item;
|
||||
}
|
||||
++q->n;
|
||||
}
|
||||
|
||||
void nghttp2_outbound_queue_pop(nghttp2_outbound_queue *q) {
|
||||
nghttp2_outbound_item *item;
|
||||
if (!q->head) {
|
||||
return;
|
||||
}
|
||||
item = q->head;
|
||||
q->head = q->head->qnext;
|
||||
item->qnext = NULL;
|
||||
if (!q->head) {
|
||||
q->tail = NULL;
|
||||
}
|
||||
--q->n;
|
||||
}
|
||||
|
|
|
@ -33,13 +33,6 @@
|
|||
#include "nghttp2_frame.h"
|
||||
#include "nghttp2_mem.h"
|
||||
|
||||
/* A bit higher priority for non-DATA frames */
|
||||
#define NGHTTP2_OB_EX_CYCLE 2
|
||||
/* Even more higher priority for SETTINGS frame */
|
||||
#define NGHTTP2_OB_SETTINGS_CYCLE 1
|
||||
/* Highest priority for PING frame */
|
||||
#define NGHTTP2_OB_PING_CYCLE 0
|
||||
|
||||
/* struct used for HEADERS and PUSH_PROMISE frame */
|
||||
typedef struct {
|
||||
nghttp2_data_provider data_prd;
|
||||
|
@ -104,10 +97,12 @@ typedef union {
|
|||
nghttp2_goaway_aux_data goaway;
|
||||
} nghttp2_aux_data;
|
||||
|
||||
typedef struct {
|
||||
struct nghttp2_outbound_item;
|
||||
typedef struct nghttp2_outbound_item nghttp2_outbound_item;
|
||||
|
||||
struct nghttp2_outbound_item {
|
||||
nghttp2_frame frame;
|
||||
nghttp2_aux_data aux_data;
|
||||
int64_t seq;
|
||||
/* The priority used in priority comparion. Smaller is served
|
||||
ealier. For PING, SETTINGS and non-DATA frames (excluding
|
||||
response HEADERS frame) have dedicated cycle value defined above.
|
||||
|
@ -116,9 +111,10 @@ typedef struct {
|
|||
that the amount of transmission is distributed across streams
|
||||
proportional to effective weight (inside a tree). */
|
||||
uint64_t cycle;
|
||||
nghttp2_outbound_item *qnext;
|
||||
/* nonzero if this object is queued. */
|
||||
uint8_t queued;
|
||||
} nghttp2_outbound_item;
|
||||
};
|
||||
|
||||
/*
|
||||
* Deallocates resource for |item|. If |item| is NULL, this function
|
||||
|
@ -126,4 +122,29 @@ typedef struct {
|
|||
*/
|
||||
void nghttp2_outbound_item_free(nghttp2_outbound_item *item, nghttp2_mem *mem);
|
||||
|
||||
/*
|
||||
* queue for nghttp2_outbound_item.
|
||||
*/
|
||||
typedef struct {
|
||||
nghttp2_outbound_item *head, *tail;
|
||||
/* number of items in this queue. */
|
||||
size_t n;
|
||||
} nghttp2_outbound_queue;
|
||||
|
||||
void nghttp2_outbound_queue_init(nghttp2_outbound_queue *q);
|
||||
|
||||
/* Pushes |item| into |q| */
|
||||
void nghttp2_outbound_queue_push(nghttp2_outbound_queue *q,
|
||||
nghttp2_outbound_item *item);
|
||||
|
||||
/* Pops |item| at the top from |q|. If |q| is empty, nothing
|
||||
happens. */
|
||||
void nghttp2_outbound_queue_pop(nghttp2_outbound_queue *q);
|
||||
|
||||
/* Returns the top item. */
|
||||
#define nghttp2_outbound_queue_top(Q) ((Q)->head)
|
||||
|
||||
/* Returns the size of the queue */
|
||||
#define nghttp2_outbound_queue_size(Q) ((Q)->n)
|
||||
|
||||
#endif /* NGHTTP2_OUTBOUND_ITEM_H */
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
*/
|
||||
#include "nghttp2_pq.h"
|
||||
|
||||
int nghttp2_pq_init(nghttp2_pq *pq, nghttp2_compar compar, nghttp2_mem *mem) {
|
||||
int nghttp2_pq_init(nghttp2_pq *pq, nghttp2_less less, nghttp2_mem *mem) {
|
||||
pq->mem = mem;
|
||||
pq->capacity = 128;
|
||||
pq->q = nghttp2_mem_malloc(mem, pq->capacity * sizeof(void *));
|
||||
|
@ -32,7 +32,7 @@ int nghttp2_pq_init(nghttp2_pq *pq, nghttp2_compar compar, nghttp2_mem *mem) {
|
|||
return NGHTTP2_ERR_NOMEM;
|
||||
}
|
||||
pq->length = 0;
|
||||
pq->compar = compar;
|
||||
pq->less = less;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -52,7 +52,7 @@ static void bubble_up(nghttp2_pq *pq, size_t index) {
|
|||
return;
|
||||
} else {
|
||||
size_t parent = (index - 1) / 2;
|
||||
if (pq->compar(pq->q[parent], pq->q[index]) > 0) {
|
||||
if (pq->less(pq->q[index], pq->q[parent])) {
|
||||
swap(pq, parent, index);
|
||||
bubble_up(pq, parent);
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ static void bubble_down(nghttp2_pq *pq, size_t index) {
|
|||
if (j >= pq->length) {
|
||||
break;
|
||||
}
|
||||
if (pq->compar(pq->q[minindex], pq->q[j]) > 0) {
|
||||
if (pq->less(pq->q[j], pq->q[minindex])) {
|
||||
minindex = j;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,8 +45,8 @@ typedef struct {
|
|||
/* The maximum number of items this pq can store. This is
|
||||
automatically extended when length is reached to this value. */
|
||||
size_t capacity;
|
||||
/* The compare function between items */
|
||||
nghttp2_compar compar;
|
||||
/* The less function between items */
|
||||
nghttp2_less less;
|
||||
} nghttp2_pq;
|
||||
|
||||
/*
|
||||
|
@ -58,7 +58,7 @@ typedef struct {
|
|||
* NGHTTP2_ERR_NOMEM
|
||||
* Out of memory.
|
||||
*/
|
||||
int nghttp2_pq_init(nghttp2_pq *pq, nghttp2_compar cmp, nghttp2_mem *mem);
|
||||
int nghttp2_pq_init(nghttp2_pq *pq, nghttp2_less less, nghttp2_mem *mem);
|
||||
|
||||
/*
|
||||
* Deallocates any resources allocated for |pq|. The stored items are
|
||||
|
|
|
@ -221,17 +221,13 @@ nghttp2_stream *nghttp2_session_get_stream_raw(nghttp2_session *session,
|
|||
return (nghttp2_stream *)nghttp2_map_find(&session->streams, stream_id);
|
||||
}
|
||||
|
||||
static int outbound_item_compar(const void *lhsx, const void *rhsx) {
|
||||
static int outbound_item_less(const void *lhsx, const void *rhsx) {
|
||||
const nghttp2_outbound_item *lhs, *rhs;
|
||||
|
||||
lhs = (const nghttp2_outbound_item *)lhsx;
|
||||
rhs = (const nghttp2_outbound_item *)rhsx;
|
||||
|
||||
if (lhs->cycle == rhs->cycle) {
|
||||
return (lhs->seq < rhs->seq) ? -1 : ((lhs->seq > rhs->seq) ? 1 : 0);
|
||||
}
|
||||
|
||||
return (lhs->cycle < rhs->cycle) ? -1 : 1;
|
||||
return (lhs->cycle < rhs->cycle) ? 1 : 0;
|
||||
}
|
||||
|
||||
static void session_inbound_frame_reset(nghttp2_session *session) {
|
||||
|
@ -333,15 +329,7 @@ static int session_new(nghttp2_session **session_ptr,
|
|||
/* next_stream_id is initialized in either
|
||||
nghttp2_session_client_new2 or nghttp2_session_server_new2 */
|
||||
|
||||
rv = nghttp2_pq_init(&(*session_ptr)->ob_pq, outbound_item_compar, mem);
|
||||
if (rv != 0) {
|
||||
goto fail_ob_pq;
|
||||
}
|
||||
rv = nghttp2_pq_init(&(*session_ptr)->ob_ss_pq, outbound_item_compar, mem);
|
||||
if (rv != 0) {
|
||||
goto fail_ob_ss_pq;
|
||||
}
|
||||
rv = nghttp2_pq_init(&(*session_ptr)->ob_da_pq, outbound_item_compar, mem);
|
||||
rv = nghttp2_pq_init(&(*session_ptr)->ob_da_pq, outbound_item_less, mem);
|
||||
if (rv != 0) {
|
||||
goto fail_ob_da_pq;
|
||||
}
|
||||
|
@ -361,11 +349,6 @@ static int session_new(nghttp2_session **session_ptr,
|
|||
|
||||
nghttp2_stream_roots_init(&(*session_ptr)->roots);
|
||||
|
||||
(*session_ptr)->next_seq = 0;
|
||||
/* Do +1 so that any HEADERS/DATA frames are scheduled after urgent
|
||||
frames. */
|
||||
(*session_ptr)->last_cycle = NGHTTP2_OB_EX_CYCLE + 1;
|
||||
|
||||
(*session_ptr)->remote_window_size = NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE;
|
||||
(*session_ptr)->recv_window_size = 0;
|
||||
(*session_ptr)->consumed_size = 0;
|
||||
|
@ -454,10 +437,6 @@ fail_hd_inflater:
|
|||
fail_hd_deflater:
|
||||
nghttp2_pq_free(&(*session_ptr)->ob_da_pq);
|
||||
fail_ob_da_pq:
|
||||
nghttp2_pq_free(&(*session_ptr)->ob_ss_pq);
|
||||
fail_ob_ss_pq:
|
||||
nghttp2_pq_free(&(*session_ptr)->ob_pq);
|
||||
fail_ob_pq:
|
||||
nghttp2_mem_free(mem, *session_ptr);
|
||||
fail_session:
|
||||
return rv;
|
||||
|
@ -563,6 +542,16 @@ static void ob_pq_free(nghttp2_pq *pq, nghttp2_mem *mem) {
|
|||
nghttp2_pq_free(pq);
|
||||
}
|
||||
|
||||
static void ob_q_free(nghttp2_outbound_queue *q, nghttp2_mem *mem) {
|
||||
nghttp2_outbound_item *item, *next;
|
||||
for (item = q->head; item;) {
|
||||
next = item->qnext;
|
||||
nghttp2_outbound_item_free(item, mem);
|
||||
nghttp2_mem_free(mem, item);
|
||||
item = next;
|
||||
}
|
||||
}
|
||||
|
||||
void nghttp2_session_del(nghttp2_session *session) {
|
||||
nghttp2_mem *mem;
|
||||
|
||||
|
@ -581,8 +570,9 @@ void nghttp2_session_del(nghttp2_session *session) {
|
|||
nghttp2_map_each_free(&session->streams, free_streams, session);
|
||||
nghttp2_map_free(&session->streams);
|
||||
|
||||
ob_pq_free(&session->ob_pq, mem);
|
||||
ob_pq_free(&session->ob_ss_pq, mem);
|
||||
ob_q_free(&session->ob_urgent, mem);
|
||||
ob_q_free(&session->ob_reg, mem);
|
||||
ob_q_free(&session->ob_syn, mem);
|
||||
ob_pq_free(&session->ob_da_pq, mem);
|
||||
active_outbound_item_reset(&session->aob, mem);
|
||||
session_inbound_frame_reset(session);
|
||||
|
@ -690,8 +680,8 @@ nghttp2_session_reprioritize_stream(nghttp2_session *session,
|
|||
|
||||
void nghttp2_session_outbound_item_init(nghttp2_session *session,
|
||||
nghttp2_outbound_item *item) {
|
||||
item->seq = session->next_seq++;
|
||||
item->cycle = NGHTTP2_OB_EX_CYCLE;
|
||||
item->cycle = 0;
|
||||
item->qnext = NULL;
|
||||
item->queued = 0;
|
||||
|
||||
memset(&item->aux_data, 0, sizeof(nghttp2_aux_data));
|
||||
|
@ -711,41 +701,20 @@ int nghttp2_session_add_item(nghttp2_session *session,
|
|||
if (frame->hd.type != NGHTTP2_DATA) {
|
||||
|
||||
switch (frame->hd.type) {
|
||||
case NGHTTP2_RST_STREAM:
|
||||
if (stream) {
|
||||
stream->state = NGHTTP2_STREAM_CLOSING;
|
||||
}
|
||||
|
||||
break;
|
||||
case NGHTTP2_SETTINGS:
|
||||
item->cycle = NGHTTP2_OB_SETTINGS_CYCLE;
|
||||
|
||||
break;
|
||||
case NGHTTP2_PING:
|
||||
/* Ping has highest priority. */
|
||||
item->cycle = NGHTTP2_OB_PING_CYCLE;
|
||||
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (frame->hd.type == NGHTTP2_HEADERS) {
|
||||
case NGHTTP2_HEADERS:
|
||||
/* We push request HEADERS and push response HEADERS to
|
||||
dedicated queue because their transmission is affected by
|
||||
SETTINGS_MAX_CONCURRENT_STREAMS */
|
||||
/* TODO If 2 HEADERS are submitted for reserved stream, then
|
||||
both of them are queued into ob_ss_pq, which is not
|
||||
both of them are queued into ob_syn, which is not
|
||||
desirable. */
|
||||
if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) {
|
||||
rv = nghttp2_pq_push(&session->ob_ss_pq, item);
|
||||
|
||||
if (rv != 0) {
|
||||
return rv;
|
||||
nghttp2_outbound_queue_push(&session->ob_syn, item);
|
||||
item->queued = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
item->queued = 1;
|
||||
} else if (stream && (stream->state == NGHTTP2_STREAM_RESERVED ||
|
||||
if (stream && (stream->state == NGHTTP2_STREAM_RESERVED ||
|
||||
item->aux_data.headers.attach_stream)) {
|
||||
item->cycle = session->last_cycle;
|
||||
|
||||
|
@ -754,22 +723,25 @@ int nghttp2_session_add_item(nghttp2_session *session,
|
|||
if (rv != 0) {
|
||||
return rv;
|
||||
}
|
||||
} else {
|
||||
rv = nghttp2_pq_push(&session->ob_pq, item);
|
||||
|
||||
if (rv != 0) {
|
||||
return rv;
|
||||
break;
|
||||
}
|
||||
|
||||
nghttp2_outbound_queue_push(&session->ob_reg, item);
|
||||
item->queued = 1;
|
||||
break;
|
||||
case NGHTTP2_SETTINGS:
|
||||
case NGHTTP2_PING:
|
||||
nghttp2_outbound_queue_push(&session->ob_urgent, item);
|
||||
item->queued = 1;
|
||||
break;
|
||||
case NGHTTP2_RST_STREAM:
|
||||
if (stream) {
|
||||
stream->state = NGHTTP2_STREAM_CLOSING;
|
||||
}
|
||||
} else {
|
||||
rv = nghttp2_pq_push(&session->ob_pq, item);
|
||||
|
||||
if (rv != 0) {
|
||||
return rv;
|
||||
}
|
||||
|
||||
/* fall through */
|
||||
default:
|
||||
nghttp2_outbound_queue_push(&session->ob_reg, item);
|
||||
item->queued = 1;
|
||||
}
|
||||
|
||||
|
@ -795,30 +767,6 @@ int nghttp2_session_add_item(nghttp2_session *session,
|
|||
return 0;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int32_t stream_id;
|
||||
uint32_t error_code;
|
||||
} nghttp2_rst_target;
|
||||
|
||||
static int cancel_pending_request(void *pq_item, void *arg) {
|
||||
nghttp2_outbound_item *item;
|
||||
nghttp2_rst_target *t;
|
||||
nghttp2_headers_aux_data *aux_data;
|
||||
|
||||
item = pq_item;
|
||||
t = arg;
|
||||
aux_data = &item->aux_data.headers;
|
||||
|
||||
if (item->frame.hd.stream_id != t->stream_id || aux_data->canceled) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
aux_data->error_code = t->error_code;
|
||||
aux_data->canceled = 1;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int nghttp2_session_add_rst_stream(nghttp2_session *session, int32_t stream_id,
|
||||
uint32_t error_code) {
|
||||
int rv;
|
||||
|
@ -826,7 +774,6 @@ int nghttp2_session_add_rst_stream(nghttp2_session *session, int32_t stream_id,
|
|||
nghttp2_frame *frame;
|
||||
nghttp2_stream *stream;
|
||||
nghttp2_mem *mem;
|
||||
nghttp2_rst_target t = {stream_id, error_code};
|
||||
|
||||
mem = &session->mem;
|
||||
stream = nghttp2_session_get_stream(session, stream_id);
|
||||
|
@ -834,21 +781,35 @@ int nghttp2_session_add_rst_stream(nghttp2_session *session, int32_t stream_id,
|
|||
return 0;
|
||||
}
|
||||
|
||||
/* Cancel pending request HEADERS in ob_ss_pq if this RST_STREAM
|
||||
/* Cancel pending request HEADERS in ob_syn if this RST_STREAM
|
||||
refers to that stream. */
|
||||
if (!session->server && nghttp2_session_is_my_stream_id(session, stream_id) &&
|
||||
nghttp2_pq_top(&session->ob_ss_pq)) {
|
||||
nghttp2_outbound_item *top;
|
||||
nghttp2_outbound_queue_top(&session->ob_syn)) {
|
||||
nghttp2_headers_aux_data *aux_data;
|
||||
nghttp2_frame *headers_frame;
|
||||
|
||||
top = nghttp2_pq_top(&session->ob_ss_pq);
|
||||
headers_frame = &top->frame;
|
||||
|
||||
headers_frame = &nghttp2_outbound_queue_top(&session->ob_syn)->frame;
|
||||
assert(headers_frame->hd.type == NGHTTP2_HEADERS);
|
||||
|
||||
if (headers_frame->hd.stream_id <= stream_id &&
|
||||
(uint32_t)stream_id < session->next_stream_id) {
|
||||
if (nghttp2_pq_each(&session->ob_ss_pq, cancel_pending_request, &t)) {
|
||||
|
||||
for (item = session->ob_syn.head; item; item = item->qnext) {
|
||||
aux_data = &item->aux_data.headers;
|
||||
|
||||
if (item->frame.hd.stream_id < stream_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
/* stream_id in ob_syn queue must be strictly increasing. If
|
||||
we found larger ID, then we can break here. */
|
||||
if (item->frame.hd.stream_id > stream_id || aux_data->canceled) {
|
||||
break;
|
||||
}
|
||||
|
||||
aux_data->error_code = error_code;
|
||||
aux_data->canceled = 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -2022,125 +1983,68 @@ static int session_prep_frame(nghttp2_session *session,
|
|||
}
|
||||
}
|
||||
|
||||
/* Used only for tests */
|
||||
nghttp2_outbound_item *nghttp2_session_get_ob_pq_top(nghttp2_session *session) {
|
||||
return (nghttp2_outbound_item *)nghttp2_pq_top(&session->ob_pq);
|
||||
}
|
||||
|
||||
nghttp2_outbound_item *
|
||||
nghttp2_session_get_next_ob_item(nghttp2_session *session) {
|
||||
nghttp2_outbound_item *item, *headers_item;
|
||||
|
||||
if (nghttp2_pq_empty(&session->ob_pq)) {
|
||||
if (nghttp2_pq_empty(&session->ob_ss_pq)) {
|
||||
if (session->remote_window_size == 0 ||
|
||||
nghttp2_pq_empty(&session->ob_da_pq)) {
|
||||
return NULL;
|
||||
if (nghttp2_outbound_queue_top(&session->ob_urgent)) {
|
||||
return nghttp2_outbound_queue_top(&session->ob_urgent);
|
||||
}
|
||||
|
||||
if (nghttp2_outbound_queue_top(&session->ob_reg)) {
|
||||
return nghttp2_outbound_queue_top(&session->ob_reg);
|
||||
}
|
||||
|
||||
if (!session_is_outgoing_concurrent_streams_max(session)) {
|
||||
if (nghttp2_outbound_queue_top(&session->ob_syn)) {
|
||||
return nghttp2_outbound_queue_top(&session->ob_syn);
|
||||
}
|
||||
}
|
||||
|
||||
if (session->remote_window_size > 0 &&
|
||||
!nghttp2_pq_empty(&session->ob_da_pq)) {
|
||||
return nghttp2_pq_top(&session->ob_da_pq);
|
||||
}
|
||||
|
||||
/* Return item only when concurrent connection limit is not
|
||||
reached */
|
||||
if (session_is_outgoing_concurrent_streams_max(session)) {
|
||||
if (session->remote_window_size == 0 ||
|
||||
nghttp2_pq_empty(&session->ob_da_pq)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return nghttp2_pq_top(&session->ob_da_pq);
|
||||
}
|
||||
|
||||
return nghttp2_pq_top(&session->ob_ss_pq);
|
||||
}
|
||||
|
||||
if (nghttp2_pq_empty(&session->ob_ss_pq)) {
|
||||
return nghttp2_pq_top(&session->ob_pq);
|
||||
}
|
||||
|
||||
item = nghttp2_pq_top(&session->ob_pq);
|
||||
headers_item = nghttp2_pq_top(&session->ob_ss_pq);
|
||||
|
||||
if (session_is_outgoing_concurrent_streams_max(session) ||
|
||||
outbound_item_compar(item, headers_item) < 0) {
|
||||
return item;
|
||||
}
|
||||
|
||||
return headers_item;
|
||||
}
|
||||
|
||||
nghttp2_outbound_item *
|
||||
nghttp2_session_pop_next_ob_item(nghttp2_session *session) {
|
||||
nghttp2_outbound_item *item, *headers_item;
|
||||
nghttp2_outbound_item *item;
|
||||
|
||||
if (nghttp2_pq_empty(&session->ob_pq)) {
|
||||
if (nghttp2_pq_empty(&session->ob_ss_pq)) {
|
||||
if (session->remote_window_size == 0 ||
|
||||
nghttp2_pq_empty(&session->ob_da_pq)) {
|
||||
return NULL;
|
||||
item = nghttp2_outbound_queue_top(&session->ob_urgent);
|
||||
if (item) {
|
||||
nghttp2_outbound_queue_pop(&session->ob_urgent);
|
||||
item->queued = 0;
|
||||
return item;
|
||||
}
|
||||
|
||||
item = nghttp2_outbound_queue_top(&session->ob_reg);
|
||||
if (item) {
|
||||
nghttp2_outbound_queue_pop(&session->ob_reg);
|
||||
item->queued = 0;
|
||||
return item;
|
||||
}
|
||||
|
||||
if (!session_is_outgoing_concurrent_streams_max(session)) {
|
||||
item = nghttp2_outbound_queue_top(&session->ob_syn);
|
||||
if (item) {
|
||||
nghttp2_outbound_queue_pop(&session->ob_syn);
|
||||
item->queued = 0;
|
||||
return item;
|
||||
}
|
||||
}
|
||||
|
||||
if (session->remote_window_size > 0 &&
|
||||
!nghttp2_pq_empty(&session->ob_da_pq)) {
|
||||
item = nghttp2_pq_top(&session->ob_da_pq);
|
||||
nghttp2_pq_pop(&session->ob_da_pq);
|
||||
|
||||
item->queued = 0;
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
/* Pop item only when concurrent connection limit is not
|
||||
reached */
|
||||
if (session_is_outgoing_concurrent_streams_max(session)) {
|
||||
if (session->remote_window_size == 0 ||
|
||||
nghttp2_pq_empty(&session->ob_da_pq)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
item = nghttp2_pq_top(&session->ob_da_pq);
|
||||
nghttp2_pq_pop(&session->ob_da_pq);
|
||||
|
||||
item->queued = 0;
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
item = nghttp2_pq_top(&session->ob_ss_pq);
|
||||
nghttp2_pq_pop(&session->ob_ss_pq);
|
||||
|
||||
item->queued = 0;
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
if (nghttp2_pq_empty(&session->ob_ss_pq)) {
|
||||
item = nghttp2_pq_top(&session->ob_pq);
|
||||
nghttp2_pq_pop(&session->ob_pq);
|
||||
|
||||
item->queued = 0;
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
item = nghttp2_pq_top(&session->ob_pq);
|
||||
headers_item = nghttp2_pq_top(&session->ob_ss_pq);
|
||||
|
||||
if (session_is_outgoing_concurrent_streams_max(session) ||
|
||||
outbound_item_compar(item, headers_item) < 0) {
|
||||
nghttp2_pq_pop(&session->ob_pq);
|
||||
|
||||
item->queued = 0;
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
nghttp2_pq_pop(&session->ob_ss_pq);
|
||||
|
||||
headers_item->queued = 0;
|
||||
|
||||
return headers_item;
|
||||
}
|
||||
|
||||
static int session_call_before_frame_send(nghttp2_session *session,
|
||||
nghttp2_frame *frame) {
|
||||
int rv;
|
||||
|
@ -2579,7 +2483,8 @@ static int session_after_frame_sent2(nghttp2_session *session) {
|
|||
waiting at the top of the queue, we continue to send this
|
||||
data. */
|
||||
if (stream->dpri == NGHTTP2_STREAM_DPRI_TOP &&
|
||||
(next_item == NULL || outbound_item_compar(item, next_item) < 0)) {
|
||||
(next_item == NULL || (next_item->frame.hd.type == NGHTTP2_DATA &&
|
||||
outbound_item_less(item, next_item)))) {
|
||||
size_t next_readmax;
|
||||
|
||||
next_readmax = nghttp2_session_next_data_read(session, stream);
|
||||
|
@ -6021,10 +5926,12 @@ int nghttp2_session_want_write(nghttp2_session *session) {
|
|||
* want to write them.
|
||||
*/
|
||||
|
||||
if (session->aob.item == NULL && nghttp2_pq_empty(&session->ob_pq) &&
|
||||
if (session->aob.item == NULL &&
|
||||
nghttp2_outbound_queue_top(&session->ob_urgent) == NULL &&
|
||||
nghttp2_outbound_queue_top(&session->ob_reg) == NULL &&
|
||||
(nghttp2_pq_empty(&session->ob_da_pq) ||
|
||||
session->remote_window_size == 0) &&
|
||||
(nghttp2_pq_empty(&session->ob_ss_pq) ||
|
||||
(nghttp2_outbound_queue_top(&session->ob_syn) == NULL ||
|
||||
session_is_outgoing_concurrent_streams_max(session))) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -6425,8 +6332,9 @@ int nghttp2_session_resume_data(nghttp2_session *session, int32_t stream_id) {
|
|||
}
|
||||
|
||||
size_t nghttp2_session_get_outbound_queue_size(nghttp2_session *session) {
|
||||
return nghttp2_pq_size(&session->ob_pq) +
|
||||
nghttp2_pq_size(&session->ob_ss_pq) +
|
||||
return nghttp2_outbound_queue_size(&session->ob_urgent) +
|
||||
nghttp2_outbound_queue_size(&session->ob_reg) +
|
||||
nghttp2_outbound_queue_size(&session->ob_syn) +
|
||||
nghttp2_pq_size(&session->ob_da_pq);
|
||||
}
|
||||
|
||||
|
|
|
@ -142,12 +142,15 @@ typedef enum {
|
|||
struct nghttp2_session {
|
||||
nghttp2_map /* <nghttp2_stream*> */ streams;
|
||||
nghttp2_stream_roots roots;
|
||||
/* Queue for outbound frames other than stream-creating HEADERS and
|
||||
DATA */
|
||||
nghttp2_pq /* <nghttp2_outbound_item*> */ ob_pq;
|
||||
/* Queue for outbound stream-creating HEADERS frame */
|
||||
nghttp2_pq /* <nghttp2_outbound_item*> */ ob_ss_pq;
|
||||
/* QUeue for DATA frame */
|
||||
/* Queue for outbound urgent frames (PING and SETTINGS) */
|
||||
nghttp2_outbound_queue ob_urgent;
|
||||
/* Queue for non-DATA frames */
|
||||
nghttp2_outbound_queue ob_reg;
|
||||
/* Queue for outbound stream-creating HEADERS (request or push
|
||||
response) frame, which are subject to
|
||||
SETTINGS_MAX_CONCURRENT_STREAMS limit. */
|
||||
nghttp2_outbound_queue ob_syn;
|
||||
/* Queue for DATA frame */
|
||||
nghttp2_pq /* <nghttp2_outbound_item*> */ ob_da_pq;
|
||||
nghttp2_active_outbound_item aob;
|
||||
nghttp2_inbound_frame iframe;
|
||||
|
@ -156,9 +159,6 @@ struct nghttp2_session {
|
|||
nghttp2_session_callbacks callbacks;
|
||||
/* Memory allocator */
|
||||
nghttp2_mem mem;
|
||||
/* Sequence number of outbound frame to maintain the order of
|
||||
enqueue if priority is equal. */
|
||||
int64_t next_seq;
|
||||
/* Reset count of nghttp2_outbound_item's weight. We decrements
|
||||
weight each time DATA is sent to simulate resource sharing. We
|
||||
use priority queue and larger weight has the precedence. If
|
||||
|
@ -707,12 +707,6 @@ int nghttp2_session_pack_data(nghttp2_session *session, nghttp2_bufs *bufs,
|
|||
nghttp2_data_aux_data *aux_data,
|
||||
nghttp2_stream *stream);
|
||||
|
||||
/*
|
||||
* Returns top of outbound frame queue. This function returns NULL if
|
||||
* queue is empty.
|
||||
*/
|
||||
nghttp2_outbound_item *nghttp2_session_get_ob_pq_top(nghttp2_session *session);
|
||||
|
||||
/*
|
||||
* Pops and returns next item to send. If there is no such item,
|
||||
* returns NULL. This function takes into account max concurrent
|
||||
|
|
|
@ -101,22 +101,26 @@ static int stream_push_item(nghttp2_stream *stream, nghttp2_session *session) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
switch (item->frame.hd.type) {
|
||||
case NGHTTP2_DATA:
|
||||
/* Penalize item by delaying scheduling according to effective
|
||||
weight. This will delay low priority stream, which is good.
|
||||
OTOH, this may incur delay for high priority item. Will see. */
|
||||
OTOH, this may incur delay for high priority item. Will
|
||||
see. */
|
||||
item->cycle =
|
||||
session->last_cycle +
|
||||
NGHTTP2_DATA_PAYLOADLEN * NGHTTP2_MAX_WEIGHT / stream->effective_weight;
|
||||
|
||||
switch (item->frame.hd.type) {
|
||||
case NGHTTP2_DATA:
|
||||
rv = nghttp2_pq_push(&session->ob_da_pq, item);
|
||||
if (rv != 0) {
|
||||
return rv;
|
||||
}
|
||||
break;
|
||||
case NGHTTP2_HEADERS:
|
||||
if (stream->state == NGHTTP2_STREAM_RESERVED) {
|
||||
rv = nghttp2_pq_push(&session->ob_ss_pq, item);
|
||||
nghttp2_outbound_queue_push(&session->ob_syn, item);
|
||||
} else {
|
||||
rv = nghttp2_pq_push(&session->ob_pq, item);
|
||||
nghttp2_outbound_queue_push(&session->ob_reg, item);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
@ -124,10 +128,6 @@ static int stream_push_item(nghttp2_stream *stream, nghttp2_session *session) {
|
|||
assert(0);
|
||||
}
|
||||
|
||||
if (rv != 0) {
|
||||
return rv;
|
||||
}
|
||||
|
||||
item->queued = 1;
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -28,14 +28,14 @@
|
|||
|
||||
#include "nghttp2_pq.h"
|
||||
|
||||
static int pq_compar(const void *lhs, const void *rhs) {
|
||||
return strcmp(lhs, rhs);
|
||||
static int pq_less(const void *lhs, const void *rhs) {
|
||||
return strcmp(lhs, rhs) < 0;
|
||||
}
|
||||
|
||||
void test_nghttp2_pq(void) {
|
||||
int i;
|
||||
nghttp2_pq pq;
|
||||
nghttp2_pq_init(&pq, pq_compar, nghttp2_mem_default());
|
||||
nghttp2_pq_init(&pq, pq_less, nghttp2_mem_default());
|
||||
CU_ASSERT(nghttp2_pq_empty(&pq));
|
||||
CU_ASSERT(0 == nghttp2_pq_size(&pq));
|
||||
CU_ASSERT(0 == nghttp2_pq_push(&pq, (void *)"foo"));
|
||||
|
@ -80,10 +80,10 @@ typedef struct {
|
|||
int val;
|
||||
} node;
|
||||
|
||||
static int node_compar(const void *lhs, const void *rhs) {
|
||||
static int node_less(const void *lhs, const void *rhs) {
|
||||
node *ln = (node *)lhs;
|
||||
node *rn = (node *)rhs;
|
||||
return ln->key - rn->key;
|
||||
return ln->key < rn->key;
|
||||
}
|
||||
|
||||
static int node_update(void *item, void *arg _U_) {
|
||||
|
@ -103,7 +103,7 @@ void test_nghttp2_pq_update(void) {
|
|||
node *nd;
|
||||
int ans[] = {-8, -6, -4, -2, 0, 1, 3, 5, 7, 9};
|
||||
|
||||
nghttp2_pq_init(&pq, node_compar, nghttp2_mem_default());
|
||||
nghttp2_pq_init(&pq, node_less, nghttp2_mem_default());
|
||||
|
||||
for (i = 0; i < (int)(sizeof(nodes) / sizeof(nodes[0])); ++i) {
|
||||
nodes[i].key = i;
|
||||
|
|
|
@ -1641,7 +1641,7 @@ void test_nghttp2_session_add_frame(void) {
|
|||
session->next_stream_id += 2;
|
||||
|
||||
CU_ASSERT(0 == nghttp2_session_add_item(session, item));
|
||||
CU_ASSERT(0 == nghttp2_pq_empty(&session->ob_ss_pq));
|
||||
CU_ASSERT(NULL != nghttp2_outbound_queue_top(&session->ob_syn));
|
||||
CU_ASSERT(0 == nghttp2_session_send(session));
|
||||
CU_ASSERT(NGHTTP2_HEADERS == acc.buf[3]);
|
||||
CU_ASSERT((NGHTTP2_FLAG_END_HEADERS | NGHTTP2_FLAG_PRIORITY) == acc.buf[4]);
|
||||
|
@ -2484,16 +2484,16 @@ void test_nghttp2_session_on_ping_received(void) {
|
|||
CU_ASSERT(0 == nghttp2_session_on_ping_received(session, &frame));
|
||||
CU_ASSERT(1 == user_data.frame_recv_cb_called);
|
||||
|
||||
/* Since this ping frame has PONG flag set, no further action is
|
||||
/* Since this ping frame has ACK flag set, no further action is
|
||||
performed. */
|
||||
CU_ASSERT(NULL == nghttp2_session_get_ob_pq_top(session));
|
||||
CU_ASSERT(NULL == nghttp2_outbound_queue_top(&session->ob_urgent));
|
||||
|
||||
/* Clear the flag, and receive it again */
|
||||
frame.hd.flags = NGHTTP2_FLAG_NONE;
|
||||
|
||||
CU_ASSERT(0 == nghttp2_session_on_ping_received(session, &frame));
|
||||
CU_ASSERT(2 == user_data.frame_recv_cb_called);
|
||||
top = nghttp2_session_get_ob_pq_top(session);
|
||||
top = nghttp2_outbound_queue_top(&session->ob_urgent);
|
||||
CU_ASSERT(NGHTTP2_PING == top->frame.hd.type);
|
||||
CU_ASSERT(NGHTTP2_FLAG_ACK == top->frame.hd.flags);
|
||||
CU_ASSERT(memcmp(opaque_data, top->frame.ping.opaque_data, 8) == 0);
|
||||
|
@ -2664,7 +2664,7 @@ void test_nghttp2_session_on_data_received(void) {
|
|||
frame.hd.stream_id = 4;
|
||||
|
||||
CU_ASSERT(0 == nghttp2_session_on_data_received(session, &frame));
|
||||
CU_ASSERT(NULL == nghttp2_session_get_ob_pq_top(session));
|
||||
CU_ASSERT(NULL == nghttp2_outbound_queue_top(&session->ob_reg));
|
||||
|
||||
/* Check INVALID_STREAM case: DATA frame with stream ID which does
|
||||
not exist. */
|
||||
|
@ -2672,7 +2672,7 @@ void test_nghttp2_session_on_data_received(void) {
|
|||
frame.hd.stream_id = 6;
|
||||
|
||||
CU_ASSERT(0 == nghttp2_session_on_data_received(session, &frame));
|
||||
top = nghttp2_session_get_ob_pq_top(session);
|
||||
top = nghttp2_outbound_queue_top(&session->ob_reg);
|
||||
/* DATA against nonexistent stream is just ignored for now */
|
||||
CU_ASSERT(top == NULL);
|
||||
/* CU_ASSERT(NGHTTP2_RST_STREAM == top->frame.hd.type); */
|
||||
|
@ -4513,7 +4513,7 @@ void test_nghttp2_session_pop_next_ob_item(void) {
|
|||
CU_ASSERT(0 == nghttp2_submit_headers(session, NGHTTP2_FLAG_END_STREAM, 2,
|
||||
NULL, NULL, 0, NULL));
|
||||
CU_ASSERT(NULL == nghttp2_session_pop_next_ob_item(session));
|
||||
CU_ASSERT(1 == nghttp2_pq_size(&session->ob_ss_pq));
|
||||
CU_ASSERT(1 == nghttp2_outbound_queue_size(&session->ob_syn));
|
||||
nghttp2_session_del(session);
|
||||
}
|
||||
|
||||
|
@ -4559,7 +4559,7 @@ void test_nghttp2_session_max_concurrent_streams(void) {
|
|||
CU_ASSERT(NGHTTP2_ERR_IGN_HEADER_BLOCK ==
|
||||
nghttp2_session_on_request_headers_received(session, &frame));
|
||||
|
||||
item = nghttp2_session_get_ob_pq_top(session);
|
||||
item = nghttp2_outbound_queue_top(&session->ob_reg);
|
||||
CU_ASSERT(NGHTTP2_RST_STREAM == item->frame.hd.type);
|
||||
CU_ASSERT(NGHTTP2_REFUSED_STREAM == item->frame.rst_stream.error_code);
|
||||
|
||||
|
@ -4572,7 +4572,7 @@ void test_nghttp2_session_max_concurrent_streams(void) {
|
|||
CU_ASSERT(NGHTTP2_ERR_IGN_HEADER_BLOCK ==
|
||||
nghttp2_session_on_request_headers_received(session, &frame));
|
||||
|
||||
item = nghttp2_session_get_ob_pq_top(session);
|
||||
item = nghttp2_outbound_queue_top(&session->ob_reg);
|
||||
CU_ASSERT(NGHTTP2_GOAWAY == item->frame.hd.type);
|
||||
CU_ASSERT(NGHTTP2_PROTOCOL_ERROR == item->frame.goaway.error_code);
|
||||
|
||||
|
@ -6937,7 +6937,7 @@ void test_nghttp2_session_cancel_reserved_remote(void) {
|
|||
|
||||
/* No RST_STREAM or GOAWAY is generated since stream should be in
|
||||
NGHTTP2_STREAM_CLOSING and push response should be ignored. */
|
||||
CU_ASSERT(0 == nghttp2_pq_size(&session->ob_pq));
|
||||
CU_ASSERT(0 == nghttp2_outbound_queue_size(&session->ob_reg));
|
||||
|
||||
/* Check that we can receive push response HEADERS while RST_STREAM
|
||||
is just queued. */
|
||||
|
@ -6960,7 +6960,7 @@ void test_nghttp2_session_cancel_reserved_remote(void) {
|
|||
|
||||
CU_ASSERT(nghttp2_buf_len(&bufs.head->buf) == rv);
|
||||
|
||||
CU_ASSERT(1 == nghttp2_pq_size(&session->ob_pq));
|
||||
CU_ASSERT(1 == nghttp2_outbound_queue_size(&session->ob_reg));
|
||||
|
||||
nghttp2_frame_headers_free(&frame.headers, mem);
|
||||
|
||||
|
|
Loading…
Reference in New Issue