python: Add async body generation support

This commit is contained in:
Tatsuhiro Tsujikawa 2015-04-01 22:49:43 +09:00
parent 2d15dca096
commit b0123448a4
3 changed files with 206 additions and 61 deletions

View File

@ -220,6 +220,10 @@ HTTP/2 servers
This is a value of ``:path`` header field. This is a value of ``:path`` header field.
.. py:attribute:: headers
Request header fields.
A :py:class:`BaseRequestHandler` has the following methods: A :py:class:`BaseRequestHandler` has the following methods:
.. py:method:: on_headers() .. py:method:: on_headers()
@ -250,9 +254,26 @@ HTTP/2 servers
is additional response headers. The *:status* header field will is additional response headers. The *:status* header field will
be appended by the library. The *body* is the response body. be appended by the library. The *body* is the response body.
It could be ``None`` if response body is empty. Or it must be It could be ``None`` if response body is empty. Or it must be
instance of either ``str``, ``bytes`` or :py:class:`io.IOBase`. instance of either ``str``, ``bytes``, :py:class:`io.IOBase` or
If instance of ``str`` is specified, it will be encoded using callable, called body generator, which takes one parameter,
UTF-8. size. The body generator generates response body. It can pause
generation of response so that it can wait for slow backend data
generation. When invoked, it should return tuple, byte string
at most size length and flag. The flag is either ``DATA_OK``,
``DATA_EOF`` or ``DATA_DEFERRED``. For non-empty byte string
and it is not the last chunk of response, ``DATA_OK`` must be
returned as flag. If this is the last chunk of the response
(byte string could be ``None``), ``DATA_EOF`` must be returned
as flag. If there is no data available right now, but
additional data are anticipated, return tuple (``None``,
``DATA_DEFERRD``). When data arrived, call :py:meth:`resume()`
and restart response body transmission.
Only the body generator can pause response body generation;
instance of :py:class:`io.IOBase` must not block.
If instance of ``str`` is specified as *body*, it will be
encoded using UTF-8.
The *headers* is a list of tuple of the form ``(name, The *headers* is a list of tuple of the form ``(name,
value)``. The ``name`` and ``value`` can be either byte string value)``. The ``name`` and ``value`` can be either byte string
@ -273,10 +294,8 @@ HTTP/2 servers
The *status* is HTTP status code. The *headers* is additional The *status* is HTTP status code. The *headers* is additional
response headers. The ``:status`` header field is appended by response headers. The ``:status`` header field is appended by
the library. The *body* is the response body. It could be the library. The *body* is the response body. It has the same
``None`` if response body is empty. Or it must be instance of semantics of *body* parameter of :py:meth:`send_response()`.
either ``str``, ``bytes`` or ``io.IOBase``. If instance of
``str`` is specified, it is encoded using UTF-8.
The headers and request_headers are a list of tuple of the form The headers and request_headers are a list of tuple of the form
``(name, value)``. The ``name`` and ``value`` can be either byte ``(name, value)``. The ``name`` and ``value`` can be either byte
@ -288,6 +307,14 @@ HTTP/2 servers
Raises the exception if any error occurs. Raises the exception if any error occurs.
.. py:method:: resume()
Signals the restarting of response body transmission paused by
``DATA_DEFERRED`` from the body generator (see
:py:meth:`send_response()` about the body generator). It is not
an error calling this method while response body transmission is
not paused.
The following example illustrates :py:class:`HTTP2Server` and The following example illustrates :py:class:`HTTP2Server` and
:py:class:`BaseRequestHandler` usage: :py:class:`BaseRequestHandler` usage:
@ -296,6 +323,7 @@ The following example illustrates :py:class:`HTTP2Server` and
#!/usr/bin/env python #!/usr/bin/env python
import io, ssl import io, ssl
import nghttp2 import nghttp2
class Handler(nghttp2.BaseRequestHandler): class Handler(nghttp2.BaseRequestHandler):
@ -311,9 +339,85 @@ The following example illustrates :py:class:`HTTP2Server` and
body=io.BytesIO(b'nghttp2-python FTW')) body=io.BytesIO(b'nghttp2-python FTW'))
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 ctx.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
ctx.load_cert_chain('server.crt', 'server.key') ctx.load_cert_chain('server.crt', 'server.key')
# give None to ssl to make the server non-SSL/TLS # give None to ssl to make the server non-SSL/TLS
server = nghttp2.HTTP2Server(('127.0.0.1', 8443), Handler, ssl=ctx) server = nghttp2.HTTP2Server(('127.0.0.1', 8443), Handler, ssl=ctx)
server.serve_forever() server.serve_forever()
The following example illustrates HTTP/2 server using asynchronous
response body generation. This is simplified reverse proxy:
.. code-block:: python
#!/usr/bin/env python
import ssl
import os
import urllib
import asyncio
import io
import nghttp2
@asyncio.coroutine
def get_http_header(handler, url):
url = urllib.parse.urlsplit(url)
ssl = url.scheme == 'https'
if url.port == None:
if url.scheme == 'https':
port = 443
else:
port = 80
else:
port = url.port
connect = asyncio.open_connection(url.hostname, port, ssl=ssl)
reader, writer = yield from connect
req = 'GET {path} HTTP/1.0\r\n\r\n'.format(path=url.path or '/')
writer.write(req.encode('utf-8'))
# skip response header fields
while True:
line = yield from reader.readline()
line = line.rstrip()
if not line:
break
# read body
while True:
b = yield from reader.read(4096)
if not b:
break
handler.buf.write(b)
writer.close()
handler.buf.seek(0)
handler.eof = True
handler.resume()
class Body:
def __init__(self, handler):
self.handler = handler
self.handler.eof = False
self.handler.buf = io.BytesIO()
def generate(self, n):
buf = self.handler.buf
data = buf.read1(n)
if not data and not self.handler.eof:
return None, nghttp2.DATA_DEFERRED
return data, nghttp2.DATA_EOF if self.handler.eof else nghttp2.DATA_OK
class Handler(nghttp2.BaseRequestHandler):
def on_headers(self):
body = Body(self)
asyncio.async(get_http_header(
self, 'http://localhost' + self.path.decode('utf-8')))
self.send_response(status=200, body=body.generate)
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
ctx.load_cert_chain('server.crt', 'server.key')
server = nghttp2.HTTP2Server(('127.0.0.1', 8443), Handler, ssl=ctx)
server.serve_forever()

View File

@ -34,6 +34,7 @@ cdef extern from 'nghttp2/nghttp2.h':
ctypedef enum nghttp2_error: ctypedef enum nghttp2_error:
NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
NGHTTP2_ERR_DEFERRED
ctypedef enum nghttp2_flag: ctypedef enum nghttp2_flag:
NGHTTP2_FLAG_NONE NGHTTP2_FLAG_NONE
@ -282,6 +283,9 @@ cdef extern from 'nghttp2/nghttp2.h':
int nghttp2_session_terminate_session(nghttp2_session *session, int nghttp2_session_terminate_session(nghttp2_session *session,
uint32_t error_code) uint32_t error_code)
int nghttp2_session_resume_data(nghttp2_session *session,
int32_t stream_id)
const char* nghttp2_strerror(int lib_error_code) const char* nghttp2_strerror(int lib_error_code)
int nghttp2_hd_deflate_new(nghttp2_hd_deflater **deflater_ptr, int nghttp2_hd_deflate_new(nghttp2_hd_deflater **deflater_ptr,

View File

@ -258,19 +258,35 @@ try:
except ImportError: except ImportError:
asyncio = None asyncio = None
# body generator flags
DATA_OK = 0
DATA_EOF = 1
DATA_DEFERRED = 2
class _ByteIOWrapper:
def __init__(self, b):
self.b = b
def generate(self, n):
data = self.b.read1(n)
if not data:
return None, DATA_EOF
return data, DATA_OK
def wrap_body(body): def wrap_body(body):
if body is None: if body is None:
return body return body
elif isinstance(body, str): elif isinstance(body, str):
return io.BytesIO(body.encode('utf-8')) return _ByteIOWrapper(io.BytesIO(body.encode('utf-8'))).generate
elif isinstance(body, bytes): elif isinstance(body, bytes):
return io.BytesIO(body) return _ByteIOWrapper(io.BytesIO(body)).generate
elif isinstance(body, io.IOBase): elif isinstance(body, io.IOBase):
return body return _ByteIOWrapper(body).generate
else: else:
raise Exception(('body must be None or instance of str or ' # assume that callable in the form f(n) returning tuple byte
'bytes or io.IOBase')) # string and flag.
return body
cdef _get_stream_user_data(cnghttp2.nghttp2_session *session, cdef _get_stream_user_data(cnghttp2.nghttp2_session *session,
int32_t stream_id): int32_t stream_id):
@ -488,29 +504,39 @@ cdef int on_stream_close(cnghttp2.nghttp2_session *session,
return 0 return 0
cdef ssize_t server_data_source_read(cnghttp2.nghttp2_session *session, cdef ssize_t data_source_read(cnghttp2.nghttp2_session *session,
int32_t stream_id, int32_t stream_id,
uint8_t *buf, size_t length, uint8_t *buf, size_t length,
uint32_t *data_flags, uint32_t *data_flags,
cnghttp2.nghttp2_data_source *source, cnghttp2.nghttp2_data_source *source,
void *user_data): void *user_data):
cdef http2 = <_HTTP2SessionCore>user_data cdef http2 = <_HTTP2SessionCoreBase>user_data
handler = <object>source.ptr generator = <object>source.ptr
http2.enter_callback()
try: try:
data = handler.response_body.read(length) data, flag = generator(length)
except: except:
sys.stderr.write(traceback.format_exc()) sys.stderr.write(traceback.format_exc())
return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
finally:
http2.leave_callback()
if flag == DATA_DEFERRED:
return cnghttp2.NGHTTP2_ERR_DEFERRED
if data: if data:
nread = len(data) nread = len(data)
memcpy(buf, <uint8_t*>data, nread) memcpy(buf, <uint8_t*>data, nread)
return nread else:
nread = 0
if flag == DATA_EOF:
data_flags[0] = cnghttp2.NGHTTP2_DATA_FLAG_EOF data_flags[0] = cnghttp2.NGHTTP2_DATA_FLAG_EOF
elif flag != DATA_OK:
return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
return 0 return nread
cdef int client_on_begin_headers(cnghttp2.nghttp2_session *session, cdef int client_on_begin_headers(cnghttp2.nghttp2_session *session,
const cnghttp2.nghttp2_frame *frame, const cnghttp2.nghttp2_frame *frame,
@ -595,36 +621,13 @@ cdef int client_on_frame_send(cnghttp2.nghttp2_session *session,
return 0 return 0
http2._start_settings_timer() http2._start_settings_timer()
cdef ssize_t client_data_source_read(cnghttp2.nghttp2_session *session,
int32_t stream_id,
uint8_t *buf, size_t length,
uint32_t *data_flags,
cnghttp2.nghttp2_data_source *source,
void *user_data):
cdef http2 = <_HTTP2ClientSessionCore>user_data
body = <object>source.ptr
try:
data = body.read(length)
except:
sys.stderr.write(traceback.format_exc())
return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
if data:
nread = len(data)
memcpy(buf, <uint8_t*>data, nread)
return nread
data_flags[0] = cnghttp2.NGHTTP2_DATA_FLAG_EOF
return 0
cdef class _HTTP2SessionCoreBase: cdef class _HTTP2SessionCoreBase:
cdef cnghttp2.nghttp2_session *session cdef cnghttp2.nghttp2_session *session
cdef transport cdef transport
cdef handler_class cdef handler_class
cdef handlers cdef handlers
cdef settings_timer cdef settings_timer
cdef inside_callback
def __cinit__(self, transport, handler_class=None): def __cinit__(self, transport, handler_class=None):
self.session = NULL self.session = NULL
@ -632,6 +635,7 @@ cdef class _HTTP2SessionCoreBase:
self.handler_class = handler_class self.handler_class = handler_class
self.handlers = set() self.handlers = set()
self.settings_timer = None self.settings_timer = None
self.inside_callback = False
def __dealloc__(self): def __dealloc__(self):
cnghttp2.nghttp2_session_del(self.session) cnghttp2.nghttp2_session_del(self.session)
@ -668,6 +672,17 @@ cdef class _HTTP2SessionCoreBase:
cnghttp2.nghttp2_session_want_write(self.session) == 0: cnghttp2.nghttp2_session_want_write(self.session) == 0:
self.transport.close() self.transport.close()
def resume(self, stream_id):
cnghttp2.nghttp2_session_resume_data(self.session, stream_id)
if not self.inside_callback:
self.send_data()
def enter_callback(self):
self.inside_callback = True
def leave_callback(self):
self.inside_callback = False
def _make_handler(self, stream_id): def _make_handler(self, stream_id):
logging.debug('_make_handler, stream_id:%s', stream_id) logging.debug('_make_handler, stream_id:%s', stream_id)
handler = self.handler_class(self, stream_id) handler = self.handler_class(self, stream_id)
@ -815,8 +830,8 @@ cdef class _HTTP2SessionCore(_HTTP2SessionCoreBase):
nvlen = _make_nva(&nva, handler.response_headers) nvlen = _make_nva(&nva, handler.response_headers)
if handler.response_body: if handler.response_body:
prd.source.ptr = <void*>handler prd.source.ptr = <void*>handler.response_body
prd.read_callback = server_data_source_read prd.read_callback = data_source_read
prd_ptr = &prd prd_ptr = &prd
else: else:
prd_ptr = NULL prd_ptr = NULL
@ -933,7 +948,7 @@ cdef class _HTTP2ClientSessionCore(_HTTP2SessionCoreBase):
if body: if body:
prd.source.ptr = <void*>body prd.source.ptr = <void*>body
prd.read_callback = client_data_source_read prd.read_callback = data_source_read
prd_ptr = &prd prd_ptr = &prd
else: else:
prd_ptr = NULL prd_ptr = NULL
@ -1095,8 +1110,26 @@ if asyncio:
additional response headers. The :status header field is additional response headers. The :status header field is
appended by the library. The body is the response body. It appended by the library. The body is the response body. It
could be None if response body is empty. Or it must be could be None if response body is empty. Or it must be
instance of either str, bytes or io.IOBase. If instance of str instance of either str, bytes, io.IOBase or callable,
is specified, it is encoded using UTF-8. called body generator, which takes one parameter,
size. The body generator generates response body. It can
pause generation of response so that it can wait for slow
backend data generation. When invoked, it should return
tuple, byte string and flag. The flag is either DATA_OK,
DATA_EOF and DATA_DEFERRED. For non-empty byte string and
it is not the last chunk of response, DATA_OK is returned
as flag. If this is the last chunk of the response (byte
string is possibly None), DATA_EOF must be returned as
flag. If there is no data available right now, but
additional data are anticipated, return tuple (None,
DATA_DEFERRD). When data arrived, call resume() and
restart response body transmission.
Only the body generator can pause response body
generation; instance of io.IOBase must not block.
If instance of str is specified as body, it is encoded
using UTF-8.
The headers is a list of tuple of the form (name, The headers is a list of tuple of the form (name,
value). The name and value can be either unicode string or value). The name and value can be either unicode string or
@ -1129,11 +1162,9 @@ if asyncio:
request_headers parameter). request_headers parameter).
The status is HTTP status code. The headers is additional The status is HTTP status code. The headers is additional
response headers. The :status header field is appended by the response headers. The :status header field is appended by
library. The body is the response body. It could be None if the library. The body is the response body. It has the
response body is empty. Or it must be instance of either str, same semantics of body parameter of send_response().
bytes or io.IOBase. If instance of str is specified, it is
encoded using UTF-8.
The headers and request_headers are a list of tuple of the The headers and request_headers are a list of tuple of the
form (name, value). The name and value can be either form (name, value). The name and value can be either
@ -1184,6 +1215,9 @@ if asyncio:
self.response_body = body self.response_body = body
def resume(self):
self.http2.resume(self.stream_id)
def _encode_headers(headers): def _encode_headers(headers):
if not headers: if not headers:
return [] return []
@ -1430,6 +1464,9 @@ if asyncio:
''' '''
self.http2.push(push_promise, handler) self.http2.push(push_promise, handler)
def resume(self):
self.http2.resume(self.stream_id)
class _HTTP2ClientSession(asyncio.Protocol): class _HTTP2ClientSession(asyncio.Protocol):
def __init__(self, client): def __init__(self, client):