diff --git a/python/cnghttp2.pxd b/python/cnghttp2.pxd index 31218382..e3ac95c3 100644 --- a/python/cnghttp2.pxd +++ b/python/cnghttp2.pxd @@ -245,7 +245,12 @@ cdef extern from 'nghttp2/nghttp2.h': nghttp2_data_source source nghttp2_data_source_read_callback read_callback - int nghttp2_submit_request(nghttp2_session *session, int32_t pri, + ctypedef struct nghttp2_priority_spec: + int32_t stream_id + int32_t weight + uint8_t exclusive + + int nghttp2_submit_request(nghttp2_session *session, const nghttp2_priority_spec *pri_spec, const nghttp2_nv *nva, size_t nvlen, const nghttp2_data_provider *data_prd, void *stream_user_data) diff --git a/python/nghttp2.pyx b/python/nghttp2.pyx index 22fc7532..f9beda3f 100644 --- a/python/nghttp2.pyx +++ b/python/nghttp2.pyx @@ -25,6 +25,8 @@ cimport cnghttp2 from libc.stdlib cimport malloc, free from libc.string cimport memcpy, memset from libc.stdint cimport uint8_t, uint16_t, uint32_t, int32_t +import logging + DEFAULT_HEADER_TABLE_SIZE = cnghttp2.NGHTTP2_DEFAULT_HEADER_TABLE_SIZE DEFLATE_MAX_HEADER_TABLE_SIZE = 4096 @@ -252,9 +254,24 @@ try: import email.utils import datetime import time + from urllib.parse import urlparse except ImportError: asyncio = None +def wrap_body(body): + if body is None: + return body + elif isinstance(body, str): + return io.BytesIO(body.encode('utf-8')) + elif isinstance(body, bytes): + return io.BytesIO(body) + elif isinstance(body, io.IOBase): + return body + else: + raise Exception(('body must be None or instance of str or ' + 'bytes or io.IOBase')) + + cdef _get_stream_user_data(cnghttp2.nghttp2_session *session, int32_t stream_id): cdef void *stream_user_data @@ -289,9 +306,35 @@ cdef int server_on_header(cnghttp2.nghttp2_session *session, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data): - cdef http2 = <_HTTP2SessionCore>user_data + cdef http2 = <_HTTP2SessionCoreBase>user_data + logging.debug('server_on_header, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) handler = _get_stream_user_data(session, frame.hd.stream_id) + return on_header(name, namelen, value, valuelen, flags, handler) + +cdef int client_on_header(cnghttp2.nghttp2_session *session, + const cnghttp2.nghttp2_frame *frame, + const uint8_t *name, size_t namelen, + const uint8_t *value, size_t valuelen, + uint8_t flags, + void *user_data): + cdef http2 = <_HTTP2SessionCoreBase>user_data + logging.debug('client_on_header, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) + + if frame.hd.type == cnghttp2.NGHTTP2_HEADERS: + if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_REQUEST: + return 0 + handler = _get_stream_user_data(session, frame.hd.stream_id) + elif frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE: + handler = _get_stream_user_data(session, frame.push_promise.promised_stream_id) + + return on_header(name, namelen, value, valuelen, flags, handler) + + +cdef int on_header(const uint8_t *name, size_t namelen, + const uint8_t *value, size_t valuelen, + uint8_t flags, + object handler): if not handler: return 0 @@ -305,6 +348,8 @@ cdef int server_on_header(cnghttp2.nghttp2_session *session, handler.host = values[0] elif key == b':path': handler.path = values[0] + elif key == b':status': + handler.status = values[0] if key == b'cookie': handler.cookies.extend(values) @@ -338,6 +383,7 @@ cdef int server_on_frame_recv(cnghttp2.nghttp2_session *session, const cnghttp2.nghttp2_frame *frame, void *user_data): cdef http2 = <_HTTP2SessionCore>user_data + logging.debug('server_on_frame_recv, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) if frame.hd.type == cnghttp2.NGHTTP2_DATA: if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM: @@ -377,11 +423,11 @@ cdef int server_on_frame_recv(cnghttp2.nghttp2_session *session, return 0 -cdef int server_on_data_chunk_recv(cnghttp2.nghttp2_session *session, +cdef int on_data_chunk_recv(cnghttp2.nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t length, void *user_data): - cdef http2 = <_HTTP2SessionCore>user_data + cdef http2 = <_HTTP2SessionCoreBase>user_data handler = _get_stream_user_data(session, stream_id) if not handler: @@ -399,6 +445,7 @@ cdef int server_on_frame_send(cnghttp2.nghttp2_session *session, const cnghttp2.nghttp2_frame *frame, void *user_data): cdef http2 = <_HTTP2SessionCore>user_data + logging.debug('server_on_frame_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE: # For PUSH_PROMISE, send push response immediately @@ -418,6 +465,7 @@ cdef int server_on_frame_not_send(cnghttp2.nghttp2_session *session, int lib_error_code, void *user_data): cdef http2 = <_HTTP2SessionCore>user_data + logging.debug('server_on_frame_not_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE: # We have to remove handler here. Without this, it is not @@ -428,11 +476,12 @@ cdef int server_on_frame_not_send(cnghttp2.nghttp2_session *session, return 0 http2._remove_handler(handler) -cdef int server_on_stream_close(cnghttp2.nghttp2_session *session, +cdef int on_stream_close(cnghttp2.nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data): - cdef http2 = <_HTTP2SessionCore>user_data + cdef http2 = <_HTTP2SessionCoreBase>user_data + logging.debug('on_stream_close, stream_id:%s', stream_id) handler = _get_stream_user_data(session, stream_id) if not handler: @@ -471,68 +520,134 @@ cdef ssize_t server_data_source_read(cnghttp2.nghttp2_session *session, return 0 -cdef class _HTTP2SessionCore: +cdef int client_on_begin_headers(cnghttp2.nghttp2_session *session, + const cnghttp2.nghttp2_frame *frame, + void *user_data): + cdef http2 = <_HTTP2ClientSessionCore>user_data + + if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE: + # Generate a temporary handler until the headers are all received + push_handler = BaseResponseHandler() + http2._add_handler(push_handler, frame.push_promise.promised_stream_id) + cnghttp2.nghttp2_session_set_stream_user_data(session, frame.push_promise.promised_stream_id, + push_handler) + + return 0 + +cdef int client_on_frame_recv(cnghttp2.nghttp2_session *session, + const cnghttp2.nghttp2_frame *frame, + void *user_data): + cdef http2 = <_HTTP2ClientSessionCore>user_data + logging.debug('client_on_frame_recv, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) + + if frame.hd.type == cnghttp2.NGHTTP2_DATA: + if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM: + handler = _get_stream_user_data(session, frame.hd.stream_id) + if not handler: + return 0 + try: + handler.on_response_done() + except: + sys.stderr.write(traceback.format_exc()) + return http2._rst_stream(frame.hd.stream_id) + elif frame.hd.type == cnghttp2.NGHTTP2_HEADERS: + if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_RESPONSE: + handler = _get_stream_user_data(session, frame.hd.stream_id) + + if not handler: + return 0 + # Check required header fields. We expect a status. + if handler.status is None: + return http2._rst_stream(frame.hd.stream_id, + cnghttp2.NGHTTP2_PROTOCOL_ERROR) + if handler.cookies: + handler.headers.append((b'cookie', + b'; '.join(handler.cookies))) + handler.cookies = None + try: + handler.on_headers() + if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM: + handler.on_response_done() + except: + sys.stderr.write(traceback.format_exc()) + return http2._rst_stream(frame.hd.stream_id) + elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS: + if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK): + http2._stop_settings_timer() + elif frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE: + handler = _get_stream_user_data(session, frame.hd.stream_id) + if not handler: + return 0 + # Get the temporary push_handler which now should have all of the header data + push_handler = _get_stream_user_data(session, frame.push_promise.promised_stream_id) + if not push_handler: + return 0 + # Remove the temporary handler + http2._remove_handler(push_handler) + cnghttp2.nghttp2_session_set_stream_user_data(session, frame.push_promise.promised_stream_id, + NULL) + + if push_handler.scheme is None or push_handler.method is None or\ + push_handler.host is None or push_handler.path is None: + return http2._rst_stream(frame.push_promise.promised_stream_id, + cnghttp2.NGHTTP2_PROTOCOL_ERROR) + try: + handler.on_push_promise(push_handler) + except: + sys.stderr.write(traceback.format_exc()) + return http2._rst_stream(frame.hd.stream_id) + + return 0 + +cdef int client_on_frame_send(cnghttp2.nghttp2_session *session, + const cnghttp2.nghttp2_frame *frame, + void *user_data): + cdef http2 = <_HTTP2ClientSessionCore>user_data + logging.debug('client_on_frame_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) + + if frame.hd.type == cnghttp2.NGHTTP2_SETTINGS: + if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK) == 0: + return 0 + http2._start_settings_timer() + +cdef ssize_t client_data_source_read(cnghttp2.nghttp2_session *session, + int32_t stream_id, + uint8_t *buf, size_t length, + uint32_t *data_flags, + cnghttp2.nghttp2_data_source *source, + void *user_data): + cdef http2 = <_HTTP2ClientSessionCore>user_data + body = source.ptr + + try: + data = body.read(length) + except: + sys.stderr.write(traceback.format_exc()) + return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; + + if data: + nread = len(data) + memcpy(buf, data, nread) + return nread + + data_flags[0] = cnghttp2.NGHTTP2_DATA_FLAG_EOF + + return 0 + +cdef class _HTTP2SessionCoreBase: cdef cnghttp2.nghttp2_session *session cdef transport cdef handler_class cdef handlers cdef settings_timer - def __cinit__(self, transport, handler_class): - cdef cnghttp2.nghttp2_session_callbacks *callbacks - cdef cnghttp2.nghttp2_settings_entry iv[2] - cdef int rv - + def __cinit__(self, transport, handler_class=None): self.session = NULL - self.transport = transport self.handler_class = handler_class self.handlers = set() self.settings_timer = None - rv = cnghttp2.nghttp2_session_callbacks_new(&callbacks) - - if rv != 0: - raise Exception('nghttp2_session_callbacks_new failed: {}'.format\ - (_strerror(rv))) - - cnghttp2.nghttp2_session_callbacks_set_on_header_callback( - callbacks, server_on_header) - cnghttp2.nghttp2_session_callbacks_set_on_begin_headers_callback( - callbacks, server_on_begin_headers) - cnghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback( - callbacks, server_on_frame_recv) - cnghttp2.nghttp2_session_callbacks_set_on_stream_close_callback( - callbacks, server_on_stream_close) - cnghttp2.nghttp2_session_callbacks_set_on_frame_send_callback( - callbacks, server_on_frame_send) - cnghttp2.nghttp2_session_callbacks_set_on_frame_not_send_callback( - callbacks, server_on_frame_not_send) - cnghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback( - callbacks, server_on_data_chunk_recv) - - rv = cnghttp2.nghttp2_session_server_new(&self.session, callbacks, - self) - - cnghttp2.nghttp2_session_callbacks_del(callbacks) - - if rv != 0: - raise Exception('nghttp2_session_server_new failed: {}'.format\ - (_strerror(rv))) - - iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS - iv[0].value = 100 - iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE - iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE - - rv = cnghttp2.nghttp2_submit_settings(self.session, - cnghttp2.NGHTTP2_FLAG_NONE, - iv, sizeof(iv) / sizeof(iv[0])) - - if rv != 0: - raise Exception('nghttp2_submit_settings failed: {}'.format\ - (_strerror(rv))) - def __dealloc__(self): cnghttp2.nghttp2_session_del(self.session) @@ -569,13 +684,139 @@ cdef class _HTTP2SessionCore: self.transport.close() def _make_handler(self, stream_id): + logging.debug('_make_handler, stream_id:%s', stream_id) handler = self.handler_class(self, stream_id) self.handlers.add(handler) return handler def _remove_handler(self, handler): + logging.debug('_remove_handler, stream_id:%s', handler.stream_id) self.handlers.remove(handler) + def _add_handler(self, handler, stream_id): + logging.debug('_add_handler, stream_id:%s', stream_id) + handler.stream_id = stream_id + handler.http2 = self + handler.remote_address = self._get_remote_address() + self.handlers.add(handler) + + def _rst_stream(self, stream_id, + error_code=cnghttp2.NGHTTP2_INTERNAL_ERROR): + cdef int rv + + rv = cnghttp2.nghttp2_submit_rst_stream\ + (self.session, cnghttp2.NGHTTP2_FLAG_NONE, + stream_id, error_code) + + return rv + + def _get_remote_address(self): + return self.transport.get_extra_info('peername') + + def _start_settings_timer(self): + loop = asyncio.get_event_loop() + self.settings_timer = loop.call_later(self.SETTINGS_TIMEOUT, + self._settings_timeout) + + def _stop_settings_timer(self): + if self.settings_timer: + self.settings_timer.cancel() + self.settings_timer = None + + def _settings_timeout(self): + cdef int rv + + logging.debug('_settings_timeout') + + self.settings_timer = None + + rv = cnghttp2.nghttp2_session_terminate_session\ + (self.session, cnghttp2.NGHTTP2_SETTINGS_TIMEOUT) + try: + self.send_data() + except Exception as err: + sys.stderr.write(traceback.format_exc()) + self.transport.close() + return + + def _log_request(self, handler): + now = datetime.datetime.now() + tv = time.mktime(now.timetuple()) + datestr = email.utils.formatdate(timeval=tv, localtime=False, + usegmt=True) + try: + method = handler.method.decode('utf-8') + except: + method = handler.method + try: + path = handler.path.decode('utf-8') + except: + path = handler.path + logging.info('%s - - [%s] "%s %s HTTP/2" %s - %s', handler.remote_address[0], + datestr, method, path, handler.status, + 'P' if handler.pushed else '-') + + def close(self): + rv = cnghttp2.nghttp2_session_terminate_session\ + (self.session, cnghttp2.NGHTTP2_NO_ERROR) + try: + self.send_data() + except Exception as err: + sys.stderr.write(traceback.format_exc()) + self.transport.close() + return + +cdef class _HTTP2SessionCore(_HTTP2SessionCoreBase): + def __cinit__(self, *args, **kwargs): + cdef cnghttp2.nghttp2_session_callbacks *callbacks + cdef cnghttp2.nghttp2_settings_entry iv[2] + cdef int rv + + super(_HTTP2SessionCore, self).__init__(*args, **kwargs) + + rv = cnghttp2.nghttp2_session_callbacks_new(&callbacks) + + if rv != 0: + raise Exception('nghttp2_session_callbacks_new failed: {}'.format\ + (_strerror(rv))) + + cnghttp2.nghttp2_session_callbacks_set_on_header_callback( + callbacks, server_on_header) + cnghttp2.nghttp2_session_callbacks_set_on_begin_headers_callback( + callbacks, server_on_begin_headers) + cnghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback( + callbacks, server_on_frame_recv) + cnghttp2.nghttp2_session_callbacks_set_on_stream_close_callback( + callbacks, on_stream_close) + cnghttp2.nghttp2_session_callbacks_set_on_frame_send_callback( + callbacks, server_on_frame_send) + cnghttp2.nghttp2_session_callbacks_set_on_frame_not_send_callback( + callbacks, server_on_frame_not_send) + cnghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback( + callbacks, on_data_chunk_recv) + + rv = cnghttp2.nghttp2_session_server_new(&self.session, callbacks, + self) + + cnghttp2.nghttp2_session_callbacks_del(callbacks) + + if rv != 0: + raise Exception('nghttp2_session_server_new failed: {}'.format\ + (_strerror(rv))) + + iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS + iv[0].value = 100 + iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE + iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE + + rv = cnghttp2.nghttp2_submit_settings(self.session, + cnghttp2.NGHTTP2_FLAG_NONE, + iv, sizeof(iv) / sizeof(iv[0])) + + if rv != 0: + raise Exception('nghttp2_submit_settings failed: {}'.format\ + (_strerror(rv))) + def send_response(self, handler): cdef cnghttp2.nghttp2_data_provider prd cdef cnghttp2.nghttp2_data_provider *prd_ptr @@ -583,6 +824,8 @@ cdef class _HTTP2SessionCore: cdef size_t nvlen cdef int rv + logging.debug('send_response, stream_id:%s', handler.stream_id) + nva = NULL nvlen = _make_nva(&nva, handler.response_headers) @@ -628,62 +871,132 @@ cdef class _HTTP2SessionCore: promised_handler.stream_id = promised_stream_id + logging.debug('push, stream_id:%s', promised_stream_id) + return promised_handler - def _rst_stream(self, stream_id, - error_code=cnghttp2.NGHTTP2_INTERNAL_ERROR): +cdef class _HTTP2ClientSessionCore(_HTTP2SessionCoreBase): + def __cinit__(self, *args, **kwargs): + cdef cnghttp2.nghttp2_session_callbacks *callbacks + cdef cnghttp2.nghttp2_settings_entry iv[2] cdef int rv - rv = cnghttp2.nghttp2_submit_rst_stream\ - (self.session, cnghttp2.NGHTTP2_FLAG_NONE, - stream_id, error_code) + super(_HTTP2ClientSessionCore, self).__init__(*args, **kwargs) - return rv + rv = cnghttp2.nghttp2_session_callbacks_new(&callbacks) - def _start_settings_timer(self): - loop = asyncio.get_event_loop() - self.settings_timer = loop.call_later(self.SETTINGS_TIMEOUT, - self._settings_timeout) + if rv != 0: + raise Exception('nghttp2_session_callbacks_new failed: {}'.format\ + (_strerror(rv))) - def _stop_settings_timer(self): - if self.settings_timer: - self.settings_timer.cancel() - self.settings_timer = None + cnghttp2.nghttp2_session_callbacks_set_on_header_callback( + callbacks, client_on_header) + cnghttp2.nghttp2_session_callbacks_set_on_begin_headers_callback( + callbacks, client_on_begin_headers) + cnghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback( + callbacks, client_on_frame_recv) + cnghttp2.nghttp2_session_callbacks_set_on_stream_close_callback( + callbacks, on_stream_close) + cnghttp2.nghttp2_session_callbacks_set_on_frame_send_callback( + callbacks, client_on_frame_send) + cnghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback( + callbacks, on_data_chunk_recv) - def _settings_timeout(self): - cdef int rv + rv = cnghttp2.nghttp2_session_client_new(&self.session, callbacks, + self) - self.settings_timer = None + cnghttp2.nghttp2_session_callbacks_del(callbacks) - rv = cnghttp2.nghttp2_session_terminate_session\ - (self.session, cnghttp2.NGHTTP2_SETTINGS_TIMEOUT) - try: - self.send_data() - except Exception as err: - sys.stderr.write(traceback.format_exc()) - self.transport.close() - return + if rv != 0: + raise Exception('nghttp2_session_client_new failed: {}'.format\ + (_strerror(rv))) - def _get_client_address(self): - return self.transport.get_extra_info('peername') + iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS + iv[0].value = 100 + iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE + iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE - def _log_request(self, handler): - now = datetime.datetime.now() - tv = time.mktime(now.timetuple()) - datestr = email.utils.formatdate(timeval=tv, localtime=False, - usegmt=True) - try: - method = handler.method.decode('utf-8') - except: - method = handler.method - try: - path = handler.path.decode('utf-8') - except: - path = handler.path - sys.stderr.write('{} - - [{}] "{} {} HTTP/2" {} - {}\n'.format\ - (handler.client_address[0], - datestr, method, path, handler.status, - 'P' if handler.pushed else '-')) + rv = cnghttp2.nghttp2_submit_settings(self.session, + cnghttp2.NGHTTP2_FLAG_NONE, + iv, sizeof(iv) / sizeof(iv[0])) + + if rv != 0: + raise Exception('nghttp2_submit_settings failed: {}'.format\ + (_strerror(rv))) + + def send_request(self, method, scheme, host, path, headers, body, handler): + cdef cnghttp2.nghttp2_data_provider prd + cdef cnghttp2.nghttp2_data_provider *prd_ptr + cdef cnghttp2.nghttp2_priority_spec *pri_ptr + cdef cnghttp2.nghttp2_nv *nva + cdef size_t nvlen + cdef int32_t stream_id + + body = wrap_body(body) + + if headers is None: + headers = [] + + headers = _encode_headers(headers) + headers.append((b':scheme', scheme.encode('utf-8'))) + headers.append((b':method', method.encode('utf-8'))) + headers.append((b':authority', host.encode('utf-8'))) + headers.append((b':path', path.encode('utf-8'))) + + nva = NULL + nvlen = _make_nva(&nva, headers) + + if body: + prd.source.ptr = body + prd.read_callback = client_data_source_read + prd_ptr = &prd + else: + prd_ptr = NULL + + # TODO: Enable priorities + pri_ptr = NULL + + stream_id = cnghttp2.nghttp2_submit_request\ + (self.session, pri_ptr, + nva, nvlen, prd_ptr, + handler) + free(nva) + + if stream_id < 0: + raise Exception('nghttp2_submit_request failed: {}'.format\ + (_strerror(stream_id))) + + logging.debug('request, stream_id:%s', stream_id) + + self._add_handler(handler, stream_id) + cnghttp2.nghttp2_session_set_stream_user_data(self.session, stream_id, + handler) + + return handler + + def push(self, push_promise, handler): + if handler: + # push_promise accepted, fill in the handler with the stored + # headers from the push_promise + handler.status = push_promise.status + handler.scheme = push_promise.scheme + handler.method = push_promise.method + handler.host = push_promise.host + handler.path = push_promise.path + handler.headers = push_promise.headers + handler.cookies = push_promise.cookies + handler.stream_id = push_promise.stream_id + handler.http2 = self + handler.pushed = True + + self._add_handler(handler, handler.stream_id) + + cnghttp2.nghttp2_session_set_stream_user_data(self.session, handler.stream_id, + handler) + else: + # push_promise rejected, reset the stream + self._rst_stream(push_promise.stream_id, + error_code=cnghttp2.NGHTTP2_NO_ERROR) if asyncio: @@ -742,7 +1055,7 @@ if asyncio: self.stream_id = stream_id self.http2 = http2 # address of the client - self.client_address = self.http2._get_client_address() + self.remote_address = self.http2._get_remote_address() # :scheme header field in request self.scheme = None # :method header field in request @@ -756,6 +1069,10 @@ if asyncio: # True if this is a handler for pushed resource self.pushed = False + @property + def client_address(self): + return self.remote_address + def on_headers(self): '''Called when request HEADERS is arrived. @@ -807,7 +1124,7 @@ if asyncio: if not status: raise Exception('status must not be empty') - body = self._wrap_body(body) + body = wrap_body(body) self._set_response_prop(status, headers, body) self.http2.send_response(self) @@ -847,7 +1164,7 @@ if asyncio: if not path: raise Exception('path must not be empty') - body = self._wrap_body(body) + body = wrap_body(body) promised_handler = self.http2._make_handler(-1) promised_handler.pushed = True @@ -882,19 +1199,6 @@ if asyncio: self.response_body = body - def _wrap_body(self, body): - if body is None: - return body - elif isinstance(body, str): - return io.BytesIO(body.encode('utf-8')) - elif isinstance(body, bytes): - return io.BytesIO(body) - elif isinstance(body, io.IOBase): - return body - else: - raise Exception(('body must be None or instance of str or ' - 'bytes or io.IOBase')) - def _encode_headers(headers): return [(k if isinstance(k, bytes) else k.encode('utf-8'), v if isinstance(v, bytes) else v.encode('utf-8')) \ @@ -908,17 +1212,23 @@ if asyncio: self.http2 = None def connection_made(self, transport): + address = transport.get_extra_info('peername') + logging.info('connection_made, address:%s, port:%s', address[0], address[1]) + self.transport = transport self.connection_header = cnghttp2.NGHTTP2_CLIENT_CONNECTION_PREFACE sock = self.transport.get_extra_info('socket') sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) ssl_ctx = self.transport.get_extra_info('sslcontext') if ssl_ctx: - if sock.selected_npn_protocol().encode('utf-8') != \ + protocol = sock.selected_npn_protocol() + logging.info('npn, protocol:%s', protocol) + if protocol.encode('utf-8') != \ cnghttp2.NGHTTP2_PROTO_VERSION_ID: self.transport.abort() def connection_lost(self, exc): + logging.info('connection_lost') if self.http2: self.http2 = None @@ -991,6 +1301,7 @@ if asyncio: host=address[0], port=address[1], ssl=ssl) self.server = self.loop.run_until_complete(coro) + logging.info('listen, address:%s, port:%s', address[0], address[1]) def serve_forever(self): try: @@ -998,3 +1309,268 @@ if asyncio: finally: self.server.close() self.loop.close() + + + + class BaseResponseHandler: + + """HTTP/2 response (stream) handler base class. + + The class is used to handle the HTTP/2 stream. By default, it does + not nothing. It must be subclassed to handle each event callback + method. + + The first callback method invoked is on_headers(). It is called + when HEADERS frame, which includes response header fields, is + arrived. + + If response has a body, on_data(data) is invoked for each + chunk of received data. + + When whole response is received, on_response_done() is invoked. + + When stream is closed, on_close(error_code) is called. + + The application can send follow up requests using HTTP2Client.send_request() method. + + The application can handle push resource using on_push_promise() method. + + The following instance variables are available: + + server_address + Contains a tuple of the form (host, port) referring to the server's + address. + + stream_id + Stream ID of this stream + + scheme + Scheme of the request URI. This is a value of :scheme header field. + + method + Method of this stream. This is a value of :method header field. + + host + This is a value of :authority or host header field. + + path + This is a value of :path header field. + + """ + + def __init__(self, http2=None, stream_id=-1): + self.headers = [] + self.cookies = [] + # Stream ID. For promised stream, it is initially -1. + self.stream_id = stream_id + self.http2 = http2 + # address of the server + self.remote_address = None + # :scheme header field in request + self.scheme = None + # :method header field in request + self.method = None + # :authority or host header field in request + self.host = None + # :path header field in request + self.path = None + # HTTP status + self.status = None + # True if this is a handler for pushed resource + self.pushed = False + + @property + def server_address(self): + return self.remote_address + + def on_headers(self): + + '''Called when response HEADERS is arrived. + + ''' + pass + + def on_data(self, data): + + '''Called when a chunk of response body is arrived. This method + will be called multiple times until all data are received. + + ''' + pass + + def on_response_done(self): + + '''Called when whole response was received + + ''' + pass + + def on_close(self, error_code): + + '''Called when stream is about to close. + + ''' + pass + + def on_push_promise(self, push_promise): + + '''Called when a push is promised. Default behavior is to + cancel the push. If application overrides this method, + it should call either accept_push or reject_push. + + ''' + self.reject_push(push_promise) + + def reject_push(self, push_promise): + + '''Convenience method equivalent to calling accept_push + with a falsy value. + + ''' + self.http2.push(push_promise, None) + + def accept_push(self, push_promise, handler=None): + + '''Accept a push_promise and provider a handler for the + new stream. If a falsy value is supplied for the handler, + the push is rejected. + + ''' + self.http2.push(push_promise, handler) + + class _HTTP2ClientSession(asyncio.Protocol): + + def __init__(self, client): + asyncio.Protocol.__init__(self) + self.http2 = None + self.pending = [] + self.client = client + + def connection_made(self, transport): + address = transport.get_extra_info('peername') + logging.info('connection_made, address:%s, port:%s', address[0], address[1]) + + self.transport = transport + sock = self.transport.get_extra_info('socket') + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + ssl_ctx = self.transport.get_extra_info('sslcontext') + if ssl_ctx: + protocol = sock.selected_npn_protocol() + logging.info('npn, protocol:%s', protocol) + if protocol is None or protocol.encode('utf-8') != \ + cnghttp2.NGHTTP2_PROTO_VERSION_ID: + self.transport.abort() + + # Send preamble + self.transport.write(cnghttp2.NGHTTP2_CLIENT_CONNECTION_PREFACE) + self.http2 = _HTTP2ClientSessionCore(self.transport) + + # Clear pending requests + send_pending = self.pending + self.pending = [] + for method,scheme,host,path,headers,body,handler in send_pending: + self.send_request(method=method, scheme=scheme, host=host, path=path,\ + headers=headers, body=body, handler=handler) + + + def connection_lost(self, exc): + logging.info('connection_lost') + if self.http2: + self.http2 = None + self.client.close() + + def data_received(self, data): + try: + self.http2.data_received(data) + except Exception as err: + sys.stderr.write(traceback.format_exc()) + self.transport.close() + return + + def resume_writing(self): + try: + self.http2.send_data() + except Exception as err: + sys.stderr.write(traceback.format_exc()) + self.transport.close() + return + + def send_request(self, method, scheme, host, path, headers, body, handler): + try: + # Waiting until connection established + if not self.http2: + self.pending.append([method, scheme, host, path, headers, body, handler]) + return + + self.http2.send_request(method=method, scheme=scheme, host=host, path=path,\ + headers=headers, body=body, handler=handler) + except Exception as err: + sys.stderr.write(traceback.format_exc()) + self.transport.close() + return + + def close(self): + if self.http2: + self.http2.close() + + + class HTTP2Client: + + '''HTTP/2 client. + + This class builds on top of the asyncio event loop. + + ''' + def __init__(self, address, loop=None, ssl=None): + + '''address is a tuple of the connect address and port (e.g., + ('127.0.0.1', 8080)). The ssl can be ssl.SSLContext instance. + If it is not None, the resulting client is SSL/TLS capable. + ''' + + self.address = address + self.session = _HTTP2ClientSession(self) + def session_factory(): + return self.session + + if ssl: + ssl.set_npn_protocols([cnghttp2.NGHTTP2_PROTO_VERSION_ID\ + .decode('utf-8')]) + + self.loop = loop + if not self.loop: + self.loop = asyncio.get_event_loop() + + coro = self.loop.create_connection(session_factory, + host=address[0], port=address[1], + ssl=ssl) + + if ssl: + self.scheme = 'https' + else: + self.scheme = 'http' + + self.transport,_ = self.loop.run_until_complete(coro) + logging.info('connect, address:%s, port:%s', self.address[0], self.address[1]) + + @property + def io_loop(self): + return self.loop + + def close(self): + self.session.close() + + def send_request(self, method='GET', url='/', headers=None, body=None, handler=None): + url = urlparse(url) + scheme = url.scheme if url.scheme else self.scheme + host = url.netloc if url.netloc else self.address[0]+':'+str(self.address[1]) + path = url.path + if url.params: + path += ';'+url.params + if url.query: + path += '?'+url.query + if url.fragment: + path += '#'+url.fragment + + self.session.send_request(method=method, scheme=scheme, host=host, path=path,\ + headers=headers, body=body, handler=handler)