Added pri to spdylay_stream. Refactored SYN_STREAM, SYN_REPLY handling when they are received.

This commit is contained in:
Tatsuhiro Tsujikawa 2012-01-25 23:46:07 +09:00
parent 6629f35a94
commit cbb8dd6a8c
8 changed files with 249 additions and 58 deletions

View File

@ -33,14 +33,19 @@ extern "C" {
#include <stdint.h>
typedef enum {
SPDYLAY_ERR_NOMEM = -500,
SPDYLAY_ERR_INVALID_ARGUMENT = -501,
SPDYLAY_ERR_ZLIB = -502,
SPDYLAY_ERR_ZLIB_BUF = -503,
SPDYLAY_ERR_WOULDBLOCK = -504,
SPDYLAY_ERR_PROTO = -505,
SPDYLAY_ERR_CALLBACK_FAILURE = -505,
SPDYLAY_ERR_INVALID_FRAME = -506,
/* The errors < SPDYLAY_ERR_FATAL mean that the library is under
unexpected condition that it cannot process any further data
reliably (e.g., out of memory). */
SPDYLAY_ERR_FATAL = -900,
SPDYLAY_ERR_NOMEM = -901,
SPDYLAY_ERR_CALLBACK_FAILURE = -902,
} spdylay_error;
typedef enum {
@ -58,7 +63,9 @@ typedef enum {
} spdylay_frame_type;
typedef enum {
SPDYLAY_FLAG_FIN = 1
SPDYLAY_FLAG_NONE = 0,
SPDYLAY_FLAG_FIN = 1,
SPDYLAY_FLAG_UNIDIRECTIONAL = 2
} spdylay_flag;
typedef enum {

View File

@ -33,6 +33,16 @@
#include "spdylay_stream.h"
#include "spdylay_helper.h"
/*
* Returns spdylay_stream* object whose stream ID is |stream_id|. It
* could be NULL if such stream does not exist.
*/
static spdylay_stream* spdylay_session_get_stream(spdylay_session *session,
int32_t stream_id)
{
return (spdylay_stream*)spdylay_map_find(&session->streams, stream_id);
}
int spdylay_outbound_item_compar(const void *lhsx, const void *rhsx)
{
const spdylay_outbound_item *lhs, *rhs;
@ -146,6 +156,17 @@ int spdylay_session_add_frame(spdylay_session *session,
case SPDYLAY_SYN_STREAM:
item->pri = 4-frame->syn_stream.pri;
break;
case SPDYLAY_RST_STREAM: {
spdylay_stream *stream = spdylay_session_get_stream
(session, frame->rst_stream.stream_id);
if(stream) {
stream->state = SPDYLAY_STREAM_CLOSING;
item->pri = stream->pri;
} else {
item->pri = 4;
}
break;
}
default:
item->pri = 4;
};
@ -176,14 +197,15 @@ int spdylay_session_add_rst_stream(spdylay_session *session,
return 0;
}
int spdylay_session_open_stream(spdylay_session *session, int32_t stream_id)
int spdylay_session_open_stream(spdylay_session *session, int32_t stream_id,
uint8_t flags, uint8_t pri)
{
int r;
spdylay_stream *stream = malloc(sizeof(spdylay_stream));
if(stream == NULL) {
return SPDYLAY_ERR_NOMEM;
}
spdylay_stream_init(stream, stream_id);
spdylay_stream_init(stream, stream_id, flags, pri);
r = spdylay_map_insert(&session->streams, stream_id, stream);
if(r != 0) {
free(stream);
@ -209,7 +231,9 @@ ssize_t spdylay_session_prep_frame(spdylay_session *session,
return framebuflen;
}
printf("packed %d bytes\n", framebuflen);
r = spdylay_session_open_stream(session, item->frame->syn_stream.stream_id);
r = spdylay_session_open_stream(session, item->frame->syn_stream.stream_id,
item->frame->syn_stream.hd.flags,
item->frame->syn_stream.pri);
if(r != 0) {
free(framebuf);
return r;
@ -363,6 +387,17 @@ static int spdylay_session_is_new_peer_stream_id(spdylay_session *session,
}
}
static int spdylay_session_is_my_stream_id(spdylay_session *session,
int32_t stream_id)
{
int r;
if(stream_id == 0) {
return 0;
}
r = stream_id % 2;
return (session->server && r == 0) || r == 1;
}
/*
* Validates SYN_STREAM frame |frame|. This function returns 0 if it
* succeeds, or -1.
@ -378,28 +413,6 @@ static int spdylay_session_validate_syn_stream(spdylay_session *session,
}
}
static spdylay_stream* spdylay_session_get_stream(spdylay_session *session,
int32_t stream_id)
{
return (spdylay_stream*)spdylay_map_find(&session->streams, stream_id);
}
/*
* Validates SYN_REPLY frame |frame|. This function returns 0 if it
* succeeds, or -1.
*/
static int spdylay_session_validate_syn_reply(spdylay_session *session,
spdylay_syn_reply *frame)
{
spdylay_stream *stream;
stream = spdylay_session_get_stream(session, frame->stream_id);
if(stream && stream->state == SPDYLAY_STREAM_OPENING) {
return 0;
} else {
return -1;
}
}
static int spdylay_session_handle_invalid_ctrl_frame(spdylay_session *session,
int32_t stream_id,
spdylay_frame_type type,
@ -415,6 +428,56 @@ static int spdylay_session_handle_invalid_ctrl_frame(spdylay_session *session,
session->callbacks.on_invalid_ctrl_recv_callback
(session, type, frame, session->user_data);
}
return 0;
}
int spdylay_session_on_syn_stream_received(spdylay_session *session,
spdylay_frame *frame)
{
int r;
if(spdylay_session_validate_syn_stream(session, &frame->syn_stream) == 0) {
r = spdylay_session_open_stream(session, frame->syn_stream.stream_id,
frame->syn_stream.hd.flags,
frame->syn_stream.pri);
if(r == 0) {
session->last_recv_stream_id = frame->syn_stream.stream_id;
spdylay_session_call_on_ctrl_frame_received(session, SPDYLAY_SYN_STREAM,
frame);
}
} else {
r = spdylay_session_handle_invalid_ctrl_frame
(session, frame->syn_stream.stream_id, SPDYLAY_SYN_STREAM, frame);
}
return r;
}
int spdylay_session_on_syn_reply_received(spdylay_session *session,
spdylay_frame *frame)
{
int r = 0;
int valid = 0;
if(spdylay_session_is_my_stream_id(session, frame->syn_reply.stream_id)) {
spdylay_stream *stream = spdylay_session_get_stream
(session, frame->syn_reply.stream_id);
if(stream) {
if(stream->state == SPDYLAY_STREAM_OPENING) {
valid = 1;
stream->state = SPDYLAY_STREAM_OPENED;
spdylay_session_call_on_ctrl_frame_received(session, SPDYLAY_SYN_REPLY,
frame);
} else if(stream->state == SPDYLAY_STREAM_CLOSING) {
/* This is race condition. SPDYLAY_STREAM_CLOSING indicates
that we queued RST_STREAM but it has not been sent. It will
eventually sent, so we just ignore this frame. */
valid = 1;
}
}
}
if(!valid) {
r = spdylay_session_handle_invalid_ctrl_frame
(session, frame->syn_reply.stream_id, SPDYLAY_SYN_REPLY, frame);
}
return r;
}
int spdylay_session_process_ctrl_frame(spdylay_session *session)
@ -434,13 +497,7 @@ int spdylay_session_process_ctrl_frame(spdylay_session *session)
session->iframe.len,
&session->hd_inflater);
if(r == 0) {
r = spdylay_session_validate_syn_stream(session, &frame.syn_stream);
if(r == 0) {
spdylay_session_call_on_ctrl_frame_received(session, type, &frame);
} else {
r = spdylay_session_handle_invalid_ctrl_frame
(session, frame.syn_stream.stream_id, type, &frame);
}
r = spdylay_session_on_syn_stream_received(session, &frame);
spdylay_frame_syn_stream_free(&frame.syn_stream);
} else {
/* TODO if r indicates mulformed NV pairs (multiple nulls) or
@ -457,13 +514,7 @@ int spdylay_session_process_ctrl_frame(spdylay_session *session)
session->iframe.len,
&session->hd_inflater);
if(r == 0) {
r = spdylay_session_validate_syn_reply(session, &frame.syn_reply);
if(r == 0) {
spdylay_session_call_on_ctrl_frame_received(session, type, &frame);
} else {
r = spdylay_session_handle_invalid_ctrl_frame
(session, frame.syn_reply.stream_id, type, &frame);
}
r = spdylay_session_on_syn_reply_received(session, &frame);
spdylay_frame_syn_reply_free(&frame.syn_reply);
}
break;
@ -471,11 +522,12 @@ int spdylay_session_process_ctrl_frame(spdylay_session *session)
/* ignore */
printf("Received control frame type %x\n", type);
}
if(r != 0) {
if(r < SPDYLAY_ERR_FATAL) {
return r;
}
} else {
return 0;
}
}
int spdylay_session_process_data_frame(spdylay_session *session)
{
@ -555,7 +607,11 @@ int spdylay_session_recv(spdylay_session *session)
session->ibuf.mark += readlen;
if(session->iframe.len == session->iframe.off) {
if(spdylay_frame_is_ctrl_frame(session->iframe.headbuf[0])) {
spdylay_session_process_ctrl_frame(session);
r = spdylay_session_process_ctrl_frame(session);
if(r < 0) {
/* Fatal error */
return r;
}
} else {
spdylay_session_process_data_frame(session);
}

View File

@ -101,8 +101,35 @@ int spdylay_session_add_frame(spdylay_session *session,
int spdylay_session_add_rst_stream(spdylay_session *session,
int32_t stream_id, uint32_t status_code);
int spdylay_session_open_stream(spdylay_session *session, int32_t stream_id);
/*
* Creates new stream in |session| with stream ID |stream_id|,
* priority |pri| and flags |flags|. Currently, |flags| &
* SPDYLAY_FLAG_UNIDIRECTIONAL is non-zero, this stream is
* unidirectional. |flags| & SPDYLAY_FLAG_FIN is non-zero, the sender
* of SYN_STREAM will not send any further data in this stream.
*/
int spdylay_session_open_stream(spdylay_session *session, int32_t stream_id,
uint8_t flags, uint8_t pri);
int spdylay_session_close_stream(spdylay_session *session, int32_t stream_id);
/*
* Called when SYN_STREAM is received. Received frame is |frame|.
* This function does first
* validate received frame and then open stream and call callback
* functions.
*/
int spdylay_session_on_syn_stream_received(spdylay_session *session,
spdylay_frame *frame);
/*
* Called when SYN_STREAM is received. Received frame is |frame|.
* This function does first validate received frame and then open
* stream and call callback functions.
*/
int spdylay_session_on_syn_reply_received(spdylay_session *session,
spdylay_frame *frame);
#endif /* SPDYLAY_SESSION_H */

View File

@ -24,10 +24,13 @@
*/
#include "spdylay_stream.h"
void spdylay_stream_init(spdylay_stream *stream, int32_t stream_id)
void spdylay_stream_init(spdylay_stream *stream, int32_t stream_id,
uint8_t flags, uint8_t pri)
{
stream->stream_id = stream_id;
stream->state = SPDYLAY_STREAM_OPENING;
stream->flags = flags;
stream->pri = pri;
}
void spdylay_stream_free(spdylay_stream *stream)

View File

@ -31,6 +31,15 @@
#include <spdylay/spdylay.h>
/*
* If local peer is stream initiator:
* SPDYLAY_STREAM_OPENING : upon sending SYN_STREAM
* SPDYLAY_STREAM_OPENED : upon receiving SYN_REPLY
*
* If remote peer is stream initiator:
* SPDYLAY_STREAM_OPENING : upon receiving SYN_STREAM
* SPDYLAY_STREAM_OPENED : upon sending SYN_REPLY
*/
typedef enum {
/* For stream initiator: SYN_STREAM has been sent, but SYN_REPLY is
not received yet. For receiver: SYN_STREAM has been received,
@ -47,11 +56,14 @@ typedef enum {
typedef struct {
int32_t stream_id;
spdylay_stream_state state;
/* Use same value in frame */
/* Use same value in SYN_STREAM frame */
uint8_t flags;
/* Use same scheme in SYN_STREAM frame */
uint8_t pri;
} spdylay_stream;
void spdylay_stream_init(spdylay_stream *stream, int32_t stream_id);
void spdylay_stream_init(spdylay_stream *stream, int32_t stream_id,
uint8_t flags, uint8_t pri);
void spdylay_stream_free(spdylay_stream *stream);

View File

@ -71,6 +71,10 @@ int main()
test_spdylay_session_recv_invalid_stream_id) ||
!CU_add_test(pSuite, "session_add_frame",
test_spdylay_session_add_frame) ||
!CU_add_test(pSuite, "session_on_syn_stream_received",
test_spdylay_session_on_syn_stream_received) ||
!CU_add_test(pSuite, "session_on_syn_reply_received",
test_spdylay_session_on_syn_reply_received) ||
!CU_add_test(pSuite, "frame_unpack_nv", test_spdylay_frame_unpack_nv) ||
!CU_add_test(pSuite, "frame_count_nv_space",
test_spdylay_frame_count_nv_space)) {

View File

@ -31,6 +31,7 @@
#include <arpa/inet.h>
#include "spdylay_session.h"
#include "spdylay_stream.h"
typedef struct {
uint8_t buf[4096];
@ -48,7 +49,7 @@ typedef struct {
typedef struct {
accumulator *acc;
scripted_data_feed *df;
int flags;
int valid, invalid;
} my_user_data;
static void scripted_data_feed_init(scripted_data_feed *df,
@ -88,13 +89,22 @@ static ssize_t accumulator_send_callback(spdylay_session *session,
return len;
}
static void on_ctrl_recv_callback(spdylay_session *session,
spdylay_frame_type type,
spdylay_frame *frame,
void *user_data)
{
my_user_data *ud = (my_user_data*)user_data;
++ud->valid;
}
static void on_invalid_ctrl_recv_callback(spdylay_session *session,
spdylay_frame_type type,
spdylay_frame *frame,
void *user_data)
{
my_user_data *ud = (my_user_data*)user_data;
++ud->flags;
++ud->invalid;
}
static char** dup_nv(const char **src)
@ -128,7 +138,8 @@ void test_spdylay_session_recv()
user_data.df = &df;
spdylay_session_client_new(&session, &callbacks, &user_data);
spdylay_frame_syn_stream_init(&frame.syn_stream, 0, 0, 0, 3, dup_nv(nv));
spdylay_frame_syn_stream_init(&frame.syn_stream, SPDYLAY_FLAG_NONE, 0, 0, 3,
dup_nv(nv));
framelen = spdylay_frame_pack_syn_stream(&framedata, &frame.syn_stream,
&session->hd_deflater);
scripted_data_feed_init(&df, framedata, framelen);
@ -165,7 +176,8 @@ void test_spdylay_session_add_frame()
CU_ASSERT(0 == spdylay_session_client_new(&session, &callbacks, &user_data));
frame = malloc(sizeof(spdylay_frame));
spdylay_frame_syn_stream_init(&frame->syn_stream, 0, 0, 0, 3, dup_nv(nv));
spdylay_frame_syn_stream_init(&frame->syn_stream, SPDYLAY_FLAG_NONE, 0, 0, 3,
dup_nv(nv));
CU_ASSERT(0 == spdylay_session_add_frame(session, SPDYLAY_SYN_STREAM, frame));
CU_ASSERT(0 == spdylay_pq_empty(&session->ob_pq));
@ -203,9 +215,10 @@ void test_spdylay_session_recv_invalid_stream_id()
spdylay_frame frame;
user_data.df = &df;
user_data.flags = 0;
user_data.invalid = 0;
spdylay_session_client_new(&session, &callbacks, &user_data);
spdylay_frame_syn_stream_init(&frame.syn_stream, 0, 1, 0, 3, dup_nv(nv));
spdylay_frame_syn_stream_init(&frame.syn_stream, SPDYLAY_FLAG_NONE, 1, 0, 3,
dup_nv(nv));
framelen = spdylay_frame_pack_syn_stream(&framedata, &frame.syn_stream,
&session->hd_deflater);
scripted_data_feed_init(&df, framedata, framelen);
@ -213,9 +226,10 @@ void test_spdylay_session_recv_invalid_stream_id()
spdylay_frame_syn_stream_free(&frame.syn_stream);
CU_ASSERT(0 == spdylay_session_recv(session));
CU_ASSERT(1 == user_data.flags);
CU_ASSERT(1 == user_data.invalid);
spdylay_frame_syn_reply_init(&frame.syn_reply, 0, 100, dup_nv(nv));
spdylay_frame_syn_reply_init(&frame.syn_reply, SPDYLAY_FLAG_NONE, 100,
dup_nv(nv));
framelen = spdylay_frame_pack_syn_reply(&framedata, &frame.syn_reply,
&session->hd_deflater);
scripted_data_feed_init(&df, framedata, framelen);
@ -223,8 +237,74 @@ void test_spdylay_session_recv_invalid_stream_id()
spdylay_frame_syn_reply_free(&frame.syn_reply);
CU_ASSERT(0 == spdylay_session_recv(session));
CU_ASSERT(2 == user_data.flags);
CU_ASSERT(2 == user_data.invalid);
spdylay_session_del(session);
}
void test_spdylay_session_on_syn_stream_received()
{
spdylay_session *session;
spdylay_session_callbacks callbacks = {
NULL,
NULL,
on_ctrl_recv_callback,
on_invalid_ctrl_recv_callback
};
my_user_data user_data;
const char *nv[] = { NULL };
spdylay_frame frame;
user_data.valid = 0;
user_data.invalid = 0;
spdylay_session_client_new(&session, &callbacks, &user_data);
spdylay_frame_syn_stream_init(&frame.syn_stream, SPDYLAY_FLAG_NONE,
2, 0, 3, dup_nv(nv));
CU_ASSERT(0 == spdylay_session_on_syn_stream_received(session, &frame));
CU_ASSERT(1 == user_data.valid);
CU_ASSERT(SPDYLAY_STREAM_OPENING ==
((spdylay_stream*)spdylay_map_find(&session->streams, 2))->state);
CU_ASSERT(0 == spdylay_session_on_syn_stream_received(session, &frame));
CU_ASSERT(1 == user_data.invalid);
CU_ASSERT(SPDYLAY_STREAM_CLOSING ==
((spdylay_stream*)spdylay_map_find(&session->streams, 2))->state);
spdylay_frame_syn_stream_free(&frame.syn_stream);
spdylay_session_del(session);
}
void test_spdylay_session_on_syn_reply_received()
{
spdylay_session *session;
spdylay_session_callbacks callbacks = {
NULL,
NULL,
on_ctrl_recv_callback,
on_invalid_ctrl_recv_callback
};
my_user_data user_data;
const char *nv[] = { NULL };
spdylay_frame frame;
user_data.valid = 0;
user_data.invalid = 0;
spdylay_session_client_new(&session, &callbacks, &user_data);
spdylay_session_open_stream(session, 1, SPDYLAY_FLAG_NONE, 0);
spdylay_frame_syn_reply_init(&frame.syn_reply, SPDYLAY_FLAG_NONE, 1,
dup_nv(nv));
CU_ASSERT(0 == spdylay_session_on_syn_reply_received(session, &frame));
CU_ASSERT(1 == user_data.valid);
CU_ASSERT(SPDYLAY_STREAM_OPENED ==
((spdylay_stream*)spdylay_map_find(&session->streams, 1))->state);
CU_ASSERT(0 == spdylay_session_on_syn_reply_received(session, &frame));
CU_ASSERT(1 == user_data.invalid);
CU_ASSERT(SPDYLAY_STREAM_CLOSING ==
((spdylay_stream*)spdylay_map_find(&session->streams, 1))->state);
spdylay_frame_syn_reply_free(&frame.syn_reply);
spdylay_session_del(session);
}

View File

@ -28,5 +28,7 @@
void test_spdylay_session_recv();
void test_spdylay_session_recv_invalid_stream_id();
void test_spdylay_session_add_frame();
void test_spdylay_session_on_syn_stream_received();
void test_spdylay_session_on_syn_reply_received();
#endif // SPDYLAY_SESSION_TEST_H