More explicit handling of shutdown status of read and write in each stream.

This commit is contained in:
Tatsuhiro Tsujikawa 2012-01-29 00:08:51 +09:00
parent e7489503b8
commit 971e46f563
5 changed files with 121 additions and 54 deletions

View File

@ -294,6 +294,16 @@ int spdylay_session_close_stream(spdylay_session *session, int32_t stream_id)
}
}
int spdylay_session_close_stream_if_shut_rdwr(spdylay_session *session,
spdylay_stream *stream)
{
if((stream->shut_flags & SPDYLAY_SHUT_RDWR) == SPDYLAY_SHUT_RDWR) {
return spdylay_session_close_stream(session, stream->stream_id);
} else {
return 0;
}
}
/*
* Returns non-zero value if local peer can send SYN_REPLY with stream
* ID |stream_id| at the moment, or 0.
@ -309,7 +319,7 @@ static int spdylay_session_is_reply_allowed(spdylay_session *session,
return 0;
} else {
return stream->state == SPDYLAY_STREAM_OPENING &&
(stream->flags & SPDYLAY_FLAG_UNIDIRECTIONAL) == 0;
(stream->shut_flags & SPDYLAY_SHUT_WR) == 0;
}
}
@ -321,9 +331,10 @@ static int spdylay_session_is_data_allowed(spdylay_session *session,
return 0;
}
if(spdylay_session_is_my_stream_id(session, stream_id)) {
return (stream->flags & SPDYLAY_FLAG_FIN) == 0;
return (stream->shut_flags & SPDYLAY_SHUT_WR) == 0;
} else {
return stream->state == SPDYLAY_STREAM_OPENED;
return stream->state == SPDYLAY_STREAM_OPENED &&
(stream->shut_flags & SPDYLAY_SHUT_WR) == 0;
}
}
@ -456,6 +467,13 @@ static int spdylay_session_after_frame_sent(spdylay_session *session)
spdylay_session_get_stream(session, frame->syn_stream.stream_id);
if(stream) {
stream->state = SPDYLAY_STREAM_OPENING;
if(frame->syn_stream.hd.flags & SPDYLAY_FLAG_FIN) {
spdylay_stream_shutdown(stream, SPDYLAY_SHUT_WR);
}
if(frame->syn_stream.hd.flags & SPDYLAY_FLAG_UNIDIRECTIONAL) {
spdylay_stream_shutdown(stream, SPDYLAY_SHUT_RD);
}
spdylay_session_close_stream_if_shut_rdwr(session, stream);
}
break;
}
@ -464,6 +482,10 @@ static int spdylay_session_after_frame_sent(spdylay_session *session)
spdylay_session_get_stream(session, frame->syn_reply.stream_id);
if(stream) {
stream->state = SPDYLAY_STREAM_OPENED;
if(frame->syn_reply.hd.flags & SPDYLAY_FLAG_FIN) {
spdylay_stream_shutdown(stream, SPDYLAY_SHUT_WR);
}
spdylay_session_close_stream_if_shut_rdwr(session, stream);
}
break;
}
@ -490,18 +512,11 @@ static int spdylay_session_after_frame_sent(spdylay_session *session)
abort();
case SPDYLAY_DATA:
if(frame->data.flags & SPDYLAY_FLAG_FIN) {
if(spdylay_session_is_my_stream_id(session, frame->data.stream_id)) {
/* This is the last frame of request DATA (e.g., POST in HTTP
term), so set FIN bit set in stream. This also happens when
request HEADERS frame is sent with FIN bit set. */
spdylay_stream *stream =
spdylay_session_get_stream(session, frame->data.stream_id);
if(stream) {
stream->flags |= SPDYLAY_FLAG_FIN;
}
} else {
/* We send all data requested by peer, so close the stream. */
spdylay_session_close_stream(session, frame->data.stream_id);
spdylay_stream *stream =
spdylay_session_get_stream(session, frame->data.stream_id);
if(stream) {
spdylay_stream_shutdown(stream, SPDYLAY_SHUT_WR);
spdylay_session_close_stream_if_shut_rdwr(session, stream);
}
}
break;
@ -740,6 +755,19 @@ int spdylay_session_on_syn_stream_received(spdylay_session *session,
frame->syn_stream.hd.flags,
frame->syn_stream.pri,
SPDYLAY_STREAM_OPENING);
if(r == 0) {
spdylay_stream *stream = spdylay_session_get_stream
(session, frame->syn_stream.stream_id);
if(flags & SPDYLAY_FLAG_FIN) {
spdylay_stream_shutdown(stream, SPDYLAY_SHUT_RD);
}
if(flags & SPDYLAY_FLAG_UNIDIRECTIONAL) {
spdylay_stream_shutdown(stream, SPDYLAY_SHUT_WR);
}
/* We don't call spdylay_session_close_stream_if_shut_rdwr()
here because either SPDYLAY_FLAG_FIN or
SPDYLAY_FLAG_UNIDIRECTIONAL is not set here. */
}
}
if(r == 0) {
session->last_recv_stream_id = frame->syn_stream.stream_id;
@ -758,20 +786,20 @@ int spdylay_session_on_syn_reply_received(spdylay_session *session,
{
int r = 0;
int valid = 0;
if(spdylay_session_is_my_stream_id(session, frame->syn_reply.stream_id)) {
spdylay_stream *stream = spdylay_session_get_stream
(session, frame->syn_reply.stream_id);
if(stream && (stream->flags & SPDYLAY_FLAG_UNIDIRECTIONAL) == 0) {
spdylay_stream *stream = spdylay_session_get_stream
(session, frame->syn_reply.stream_id);
if(stream && (stream->shut_flags & SPDYLAY_SHUT_RD) == 0) {
if(spdylay_session_is_my_stream_id(session, frame->syn_reply.stream_id)) {
if(stream->state == SPDYLAY_STREAM_OPENING) {
valid = 1;
stream->state = SPDYLAY_STREAM_OPENED;
spdylay_session_call_on_ctrl_frame_received(session, SPDYLAY_SYN_REPLY,
frame);
if(frame->syn_reply.hd.flags & SPDYLAY_FLAG_FIN) {
/* This is the last frame of this stream, so close the
stream. This also happens when DATA and HEADERS frame
with FIN bit set. */
spdylay_session_close_stream(session, frame->syn_reply.stream_id);
/* This is the last frame of this stream, so disallow
further receptions. */
spdylay_stream_shutdown(stream, SPDYLAY_SHUT_RD);
spdylay_session_close_stream_if_shut_rdwr(session, stream);
}
} else if(stream->state == SPDYLAY_STREAM_CLOSING) {
/* This is race condition. SPDYLAY_STREAM_CLOSING indicates
@ -853,18 +881,18 @@ int spdylay_session_on_headers_received(spdylay_session *session,
{
int r = 0;
int valid = 0;
if(spdylay_session_is_my_stream_id(session, frame->headers.stream_id)) {
spdylay_stream *stream = spdylay_session_get_stream
(session, frame->headers.stream_id);
if(stream) {
if(stream->state == SPDYLAY_STREAM_OPENED) {
spdylay_stream *stream = spdylay_session_get_stream
(session, frame->headers.stream_id);
/* First we check readability from this stream. */
if(stream && (stream->shut_flags & SPDYLAY_SHUT_RD) == 0) {
if(spdylay_session_is_my_stream_id(session, frame->headers.stream_id)) {
if(stream && stream->state == SPDYLAY_STREAM_OPENED) {
valid = 1;
spdylay_session_call_on_ctrl_frame_received(session, SPDYLAY_HEADERS,
frame);
if(frame->headers.hd.flags & SPDYLAY_FLAG_FIN) {
/* Close the stream because this is the last frame of the
stream. */
spdylay_session_close_stream(session, frame->headers.stream_id);
spdylay_stream_shutdown(stream, SPDYLAY_SHUT_RD);
spdylay_session_close_stream_if_shut_rdwr(session, stream);
}
} else if(stream->state == SPDYLAY_STREAM_CLOSING) {
/* This is race condition. SPDYLAY_STREAM_CLOSING indicates
@ -872,21 +900,15 @@ int spdylay_session_on_headers_received(spdylay_session *session,
eventually sent, so we just ignore this frame. */
valid = 1;
}
}
} else {
spdylay_stream *stream = spdylay_session_get_stream
(session, frame->headers.stream_id);
if(stream) {
if((stream->flags & SPDYLAY_FLAG_FIN) == 0) {
valid = 1;
spdylay_session_call_on_ctrl_frame_received(session, SPDYLAY_HEADERS,
frame);
if(frame->headers.hd.flags & SPDYLAY_FLAG_FIN) {
stream->flags |= SPDYLAY_FLAG_FIN;
}
if(stream->flags & SPDYLAY_FLAG_UNIDIRECTIONAL) {
spdylay_session_close_stream(session, frame->headers.stream_id);
}
} else {
/* If this is remote peer initiated stream, it is OK unless it
have sent FIN frame already. */
valid = 1;
spdylay_session_call_on_ctrl_frame_received(session, SPDYLAY_HEADERS,
frame);
if(frame->headers.hd.flags & SPDYLAY_FLAG_FIN) {
spdylay_stream_shutdown(stream, SPDYLAY_SHUT_RD);
spdylay_session_close_stream_if_shut_rdwr(session, stream);
}
}
}
@ -1005,16 +1027,20 @@ static int spdylay_session_process_data_frame(spdylay_session *session)
SPDYLAY_LENGTH_MASK;
stream = spdylay_session_get_stream(session, stream_id);
if(stream) {
if(spdylay_session_is_my_stream_id(session, stream_id)) {
if(stream->state == SPDYLAY_STREAM_OPENED) {
if((stream->shut_flags & SPDYLAY_SHUT_RD) == 0) {
if(spdylay_session_is_my_stream_id(session, stream_id)) {
if(stream->state == SPDYLAY_STREAM_OPENED) {
valid = 1;
} else if(stream->state != SPDYLAY_STREAM_CLOSING) {
status_code = SPDYLAY_PROTOCOL_ERROR;
}
} else {
/* It is OK if this is remote peer initiated stream and we did
not receive FIN. */
valid = 1;
} else if(stream->state != SPDYLAY_STREAM_CLOSING) {
status_code = SPDYLAY_PROTOCOL_ERROR;
}
} else if(stream->flags & SPDYLAY_FLAG_FIN) {
status_code = SPDYLAY_PROTOCOL_ERROR;
} else {
valid = 1;
status_code = SPDYLAY_PROTOCOL_ERROR;
}
} else {
status_code = SPDYLAY_INVALID_STREAM;

View File

@ -155,6 +155,16 @@ int spdylay_session_open_stream(spdylay_session *session, int32_t stream_id,
*/
int spdylay_session_close_stream(spdylay_session *session, int32_t stream_id);
/*
* If further receptions and transmissions over this stream are
* disallowed, close this stream. This function returns 0 if it
* succeeds, or negative error code. If either receptions or
* transmissions is allowed, this function returns 0 and the stream
* will not be closed.
*/
int spdylay_session_close_stream_if_shut_rdwr(spdylay_session *session,
spdylay_stream *stream);
/*
* Called when SYN_STREAM is received. Received frame is |frame|.
* This function does first validate received frame and then open

View File

@ -32,7 +32,13 @@ void spdylay_stream_init(spdylay_stream *stream, int32_t stream_id,
stream->flags = flags;
stream->pri = pri;
stream->state = initial_state;
stream->shut_flags = SPDYLAY_SHUT_NONE;
}
void spdylay_stream_free(spdylay_stream *stream)
{}
void spdylay_stream_shutdown(spdylay_stream *stream, spdylay_shut_flag flag)
{
stream->shut_flags |= flag;
}

View File

@ -35,12 +35,15 @@
* If local peer is stream initiator:
* SPDYLAY_STREAM_OPENING : upon sending SYN_STREAM
* SPDYLAY_STREAM_OPENED : upon receiving SYN_REPLY
* SPDYLAY_STREAM_CLOSING : upon queuing RST_STREAM
*
* If remote peer is stream initiator:
* SPDYLAY_STREAM_OPENING : upon receiving SYN_STREAM
* SPDYLAY_STREAM_OPENED : upon sending SYN_REPLY
* SPDYLAY_STREAM_CLOSING : upon queuing RST_STREAM
*/
typedef enum {
/* Initial state */
SPDYLAY_STREAM_INITIAL,
/* For stream initiator: SYN_STREAM has been sent, but SYN_REPLY is
not received yet. For receiver: SYN_STREAM has been received,
@ -54,6 +57,17 @@ typedef enum {
SPDYLAY_STREAM_CLOSING
} spdylay_stream_state;
typedef enum {
SPDYLAY_SHUT_NONE = 0,
/* Indicates further receptions will be disallowed. */
SPDYLAY_SHUT_RD = 0x01,
/* Indicates further transmissions will be disallowed. */
SPDYLAY_SHUT_WR = 0x02,
/* Indicates both further receptions and transmissions will be
disallowed. */
SPDYLAY_SHUT_RDWR = SPDYLAY_SHUT_RD | SPDYLAY_SHUT_WR
} spdylay_shut_flag;
typedef struct {
int32_t stream_id;
spdylay_stream_state state;
@ -61,6 +75,8 @@ typedef struct {
uint8_t flags;
/* Use same scheme in SYN_STREAM frame */
uint8_t pri;
/* Bitwise OR of zero or more spdylay_shut_flag values */
uint8_t shut_flags;
} spdylay_stream;
void spdylay_stream_init(spdylay_stream *stream, int32_t stream_id,
@ -69,4 +85,10 @@ void spdylay_stream_init(spdylay_stream *stream, int32_t stream_id,
void spdylay_stream_free(spdylay_stream *stream);
/*
* Disallow either further receptions or transmissions, or both.
* |flag| is bitwise OR of one or more of spdylay_shut_flag.
*/
void spdylay_stream_shutdown(spdylay_stream *stream, spdylay_shut_flag flag);
#endif /* SPDYLAY_STREAM */

View File

@ -503,6 +503,8 @@ void test_spdylay_session_on_headers_received()
spdylay_session_client_new(&session, &callbacks, &user_data);
spdylay_session_open_stream(session, 1, SPDYLAY_FLAG_NONE, 0,
SPDYLAY_STREAM_OPENED);
spdylay_stream_shutdown(spdylay_session_get_stream(session, 1),
SPDYLAY_SHUT_WR);
spdylay_frame_headers_init(&frame.headers, SPDYLAY_FLAG_NONE, 1,
dup_nv(nv));
@ -531,7 +533,8 @@ void test_spdylay_session_on_headers_received()
CU_ASSERT(3 == user_data.valid);
CU_ASSERT(SPDYLAY_STREAM_OPENING ==
spdylay_session_get_stream(session, 2)->state);
CU_ASSERT(spdylay_session_get_stream(session, 2)->flags & SPDYLAY_FLAG_FIN);
CU_ASSERT(spdylay_session_get_stream(session, 2)->shut_flags &
SPDYLAY_SHUT_RD);
CU_ASSERT(0 == spdylay_session_on_headers_received(session, &frame));
CU_ASSERT(2 == user_data.invalid);