Merge branch 'scoky-master'
This commit is contained in:
commit
8e5b5d00e1
|
@ -245,7 +245,12 @@ cdef extern from 'nghttp2/nghttp2.h':
|
|||
nghttp2_data_source source
|
||||
nghttp2_data_source_read_callback read_callback
|
||||
|
||||
int nghttp2_submit_request(nghttp2_session *session, int32_t pri,
|
||||
ctypedef struct nghttp2_priority_spec:
|
||||
int32_t stream_id
|
||||
int32_t weight
|
||||
uint8_t exclusive
|
||||
|
||||
int nghttp2_submit_request(nghttp2_session *session, const nghttp2_priority_spec *pri_spec,
|
||||
const nghttp2_nv *nva, size_t nvlen,
|
||||
const nghttp2_data_provider *data_prd,
|
||||
void *stream_user_data)
|
||||
|
|
|
@ -25,6 +25,8 @@ cimport cnghttp2
|
|||
from libc.stdlib cimport malloc, free
|
||||
from libc.string cimport memcpy, memset
|
||||
from libc.stdint cimport uint8_t, uint16_t, uint32_t, int32_t
|
||||
import logging
|
||||
|
||||
|
||||
DEFAULT_HEADER_TABLE_SIZE = cnghttp2.NGHTTP2_DEFAULT_HEADER_TABLE_SIZE
|
||||
DEFLATE_MAX_HEADER_TABLE_SIZE = 4096
|
||||
|
@ -252,9 +254,24 @@ try:
|
|||
import email.utils
|
||||
import datetime
|
||||
import time
|
||||
from urllib.parse import urlparse
|
||||
except ImportError:
|
||||
asyncio = None
|
||||
|
||||
def wrap_body(body):
|
||||
if body is None:
|
||||
return body
|
||||
elif isinstance(body, str):
|
||||
return io.BytesIO(body.encode('utf-8'))
|
||||
elif isinstance(body, bytes):
|
||||
return io.BytesIO(body)
|
||||
elif isinstance(body, io.IOBase):
|
||||
return body
|
||||
else:
|
||||
raise Exception(('body must be None or instance of str or '
|
||||
'bytes or io.IOBase'))
|
||||
|
||||
|
||||
cdef _get_stream_user_data(cnghttp2.nghttp2_session *session,
|
||||
int32_t stream_id):
|
||||
cdef void *stream_user_data
|
||||
|
@ -289,9 +306,35 @@ cdef int server_on_header(cnghttp2.nghttp2_session *session,
|
|||
const uint8_t *value, size_t valuelen,
|
||||
uint8_t flags,
|
||||
void *user_data):
|
||||
cdef http2 = <_HTTP2SessionCore>user_data
|
||||
cdef http2 = <_HTTP2SessionCoreBase>user_data
|
||||
logging.debug('server_on_header, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
||||
|
||||
handler = _get_stream_user_data(session, frame.hd.stream_id)
|
||||
return on_header(name, namelen, value, valuelen, flags, handler)
|
||||
|
||||
cdef int client_on_header(cnghttp2.nghttp2_session *session,
|
||||
const cnghttp2.nghttp2_frame *frame,
|
||||
const uint8_t *name, size_t namelen,
|
||||
const uint8_t *value, size_t valuelen,
|
||||
uint8_t flags,
|
||||
void *user_data):
|
||||
cdef http2 = <_HTTP2SessionCoreBase>user_data
|
||||
logging.debug('client_on_header, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
||||
|
||||
if frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
|
||||
if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_REQUEST:
|
||||
return 0
|
||||
handler = _get_stream_user_data(session, frame.hd.stream_id)
|
||||
elif frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
|
||||
handler = _get_stream_user_data(session, frame.push_promise.promised_stream_id)
|
||||
|
||||
return on_header(name, namelen, value, valuelen, flags, handler)
|
||||
|
||||
|
||||
cdef int on_header(const uint8_t *name, size_t namelen,
|
||||
const uint8_t *value, size_t valuelen,
|
||||
uint8_t flags,
|
||||
object handler):
|
||||
if not handler:
|
||||
return 0
|
||||
|
||||
|
@ -305,6 +348,8 @@ cdef int server_on_header(cnghttp2.nghttp2_session *session,
|
|||
handler.host = values[0]
|
||||
elif key == b':path':
|
||||
handler.path = values[0]
|
||||
elif key == b':status':
|
||||
handler.status = values[0]
|
||||
|
||||
if key == b'cookie':
|
||||
handler.cookies.extend(values)
|
||||
|
@ -338,6 +383,7 @@ cdef int server_on_frame_recv(cnghttp2.nghttp2_session *session,
|
|||
const cnghttp2.nghttp2_frame *frame,
|
||||
void *user_data):
|
||||
cdef http2 = <_HTTP2SessionCore>user_data
|
||||
logging.debug('server_on_frame_recv, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
||||
|
||||
if frame.hd.type == cnghttp2.NGHTTP2_DATA:
|
||||
if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM:
|
||||
|
@ -377,11 +423,11 @@ cdef int server_on_frame_recv(cnghttp2.nghttp2_session *session,
|
|||
|
||||
return 0
|
||||
|
||||
cdef int server_on_data_chunk_recv(cnghttp2.nghttp2_session *session,
|
||||
cdef int on_data_chunk_recv(cnghttp2.nghttp2_session *session,
|
||||
uint8_t flags,
|
||||
int32_t stream_id, const uint8_t *data,
|
||||
size_t length, void *user_data):
|
||||
cdef http2 = <_HTTP2SessionCore>user_data
|
||||
cdef http2 = <_HTTP2SessionCoreBase>user_data
|
||||
|
||||
handler = _get_stream_user_data(session, stream_id)
|
||||
if not handler:
|
||||
|
@ -399,6 +445,7 @@ cdef int server_on_frame_send(cnghttp2.nghttp2_session *session,
|
|||
const cnghttp2.nghttp2_frame *frame,
|
||||
void *user_data):
|
||||
cdef http2 = <_HTTP2SessionCore>user_data
|
||||
logging.debug('server_on_frame_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
||||
|
||||
if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
|
||||
# For PUSH_PROMISE, send push response immediately
|
||||
|
@ -418,6 +465,7 @@ cdef int server_on_frame_not_send(cnghttp2.nghttp2_session *session,
|
|||
int lib_error_code,
|
||||
void *user_data):
|
||||
cdef http2 = <_HTTP2SessionCore>user_data
|
||||
logging.debug('server_on_frame_not_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
||||
|
||||
if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
|
||||
# We have to remove handler here. Without this, it is not
|
||||
|
@ -428,11 +476,12 @@ cdef int server_on_frame_not_send(cnghttp2.nghttp2_session *session,
|
|||
return 0
|
||||
http2._remove_handler(handler)
|
||||
|
||||
cdef int server_on_stream_close(cnghttp2.nghttp2_session *session,
|
||||
cdef int on_stream_close(cnghttp2.nghttp2_session *session,
|
||||
int32_t stream_id,
|
||||
uint32_t error_code,
|
||||
void *user_data):
|
||||
cdef http2 = <_HTTP2SessionCore>user_data
|
||||
cdef http2 = <_HTTP2SessionCoreBase>user_data
|
||||
logging.debug('on_stream_close, stream_id:%s', stream_id)
|
||||
|
||||
handler = _get_stream_user_data(session, stream_id)
|
||||
if not handler:
|
||||
|
@ -471,68 +520,134 @@ cdef ssize_t server_data_source_read(cnghttp2.nghttp2_session *session,
|
|||
|
||||
return 0
|
||||
|
||||
cdef class _HTTP2SessionCore:
|
||||
cdef int client_on_begin_headers(cnghttp2.nghttp2_session *session,
|
||||
const cnghttp2.nghttp2_frame *frame,
|
||||
void *user_data):
|
||||
cdef http2 = <_HTTP2ClientSessionCore>user_data
|
||||
|
||||
if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
|
||||
# Generate a temporary handler until the headers are all received
|
||||
push_handler = BaseResponseHandler()
|
||||
http2._add_handler(push_handler, frame.push_promise.promised_stream_id)
|
||||
cnghttp2.nghttp2_session_set_stream_user_data(session, frame.push_promise.promised_stream_id,
|
||||
<void*>push_handler)
|
||||
|
||||
return 0
|
||||
|
||||
cdef int client_on_frame_recv(cnghttp2.nghttp2_session *session,
|
||||
const cnghttp2.nghttp2_frame *frame,
|
||||
void *user_data):
|
||||
cdef http2 = <_HTTP2ClientSessionCore>user_data
|
||||
logging.debug('client_on_frame_recv, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
||||
|
||||
if frame.hd.type == cnghttp2.NGHTTP2_DATA:
|
||||
if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM:
|
||||
handler = _get_stream_user_data(session, frame.hd.stream_id)
|
||||
if not handler:
|
||||
return 0
|
||||
try:
|
||||
handler.on_response_done()
|
||||
except:
|
||||
sys.stderr.write(traceback.format_exc())
|
||||
return http2._rst_stream(frame.hd.stream_id)
|
||||
elif frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
|
||||
if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_RESPONSE:
|
||||
handler = _get_stream_user_data(session, frame.hd.stream_id)
|
||||
|
||||
if not handler:
|
||||
return 0
|
||||
# Check required header fields. We expect a status.
|
||||
if handler.status is None:
|
||||
return http2._rst_stream(frame.hd.stream_id,
|
||||
cnghttp2.NGHTTP2_PROTOCOL_ERROR)
|
||||
if handler.cookies:
|
||||
handler.headers.append((b'cookie',
|
||||
b'; '.join(handler.cookies)))
|
||||
handler.cookies = None
|
||||
try:
|
||||
handler.on_headers()
|
||||
if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM:
|
||||
handler.on_response_done()
|
||||
except:
|
||||
sys.stderr.write(traceback.format_exc())
|
||||
return http2._rst_stream(frame.hd.stream_id)
|
||||
elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS:
|
||||
if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK):
|
||||
http2._stop_settings_timer()
|
||||
elif frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
|
||||
handler = _get_stream_user_data(session, frame.hd.stream_id)
|
||||
if not handler:
|
||||
return 0
|
||||
# Get the temporary push_handler which now should have all of the header data
|
||||
push_handler = _get_stream_user_data(session, frame.push_promise.promised_stream_id)
|
||||
if not push_handler:
|
||||
return 0
|
||||
# Remove the temporary handler
|
||||
http2._remove_handler(push_handler)
|
||||
cnghttp2.nghttp2_session_set_stream_user_data(session, frame.push_promise.promised_stream_id,
|
||||
<void*>NULL)
|
||||
|
||||
if push_handler.scheme is None or push_handler.method is None or\
|
||||
push_handler.host is None or push_handler.path is None:
|
||||
return http2._rst_stream(frame.push_promise.promised_stream_id,
|
||||
cnghttp2.NGHTTP2_PROTOCOL_ERROR)
|
||||
try:
|
||||
handler.on_push_promise(push_handler)
|
||||
except:
|
||||
sys.stderr.write(traceback.format_exc())
|
||||
return http2._rst_stream(frame.hd.stream_id)
|
||||
|
||||
return 0
|
||||
|
||||
cdef int client_on_frame_send(cnghttp2.nghttp2_session *session,
|
||||
const cnghttp2.nghttp2_frame *frame,
|
||||
void *user_data):
|
||||
cdef http2 = <_HTTP2ClientSessionCore>user_data
|
||||
logging.debug('client_on_frame_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
||||
|
||||
if frame.hd.type == cnghttp2.NGHTTP2_SETTINGS:
|
||||
if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK) == 0:
|
||||
return 0
|
||||
http2._start_settings_timer()
|
||||
|
||||
cdef ssize_t client_data_source_read(cnghttp2.nghttp2_session *session,
|
||||
int32_t stream_id,
|
||||
uint8_t *buf, size_t length,
|
||||
uint32_t *data_flags,
|
||||
cnghttp2.nghttp2_data_source *source,
|
||||
void *user_data):
|
||||
cdef http2 = <_HTTP2ClientSessionCore>user_data
|
||||
body = <object>source.ptr
|
||||
|
||||
try:
|
||||
data = body.read(length)
|
||||
except:
|
||||
sys.stderr.write(traceback.format_exc())
|
||||
return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
|
||||
|
||||
if data:
|
||||
nread = len(data)
|
||||
memcpy(buf, <uint8_t*>data, nread)
|
||||
return nread
|
||||
|
||||
data_flags[0] = cnghttp2.NGHTTP2_DATA_FLAG_EOF
|
||||
|
||||
return 0
|
||||
|
||||
cdef class _HTTP2SessionCoreBase:
|
||||
cdef cnghttp2.nghttp2_session *session
|
||||
cdef transport
|
||||
cdef handler_class
|
||||
cdef handlers
|
||||
cdef settings_timer
|
||||
|
||||
def __cinit__(self, transport, handler_class):
|
||||
cdef cnghttp2.nghttp2_session_callbacks *callbacks
|
||||
cdef cnghttp2.nghttp2_settings_entry iv[2]
|
||||
cdef int rv
|
||||
|
||||
def __cinit__(self, transport, handler_class=None):
|
||||
self.session = NULL
|
||||
|
||||
self.transport = transport
|
||||
self.handler_class = handler_class
|
||||
self.handlers = set()
|
||||
self.settings_timer = None
|
||||
|
||||
rv = cnghttp2.nghttp2_session_callbacks_new(&callbacks)
|
||||
|
||||
if rv != 0:
|
||||
raise Exception('nghttp2_session_callbacks_new failed: {}'.format\
|
||||
(_strerror(rv)))
|
||||
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_header_callback(
|
||||
callbacks, server_on_header)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_begin_headers_callback(
|
||||
callbacks, server_on_begin_headers)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback(
|
||||
callbacks, server_on_frame_recv)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_stream_close_callback(
|
||||
callbacks, server_on_stream_close)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_frame_send_callback(
|
||||
callbacks, server_on_frame_send)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_frame_not_send_callback(
|
||||
callbacks, server_on_frame_not_send)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
|
||||
callbacks, server_on_data_chunk_recv)
|
||||
|
||||
rv = cnghttp2.nghttp2_session_server_new(&self.session, callbacks,
|
||||
<void*>self)
|
||||
|
||||
cnghttp2.nghttp2_session_callbacks_del(callbacks)
|
||||
|
||||
if rv != 0:
|
||||
raise Exception('nghttp2_session_server_new failed: {}'.format\
|
||||
(_strerror(rv)))
|
||||
|
||||
iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS
|
||||
iv[0].value = 100
|
||||
iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE
|
||||
iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE
|
||||
|
||||
rv = cnghttp2.nghttp2_submit_settings(self.session,
|
||||
cnghttp2.NGHTTP2_FLAG_NONE,
|
||||
iv, sizeof(iv) / sizeof(iv[0]))
|
||||
|
||||
if rv != 0:
|
||||
raise Exception('nghttp2_submit_settings failed: {}'.format\
|
||||
(_strerror(rv)))
|
||||
|
||||
def __dealloc__(self):
|
||||
cnghttp2.nghttp2_session_del(self.session)
|
||||
|
||||
|
@ -569,13 +684,139 @@ cdef class _HTTP2SessionCore:
|
|||
self.transport.close()
|
||||
|
||||
def _make_handler(self, stream_id):
|
||||
logging.debug('_make_handler, stream_id:%s', stream_id)
|
||||
handler = self.handler_class(self, stream_id)
|
||||
self.handlers.add(handler)
|
||||
return handler
|
||||
|
||||
def _remove_handler(self, handler):
|
||||
logging.debug('_remove_handler, stream_id:%s', handler.stream_id)
|
||||
self.handlers.remove(handler)
|
||||
|
||||
def _add_handler(self, handler, stream_id):
|
||||
logging.debug('_add_handler, stream_id:%s', stream_id)
|
||||
handler.stream_id = stream_id
|
||||
handler.http2 = self
|
||||
handler.remote_address = self._get_remote_address()
|
||||
self.handlers.add(handler)
|
||||
|
||||
def _rst_stream(self, stream_id,
|
||||
error_code=cnghttp2.NGHTTP2_INTERNAL_ERROR):
|
||||
cdef int rv
|
||||
|
||||
rv = cnghttp2.nghttp2_submit_rst_stream\
|
||||
(self.session, cnghttp2.NGHTTP2_FLAG_NONE,
|
||||
stream_id, error_code)
|
||||
|
||||
return rv
|
||||
|
||||
def _get_remote_address(self):
|
||||
return self.transport.get_extra_info('peername')
|
||||
|
||||
def _start_settings_timer(self):
|
||||
loop = asyncio.get_event_loop()
|
||||
self.settings_timer = loop.call_later(self.SETTINGS_TIMEOUT,
|
||||
self._settings_timeout)
|
||||
|
||||
def _stop_settings_timer(self):
|
||||
if self.settings_timer:
|
||||
self.settings_timer.cancel()
|
||||
self.settings_timer = None
|
||||
|
||||
def _settings_timeout(self):
|
||||
cdef int rv
|
||||
|
||||
logging.debug('_settings_timeout')
|
||||
|
||||
self.settings_timer = None
|
||||
|
||||
rv = cnghttp2.nghttp2_session_terminate_session\
|
||||
(self.session, cnghttp2.NGHTTP2_SETTINGS_TIMEOUT)
|
||||
try:
|
||||
self.send_data()
|
||||
except Exception as err:
|
||||
sys.stderr.write(traceback.format_exc())
|
||||
self.transport.close()
|
||||
return
|
||||
|
||||
def _log_request(self, handler):
|
||||
now = datetime.datetime.now()
|
||||
tv = time.mktime(now.timetuple())
|
||||
datestr = email.utils.formatdate(timeval=tv, localtime=False,
|
||||
usegmt=True)
|
||||
try:
|
||||
method = handler.method.decode('utf-8')
|
||||
except:
|
||||
method = handler.method
|
||||
try:
|
||||
path = handler.path.decode('utf-8')
|
||||
except:
|
||||
path = handler.path
|
||||
logging.info('%s - - [%s] "%s %s HTTP/2" %s - %s', handler.remote_address[0],
|
||||
datestr, method, path, handler.status,
|
||||
'P' if handler.pushed else '-')
|
||||
|
||||
def close(self):
|
||||
rv = cnghttp2.nghttp2_session_terminate_session\
|
||||
(self.session, cnghttp2.NGHTTP2_NO_ERROR)
|
||||
try:
|
||||
self.send_data()
|
||||
except Exception as err:
|
||||
sys.stderr.write(traceback.format_exc())
|
||||
self.transport.close()
|
||||
return
|
||||
|
||||
cdef class _HTTP2SessionCore(_HTTP2SessionCoreBase):
|
||||
def __cinit__(self, *args, **kwargs):
|
||||
cdef cnghttp2.nghttp2_session_callbacks *callbacks
|
||||
cdef cnghttp2.nghttp2_settings_entry iv[2]
|
||||
cdef int rv
|
||||
|
||||
super(_HTTP2SessionCore, self).__init__(*args, **kwargs)
|
||||
|
||||
rv = cnghttp2.nghttp2_session_callbacks_new(&callbacks)
|
||||
|
||||
if rv != 0:
|
||||
raise Exception('nghttp2_session_callbacks_new failed: {}'.format\
|
||||
(_strerror(rv)))
|
||||
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_header_callback(
|
||||
callbacks, server_on_header)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_begin_headers_callback(
|
||||
callbacks, server_on_begin_headers)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback(
|
||||
callbacks, server_on_frame_recv)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_stream_close_callback(
|
||||
callbacks, on_stream_close)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_frame_send_callback(
|
||||
callbacks, server_on_frame_send)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_frame_not_send_callback(
|
||||
callbacks, server_on_frame_not_send)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
|
||||
callbacks, on_data_chunk_recv)
|
||||
|
||||
rv = cnghttp2.nghttp2_session_server_new(&self.session, callbacks,
|
||||
<void*>self)
|
||||
|
||||
cnghttp2.nghttp2_session_callbacks_del(callbacks)
|
||||
|
||||
if rv != 0:
|
||||
raise Exception('nghttp2_session_server_new failed: {}'.format\
|
||||
(_strerror(rv)))
|
||||
|
||||
iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS
|
||||
iv[0].value = 100
|
||||
iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE
|
||||
iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE
|
||||
|
||||
rv = cnghttp2.nghttp2_submit_settings(self.session,
|
||||
cnghttp2.NGHTTP2_FLAG_NONE,
|
||||
iv, sizeof(iv) / sizeof(iv[0]))
|
||||
|
||||
if rv != 0:
|
||||
raise Exception('nghttp2_submit_settings failed: {}'.format\
|
||||
(_strerror(rv)))
|
||||
|
||||
def send_response(self, handler):
|
||||
cdef cnghttp2.nghttp2_data_provider prd
|
||||
cdef cnghttp2.nghttp2_data_provider *prd_ptr
|
||||
|
@ -583,6 +824,8 @@ cdef class _HTTP2SessionCore:
|
|||
cdef size_t nvlen
|
||||
cdef int rv
|
||||
|
||||
logging.debug('send_response, stream_id:%s', handler.stream_id)
|
||||
|
||||
nva = NULL
|
||||
nvlen = _make_nva(&nva, handler.response_headers)
|
||||
|
||||
|
@ -628,62 +871,132 @@ cdef class _HTTP2SessionCore:
|
|||
|
||||
promised_handler.stream_id = promised_stream_id
|
||||
|
||||
logging.debug('push, stream_id:%s', promised_stream_id)
|
||||
|
||||
return promised_handler
|
||||
|
||||
def _rst_stream(self, stream_id,
|
||||
error_code=cnghttp2.NGHTTP2_INTERNAL_ERROR):
|
||||
cdef class _HTTP2ClientSessionCore(_HTTP2SessionCoreBase):
|
||||
def __cinit__(self, *args, **kwargs):
|
||||
cdef cnghttp2.nghttp2_session_callbacks *callbacks
|
||||
cdef cnghttp2.nghttp2_settings_entry iv[2]
|
||||
cdef int rv
|
||||
|
||||
rv = cnghttp2.nghttp2_submit_rst_stream\
|
||||
(self.session, cnghttp2.NGHTTP2_FLAG_NONE,
|
||||
stream_id, error_code)
|
||||
super(_HTTP2ClientSessionCore, self).__init__(*args, **kwargs)
|
||||
|
||||
return rv
|
||||
rv = cnghttp2.nghttp2_session_callbacks_new(&callbacks)
|
||||
|
||||
def _start_settings_timer(self):
|
||||
loop = asyncio.get_event_loop()
|
||||
self.settings_timer = loop.call_later(self.SETTINGS_TIMEOUT,
|
||||
self._settings_timeout)
|
||||
if rv != 0:
|
||||
raise Exception('nghttp2_session_callbacks_new failed: {}'.format\
|
||||
(_strerror(rv)))
|
||||
|
||||
def _stop_settings_timer(self):
|
||||
if self.settings_timer:
|
||||
self.settings_timer.cancel()
|
||||
self.settings_timer = None
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_header_callback(
|
||||
callbacks, client_on_header)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_begin_headers_callback(
|
||||
callbacks, client_on_begin_headers)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback(
|
||||
callbacks, client_on_frame_recv)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_stream_close_callback(
|
||||
callbacks, on_stream_close)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_frame_send_callback(
|
||||
callbacks, client_on_frame_send)
|
||||
cnghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
|
||||
callbacks, on_data_chunk_recv)
|
||||
|
||||
def _settings_timeout(self):
|
||||
cdef int rv
|
||||
rv = cnghttp2.nghttp2_session_client_new(&self.session, callbacks,
|
||||
<void*>self)
|
||||
|
||||
self.settings_timer = None
|
||||
cnghttp2.nghttp2_session_callbacks_del(callbacks)
|
||||
|
||||
rv = cnghttp2.nghttp2_session_terminate_session\
|
||||
(self.session, cnghttp2.NGHTTP2_SETTINGS_TIMEOUT)
|
||||
try:
|
||||
self.send_data()
|
||||
except Exception as err:
|
||||
sys.stderr.write(traceback.format_exc())
|
||||
self.transport.close()
|
||||
return
|
||||
if rv != 0:
|
||||
raise Exception('nghttp2_session_client_new failed: {}'.format\
|
||||
(_strerror(rv)))
|
||||
|
||||
def _get_client_address(self):
|
||||
return self.transport.get_extra_info('peername')
|
||||
iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS
|
||||
iv[0].value = 100
|
||||
iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE
|
||||
iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE
|
||||
|
||||
def _log_request(self, handler):
|
||||
now = datetime.datetime.now()
|
||||
tv = time.mktime(now.timetuple())
|
||||
datestr = email.utils.formatdate(timeval=tv, localtime=False,
|
||||
usegmt=True)
|
||||
try:
|
||||
method = handler.method.decode('utf-8')
|
||||
except:
|
||||
method = handler.method
|
||||
try:
|
||||
path = handler.path.decode('utf-8')
|
||||
except:
|
||||
path = handler.path
|
||||
sys.stderr.write('{} - - [{}] "{} {} HTTP/2" {} - {}\n'.format\
|
||||
(handler.client_address[0],
|
||||
datestr, method, path, handler.status,
|
||||
'P' if handler.pushed else '-'))
|
||||
rv = cnghttp2.nghttp2_submit_settings(self.session,
|
||||
cnghttp2.NGHTTP2_FLAG_NONE,
|
||||
iv, sizeof(iv) / sizeof(iv[0]))
|
||||
|
||||
if rv != 0:
|
||||
raise Exception('nghttp2_submit_settings failed: {}'.format\
|
||||
(_strerror(rv)))
|
||||
|
||||
def send_request(self, method, scheme, host, path, headers, body, handler):
|
||||
cdef cnghttp2.nghttp2_data_provider prd
|
||||
cdef cnghttp2.nghttp2_data_provider *prd_ptr
|
||||
cdef cnghttp2.nghttp2_priority_spec *pri_ptr
|
||||
cdef cnghttp2.nghttp2_nv *nva
|
||||
cdef size_t nvlen
|
||||
cdef int32_t stream_id
|
||||
|
||||
body = wrap_body(body)
|
||||
|
||||
if headers is None:
|
||||
headers = []
|
||||
|
||||
headers = _encode_headers(headers)
|
||||
headers.append((b':scheme', scheme.encode('utf-8')))
|
||||
headers.append((b':method', method.encode('utf-8')))
|
||||
headers.append((b':authority', host.encode('utf-8')))
|
||||
headers.append((b':path', path.encode('utf-8')))
|
||||
|
||||
nva = NULL
|
||||
nvlen = _make_nva(&nva, headers)
|
||||
|
||||
if body:
|
||||
prd.source.ptr = <void*>body
|
||||
prd.read_callback = client_data_source_read
|
||||
prd_ptr = &prd
|
||||
else:
|
||||
prd_ptr = NULL
|
||||
|
||||
# TODO: Enable priorities
|
||||
pri_ptr = NULL
|
||||
|
||||
stream_id = cnghttp2.nghttp2_submit_request\
|
||||
(self.session, pri_ptr,
|
||||
nva, nvlen, prd_ptr,
|
||||
<void*>handler)
|
||||
free(nva)
|
||||
|
||||
if stream_id < 0:
|
||||
raise Exception('nghttp2_submit_request failed: {}'.format\
|
||||
(_strerror(stream_id)))
|
||||
|
||||
logging.debug('request, stream_id:%s', stream_id)
|
||||
|
||||
self._add_handler(handler, stream_id)
|
||||
cnghttp2.nghttp2_session_set_stream_user_data(self.session, stream_id,
|
||||
<void*>handler)
|
||||
|
||||
return handler
|
||||
|
||||
def push(self, push_promise, handler):
|
||||
if handler:
|
||||
# push_promise accepted, fill in the handler with the stored
|
||||
# headers from the push_promise
|
||||
handler.status = push_promise.status
|
||||
handler.scheme = push_promise.scheme
|
||||
handler.method = push_promise.method
|
||||
handler.host = push_promise.host
|
||||
handler.path = push_promise.path
|
||||
handler.headers = push_promise.headers
|
||||
handler.cookies = push_promise.cookies
|
||||
handler.stream_id = push_promise.stream_id
|
||||
handler.http2 = self
|
||||
handler.pushed = True
|
||||
|
||||
self._add_handler(handler, handler.stream_id)
|
||||
|
||||
cnghttp2.nghttp2_session_set_stream_user_data(self.session, handler.stream_id,
|
||||
<void*>handler)
|
||||
else:
|
||||
# push_promise rejected, reset the stream
|
||||
self._rst_stream(push_promise.stream_id,
|
||||
error_code=cnghttp2.NGHTTP2_NO_ERROR)
|
||||
|
||||
if asyncio:
|
||||
|
||||
|
@ -742,7 +1055,7 @@ if asyncio:
|
|||
self.stream_id = stream_id
|
||||
self.http2 = http2
|
||||
# address of the client
|
||||
self.client_address = self.http2._get_client_address()
|
||||
self.remote_address = self.http2._get_remote_address()
|
||||
# :scheme header field in request
|
||||
self.scheme = None
|
||||
# :method header field in request
|
||||
|
@ -756,6 +1069,10 @@ if asyncio:
|
|||
# True if this is a handler for pushed resource
|
||||
self.pushed = False
|
||||
|
||||
@property
|
||||
def client_address(self):
|
||||
return self.remote_address
|
||||
|
||||
def on_headers(self):
|
||||
|
||||
'''Called when request HEADERS is arrived.
|
||||
|
@ -807,7 +1124,7 @@ if asyncio:
|
|||
if not status:
|
||||
raise Exception('status must not be empty')
|
||||
|
||||
body = self._wrap_body(body)
|
||||
body = wrap_body(body)
|
||||
|
||||
self._set_response_prop(status, headers, body)
|
||||
self.http2.send_response(self)
|
||||
|
@ -847,7 +1164,7 @@ if asyncio:
|
|||
if not path:
|
||||
raise Exception('path must not be empty')
|
||||
|
||||
body = self._wrap_body(body)
|
||||
body = wrap_body(body)
|
||||
|
||||
promised_handler = self.http2._make_handler(-1)
|
||||
promised_handler.pushed = True
|
||||
|
@ -882,19 +1199,6 @@ if asyncio:
|
|||
|
||||
self.response_body = body
|
||||
|
||||
def _wrap_body(self, body):
|
||||
if body is None:
|
||||
return body
|
||||
elif isinstance(body, str):
|
||||
return io.BytesIO(body.encode('utf-8'))
|
||||
elif isinstance(body, bytes):
|
||||
return io.BytesIO(body)
|
||||
elif isinstance(body, io.IOBase):
|
||||
return body
|
||||
else:
|
||||
raise Exception(('body must be None or instance of str or '
|
||||
'bytes or io.IOBase'))
|
||||
|
||||
def _encode_headers(headers):
|
||||
return [(k if isinstance(k, bytes) else k.encode('utf-8'),
|
||||
v if isinstance(v, bytes) else v.encode('utf-8')) \
|
||||
|
@ -908,17 +1212,23 @@ if asyncio:
|
|||
self.http2 = None
|
||||
|
||||
def connection_made(self, transport):
|
||||
address = transport.get_extra_info('peername')
|
||||
logging.info('connection_made, address:%s, port:%s', address[0], address[1])
|
||||
|
||||
self.transport = transport
|
||||
self.connection_header = cnghttp2.NGHTTP2_CLIENT_CONNECTION_PREFACE
|
||||
sock = self.transport.get_extra_info('socket')
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
ssl_ctx = self.transport.get_extra_info('sslcontext')
|
||||
if ssl_ctx:
|
||||
if sock.selected_npn_protocol().encode('utf-8') != \
|
||||
protocol = sock.selected_npn_protocol()
|
||||
logging.info('npn, protocol:%s', protocol)
|
||||
if protocol.encode('utf-8') != \
|
||||
cnghttp2.NGHTTP2_PROTO_VERSION_ID:
|
||||
self.transport.abort()
|
||||
|
||||
def connection_lost(self, exc):
|
||||
logging.info('connection_lost')
|
||||
if self.http2:
|
||||
self.http2 = None
|
||||
|
||||
|
@ -991,6 +1301,7 @@ if asyncio:
|
|||
host=address[0], port=address[1],
|
||||
ssl=ssl)
|
||||
self.server = self.loop.run_until_complete(coro)
|
||||
logging.info('listen, address:%s, port:%s', address[0], address[1])
|
||||
|
||||
def serve_forever(self):
|
||||
try:
|
||||
|
@ -998,3 +1309,268 @@ if asyncio:
|
|||
finally:
|
||||
self.server.close()
|
||||
self.loop.close()
|
||||
|
||||
|
||||
|
||||
class BaseResponseHandler:
|
||||
|
||||
"""HTTP/2 response (stream) handler base class.
|
||||
|
||||
The class is used to handle the HTTP/2 stream. By default, it does
|
||||
not nothing. It must be subclassed to handle each event callback
|
||||
method.
|
||||
|
||||
The first callback method invoked is on_headers(). It is called
|
||||
when HEADERS frame, which includes response header fields, is
|
||||
arrived.
|
||||
|
||||
If response has a body, on_data(data) is invoked for each
|
||||
chunk of received data.
|
||||
|
||||
When whole response is received, on_response_done() is invoked.
|
||||
|
||||
When stream is closed, on_close(error_code) is called.
|
||||
|
||||
The application can send follow up requests using HTTP2Client.send_request() method.
|
||||
|
||||
The application can handle push resource using on_push_promise() method.
|
||||
|
||||
The following instance variables are available:
|
||||
|
||||
server_address
|
||||
Contains a tuple of the form (host, port) referring to the server's
|
||||
address.
|
||||
|
||||
stream_id
|
||||
Stream ID of this stream
|
||||
|
||||
scheme
|
||||
Scheme of the request URI. This is a value of :scheme header field.
|
||||
|
||||
method
|
||||
Method of this stream. This is a value of :method header field.
|
||||
|
||||
host
|
||||
This is a value of :authority or host header field.
|
||||
|
||||
path
|
||||
This is a value of :path header field.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, http2=None, stream_id=-1):
|
||||
self.headers = []
|
||||
self.cookies = []
|
||||
# Stream ID. For promised stream, it is initially -1.
|
||||
self.stream_id = stream_id
|
||||
self.http2 = http2
|
||||
# address of the server
|
||||
self.remote_address = None
|
||||
# :scheme header field in request
|
||||
self.scheme = None
|
||||
# :method header field in request
|
||||
self.method = None
|
||||
# :authority or host header field in request
|
||||
self.host = None
|
||||
# :path header field in request
|
||||
self.path = None
|
||||
# HTTP status
|
||||
self.status = None
|
||||
# True if this is a handler for pushed resource
|
||||
self.pushed = False
|
||||
|
||||
@property
|
||||
def server_address(self):
|
||||
return self.remote_address
|
||||
|
||||
def on_headers(self):
|
||||
|
||||
'''Called when response HEADERS is arrived.
|
||||
|
||||
'''
|
||||
pass
|
||||
|
||||
def on_data(self, data):
|
||||
|
||||
'''Called when a chunk of response body is arrived. This method
|
||||
will be called multiple times until all data are received.
|
||||
|
||||
'''
|
||||
pass
|
||||
|
||||
def on_response_done(self):
|
||||
|
||||
'''Called when whole response was received
|
||||
|
||||
'''
|
||||
pass
|
||||
|
||||
def on_close(self, error_code):
|
||||
|
||||
'''Called when stream is about to close.
|
||||
|
||||
'''
|
||||
pass
|
||||
|
||||
def on_push_promise(self, push_promise):
|
||||
|
||||
'''Called when a push is promised. Default behavior is to
|
||||
cancel the push. If application overrides this method,
|
||||
it should call either accept_push or reject_push.
|
||||
|
||||
'''
|
||||
self.reject_push(push_promise)
|
||||
|
||||
def reject_push(self, push_promise):
|
||||
|
||||
'''Convenience method equivalent to calling accept_push
|
||||
with a falsy value.
|
||||
|
||||
'''
|
||||
self.http2.push(push_promise, None)
|
||||
|
||||
def accept_push(self, push_promise, handler=None):
|
||||
|
||||
'''Accept a push_promise and provider a handler for the
|
||||
new stream. If a falsy value is supplied for the handler,
|
||||
the push is rejected.
|
||||
|
||||
'''
|
||||
self.http2.push(push_promise, handler)
|
||||
|
||||
class _HTTP2ClientSession(asyncio.Protocol):
|
||||
|
||||
def __init__(self, client):
|
||||
asyncio.Protocol.__init__(self)
|
||||
self.http2 = None
|
||||
self.pending = []
|
||||
self.client = client
|
||||
|
||||
def connection_made(self, transport):
|
||||
address = transport.get_extra_info('peername')
|
||||
logging.info('connection_made, address:%s, port:%s', address[0], address[1])
|
||||
|
||||
self.transport = transport
|
||||
sock = self.transport.get_extra_info('socket')
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
ssl_ctx = self.transport.get_extra_info('sslcontext')
|
||||
if ssl_ctx:
|
||||
protocol = sock.selected_npn_protocol()
|
||||
logging.info('npn, protocol:%s', protocol)
|
||||
if protocol is None or protocol.encode('utf-8') != \
|
||||
cnghttp2.NGHTTP2_PROTO_VERSION_ID:
|
||||
self.transport.abort()
|
||||
|
||||
# Send preamble
|
||||
self.transport.write(cnghttp2.NGHTTP2_CLIENT_CONNECTION_PREFACE)
|
||||
self.http2 = _HTTP2ClientSessionCore(self.transport)
|
||||
|
||||
# Clear pending requests
|
||||
send_pending = self.pending
|
||||
self.pending = []
|
||||
for method,scheme,host,path,headers,body,handler in send_pending:
|
||||
self.send_request(method=method, scheme=scheme, host=host, path=path,\
|
||||
headers=headers, body=body, handler=handler)
|
||||
|
||||
|
||||
def connection_lost(self, exc):
|
||||
logging.info('connection_lost')
|
||||
if self.http2:
|
||||
self.http2 = None
|
||||
self.client.close()
|
||||
|
||||
def data_received(self, data):
|
||||
try:
|
||||
self.http2.data_received(data)
|
||||
except Exception as err:
|
||||
sys.stderr.write(traceback.format_exc())
|
||||
self.transport.close()
|
||||
return
|
||||
|
||||
def resume_writing(self):
|
||||
try:
|
||||
self.http2.send_data()
|
||||
except Exception as err:
|
||||
sys.stderr.write(traceback.format_exc())
|
||||
self.transport.close()
|
||||
return
|
||||
|
||||
def send_request(self, method, scheme, host, path, headers, body, handler):
|
||||
try:
|
||||
# Waiting until connection established
|
||||
if not self.http2:
|
||||
self.pending.append([method, scheme, host, path, headers, body, handler])
|
||||
return
|
||||
|
||||
self.http2.send_request(method=method, scheme=scheme, host=host, path=path,\
|
||||
headers=headers, body=body, handler=handler)
|
||||
except Exception as err:
|
||||
sys.stderr.write(traceback.format_exc())
|
||||
self.transport.close()
|
||||
return
|
||||
|
||||
def close(self):
|
||||
if self.http2:
|
||||
self.http2.close()
|
||||
|
||||
|
||||
class HTTP2Client:
|
||||
|
||||
'''HTTP/2 client.
|
||||
|
||||
This class builds on top of the asyncio event loop.
|
||||
|
||||
'''
|
||||
def __init__(self, address, loop=None, ssl=None):
|
||||
|
||||
'''address is a tuple of the connect address and port (e.g.,
|
||||
('127.0.0.1', 8080)). The ssl can be ssl.SSLContext instance.
|
||||
If it is not None, the resulting client is SSL/TLS capable.
|
||||
'''
|
||||
|
||||
self.address = address
|
||||
self.session = _HTTP2ClientSession(self)
|
||||
def session_factory():
|
||||
return self.session
|
||||
|
||||
if ssl:
|
||||
ssl.set_npn_protocols([cnghttp2.NGHTTP2_PROTO_VERSION_ID\
|
||||
.decode('utf-8')])
|
||||
|
||||
self.loop = loop
|
||||
if not self.loop:
|
||||
self.loop = asyncio.get_event_loop()
|
||||
|
||||
coro = self.loop.create_connection(session_factory,
|
||||
host=address[0], port=address[1],
|
||||
ssl=ssl)
|
||||
|
||||
if ssl:
|
||||
self.scheme = 'https'
|
||||
else:
|
||||
self.scheme = 'http'
|
||||
|
||||
self.transport,_ = self.loop.run_until_complete(coro)
|
||||
logging.info('connect, address:%s, port:%s', self.address[0], self.address[1])
|
||||
|
||||
@property
|
||||
def io_loop(self):
|
||||
return self.loop
|
||||
|
||||
def close(self):
|
||||
self.session.close()
|
||||
|
||||
def send_request(self, method='GET', url='/', headers=None, body=None, handler=None):
|
||||
url = urlparse(url)
|
||||
scheme = url.scheme if url.scheme else self.scheme
|
||||
host = url.netloc if url.netloc else self.address[0]+':'+str(self.address[1])
|
||||
path = url.path
|
||||
if url.params:
|
||||
path += ';'+url.params
|
||||
if url.query:
|
||||
path += '?'+url.query
|
||||
if url.fragment:
|
||||
path += '#'+url.fragment
|
||||
|
||||
self.session.send_request(method=method, scheme=scheme, host=host, path=path,\
|
||||
headers=headers, body=body, handler=handler)
|
||||
|
|
Loading…
Reference in New Issue