diff --git a/python/cnghttp2.pxd b/python/cnghttp2.pxd index c57c788f..8f9431db 100644 --- a/python/cnghttp2.pxd +++ b/python/cnghttp2.pxd @@ -24,11 +24,215 @@ from libc.stdint cimport uint8_t, uint16_t, uint32_t, int32_t cdef extern from 'nghttp2/nghttp2.h': + const char NGHTTP2_PROTO_VERSION_ID[] + const char NGHTTP2_CLIENT_CONNECTION_HEADER[] + const size_t NGHTTP2_INITIAL_WINDOW_SIZE + + ctypedef struct nghttp2_session: + pass + + ctypedef enum nghttp2_error: + NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE + + ctypedef enum nghttp2_flag: + NGHTTP2_FLAG_NONE + NGHTTP2_FLAG_END_STREAM + NGHTTP2_FLAG_ACK + + ctypedef enum nghttp2_error_code: + NGHTTP2_NO_ERROR + NGHTTP2_PROTOCOL_ERROR + NGHTTP2_INTERNAL_ERROR + NGHTTP2_SETTINGS_TIMEOUT + + ctypedef enum nghttp2_frame_type: + NGHTTP2_DATA + NGHTTP2_HEADERS + NGHTTP2_RST_STREAM + NGHTTP2_SETTINGS + NGHTTP2_PUSH_PROMISE + NGHTTP2_GOAWAY + ctypedef struct nghttp2_nv: - uint8_t *name - uint8_t *value - uint16_t namelen - uint16_t valuelen + uint8_t *name + uint8_t *value + uint16_t namelen + uint16_t valuelen + + ctypedef enum nghttp2_settings_id: + SETTINGS_HEADER_TABLE_SIZE + NGHTTP2_SETTINGS_HEADER_TABLE_SIZE + NGHTTP2_SETTINGS_ENABLE_PUSH + NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS + NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE + + ctypedef struct nghttp2_settings_entry: + int32_t settings_id + uint32_t value + + ctypedef struct nghttp2_frame_hd: + size_t length + int32_t stream_id + uint8_t type + uint8_t flags + + ctypedef struct nghttp2_data: + nghttp2_frame_hd hd + size_t padlen + + ctypedef enum nghttp2_headers_category: + NGHTTP2_HCAT_REQUEST + NGHTTP2_HCAT_RESPONSE + NGHTTP2_HCAT_PUSH_RESPONSE + NGHTTP2_HCAT_HEADERS + + ctypedef struct nghttp2_headers: + nghttp2_frame_hd hd + size_t padlen + nghttp2_nv *nva + size_t nvlen + nghttp2_headers_category cat + int32_t pri + + ctypedef struct nghttp2_rst_stream: + nghttp2_frame_hd hd + nghttp2_error_code error_code + + + ctypedef struct nghttp2_push_promise: + nghttp2_frame_hd hd + nghttp2_nv *nva + size_t nvlen + int32_t promised_stream_id + + ctypedef struct nghttp2_goaway: + nghttp2_frame_hd hd + int32_t last_stream_id + nghttp2_error_code error_code + uint8_t *opaque_data + size_t opaque_data_len + + ctypedef union nghttp2_frame: + nghttp2_frame_hd hd + nghttp2_data data + nghttp2_headers headers + nghttp2_rst_stream rst_stream + nghttp2_push_promise push_promise + nghttp2_goaway goaway + + ctypedef ssize_t (*nghttp2_send_callback)\ + (nghttp2_session *session, const uint8_t *data, size_t length, + int flags, void *user_data) + + ctypedef int (*nghttp2_on_frame_recv_callback)\ + (nghttp2_session *session, const nghttp2_frame *frame, void *user_data) + + ctypedef int (*nghttp2_on_data_chunk_recv_callback)\ + (nghttp2_session *session, uint8_t flags, int32_t stream_id, + const uint8_t *data, size_t length, void *user_data) + + ctypedef int (*nghttp2_before_frame_send_callback)\ + (nghttp2_session *session, const nghttp2_frame *frame, void *user_data) + + ctypedef int (*nghttp2_on_stream_close_callback)\ + (nghttp2_session *session, int32_t stream_id, + nghttp2_error_code error_code, void *user_data) + + ctypedef int (*nghttp2_on_begin_headers_callback)\ + (nghttp2_session *session, const nghttp2_frame *frame, void *user_data) + + ctypedef int (*nghttp2_on_header_callback)\ + (nghttp2_session *session, + const nghttp2_frame *frame, + const uint8_t *name, size_t namelen, + const uint8_t *value, size_t valuelen, + void *user_data) + + ctypedef int (*nghttp2_on_frame_send_callback)\ + (nghttp2_session *session, const nghttp2_frame *frame, void *user_data) + + ctypedef int (*nghttp2_on_frame_not_send_callback)\ + (nghttp2_session *session, const nghttp2_frame *frame, + int lib_error_code, void *user_data) + + ctypedef struct nghttp2_session_callbacks: + nghttp2_send_callback send_callback + nghttp2_on_frame_recv_callback on_frame_recv_callback + nghttp2_on_data_chunk_recv_callback on_data_chunk_recv_callback + nghttp2_before_frame_send_callback before_frame_send_callback + nghttp2_on_frame_send_callback on_frame_send_callback + nghttp2_on_frame_not_send_callback on_frame_not_send_callback + nghttp2_on_stream_close_callback on_stream_close_callback + nghttp2_on_begin_headers_callback on_begin_headers_callback + nghttp2_on_header_callback on_header_callback + + int nghttp2_session_client_new(nghttp2_session **session_ptr, + const nghttp2_session_callbacks *callbacks, + void *user_data) + + int nghttp2_session_server_new(nghttp2_session **session_ptr, + const nghttp2_session_callbacks *callbacks, + void *user_data) + + void nghttp2_session_del(nghttp2_session *session) + + + ssize_t nghttp2_session_mem_recv(nghttp2_session *session, + const uint8_t *data, size_t datalen) + + ssize_t nghttp2_session_mem_send(nghttp2_session *session, + const uint8_t **data_ptr) + + int nghttp2_session_send(nghttp2_session *session) + + int nghttp2_session_want_read(nghttp2_session *session) + + int nghttp2_session_want_write(nghttp2_session *session) + + ctypedef union nghttp2_data_source: + int fd + void *ptr + + ctypedef ssize_t (*nghttp2_data_source_read_callback)\ + (nghttp2_session *session, int32_t stream_id, + uint8_t *buf, size_t length, int *eof, + nghttp2_data_source *source, void *user_data) + + ctypedef struct nghttp2_data_provider: + nghttp2_data_source source + nghttp2_data_source_read_callback read_callback + + int nghttp2_submit_request(nghttp2_session *session, int32_t pri, + const nghttp2_nv *nva, size_t nvlen, + const nghttp2_data_provider *data_prd, + void *stream_user_data) + + int nghttp2_submit_response(nghttp2_session *session, + int32_t stream_id, + const nghttp2_nv *nva, size_t nvlen, + const nghttp2_data_provider *data_prd) + + int nghttp2_submit_push_promise(nghttp2_session *session, uint8_t flags, + int32_t stream_id, + const nghttp2_nv *nva, size_t nvlen, + void *stream_user_data) + + int nghttp2_submit_settings(nghttp2_session *session, uint8_t flags, + const nghttp2_settings_entry *iv, size_t niv) + + int nghttp2_submit_rst_stream(nghttp2_session *session, uint8_t flags, + int32_t stream_id, + nghttp2_error_code error_code) + + void* nghttp2_session_get_stream_user_data(nghttp2_session *session, + uint32_t stream_id) + + int nghttp2_session_set_stream_user_data(nghttp2_session *session, + uint32_t stream_id, + void *stream_user_data) + + int nghttp2_session_terminate_session(nghttp2_session *session, + nghttp2_error_code error_code) const char* nghttp2_strerror(int lib_error_code) diff --git a/python/nghttp2.pyx b/python/nghttp2.pyx index 6f86a832..404f4c97 100644 --- a/python/nghttp2.pyx +++ b/python/nghttp2.pyx @@ -239,3 +239,719 @@ def print_hd_table(hdtable): 'y' if entry.ref else 'n', entry.name.decode('utf-8'), entry.value.decode('utf-8'))) + +try: + import socket + import io + import asyncio + import traceback + import sys + import email.utils + import datetime + import time +except ImportError: + pass + +cdef _get_stream_user_data(cnghttp2.nghttp2_session *session, + int32_t stream_id): + cdef void *stream_user_data + + stream_user_data = cnghttp2.nghttp2_session_get_stream_user_data\ + (session, stream_id) + if stream_user_data == NULL: + return None + + return stream_user_data + +cdef size_t _make_nva(cnghttp2.nghttp2_nv **nva_ptr, headers): + cdef cnghttp2.nghttp2_nv *nva + cdef size_t nvlen + + nvlen = len(headers) + nva = malloc(sizeof(cnghttp2.nghttp2_nv) * nvlen) + for i, (k, v) in enumerate(headers): + nva[i].name = k + nva[i].namelen = len(k) + nva[i].value = v + nva[i].valuelen = len(v) + + nva_ptr[0] = nva + + return nvlen + +cdef int server_on_header(cnghttp2.nghttp2_session *session, + const cnghttp2.nghttp2_frame *frame, + const uint8_t *name, size_t namelen, + const uint8_t *value, size_t valuelen, + void *user_data): + cdef http2 = <_HTTP2SessionCore>user_data + + handler = _get_stream_user_data(session, frame.hd.stream_id) + if not handler: + return 0 + + key = name[:namelen] + values = value[:valuelen].split(b'\x00') + if key == b':scheme': + handler.scheme = values[0] + elif key == b':method': + handler.method = values[0] + elif key == b':authority' or key == b'host': + handler.host = values[0] + elif key == b':path': + handler.path = values[0] + + if key == b'cookie': + handler.cookies.extend(values) + else: + for v in values: + handler.headers.append((key, v)) + + return 0 + +cdef int server_on_begin_request_headers(cnghttp2.nghttp2_session *session, + const cnghttp2.nghttp2_frame *frame, + void *user_data): + cdef http2 = <_HTTP2SessionCore>user_data + + handler = http2._make_handler(frame.hd.stream_id) + cnghttp2.nghttp2_session_set_stream_user_data(session, frame.hd.stream_id, + handler) + + return 0 + +cdef int server_on_begin_headers(cnghttp2.nghttp2_session *session, + const cnghttp2.nghttp2_frame *frame, + void *user_data): + if frame.hd.type == cnghttp2.NGHTTP2_HEADERS: + if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_REQUEST: + return server_on_begin_request_headers(session, frame, user_data) + + return 0 + +cdef int server_on_frame_recv(cnghttp2.nghttp2_session *session, + const cnghttp2.nghttp2_frame *frame, + void *user_data): + cdef http2 = <_HTTP2SessionCore>user_data + + if frame.hd.type == cnghttp2.NGHTTP2_DATA: + if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM: + handler = _get_stream_user_data(session, frame.hd.stream_id) + if not handler: + return 0 + try: + handler.on_request_done() + except: + sys.stderr.write(traceback.format_exc()) + return http2._rst_stream(frame.hd.stream_id) + elif frame.hd.type == cnghttp2.NGHTTP2_HEADERS: + if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_REQUEST: + handler = _get_stream_user_data(session, frame.hd.stream_id) + if not handler: + return 0 + # Check required header fields. We expect that :authority + # or host header field. + if handler.scheme is None or handler.method is None or\ + handler.host is None or handler.path is None: + return http2._rst_stream(frame.hd.stream_id, + cnghttp2.NGHTTP2_PROTOCOL_ERROR) + if handler.cookies: + handler.headers.append((b'cookie', + b'; '.join(handler.cookies))) + handler.cookies = None + try: + handler.on_headers() + if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM: + handler.on_request_done() + except: + sys.stderr.write(traceback.format_exc()) + return http2._rst_stream(frame.hd.stream_id) + elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS: + if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK): + http2._stop_settings_timer() + + return 0 + +cdef int server_on_data_chunk_recv(cnghttp2.nghttp2_session *session, + uint8_t flags, + int32_t stream_id, const uint8_t *data, + size_t length, void *user_data): + cdef http2 = <_HTTP2SessionCore>user_data + + handler = _get_stream_user_data(session, stream_id) + if not handler: + return 0 + + try: + handler.on_data(data[:length]) + except: + sys.stderr.write(traceback.format_exc()) + return http2._rst_stream(stream_id) + + return 0 + +cdef int server_on_frame_send(cnghttp2.nghttp2_session *session, + const cnghttp2.nghttp2_frame *frame, + void *user_data): + cdef http2 = <_HTTP2SessionCore>user_data + + if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE: + # For PUSH_PROMISE, send push response immediately + handler = _get_stream_user_data\ + (session, frame.push_promise.promised_stream_id) + if not handler: + return 0 + + handler.stream_id = frame.push_promise.promised_stream_id + http2.send_response(handler) + elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS: + if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK) == 0: + return 0 + http2._start_settings_timer() + +cdef int server_on_frame_not_send(cnghttp2.nghttp2_session *session, + const cnghttp2.nghttp2_frame *frame, + int lib_error_code, + void *user_data): + cdef http2 = <_HTTP2SessionCore>user_data + + if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE: + # We have to remove handler here. Without this, it is not + # removed until session is terminated. + handler = _get_stream_user_data\ + (session, frame.push_promise.promised_stream_id) + if not handler: + return 0 + http2._remove_handler(handler) + +cdef int server_on_stream_close(cnghttp2.nghttp2_session *session, + int32_t stream_id, + cnghttp2.nghttp2_error_code error_code, + void *user_data): + cdef http2 = <_HTTP2SessionCore>user_data + + handler = _get_stream_user_data(session, stream_id) + if not handler: + return 0 + + try: + handler.on_close(error_code) + except: + sys.stderr.write(traceback.format_exc()) + + http2._remove_handler(handler) + + return 0 + +cdef ssize_t server_data_source_read(cnghttp2.nghttp2_session *session, + int32_t stream_id, + uint8_t *buf, size_t length, int *eof, + cnghttp2.nghttp2_data_source *source, + void *user_data): + cdef http2 = <_HTTP2SessionCore>user_data + handler = source.ptr + + try: + data = handler.response_body.read(length) + except: + sys.stderr.write(traceback.format_exc()) + return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; + + if data: + nread = len(data) + memcpy(buf, data, nread) + return nread + + eof[0] = 1 + + return 0 + +cdef class _HTTP2SessionCore: + cdef cnghttp2.nghttp2_session *session + cdef transport + cdef handler_class + cdef handlers + cdef settings_timer + + def __cinit__(self, transport, handler_class): + cdef cnghttp2.nghttp2_session_callbacks callbacks + cdef cnghttp2.nghttp2_settings_entry iv[2] + cdef int rv + + self.session = NULL + + self.transport = transport + self.handler_class = handler_class + self.handlers = set() + self.settings_timer = None + + memset(&callbacks, 0, sizeof(callbacks)) + callbacks.on_header_callback = server_on_header + callbacks.on_begin_headers_callback = server_on_begin_headers + callbacks.on_frame_recv_callback = server_on_frame_recv + callbacks.on_stream_close_callback = server_on_stream_close + callbacks.on_frame_send_callback = server_on_frame_send + callbacks.on_frame_not_send_callback = server_on_frame_not_send + callbacks.on_data_chunk_recv_callback = server_on_data_chunk_recv + + rv = cnghttp2.nghttp2_session_server_new(&self.session, &callbacks, + self) + if rv != 0: + raise Exception('nghttp2_session_server_new failed: {}'.format\ + (_strerror(rv))) + + iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS + iv[0].value = 100 + iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE + iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE + + rv = cnghttp2.nghttp2_submit_settings(self.session, + cnghttp2.NGHTTP2_FLAG_NONE, + iv, sizeof(iv) / sizeof(iv[0])) + + if rv != 0: + raise Exception('nghttp2_submit_settings failed: {}'.format\ + (_strerror(rv))) + + def __dealloc__(self): + cnghttp2.nghttp2_session_del(self.session) + + def data_received(self, data): + cdef ssize_t rv + + rv = cnghttp2.nghttp2_session_mem_recv(self.session, data, len(data)) + if rv < 0: + raise Exception('nghttp2_session_mem_recv failed: {}'.format\ + (_strerror(rv))) + self.send_data() + + OUTBUF_MAX = 65535 + SETTINGS_TIMEOUT = 5.0 + + def send_data(self): + cdef ssize_t outbuflen + cdef const uint8_t *outbuf + + while True: + if self.transport.get_write_buffer_size() > self.OUTBUF_MAX: + break + outbuflen = cnghttp2.nghttp2_session_mem_send(self.session, &outbuf) + if outbuflen == 0: + break + if outbuflen < 0: + raise Exception('nghttp2_session_mem_send faild: {}'.format\ + (_strerror(outbuflen))) + self.transport.write(outbuf[:outbuflen]) + + if self.transport.get_write_buffer_size() == 0 and \ + cnghttp2.nghttp2_session_want_read(self.session) == 0 and \ + cnghttp2.nghttp2_session_want_write(self.session) == 0: + self.transport.close() + + def _make_handler(self, stream_id): + handler = self.handler_class(self, stream_id) + self.handlers.add(handler) + return handler + + def _remove_handler(self, handler): + self.handlers.remove(handler) + + def send_response(self, handler): + cdef cnghttp2.nghttp2_data_provider prd + cdef cnghttp2.nghttp2_data_provider *prd_ptr + cdef cnghttp2.nghttp2_nv *nva + cdef size_t nvlen + cdef int rv + + nva = NULL + nvlen = _make_nva(&nva, handler.response_headers) + + if handler.response_body: + prd.source.ptr = handler + prd.read_callback = server_data_source_read + prd_ptr = &prd + else: + prd_ptr = NULL + + rv = cnghttp2.nghttp2_submit_response(self.session, handler.stream_id, + nva, nvlen, prd_ptr) + + free(nva) + + if rv != 0: + # TODO Ignore return value + self._rst_stream(handler.stream_id) + raise Exception('nghttp2_submit_response failed: {}'.format\ + (_strerror(rv))) + + self._log_request(handler) + + def push(self, handler, promised_handler): + cdef cnghttp2.nghttp2_nv *nva + cdef size_t nvlen + + self.handlers.add(promised_handler) + + nva = NULL + nvlen = _make_nva(&nva, promised_handler.headers) + + rv = cnghttp2.nghttp2_submit_push_promise(self.session, + cnghttp2.NGHTTP2_FLAG_NONE, + handler.stream_id, + nva, nvlen, + promised_handler) + if rv != 0: + raise Exception('nghttp2_submit_push_promise failed: {}'.format\ + (_strerror(rv))) + + def _rst_stream(self, stream_id, + error_code=cnghttp2.NGHTTP2_INTERNAL_ERROR): + cdef int rv + + rv = cnghttp2.nghttp2_submit_rst_stream\ + (self.session, cnghttp2.NGHTTP2_FLAG_NONE, + stream_id, error_code) + + return rv + + def _start_settings_timer(self): + loop = asyncio.get_event_loop() + self.settings_timer = loop.call_later(self.SETTINGS_TIMEOUT, + self._settings_timeout) + + def _stop_settings_timer(self): + if self.settings_timer: + self.settings_timer.cancel() + self.settings_timer = None + + def _settings_timeout(self): + cdef int rv + + self.settings_timer = None + + rv = cnghttp2.nghttp2_session_terminate_session\ + (self.session, cnghttp2.NGHTTP2_SETTINGS_TIMEOUT) + try: + self.send_data() + except Exception as err: + sys.stderr.write(traceback.format_exc()) + self.transport.close() + return + + def _get_client_address(self): + return self.transport.get_extra_info('peername') + + def _log_request(self, handler): + now = datetime.datetime.now() + tv = time.mktime(now.timetuple()) + datestr = email.utils.formatdate(timeval=tv, localtime=False, + usegmt=True) + try: + method = handler.method.decode('utf-8') + except: + method = handler.method + try: + path = handler.path.decode('utf-8') + except: + path = handler.path + sys.stderr.write('{} - - [{}] "{} {} HTTP/2.0" {} - {}\n'.format\ + (handler.client_address[0], + datestr, method, path, handler.status, + 'P' if handler.pushed else '-')) + +class BaseRequestHandler: + + """HTTP/2 request (stream) handler base class. + + The class is used to handle the HTTP/2 stream. By default, it does + not nothing. It must be subclassed to handle each event callback + method. + + The first callback method invoked is on_headers(). It is called + when HEADERS frame, which includes request header fields, is + arrived. + + If request has request body, on_data(data) is invoked for each + chunk of received data. + + When whole request is received, on_request_done() is invoked. + + When stream is closed, on_close(error_code) is called. + + The application can send response using send_response() method. It + can be used in on_headers(), on_data() or on_request_done(). + + The application can push resource using push() method. It must be + used before send_response() call. + + The following instance variables are available: + + client_address + Contains a tuple of the form (host, port) referring to the client's + address. + + stream_id + Stream ID of this stream + + scheme + Scheme of the request URI. This is a value of :scheme header field. + + method + Method of this stream. This is a value of :method header field. + + host + This is a value of :authority or host header field. + + path + This is a value of :path header field. + + """ + + def __init__(self, http2, stream_id): + self.headers = [] + self.cookies = [] + # Stream ID. For promised stream, it is initially -1. + self.stream_id = stream_id + self.http2 = http2 + # address of the client + self.client_address = self.http2._get_client_address() + # :scheme header field in request + self.scheme = None + # :method header field in request + self.method = None + # :authority or host header field in request + self.host = None + # :path header field in request + self.path = None + # HTTP status + self.status = None + # True if this is a handler for pushed resource + self.pushed = False + + def on_headers(self): + + '''Called when request HEADERS is arrived. + + ''' + pass + + def on_data(self, data): + + '''Called when a chunk of request body is arrived. This method will be + called multiple times until all data are received. + + ''' + pass + + def on_request_done(self): + + '''Called when whole request was received + + ''' + pass + + def on_close(self, error_code): + + '''Called when stream is about to close. + + ''' + pass + + def send_response(self, status=200, headers=None, body=None): + + '''Send response. The status is HTTP status code. The headers is + additional response headers. The :status header field is + appended by the library. The body is the response body. It + could be None if response body is empty. Or it must be + instance of either str, bytes or io.IOBase. If instance of str + is specified, it is encoded using UTF-8. + + The headers is a list of tuple of the form (name, value). The + name and value are byte string. + + On error, exception was thrown. + + ''' + if self.status is not None: + raise Exception('response has already been sent') + + if not status: + raise Exception('status must not be empty') + + body = self._wrap_body(body) + + self._set_response_prop(status, headers, body) + self.http2.send_response(self) + + def push(self, path, method='GET', request_headers=None, + status=200, headers=None, body=None): + + '''Push a resource. The path is a path portion of request URI for this + resource. The method is a method to access this resource. The + request_headers is additional request headers to access this + resource. The :scheme, :method, :authority and :path are + appended by the library. The :scheme and :authority are + inherited from the request (not request_headers parameter). + + The status is HTTP status code. The headers is additional + response headers. The :status header field is appended by the + library. The body is the response body. It could be None if + response body is empty. Or it must be instance of either str, + bytes or io.IOBase. If instance of str is specified, it is + encoded using UTF-8. + + The headers and request_headers are a list of tuple of the + form (name, value). The name and value are byte string. + + On error, exception was thrown. + + ''' + if not status: + raise Exception('status must not be empty') + + if not method: + raise Exception('method must not be empty') + + if not path: + raise Exception('path must not be empty') + + body = self._wrap_body(body) + + promised_handler = self.http2._make_handler(-1) + promised_handler.pushed = True + promised_handler.scheme = self.scheme + promised_handler.method = method.encode('utf-8') + promised_handler.host = self.host + promised_handler.path = path.encode('utf-8') + promised_handler._set_response_prop(status, headers, body) + + if request_headers is None: + request_headers = [] + + request_headers.append((b':scheme', promised_handler.scheme)) + request_headers.append((b':method', promised_handler.method)) + request_headers.append((b':authority', promised_handler.host)) + request_headers.append((b':path', promised_handler.path)) + + promised_handler.headers = request_headers + + self.http2.push(self, promised_handler) + + def _set_response_prop(self, status, headers, body): + self.status = status + self.response_headers = headers + + if self.response_headers is None: + self.response_headers = [] + self.response_headers.append((b':status', str(status).encode('utf-8'))) + + if body: + self.response_body = io.BufferedReader(body) + else: + self.response_body = None + + def _wrap_body(self, body): + if body is None: + return body + elif isinstance(body, str): + return io.BytesIO(body.encode('utf-8')) + elif isinstance(body, bytes): + return io.BytesIO(body) + elif isinstance(body, io.IOBase): + return body + else: + raise Exception(('body must be None or instance of str or bytes ' + 'or io.BytesIO')) + +class _HTTP2Session(asyncio.Protocol): + + def __init__(self, RequestHandlerClass): + asyncio.Protocol.__init__(self) + self.RequestHandlerClass = RequestHandlerClass + self.http2 = None + + def connection_made(self, transport): + self.transport = transport + self.connection_header = cnghttp2.NGHTTP2_CLIENT_CONNECTION_HEADER + sock = self.transport.get_extra_info('socket') + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + ssl_ctx = self.transport.get_extra_info('sslcontext') + if ssl_ctx: + if sock.selected_npn_protocol().encode('utf-8') != \ + cnghttp2.NGHTTP2_PROTO_VERSION_ID: + self.transport.abort() + + def connection_lost(self, exc): + if self.http2: + self.http2 = None + + def data_received(self, data): + nread = min(len(data), len(self.connection_header)) + + if self.connection_header.startswith(data[:nread]): + data = data[nread:] + self.connection_header = self.connection_header[nread:] + if len(self.connection_header) == 0: + try: + self.http2 = _HTTP2SessionCore(self.transport, + self.RequestHandlerClass) + except Exception as err: + sys.stderr.write(traceback.format_exc()) + self.transport.abort() + return + + self.data_received = self.data_received2 + self.resume_writing = self.resume_writing2 + self.data_received(data) + else: + self.transport.abort() + + def data_received2(self, data): + try: + self.http2.data_received(data) + except Exception as err: + sys.stderr.write(traceback.format_exc()) + self.transport.close() + return + + def resume_writing2(self): + try: + self.http2.send_data() + except Exception as err: + sys.stderr.write(traceback.format_exc()) + self.transport.close() + return + +class HTTP2Server: + + '''HTTP/2 server. + + This class builds on top of the asyncio event loop. On + construction, RequestHandlerClass must be given, which must be a + subclass of BaseRequestHandler class. + + ''' + def __init__(self, address, RequestHandlerClass, ssl=None): + + '''address is a tuple of the listening address and port (e.g., + ('127.0.0.1', 8080)). RequestHandlerClass must be a subclass + of BaseRequestHandler class to handle a HTTP/2 stream. The + ssl can be ssl.SSLContext instance. If it is not None, the + resulting server is SSL/TLS capable. + + ''' + def session_factory(): + return _HTTP2Session(RequestHandlerClass) + + self.loop = asyncio.get_event_loop() + coro = self.loop.create_server(session_factory, + host=address[0], port=address[1], + ssl=ssl) + self.server = self.loop.run_until_complete(coro) + + def serve_forever(self): + try: + self.loop.run_forever() + finally: + self.server.close() + self.loop.close()