diff --git a/python/spdylay_tests.py b/python/spdylay_tests.py new file mode 100644 index 00000000..1b63248e --- /dev/null +++ b/python/spdylay_tests.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python + +import unittest +import io +import collections + +import spdylay + +class BufferList: + def __init__(self): + self.buffers = collections.deque() + + def add_buffer(self, bytebuf): + self.buffers.append(bytebuf) + + def get_bytes(self, length): + while self.buffers: + first = self.buffers[0] + data = first.read(length) + if data: + return data + else: + self.buffers.popleft() + return None + +class IOBridge: + def __init__(self, inputs, outputs): + self.inputs = inputs + self.outputs = outputs + +class Streams: + def __init__(self, iob): + self.iob = iob + self.streams = {} + self.recv_frames = [] + self.recv_data = io.BytesIO() + +def recv_cb(session, length): + iob = session.user_data.iob + return iob.inputs.get_bytes(length) + +def send_cb(session, data): + iob = session.user_data.iob + iob.outputs.add_buffer(io.BytesIO(data)) + return len(data) + +def read_cb(session, stream_id, length, source): + return source.read(length) + +def on_data_chunk_recv_cb(session, flags, stream_id, data): + session.user_data.recv_data.write(data) + +def on_ctrl_recv_cb(session, frame): + session.user_data.recv_frames.append(frame) + +class SpdylayTests(unittest.TestCase): + + def setUp(self): + client_output = BufferList() + server_output = BufferList() + + client_iob = IOBridge(server_output, client_output) + server_iob = IOBridge(client_output, server_output) + + self.client_streams = Streams(client_iob) + self.server_streams = Streams(server_iob) + + self.client_session = spdylay.Session(\ + spdylay.CLIENT, + spdylay.PROTO_SPDY3, + user_data=self.client_streams, + recv_cb=recv_cb, + send_cb=send_cb, + on_ctrl_recv_cb=on_ctrl_recv_cb, + on_data_chunk_recv_cb=on_data_chunk_recv_cb) + + self.server_session = spdylay.Session(\ + spdylay.SERVER, + spdylay.PROTO_SPDY3, + user_data=self.server_streams, + recv_cb=recv_cb, + send_cb=send_cb, + on_ctrl_recv_cb=on_ctrl_recv_cb, + on_data_chunk_recv_cb=on_data_chunk_recv_cb) + + def test_submit_request_and_response(self): + data_prd = spdylay.DataProvider(io.BytesIO(b'Hello World'), read_cb) + self.client_session.submit_request(0, [(b':method', b'POST')], + data_prd=data_prd, + stream_user_data=data_prd) + self.client_session.send() + self.server_session.recv() + + self.assertEqual(1, len(self.server_streams.recv_frames)) + frame = self.server_streams.recv_frames[0] + self.assertEqual(spdylay.SYN_STREAM, frame.frame_type) + self.assertEqual(1, frame.stream_id) + self.assertEqual(0, frame.assoc_stream_id) + self.assertEqual(0, frame.pri) + self.assertEqual((b':method', b'POST'), frame.nv[0]) + + self.assertEqual(b'Hello World', + self.server_streams.recv_data.getvalue()) + + self.assertEqual(data_prd, self.client_session.get_stream_user_data(1)) + + data_prd = spdylay.DataProvider(io.BytesIO(b'Foo the bar'), read_cb) + self.server_session.submit_response(1, [(b':status', b'200 OK')], + data_prd=data_prd) + self.server_session.send() + self.client_session.recv() + + self.assertEqual(1, len(self.client_streams.recv_frames)) + frame = self.client_streams.recv_frames[0] + self.assertEqual(spdylay.SYN_REPLY, frame.frame_type) + self.assertEqual(1, frame.stream_id) + self.assertEqual((b':status', b'200 OK'), frame.nv[0]) + + self.assertEqual(b'Foo the bar', + self.client_streams.recv_data.getvalue()) + + def test_submit_syn_stream_and_syn_stream(self): + self.client_session.submit_syn_stream(spdylay.CTRL_FLAG_FIN, 0, 2, + [(b':path', b'/')], None); + self.client_session.send(); + self.server_session.recv(); + + self.assertEqual(1, len(self.server_streams.recv_frames)) + frame = self.server_streams.recv_frames[0] + self.assertEqual(spdylay.SYN_STREAM, frame.frame_type) + self.assertEqual(1, frame.stream_id) + self.assertEqual(0, frame.assoc_stream_id) + self.assertEqual(2, frame.pri) + self.assertEqual((b':path', b'/'), frame.nv[0]) + + self.server_session.submit_syn_reply(spdylay.CTRL_FLAG_FIN, 1, + [(b':version', b'HTTP/1.1')]) + self.server_session.send(); + self.client_session.recv(); + + self.assertEqual(1, len(self.client_streams.recv_frames)) + frame = self.client_streams.recv_frames[0] + self.assertEqual(spdylay.SYN_REPLY, frame.frame_type) + self.assertEqual(1, frame.stream_id) + self.assertEqual((b':version', b'HTTP/1.1'), frame.nv[0]) + + def test_submit_rst_stream(self): + self.client_session.submit_syn_stream(spdylay.CTRL_FLAG_FIN, 0, 2, + [(b':path', b'/')], None); + self.client_session.send(); + self.server_session.recv(); + + self.server_session.submit_rst_stream(1, spdylay.PROTOCOL_ERROR) + self.server_session.send(); + self.client_session.recv(); + + self.assertEqual(1, len(self.client_streams.recv_frames)) + frame = self.client_streams.recv_frames[0] + self.assertEqual(spdylay.RST_STREAM, frame.frame_type) + self.assertEqual(1, frame.stream_id) + self.assertEqual(spdylay.PROTOCOL_ERROR, frame.status_code) + + def test_submit_goaway(self): + self.client_session.submit_goaway(spdylay.GOAWAY_PROTOCOL_ERROR) + self.client_session.send(); + self.server_session.recv(); + + self.assertEqual(1, len(self.server_streams.recv_frames)) + frame = self.server_streams.recv_frames[0] + self.assertEqual(spdylay.GOAWAY, frame.frame_type) + self.assertEqual(spdylay.GOAWAY_PROTOCOL_ERROR, frame.status_code) + + def test_resume_data(self): + with self.assertRaises(spdylay.InvalidArgumentError): + self.client_session.resume_data(1) + + def test_get_pri_lowest(self): + self.assertEqual(7, self.client_session.get_pri_lowest()) + + def test_fail_session(self): + self.client_session.fail_session(spdylay.GOAWAY_PROTOCOL_ERROR) + self.client_session.send(); + self.server_session.recv(); + + self.assertEqual(1, len(self.server_streams.recv_frames)) + frame = self.server_streams.recv_frames[0] + self.assertEqual(spdylay.GOAWAY, frame.frame_type) + self.assertEqual(spdylay.GOAWAY_PROTOCOL_ERROR, frame.status_code) + + self.assertFalse(self.client_session.want_read()) + self.assertFalse(self.client_session.want_write()) + + def test_deferred_data(self): + def deferred_read_cb(session, stream_id, length, source): + return spdylay.ERR_DEFERRED + + data_prd = spdylay.DataProvider(io.BytesIO(b'Hello World'), + deferred_read_cb) + self.client_session.submit_request(0, [(b':method', b'POST')], + data_prd=data_prd, + stream_user_data=data_prd) + self.client_session.send() + self.server_session.recv() + + self.assertEqual(1, len(self.server_streams.recv_frames)) + frame = self.server_streams.recv_frames[0] + self.assertEqual(spdylay.SYN_STREAM, frame.frame_type) + self.assertEqual(1, frame.stream_id) + self.assertEqual(0, frame.assoc_stream_id) + self.assertEqual(0, frame.pri) + self.assertEqual((b':method', b'POST'), frame.nv[0]) + + self.assertEqual(b'', self.server_streams.recv_data.getvalue()) + + data_prd.read_cb = read_cb + + self.client_session.resume_data(1) + + self.client_session.send() + self.server_session.recv() + + self.assertEqual(b'Hello World', + self.server_streams.recv_data.getvalue()) + + def test_recv_cb_eof(self): + def eof_recv_cb(session, length): + raise spdylay.EOFError() + + self.client_session = spdylay.Session(\ + spdylay.CLIENT, + spdylay.PROTO_SPDY3, + user_data=self.client_streams, + recv_cb=eof_recv_cb) + + with self.assertRaises(spdylay.EOFError): + self.client_session.recv() + + def test_recv_cb_callback_failure(self): + def cbfail_recv_cb(session, length): + raise spdylay.CallbackFailureError() + + self.client_session = spdylay.Session(\ + spdylay.CLIENT, + spdylay.PROTO_SPDY3, + user_data=self.client_streams, + recv_cb=cbfail_recv_cb) + + with self.assertRaises(spdylay.CallbackFailureError): + self.client_session.recv() + + def test_send_cb_callback_failure(self): + def cbfail_send_cb(session, data): + raise spdylay.CallbackFailureError() + + self.client_session = spdylay.Session(\ + spdylay.CLIENT, + spdylay.PROTO_SPDY3, + user_data=self.client_streams, + send_cb=cbfail_send_cb) + + self.client_session.submit_goaway(spdylay.GOAWAY_OK) + + with self.assertRaises(spdylay.CallbackFailureError): + self.client_session.send() + +if __name__ == '__main__': + unittest.main()