From b0123448a42eab95ed521e25ca1898bc82de3c7d Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Wed, 1 Apr 2015 22:49:43 +0900 Subject: [PATCH] python: Add async body generation support --- doc/sources/python-apiref.rst | 122 ++++++++++++++++++++++++++--- python/cnghttp2.pxd | 4 + python/nghttp2.pyx | 141 +++++++++++++++++++++------------- 3 files changed, 206 insertions(+), 61 deletions(-) diff --git a/doc/sources/python-apiref.rst b/doc/sources/python-apiref.rst index 2996149e..cfe73a38 100644 --- a/doc/sources/python-apiref.rst +++ b/doc/sources/python-apiref.rst @@ -220,6 +220,10 @@ HTTP/2 servers This is a value of ``:path`` header field. + .. py:attribute:: headers + + Request header fields. + A :py:class:`BaseRequestHandler` has the following methods: .. py:method:: on_headers() @@ -249,10 +253,27 @@ HTTP/2 servers Send response. The *status* is HTTP status code. The *headers* is additional response headers. The *:status* header field will be appended by the library. The *body* is the response body. - It could be ``None`` if response body is empty. Or it must be - instance of either ``str``, ``bytes`` or :py:class:`io.IOBase`. - If instance of ``str`` is specified, it will be encoded using - UTF-8. + It could be ``None`` if response body is empty. Or it must be + instance of either ``str``, ``bytes``, :py:class:`io.IOBase` or + callable, called body generator, which takes one parameter, + size. The body generator generates response body. It can pause + generation of response so that it can wait for slow backend data + generation. When invoked, it should return tuple, byte string + at most size length and flag. The flag is either ``DATA_OK``, + ``DATA_EOF`` or ``DATA_DEFERRED``. For non-empty byte string + and it is not the last chunk of response, ``DATA_OK`` must be + returned as flag. If this is the last chunk of the response + (byte string could be ``None``), ``DATA_EOF`` must be returned + as flag. If there is no data available right now, but + additional data are anticipated, return tuple (``None``, + ``DATA_DEFERRD``). When data arrived, call :py:meth:`resume()` + and restart response body transmission. + + Only the body generator can pause response body generation; + instance of :py:class:`io.IOBase` must not block. + + If instance of ``str`` is specified as *body*, it will be + encoded using UTF-8. The *headers* is a list of tuple of the form ``(name, value)``. The ``name`` and ``value`` can be either byte string @@ -273,10 +294,8 @@ HTTP/2 servers The *status* is HTTP status code. The *headers* is additional response headers. The ``:status`` header field is appended by - the library. The *body* is the response body. It could be - ``None`` if response body is empty. Or it must be instance of - either ``str``, ``bytes`` or ``io.IOBase``. If instance of - ``str`` is specified, it is encoded using UTF-8. + the library. The *body* is the response body. It has the same + semantics of *body* parameter of :py:meth:`send_response()`. The headers and request_headers are a list of tuple of the form ``(name, value)``. The ``name`` and ``value`` can be either byte @@ -288,6 +307,14 @@ HTTP/2 servers Raises the exception if any error occurs. + .. py:method:: resume() + + Signals the restarting of response body transmission paused by + ``DATA_DEFERRED`` from the body generator (see + :py:meth:`send_response()` about the body generator). It is not + an error calling this method while response body transmission is + not paused. + The following example illustrates :py:class:`HTTP2Server` and :py:class:`BaseRequestHandler` usage: @@ -296,6 +323,7 @@ The following example illustrates :py:class:`HTTP2Server` and #!/usr/bin/env python import io, ssl + import nghttp2 class Handler(nghttp2.BaseRequestHandler): @@ -311,9 +339,85 @@ The following example illustrates :py:class:`HTTP2Server` and body=io.BytesIO(b'nghttp2-python FTW')) ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 + ctx.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 ctx.load_cert_chain('server.crt', 'server.key') # give None to ssl to make the server non-SSL/TLS server = nghttp2.HTTP2Server(('127.0.0.1', 8443), Handler, ssl=ctx) server.serve_forever() + +The following example illustrates HTTP/2 server using asynchronous +response body generation. This is simplified reverse proxy: + +.. code-block:: python + + #!/usr/bin/env python + + import ssl + import os + import urllib + import asyncio + import io + + import nghttp2 + + @asyncio.coroutine + def get_http_header(handler, url): + url = urllib.parse.urlsplit(url) + ssl = url.scheme == 'https' + if url.port == None: + if url.scheme == 'https': + port = 443 + else: + port = 80 + else: + port = url.port + + connect = asyncio.open_connection(url.hostname, port, ssl=ssl) + reader, writer = yield from connect + req = 'GET {path} HTTP/1.0\r\n\r\n'.format(path=url.path or '/') + writer.write(req.encode('utf-8')) + # skip response header fields + while True: + line = yield from reader.readline() + line = line.rstrip() + if not line: + break + # read body + while True: + b = yield from reader.read(4096) + if not b: + break + handler.buf.write(b) + writer.close() + handler.buf.seek(0) + handler.eof = True + handler.resume() + + class Body: + def __init__(self, handler): + self.handler = handler + self.handler.eof = False + self.handler.buf = io.BytesIO() + + def generate(self, n): + buf = self.handler.buf + data = buf.read1(n) + if not data and not self.handler.eof: + return None, nghttp2.DATA_DEFERRED + return data, nghttp2.DATA_EOF if self.handler.eof else nghttp2.DATA_OK + + class Handler(nghttp2.BaseRequestHandler): + + def on_headers(self): + body = Body(self) + asyncio.async(get_http_header( + self, 'http://localhost' + self.path.decode('utf-8'))) + self.send_response(status=200, body=body.generate) + + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 + ctx.load_cert_chain('server.crt', 'server.key') + + server = nghttp2.HTTP2Server(('127.0.0.1', 8443), Handler, ssl=ctx) + server.serve_forever() diff --git a/python/cnghttp2.pxd b/python/cnghttp2.pxd index a5d23259..2718304e 100644 --- a/python/cnghttp2.pxd +++ b/python/cnghttp2.pxd @@ -34,6 +34,7 @@ cdef extern from 'nghttp2/nghttp2.h': ctypedef enum nghttp2_error: NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE + NGHTTP2_ERR_DEFERRED ctypedef enum nghttp2_flag: NGHTTP2_FLAG_NONE @@ -282,6 +283,9 @@ cdef extern from 'nghttp2/nghttp2.h': int nghttp2_session_terminate_session(nghttp2_session *session, uint32_t error_code) + int nghttp2_session_resume_data(nghttp2_session *session, + int32_t stream_id) + const char* nghttp2_strerror(int lib_error_code) int nghttp2_hd_deflate_new(nghttp2_hd_deflater **deflater_ptr, diff --git a/python/nghttp2.pyx b/python/nghttp2.pyx index 8a948126..ad1ebb82 100644 --- a/python/nghttp2.pyx +++ b/python/nghttp2.pyx @@ -258,19 +258,35 @@ try: except ImportError: asyncio = None +# body generator flags +DATA_OK = 0 +DATA_EOF = 1 +DATA_DEFERRED = 2 + +class _ByteIOWrapper: + + def __init__(self, b): + self.b = b + + def generate(self, n): + data = self.b.read1(n) + if not data: + return None, DATA_EOF + return data, DATA_OK + def wrap_body(body): if body is None: return body elif isinstance(body, str): - return io.BytesIO(body.encode('utf-8')) + return _ByteIOWrapper(io.BytesIO(body.encode('utf-8'))).generate elif isinstance(body, bytes): - return io.BytesIO(body) + return _ByteIOWrapper(io.BytesIO(body)).generate elif isinstance(body, io.IOBase): - return body + return _ByteIOWrapper(body).generate else: - raise Exception(('body must be None or instance of str or ' - 'bytes or io.IOBase')) - + # assume that callable in the form f(n) returning tuple byte + # string and flag. + return body cdef _get_stream_user_data(cnghttp2.nghttp2_session *session, int32_t stream_id): @@ -488,29 +504,39 @@ cdef int on_stream_close(cnghttp2.nghttp2_session *session, return 0 -cdef ssize_t server_data_source_read(cnghttp2.nghttp2_session *session, - int32_t stream_id, - uint8_t *buf, size_t length, - uint32_t *data_flags, - cnghttp2.nghttp2_data_source *source, - void *user_data): - cdef http2 = <_HTTP2SessionCore>user_data - handler = source.ptr +cdef ssize_t data_source_read(cnghttp2.nghttp2_session *session, + int32_t stream_id, + uint8_t *buf, size_t length, + uint32_t *data_flags, + cnghttp2.nghttp2_data_source *source, + void *user_data): + cdef http2 = <_HTTP2SessionCoreBase>user_data + generator = source.ptr + http2.enter_callback() try: - data = handler.response_body.read(length) + data, flag = generator(length) except: sys.stderr.write(traceback.format_exc()) return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; + finally: + http2.leave_callback() + + if flag == DATA_DEFERRED: + return cnghttp2.NGHTTP2_ERR_DEFERRED if data: nread = len(data) memcpy(buf, data, nread) - return nread + else: + nread = 0 - data_flags[0] = cnghttp2.NGHTTP2_DATA_FLAG_EOF + if flag == DATA_EOF: + data_flags[0] = cnghttp2.NGHTTP2_DATA_FLAG_EOF + elif flag != DATA_OK: + return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - return 0 + return nread cdef int client_on_begin_headers(cnghttp2.nghttp2_session *session, const cnghttp2.nghttp2_frame *frame, @@ -595,36 +621,13 @@ cdef int client_on_frame_send(cnghttp2.nghttp2_session *session, return 0 http2._start_settings_timer() -cdef ssize_t client_data_source_read(cnghttp2.nghttp2_session *session, - int32_t stream_id, - uint8_t *buf, size_t length, - uint32_t *data_flags, - cnghttp2.nghttp2_data_source *source, - void *user_data): - cdef http2 = <_HTTP2ClientSessionCore>user_data - body = source.ptr - - try: - data = body.read(length) - except: - sys.stderr.write(traceback.format_exc()) - return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; - - if data: - nread = len(data) - memcpy(buf, data, nread) - return nread - - data_flags[0] = cnghttp2.NGHTTP2_DATA_FLAG_EOF - - return 0 - cdef class _HTTP2SessionCoreBase: cdef cnghttp2.nghttp2_session *session cdef transport cdef handler_class cdef handlers cdef settings_timer + cdef inside_callback def __cinit__(self, transport, handler_class=None): self.session = NULL @@ -632,6 +635,7 @@ cdef class _HTTP2SessionCoreBase: self.handler_class = handler_class self.handlers = set() self.settings_timer = None + self.inside_callback = False def __dealloc__(self): cnghttp2.nghttp2_session_del(self.session) @@ -668,6 +672,17 @@ cdef class _HTTP2SessionCoreBase: cnghttp2.nghttp2_session_want_write(self.session) == 0: self.transport.close() + def resume(self, stream_id): + cnghttp2.nghttp2_session_resume_data(self.session, stream_id) + if not self.inside_callback: + self.send_data() + + def enter_callback(self): + self.inside_callback = True + + def leave_callback(self): + self.inside_callback = False + def _make_handler(self, stream_id): logging.debug('_make_handler, stream_id:%s', stream_id) handler = self.handler_class(self, stream_id) @@ -815,8 +830,8 @@ cdef class _HTTP2SessionCore(_HTTP2SessionCoreBase): nvlen = _make_nva(&nva, handler.response_headers) if handler.response_body: - prd.source.ptr = handler - prd.read_callback = server_data_source_read + prd.source.ptr = handler.response_body + prd.read_callback = data_source_read prd_ptr = &prd else: prd_ptr = NULL @@ -933,7 +948,7 @@ cdef class _HTTP2ClientSessionCore(_HTTP2SessionCoreBase): if body: prd.source.ptr = body - prd.read_callback = client_data_source_read + prd.read_callback = data_source_read prd_ptr = &prd else: prd_ptr = NULL @@ -1095,8 +1110,26 @@ if asyncio: additional response headers. The :status header field is appended by the library. The body is the response body. It could be None if response body is empty. Or it must be - instance of either str, bytes or io.IOBase. If instance of str - is specified, it is encoded using UTF-8. + instance of either str, bytes, io.IOBase or callable, + called body generator, which takes one parameter, + size. The body generator generates response body. It can + pause generation of response so that it can wait for slow + backend data generation. When invoked, it should return + tuple, byte string and flag. The flag is either DATA_OK, + DATA_EOF and DATA_DEFERRED. For non-empty byte string and + it is not the last chunk of response, DATA_OK is returned + as flag. If this is the last chunk of the response (byte + string is possibly None), DATA_EOF must be returned as + flag. If there is no data available right now, but + additional data are anticipated, return tuple (None, + DATA_DEFERRD). When data arrived, call resume() and + restart response body transmission. + + Only the body generator can pause response body + generation; instance of io.IOBase must not block. + + If instance of str is specified as body, it is encoded + using UTF-8. The headers is a list of tuple of the form (name, value). The name and value can be either unicode string or @@ -1129,11 +1162,9 @@ if asyncio: request_headers parameter). The status is HTTP status code. The headers is additional - response headers. The :status header field is appended by the - library. The body is the response body. It could be None if - response body is empty. Or it must be instance of either str, - bytes or io.IOBase. If instance of str is specified, it is - encoded using UTF-8. + response headers. The :status header field is appended by + the library. The body is the response body. It has the + same semantics of body parameter of send_response(). The headers and request_headers are a list of tuple of the form (name, value). The name and value can be either @@ -1184,6 +1215,9 @@ if asyncio: self.response_body = body + def resume(self): + self.http2.resume(self.stream_id) + def _encode_headers(headers): if not headers: return [] @@ -1430,6 +1464,9 @@ if asyncio: ''' self.http2.push(push_promise, handler) + def resume(self): + self.http2.resume(self.stream_id) + class _HTTP2ClientSession(asyncio.Protocol): def __init__(self, client):