Added spdylay_reply_submit() and DATA frame handling after SYN_REPLY.

This commit is contained in:
Tatsuhiro Tsujikawa 2012-01-27 01:17:40 +09:00
parent f642bb98c7
commit 21e165f1f8
8 changed files with 346 additions and 27 deletions

View File

@ -32,6 +32,9 @@ extern "C" {
#include <stdlib.h>
#include <stdint.h>
struct spdylay_session;
typedef struct spdylay_session spdylay_session;
typedef enum {
SPDYLAY_ERR_INVALID_ARGUMENT = -501,
SPDYLAY_ERR_ZLIB = -502,
@ -60,6 +63,7 @@ typedef enum {
SPDYLAY_NOOP = 5,
SPDYLAY_PING = 6,
SPDYLAY_GOAWAY = 7,
SPDYLAY_DATA = 100,
} spdylay_frame_type;
typedef enum {
@ -106,15 +110,33 @@ typedef struct {
uint32_t status_code;
} spdylay_rst_stream;
typedef union {
int fd;
void *ptr;
} spdylay_data_source;
typedef ssize_t (*spdylay_data_source_read_callback)
(spdylay_session *session, uint8_t *buf, size_t length, int *eof,
spdylay_data_source *source, void *user_data);
typedef struct {
spdylay_data_source source;
spdylay_data_source_read_callback read_callback;
} spdylay_data_provider;
typedef struct {
int32_t stream_id;
uint8_t flags;
spdylay_data_provider data_prd;
} spdylay_data;
typedef union {
spdylay_syn_stream syn_stream;
spdylay_syn_reply syn_reply;
spdylay_rst_stream rst_stream;
spdylay_data data;
} spdylay_frame;
struct spdylay_session;
typedef struct spdylay_session spdylay_session;
typedef ssize_t (*spdylay_send_callback)
(spdylay_session *session,
const uint8_t *data, size_t length, int flags, void *user_data);
@ -163,6 +185,19 @@ int spdylay_session_want_write(spdylay_session *session);
int spdylay_req_submit(spdylay_session *session, const char *path);
/*
* Submits SYN_REPLY frame against stream |stream_id|. |nv| must
* include "status" and "version" key. "status" must be status code
* (e.g., "200" or "200 OK"). "version" is HTTP response version
* (e.g., "HTTP/1.1"). This function creates copies of all name/value
* pairs in |nv|. If |data_prd| is not NULL, it provides data which
* will be sent in subsequent DATA frames. If |data_prd| is NULL,
* SYN_REPLY will have FLAG_FIN.
*/
int spdylay_reply_submit(spdylay_session *session,
int32_t stream_id, const char **nv,
spdylay_data_provider *data_prd);
#ifdef __cplusplus
}
#endif

View File

@ -313,12 +313,32 @@ int spdylay_frame_is_ctrl_frame(uint8_t first_byte)
void spdylay_frame_nv_free(char **nv)
{
int i;
for(i = 0; nv[i]; i += 2) {
for(i = 0; nv[i]; ++i) {
free(nv[i]);
free(nv[i+1]);
}
}
char** spdylay_frame_nv_copy(const char **nv)
{
int n, i;
char **nnv;
for(n = 0;nv[n]; ++n);
nnv = malloc((n+1)*sizeof(char*));
if(nnv == NULL) {
return NULL;
}
for(i = 0; i < n; ++i) {
nnv[i] = strdup(nv[i]);
if(nnv[i] == NULL) {
spdylay_frame_nv_free(nnv[i]);
free(nnv);
return NULL;
}
}
nnv[n] = NULL;
return nnv;
}
void spdylay_frame_syn_stream_init(spdylay_syn_stream *frame, uint8_t flags,
int32_t stream_id, int32_t assoc_stream_id,
uint8_t pri, char **nv)
@ -371,12 +391,23 @@ void spdylay_frame_rst_stream_init(spdylay_rst_stream *frame,
void spdylay_frame_rst_stream_free(spdylay_rst_stream *frame)
{}
void spdylay_frame_data_init(spdylay_data *frame, int32_t stream_id,
spdylay_data_provider *data_prd)
{
memset(frame, 0, sizeof(spdylay_data));
frame->stream_id = stream_id;
frame->data_prd = *data_prd;
}
void spdylay_frame_data_free(spdylay_data *frame)
{}
ssize_t spdylay_frame_pack_syn_stream(uint8_t **buf_ptr,
spdylay_syn_stream *frame,
spdylay_zlib *deflater)
{
uint8_t *framebuf = NULL;
size_t framelen;
ssize_t framelen;
framelen = spdylay_frame_alloc_pack_nv(&framebuf, frame->nv, 18, deflater);
if(framelen < 0) {
return framelen;
@ -417,7 +448,7 @@ ssize_t spdylay_frame_pack_syn_reply(uint8_t **buf_ptr,
spdylay_zlib *deflater)
{
uint8_t *framebuf = NULL;
size_t framelen;
ssize_t framelen;
framelen = spdylay_frame_alloc_pack_nv(&framebuf, frame->nv, 14, deflater);
if(framelen < 0) {
return framelen;
@ -447,7 +478,7 @@ ssize_t spdylay_frame_pack_rst_stream(uint8_t **buf_ptr,
spdylay_rst_stream *frame)
{
uint8_t *framebuf;
size_t framelen = 16;
ssize_t framelen = 16;
framebuf = malloc(framelen);
if(framebuf == NULL) {
return SPDYLAY_ERR_NOMEM;

View File

@ -37,6 +37,8 @@
#define SPDYLAY_LENGTH_MASK 0xffffff
#define SPDYLAY_VERSION_MASK 0x7fff
#define SPDYLAY_DATA_FRAME_LENGTH 4096
/*
* Packs SYN_STREAM frame |frame| in wire frame format and store it in
* |*buf_ptr|. This function allocates enough memory to store given
@ -141,6 +143,11 @@ void spdylay_frame_rst_stream_init(spdylay_rst_stream *frame,
void spdylay_frame_rst_stream_free(spdylay_rst_stream *frame);
void spdylay_frame_data_init(spdylay_data *frame, int32_t stream_id,
spdylay_data_provider *data_prd);
void spdylay_frame_data_free(spdylay_data *frame);
/*
* Returns 1 if the first byte of this frame indicates it is a control
* frame.
@ -152,4 +159,10 @@ int spdylay_frame_is_ctrl_frame(uint8_t first_byte);
*/
void spdylay_frame_nv_free(char **nv);
/*
* Makes a deep copy of |nv| and returns the copy. This function
* returns the pointer to the copy if it succeeds, or NULL.
*/
char** spdylay_frame_nv_copy(const char **nv);
#endif /* SPDYLAY_FRAME_H */

View File

@ -32,6 +32,21 @@
#include "spdylay_helper.h"
/*
* Returns non-zero value if |stream_id| is initiated by local host.
* Otherwrise returns 0.
*/
static int spdylay_session_is_my_stream_id(spdylay_session *session,
int32_t stream_id)
{
int r;
if(stream_id == 0) {
return 0;
}
r = stream_id % 2;
return (session->server && r == 0) || r == 1;
}
spdylay_stream* spdylay_session_get_stream(spdylay_session *session,
int32_t stream_id)
{
@ -113,6 +128,12 @@ static void spdylay_outbound_item_free(spdylay_outbound_item *item)
case SPDYLAY_SYN_REPLY:
spdylay_frame_syn_reply_free(&item->frame->syn_reply);
break;
case SPDYLAY_RST_STREAM:
spdylay_frame_rst_stream_free(&item->frame->rst_stream);
break;
case SPDYLAY_DATA:
spdylay_frame_data_free(&item->frame->data);
break;
}
free(item->frame);
}
@ -170,6 +191,14 @@ int spdylay_session_add_frame(spdylay_session *session,
}
break;
}
case SPDYLAY_DATA: {
spdylay_stream *stream = spdylay_session_get_stream
(session, frame->data.stream_id);
if(stream) {
item->pri = stream->pri;
}
break;
}
};
r = spdylay_pq_push(&session->ob_pq, item);
if(r != 0) {
@ -232,6 +261,7 @@ ssize_t spdylay_session_prep_frame(spdylay_session *session,
spdylay_outbound_item *item,
uint8_t **framebuf_ptr)
{
/* TODO Get or validate stream ID here */
uint8_t *framebuf;
ssize_t framebuflen;
int r;
@ -264,6 +294,14 @@ ssize_t spdylay_session_prep_frame(spdylay_session *session,
}
break;
}
case SPDYLAY_DATA: {
framebuflen = spdylay_session_pack_data(session, &framebuf,
&item->frame->data);
if(framebuflen < 0) {
return framebuflen;
}
break;
}
default:
framebuflen = SPDYLAY_ERR_INVALID_ARGUMENT;
}
@ -280,10 +318,18 @@ static void spdylay_active_outbound_item_reset
memset(aob, 0, sizeof(spdylay_active_outbound_item));
}
static spdylay_outbound_item* spdylay_session_get_ob_pq_top
(spdylay_session *session)
{
return (spdylay_outbound_item*)spdylay_pq_top(&session->ob_pq);
}
static int spdylay_session_after_frame_sent(spdylay_session *session)
{
/* TODO handle FIN flag. */
spdylay_frame *frame = session->aob.item->frame;
switch(session->aob.item->frame_type) {
spdylay_frame_type type = session->aob.item->frame_type;
switch(type) {
case SPDYLAY_SYN_STREAM: {
spdylay_stream *stream =
spdylay_session_get_stream(session, frame->syn_stream.stream_id);
@ -302,14 +348,55 @@ static int spdylay_session_after_frame_sent(spdylay_session *session)
}
case SPDYLAY_RST_STREAM:
spdylay_session_close_stream(session, frame->rst_stream.stream_id);
break;
case SPDYLAY_DATA:
if((frame->data.flags & SPDYLAY_FLAG_FIN) &&
!spdylay_session_is_my_stream_id(session, frame->data.stream_id)) {
/* We send all data requested by peer, so close the stream. */
spdylay_session_close_stream(session, frame->data.stream_id);
}
break;
};
/* TODO If frame is data frame, we need to sent all chunk of
data.*/
if(type == SPDYLAY_DATA) {
int r;
if(frame->data.flags & SPDYLAY_FLAG_FIN) {
spdylay_active_outbound_item_reset(&session->aob);
} else if(spdylay_pq_empty(&session->ob_pq) ||
session->aob.item->pri <=
spdylay_session_get_ob_pq_top(session)->pri) {
/* If priority of this stream is higher or equal to other stream
waiting at the top of the queue, we continue to send this
data. */
/* We assume that buffer has at least
SPDYLAY_DATA_FRAME_LENGTH. */
r = spdylay_session_pack_data_overwrite(session,
session->aob.framebuf,
SPDYLAY_DATA_FRAME_LENGTH,
&frame->data);
if(r < 0) {
spdylay_active_outbound_item_reset(&session->aob);
return r;
}
session->aob.framebufoff = 0;
} else {
r = spdylay_pq_push(&session->ob_pq, session->aob.item);
if(r == 0) {
session->aob.item = NULL;
spdylay_active_outbound_item_reset(&session->aob);
} else {
spdylay_active_outbound_item_reset(&session->aob);
return r;
}
}
} else {
spdylay_active_outbound_item_reset(&session->aob);
}
return 0;
}
int spdylay_session_send(spdylay_session *session)
{
int r;
printf("session_send\n");
while(session->aob.item || !spdylay_pq_empty(&session->ob_pq)) {
const uint8_t *data;
@ -320,7 +407,6 @@ int spdylay_session_send(spdylay_session *session)
uint8_t *framebuf;
ssize_t framebuflen;
spdylay_pq_pop(&session->ob_pq);
/* TODO Get or validate stream id here */
framebuflen = spdylay_session_prep_frame(session, item, &framebuf);
if(framebuflen < 0) {
/* TODO Call error callback? */
@ -331,6 +417,7 @@ int spdylay_session_send(spdylay_session *session)
session->aob.item = item;
session->aob.framebuf = framebuf;
session->aob.framebuflen = framebuflen;
/* TODO Call before_send callback */
}
data = session->aob.framebuf + session->aob.framebufoff;
datalen = session->aob.framebuflen - session->aob.framebufoff;
@ -343,11 +430,13 @@ int spdylay_session_send(spdylay_session *session)
return sentlen;
}
} else {
printf("sent %d bytes\n", sentlen);
session->aob.framebufoff += sentlen;
if(session->aob.framebufoff == session->aob.framebuflen) {
/* Frame has completely sent */
spdylay_session_after_frame_sent(session);
r = spdylay_session_after_frame_sent(session);
if(r < 0) {
return r;
}
} else {
/* partial write */
break;
@ -437,17 +526,6 @@ static int spdylay_session_is_new_peer_stream_id(spdylay_session *session,
}
}
static int spdylay_session_is_my_stream_id(spdylay_session *session,
int32_t stream_id)
{
int r;
if(stream_id == 0) {
return 0;
}
r = stream_id % 2;
return (session->server && r == 0) || r == 1;
}
/*
* Validates SYN_STREAM frame |frame|. This function returns 0 if it
* succeeds, or -1.
@ -683,6 +761,53 @@ int spdylay_session_want_write(spdylay_session *session)
return session->aob.item != NULL || !spdylay_pq_empty(&session->ob_pq);
}
int spdylay_reply_submit(spdylay_session *session,
int32_t stream_id, const char **nv,
spdylay_data_provider *data_prd)
{
int r;
spdylay_frame *frame;
spdylay_frame *data_frame = NULL;
char **nv_copy;
uint8_t flags = 0;
frame = malloc(sizeof(spdylay_frame));
if(frame == NULL) {
return SPDYLAY_ERR_NOMEM;
}
nv_copy = spdylay_frame_nv_copy(nv);
if(nv_copy == NULL) {
free(frame);
return SPDYLAY_ERR_NOMEM;
}
if(data_prd == NULL) {
flags |= SPDYLAY_FLAG_FIN;
}
spdylay_frame_syn_reply_init(&frame->syn_reply, flags, stream_id,
nv_copy);
r = spdylay_session_add_frame(session, SPDYLAY_SYN_REPLY, frame);
if(r != 0) {
spdylay_frame_syn_reply_free(&frame->syn_reply);
free(frame);
return r;
}
if(data_prd != NULL) {
/* TODO If error is not FATAL, we should add RST_STREAM frame to
cancel this stream. */
data_frame = malloc(sizeof(spdylay_frame));
if(data_frame == NULL) {
return SPDYLAY_ERR_NOMEM;
}
spdylay_frame_data_init(&data_frame->data, stream_id, data_prd);
r = spdylay_session_add_frame(session, SPDYLAY_DATA, data_frame);
if(r != 0) {
spdylay_frame_data_free(&data_frame->data);
free(data_frame);
return r;
}
}
return 0;
}
int spdylay_req_submit(spdylay_session *session, const char *path)
{
int r;
@ -702,5 +827,48 @@ int spdylay_req_submit(spdylay_session *session, const char *path)
spdylay_frame_syn_stream_init(&frame->syn_stream,
SPDYLAY_FLAG_FIN, 0, 0, 0, nv);
r = spdylay_session_add_frame(session, SPDYLAY_SYN_STREAM, frame);
assert(r == 0);
return r;
}
ssize_t spdylay_session_pack_data(spdylay_session *session,
uint8_t **buf_ptr, spdylay_data *frame)
{
uint8_t *framebuf;
ssize_t framelen = SPDYLAY_DATA_FRAME_LENGTH;
framebuf = malloc(framelen);
if(framebuf == NULL) {
return SPDYLAY_ERR_NOMEM;
}
framelen = spdylay_session_pack_data_overwrite(session, framebuf, framelen,
frame);
if(framelen < 0) {
free(framebuf);
}
*buf_ptr = framebuf;
return framelen;
}
ssize_t spdylay_session_pack_data_overwrite(spdylay_session *session,
uint8_t *buf, size_t len,
spdylay_data *frame)
{
ssize_t r;
int eof = 0;
uint8_t flags = 0;
r = frame->data_prd.read_callback
(session, buf+8, len-8, &eof, &frame->data_prd.source, session->user_data);
if(r < 0) {
return r;
} else if(len < r) {
return SPDYLAY_ERR_CALLBACK_FAILURE;
}
memset(buf, 0, len);
spdylay_put_uint32be(&buf[0], frame->stream_id);
spdylay_put_uint32be(&buf[4], 8+r);
if(eof) {
flags |= SPDYLAY_FLAG_FIN;
}
buf[4] = flags;
frame->flags = flags;
return r+8;
}

View File

@ -147,4 +147,25 @@ int spdylay_session_on_syn_reply_received(spdylay_session *session,
spdylay_stream* spdylay_session_get_stream(spdylay_session *session,
int32_t stream_id);
/*
* Packs DATA frame |frame| in wire frame format and store it in
* |*buf_ptr|. This function always allocates
* 8+SPDYLAY_DATA_CHUNK_LENGTH bytes. It packs header in first 8
* bytes. Remaining bytes are filled using frame->data_prd. This
* function returns the size of packed frame if it succeeds, or
* negative error code.
*/
ssize_t spdylay_session_pack_data(spdylay_session *session,
uint8_t **buf_ptr, spdylay_data *frame);
/*
* Packs DATA frame |frame| in wire frame format and store it in
* |buf|. |len| must be greater than or equal to 8. This function
* returns the sizeof packed frame if it succeeds, or negative error
* code.
*/
ssize_t spdylay_session_pack_data_overwrite(spdylay_session *session,
uint8_t *buf, size_t len,
spdylay_data *frame);
#endif /* SPDYLAY_SESSION_H */

View File

@ -79,6 +79,7 @@ int main()
test_spdylay_session_send_syn_stream) ||
!CU_add_test(pSuite, "session_send_syn_reply",
test_spdylay_session_send_syn_reply) ||
!CU_add_test(pSuite, "reply_submit", test_spdylay_reply_submit) ||
!CU_add_test(pSuite, "frame_unpack_nv", test_spdylay_frame_unpack_nv) ||
!CU_add_test(pSuite, "frame_count_nv_space",
test_spdylay_frame_count_nv_space)) {

View File

@ -50,6 +50,7 @@ typedef struct {
accumulator *acc;
scripted_data_feed *df;
int valid, invalid;
size_t data_source_length;
} my_user_data;
static void scripted_data_feed_init(scripted_data_feed *df,
@ -114,6 +115,24 @@ static void on_invalid_ctrl_recv_callback(spdylay_session *session,
++ud->invalid;
}
static ssize_t fixed_length_data_source_read_callback
(spdylay_session *session, uint8_t *buf, size_t len, int *eof,
spdylay_data_source *source, void *user_data)
{
my_user_data *ud = (my_user_data*)user_data;
size_t wlen;
if(len < ud->data_source_length) {
wlen = len;
} else {
wlen = ud->data_source_length;
}
ud->data_source_length -= wlen;
if(ud->data_source_length == 0) {
*eof = 1;
}
return wlen;
}
static char** dup_nv(const char **src)
{
int i;
@ -368,3 +387,33 @@ void test_spdylay_session_send_syn_reply()
spdylay_session_del(session);
}
void test_spdylay_reply_submit()
{
spdylay_session *session;
spdylay_session_callbacks callbacks = {
null_send_callback,
NULL,
NULL,
NULL
};
const char *nv[] = { NULL };
spdylay_stream *stream;
int32_t stream_id = 2;
spdylay_data_source source;
spdylay_data_provider data_prd = {
source,
fixed_length_data_source_read_callback
};
my_user_data ud;
ud.data_source_length = 64*1024;
CU_ASSERT(0 == spdylay_session_client_new(&session, &callbacks, &ud));
spdylay_session_open_stream(session, stream_id, SPDYLAY_FLAG_NONE, 3,
SPDYLAY_STREAM_OPENING);
CU_ASSERT(0 == spdylay_reply_submit(session, stream_id, nv, &data_prd));
CU_ASSERT(0 == spdylay_session_send(session));
spdylay_session_del(session);
}

View File

@ -32,5 +32,6 @@ void test_spdylay_session_on_syn_stream_received();
void test_spdylay_session_on_syn_reply_received();
void test_spdylay_session_send_syn_stream();
void test_spdylay_session_send_syn_reply();
void test_spdylay_reply_submit();
#endif // SPDYLAY_SESSION_TEST_H