1657 lines
58 KiB
Cython
1657 lines
58 KiB
Cython
# nghttp2 - HTTP/2 C Library
|
|
|
|
# Copyright (c) 2013 Tatsuhiro Tsujikawa
|
|
|
|
# Permission is hereby granted, free of charge, to any person obtaining
|
|
# a copy of this software and associated documentation files (the
|
|
# "Software"), to deal in the Software without restriction, including
|
|
# without limitation the rights to use, copy, modify, merge, publish,
|
|
# distribute, sublicense, and/or sell copies of the Software, and to
|
|
# permit persons to whom the Software is furnished to do so, subject to
|
|
# the following conditions:
|
|
|
|
# The above copyright notice and this permission notice shall be
|
|
# included in all copies or substantial portions of the Software.
|
|
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
|
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
|
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
|
|
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
|
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
|
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
cimport cnghttp2
|
|
|
|
from libc.stdlib cimport malloc, free
|
|
from libc.string cimport memcpy, memset
|
|
from libc.stdint cimport uint8_t, uint16_t, uint32_t, int32_t
|
|
import logging
|
|
|
|
|
|
DEFAULT_HEADER_TABLE_SIZE = cnghttp2.NGHTTP2_DEFAULT_HEADER_TABLE_SIZE
|
|
DEFLATE_MAX_HEADER_TABLE_SIZE = 4096
|
|
|
|
HD_ENTRY_OVERHEAD = 32
|
|
|
|
class HDTableEntry:
|
|
|
|
def __init__(self, name, namelen, value, valuelen):
|
|
self.name = name
|
|
self.namelen = namelen
|
|
self.value = value
|
|
self.valuelen = valuelen
|
|
|
|
def space(self):
|
|
return self.namelen + self.valuelen + HD_ENTRY_OVERHEAD
|
|
|
|
cdef _get_pybytes(uint8_t *b, uint16_t blen):
|
|
return b[:blen]
|
|
|
|
cdef class HDDeflater:
|
|
'''Performs header compression. The constructor takes
|
|
|hd_table_bufsize_max| parameter, which limits the usage of header
|
|
table in the given amount of bytes. This is necessary because the
|
|
header compressor and decompressor share the same amount of
|
|
header table and the decompressor decides that number. The
|
|
compressor may not want to use all header table size because of
|
|
limited memory availability. In that case, the
|
|
|hd_table_bufsize_max| can be used to cap the upper limit of table
|
|
size whatever the header table size is chosen by the decompressor.
|
|
The default value of |hd_table_bufsize_max| is 4096 bytes.
|
|
|
|
The following example shows how to compress request header sets:
|
|
|
|
import binascii, nghttp2
|
|
|
|
deflater = nghttp2.HDDeflater()
|
|
res = deflater.deflate([(b'foo', b'bar'),
|
|
(b'baz', b'buz')])
|
|
print(binascii.b2a_hex(res))
|
|
|
|
'''
|
|
|
|
cdef cnghttp2.nghttp2_hd_deflater *_deflater
|
|
|
|
def __cinit__(self, hd_table_bufsize_max = DEFLATE_MAX_HEADER_TABLE_SIZE):
|
|
rv = cnghttp2.nghttp2_hd_deflate_new(&self._deflater,
|
|
hd_table_bufsize_max)
|
|
if rv != 0:
|
|
raise Exception(_strerror(rv))
|
|
|
|
def __dealloc__(self):
|
|
cnghttp2.nghttp2_hd_deflate_del(self._deflater)
|
|
|
|
def deflate(self, headers):
|
|
'''Compresses the |headers|. The |headers| must be sequence of tuple
|
|
of name/value pair, which are sequence of bytes (not unicode
|
|
string).
|
|
|
|
This function returns the encoded header block in byte string.
|
|
An exception will be raised on error.
|
|
|
|
'''
|
|
cdef cnghttp2.nghttp2_nv *nva = <cnghttp2.nghttp2_nv*>\
|
|
malloc(sizeof(cnghttp2.nghttp2_nv)*\
|
|
len(headers))
|
|
cdef cnghttp2.nghttp2_nv *nvap = nva
|
|
|
|
for k, v in headers:
|
|
nvap[0].name = k
|
|
nvap[0].namelen = len(k)
|
|
nvap[0].value = v
|
|
nvap[0].valuelen = len(v)
|
|
nvap[0].flags = cnghttp2.NGHTTP2_NV_FLAG_NONE
|
|
nvap += 1
|
|
|
|
cdef size_t outcap = 0
|
|
cdef ssize_t rv
|
|
cdef uint8_t *out
|
|
cdef size_t outlen
|
|
|
|
outlen = cnghttp2.nghttp2_hd_deflate_bound(self._deflater,
|
|
nva, len(headers))
|
|
|
|
out = <uint8_t*>malloc(outlen)
|
|
|
|
rv = cnghttp2.nghttp2_hd_deflate_hd(self._deflater, out, outlen,
|
|
nva, len(headers))
|
|
free(nva)
|
|
|
|
if rv < 0:
|
|
free(out)
|
|
|
|
raise Exception(_strerror(rv))
|
|
|
|
cdef bytes res
|
|
|
|
try:
|
|
res = out[:rv]
|
|
finally:
|
|
free(out)
|
|
|
|
return res
|
|
|
|
def change_table_size(self, hd_table_bufsize_max):
|
|
'''Changes header table size to |hd_table_bufsize_max| byte.
|
|
|
|
An exception will be raised on error.
|
|
|
|
'''
|
|
cdef int rv
|
|
rv = cnghttp2.nghttp2_hd_deflate_change_table_size(self._deflater,
|
|
hd_table_bufsize_max)
|
|
if rv != 0:
|
|
raise Exception(_strerror(rv))
|
|
|
|
def get_hd_table(self):
|
|
'''Returns copy of current dynamic header table.'''
|
|
cdef size_t length = cnghttp2.nghttp2_hd_deflate_get_num_table_entries(
|
|
self._deflater)
|
|
cdef const cnghttp2.nghttp2_nv *nv
|
|
res = []
|
|
for i in range(62, length + 1):
|
|
nv = cnghttp2.nghttp2_hd_deflate_get_table_entry(self._deflater, i)
|
|
k = _get_pybytes(nv.name, nv.namelen)
|
|
v = _get_pybytes(nv.value, nv.valuelen)
|
|
res.append(HDTableEntry(k, nv.namelen, v, nv.valuelen))
|
|
return res
|
|
|
|
cdef class HDInflater:
|
|
'''Performs header decompression.
|
|
|
|
The following example shows how to compress request header sets:
|
|
|
|
data = b'0082c5ad82bd0f000362617a0362757a'
|
|
inflater = nghttp2.HDInflater()
|
|
hdrs = inflater.inflate(data)
|
|
print(hdrs)
|
|
|
|
'''
|
|
|
|
cdef cnghttp2.nghttp2_hd_inflater *_inflater
|
|
|
|
def __cinit__(self):
|
|
rv = cnghttp2.nghttp2_hd_inflate_new(&self._inflater)
|
|
if rv != 0:
|
|
raise Exception(_strerror(rv))
|
|
|
|
def __dealloc__(self):
|
|
cnghttp2.nghttp2_hd_inflate_del(self._inflater)
|
|
|
|
def inflate(self, data):
|
|
'''Decompresses the compressed header block |data|. The |data| must be
|
|
byte string (not unicode string).
|
|
|
|
'''
|
|
cdef cnghttp2.nghttp2_nv nv
|
|
cdef int inflate_flags
|
|
cdef ssize_t rv
|
|
cdef uint8_t *buf = data
|
|
cdef size_t buflen = len(data)
|
|
res = []
|
|
while True:
|
|
inflate_flags = 0
|
|
rv = cnghttp2.nghttp2_hd_inflate_hd2(self._inflater, &nv,
|
|
&inflate_flags,
|
|
buf, buflen, 1)
|
|
if rv < 0:
|
|
raise Exception(_strerror(rv))
|
|
buf += rv
|
|
buflen -= rv
|
|
if inflate_flags & cnghttp2.NGHTTP2_HD_INFLATE_EMIT:
|
|
# may throw
|
|
res.append((nv.name[:nv.namelen], nv.value[:nv.valuelen]))
|
|
if inflate_flags & cnghttp2.NGHTTP2_HD_INFLATE_FINAL:
|
|
break
|
|
|
|
cnghttp2.nghttp2_hd_inflate_end_headers(self._inflater)
|
|
return res
|
|
|
|
def change_table_size(self, hd_table_bufsize_max):
|
|
'''Changes header table size to |hd_table_bufsize_max| byte.
|
|
|
|
An exception will be raised on error.
|
|
|
|
'''
|
|
cdef int rv
|
|
rv = cnghttp2.nghttp2_hd_inflate_change_table_size(self._inflater,
|
|
hd_table_bufsize_max)
|
|
if rv != 0:
|
|
raise Exception(_strerror(rv))
|
|
|
|
def get_hd_table(self):
|
|
'''Returns copy of current dynamic header table.'''
|
|
cdef size_t length = cnghttp2.nghttp2_hd_inflate_get_num_table_entries(
|
|
self._inflater)
|
|
cdef const cnghttp2.nghttp2_nv *nv
|
|
res = []
|
|
for i in range(62, length + 1):
|
|
nv = cnghttp2.nghttp2_hd_inflate_get_table_entry(self._inflater, i)
|
|
k = _get_pybytes(nv.name, nv.namelen)
|
|
v = _get_pybytes(nv.value, nv.valuelen)
|
|
res.append(HDTableEntry(k, nv.namelen, v, nv.valuelen))
|
|
return res
|
|
|
|
cdef _strerror(int liberror_code):
|
|
return cnghttp2.nghttp2_strerror(liberror_code).decode('utf-8')
|
|
|
|
def print_hd_table(hdtable):
|
|
'''Convenient function to print |hdtable| to the standard output. This
|
|
function does not work if header name/value cannot be decoded using
|
|
UTF-8 encoding.
|
|
|
|
s=N means the entry occupies N bytes in header table.
|
|
|
|
'''
|
|
idx = 0
|
|
for entry in hdtable:
|
|
idx += 1
|
|
print('[{}] (s={}) {}: {}'\
|
|
.format(idx, entry.space(),
|
|
entry.name.decode('utf-8'),
|
|
entry.value.decode('utf-8')))
|
|
|
|
try:
|
|
import socket
|
|
import io
|
|
import asyncio
|
|
import traceback
|
|
import sys
|
|
import email.utils
|
|
import datetime
|
|
import time
|
|
import ssl as tls
|
|
from urllib.parse import urlparse
|
|
except ImportError:
|
|
asyncio = None
|
|
|
|
# body generator flags
|
|
DATA_OK = 0
|
|
DATA_EOF = 1
|
|
DATA_DEFERRED = 2
|
|
|
|
class _ByteIOWrapper:
|
|
|
|
def __init__(self, b):
|
|
self.b = b
|
|
|
|
def generate(self, n):
|
|
data = self.b.read1(n)
|
|
if not data:
|
|
return None, DATA_EOF
|
|
return data, DATA_OK
|
|
|
|
def wrap_body(body):
|
|
if body is None:
|
|
return body
|
|
elif isinstance(body, str):
|
|
return _ByteIOWrapper(io.BytesIO(body.encode('utf-8'))).generate
|
|
elif isinstance(body, bytes):
|
|
return _ByteIOWrapper(io.BytesIO(body)).generate
|
|
elif isinstance(body, io.IOBase):
|
|
return _ByteIOWrapper(body).generate
|
|
else:
|
|
# assume that callable in the form f(n) returning tuple byte
|
|
# string and flag.
|
|
return body
|
|
|
|
def negotiated_protocol(ssl_obj):
|
|
protocol = ssl_obj.selected_alpn_protocol()
|
|
if protocol:
|
|
logging.info('alpn, protocol:%s', protocol)
|
|
return protocol
|
|
|
|
protocol = ssl_obj.selected_npn_protocol()
|
|
if protocol:
|
|
logging.info('npn, protocol:%s', protocol)
|
|
return protocol
|
|
|
|
return None
|
|
|
|
def set_application_protocol(ssl_ctx):
|
|
app_protos = [cnghttp2.NGHTTP2_PROTO_VERSION_ID.decode('utf-8')]
|
|
ssl_ctx.set_npn_protocols(app_protos)
|
|
if tls.HAS_ALPN:
|
|
ssl_ctx.set_alpn_protocols(app_protos)
|
|
|
|
cdef _get_stream_user_data(cnghttp2.nghttp2_session *session,
|
|
int32_t stream_id):
|
|
cdef void *stream_user_data
|
|
|
|
stream_user_data = cnghttp2.nghttp2_session_get_stream_user_data\
|
|
(session, stream_id)
|
|
if stream_user_data == NULL:
|
|
return None
|
|
|
|
return <object>stream_user_data
|
|
|
|
cdef size_t _make_nva(cnghttp2.nghttp2_nv **nva_ptr, headers):
|
|
cdef cnghttp2.nghttp2_nv *nva
|
|
cdef size_t nvlen
|
|
|
|
nvlen = len(headers)
|
|
nva = <cnghttp2.nghttp2_nv*>malloc(sizeof(cnghttp2.nghttp2_nv) * nvlen)
|
|
for i, (k, v) in enumerate(headers):
|
|
nva[i].name = k
|
|
nva[i].namelen = len(k)
|
|
nva[i].value = v
|
|
nva[i].valuelen = len(v)
|
|
nva[i].flags = cnghttp2.NGHTTP2_NV_FLAG_NONE
|
|
|
|
nva_ptr[0] = nva
|
|
|
|
return nvlen
|
|
|
|
cdef int server_on_header(cnghttp2.nghttp2_session *session,
|
|
const cnghttp2.nghttp2_frame *frame,
|
|
const uint8_t *name, size_t namelen,
|
|
const uint8_t *value, size_t valuelen,
|
|
uint8_t flags,
|
|
void *user_data):
|
|
cdef http2 = <_HTTP2SessionCoreBase>user_data
|
|
logging.debug('server_on_header, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
|
|
|
handler = _get_stream_user_data(session, frame.hd.stream_id)
|
|
return on_header(name, namelen, value, valuelen, flags, handler)
|
|
|
|
cdef int client_on_header(cnghttp2.nghttp2_session *session,
|
|
const cnghttp2.nghttp2_frame *frame,
|
|
const uint8_t *name, size_t namelen,
|
|
const uint8_t *value, size_t valuelen,
|
|
uint8_t flags,
|
|
void *user_data):
|
|
cdef http2 = <_HTTP2SessionCoreBase>user_data
|
|
logging.debug('client_on_header, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
|
|
|
if frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
|
|
handler = _get_stream_user_data(session, frame.hd.stream_id)
|
|
elif frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
|
|
handler = _get_stream_user_data(session, frame.push_promise.promised_stream_id)
|
|
|
|
return on_header(name, namelen, value, valuelen, flags, handler)
|
|
|
|
|
|
cdef int on_header(const uint8_t *name, size_t namelen,
|
|
const uint8_t *value, size_t valuelen,
|
|
uint8_t flags,
|
|
object handler):
|
|
if not handler:
|
|
return 0
|
|
|
|
key = name[:namelen]
|
|
values = value[:valuelen].split(b'\x00')
|
|
if key == b':scheme':
|
|
handler.scheme = values[0]
|
|
elif key == b':method':
|
|
handler.method = values[0]
|
|
elif key == b':authority' or key == b'host':
|
|
handler.host = values[0]
|
|
elif key == b':path':
|
|
handler.path = values[0]
|
|
elif key == b':status':
|
|
handler.status = values[0]
|
|
|
|
if key == b'cookie':
|
|
handler.cookies.extend(values)
|
|
else:
|
|
for v in values:
|
|
handler.headers.append((key, v))
|
|
|
|
return 0
|
|
|
|
cdef int server_on_begin_request_headers(cnghttp2.nghttp2_session *session,
|
|
const cnghttp2.nghttp2_frame *frame,
|
|
void *user_data):
|
|
cdef http2 = <_HTTP2SessionCore>user_data
|
|
|
|
handler = http2._make_handler(frame.hd.stream_id)
|
|
cnghttp2.nghttp2_session_set_stream_user_data(session, frame.hd.stream_id,
|
|
<void*>handler)
|
|
|
|
return 0
|
|
|
|
cdef int server_on_begin_headers(cnghttp2.nghttp2_session *session,
|
|
const cnghttp2.nghttp2_frame *frame,
|
|
void *user_data):
|
|
if frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
|
|
if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_REQUEST:
|
|
return server_on_begin_request_headers(session, frame, user_data)
|
|
|
|
return 0
|
|
|
|
cdef int server_on_frame_recv(cnghttp2.nghttp2_session *session,
|
|
const cnghttp2.nghttp2_frame *frame,
|
|
void *user_data):
|
|
cdef http2 = <_HTTP2SessionCore>user_data
|
|
logging.debug('server_on_frame_recv, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
|
|
|
if frame.hd.type == cnghttp2.NGHTTP2_DATA:
|
|
if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM:
|
|
handler = _get_stream_user_data(session, frame.hd.stream_id)
|
|
if not handler:
|
|
return 0
|
|
try:
|
|
handler.on_request_done()
|
|
except:
|
|
sys.stderr.write(traceback.format_exc())
|
|
return http2._rst_stream(frame.hd.stream_id)
|
|
elif frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
|
|
if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_REQUEST:
|
|
handler = _get_stream_user_data(session, frame.hd.stream_id)
|
|
if not handler:
|
|
return 0
|
|
if handler.cookies:
|
|
handler.headers.append((b'cookie',
|
|
b'; '.join(handler.cookies)))
|
|
handler.cookies = None
|
|
try:
|
|
handler.on_headers()
|
|
if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM:
|
|
handler.on_request_done()
|
|
except:
|
|
sys.stderr.write(traceback.format_exc())
|
|
return http2._rst_stream(frame.hd.stream_id)
|
|
elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS:
|
|
if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK):
|
|
http2._stop_settings_timer()
|
|
|
|
return 0
|
|
|
|
cdef int on_data_chunk_recv(cnghttp2.nghttp2_session *session,
|
|
uint8_t flags,
|
|
int32_t stream_id, const uint8_t *data,
|
|
size_t length, void *user_data):
|
|
cdef http2 = <_HTTP2SessionCoreBase>user_data
|
|
|
|
handler = _get_stream_user_data(session, stream_id)
|
|
if not handler:
|
|
return 0
|
|
|
|
try:
|
|
handler.on_data(data[:length])
|
|
except:
|
|
sys.stderr.write(traceback.format_exc())
|
|
return http2._rst_stream(stream_id)
|
|
|
|
return 0
|
|
|
|
cdef int server_on_frame_send(cnghttp2.nghttp2_session *session,
|
|
const cnghttp2.nghttp2_frame *frame,
|
|
void *user_data):
|
|
cdef http2 = <_HTTP2SessionCore>user_data
|
|
logging.debug('server_on_frame_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
|
|
|
if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
|
|
# For PUSH_PROMISE, send push response immediately
|
|
handler = _get_stream_user_data\
|
|
(session, frame.push_promise.promised_stream_id)
|
|
if not handler:
|
|
return 0
|
|
|
|
http2.send_response(handler)
|
|
elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS:
|
|
if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK) != 0:
|
|
return 0
|
|
http2._start_settings_timer()
|
|
elif frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
|
|
if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM) and \
|
|
cnghttp2.nghttp2_session_check_server_session(session):
|
|
# Send RST_STREAM if remote is not closed yet
|
|
if cnghttp2.nghttp2_session_get_stream_remote_close(
|
|
session, frame.hd.stream_id) == 0:
|
|
http2._rst_stream(frame.hd.stream_id, cnghttp2.NGHTTP2_NO_ERROR)
|
|
|
|
cdef int server_on_frame_not_send(cnghttp2.nghttp2_session *session,
|
|
const cnghttp2.nghttp2_frame *frame,
|
|
int lib_error_code,
|
|
void *user_data):
|
|
cdef http2 = <_HTTP2SessionCore>user_data
|
|
logging.debug('server_on_frame_not_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
|
|
|
if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
|
|
# We have to remove handler here. Without this, it is not
|
|
# removed until session is terminated.
|
|
handler = _get_stream_user_data\
|
|
(session, frame.push_promise.promised_stream_id)
|
|
if not handler:
|
|
return 0
|
|
http2._remove_handler(handler)
|
|
|
|
cdef int on_stream_close(cnghttp2.nghttp2_session *session,
|
|
int32_t stream_id,
|
|
uint32_t error_code,
|
|
void *user_data):
|
|
cdef http2 = <_HTTP2SessionCoreBase>user_data
|
|
logging.debug('on_stream_close, stream_id:%s', stream_id)
|
|
|
|
handler = _get_stream_user_data(session, stream_id)
|
|
if not handler:
|
|
return 0
|
|
|
|
try:
|
|
handler.on_close(error_code)
|
|
except:
|
|
sys.stderr.write(traceback.format_exc())
|
|
|
|
http2._remove_handler(handler)
|
|
|
|
return 0
|
|
|
|
cdef ssize_t data_source_read(cnghttp2.nghttp2_session *session,
|
|
int32_t stream_id,
|
|
uint8_t *buf, size_t length,
|
|
uint32_t *data_flags,
|
|
cnghttp2.nghttp2_data_source *source,
|
|
void *user_data):
|
|
cdef http2 = <_HTTP2SessionCoreBase>user_data
|
|
generator = <object>source.ptr
|
|
|
|
http2.enter_callback()
|
|
try:
|
|
data, flag = generator(length)
|
|
except:
|
|
sys.stderr.write(traceback.format_exc())
|
|
return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
|
|
finally:
|
|
http2.leave_callback()
|
|
|
|
if flag == DATA_DEFERRED:
|
|
return cnghttp2.NGHTTP2_ERR_DEFERRED
|
|
|
|
if data:
|
|
nread = len(data)
|
|
memcpy(buf, <uint8_t*>data, nread)
|
|
else:
|
|
nread = 0
|
|
|
|
if flag == DATA_EOF:
|
|
data_flags[0] = cnghttp2.NGHTTP2_DATA_FLAG_EOF
|
|
if cnghttp2.nghttp2_session_check_server_session(session):
|
|
# Send RST_STREAM if remote is not closed yet
|
|
if cnghttp2.nghttp2_session_get_stream_remote_close(
|
|
session, stream_id) == 0:
|
|
http2._rst_stream(stream_id, cnghttp2.NGHTTP2_NO_ERROR)
|
|
elif flag != DATA_OK:
|
|
return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
|
|
|
|
return nread
|
|
|
|
cdef int client_on_begin_headers(cnghttp2.nghttp2_session *session,
|
|
const cnghttp2.nghttp2_frame *frame,
|
|
void *user_data):
|
|
cdef http2 = <_HTTP2ClientSessionCore>user_data
|
|
|
|
if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
|
|
# Generate a temporary handler until the headers are all received
|
|
push_handler = BaseResponseHandler()
|
|
http2._add_handler(push_handler, frame.push_promise.promised_stream_id)
|
|
cnghttp2.nghttp2_session_set_stream_user_data(session, frame.push_promise.promised_stream_id,
|
|
<void*>push_handler)
|
|
|
|
return 0
|
|
|
|
cdef int client_on_frame_recv(cnghttp2.nghttp2_session *session,
|
|
const cnghttp2.nghttp2_frame *frame,
|
|
void *user_data):
|
|
cdef http2 = <_HTTP2ClientSessionCore>user_data
|
|
logging.debug('client_on_frame_recv, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
|
|
|
if frame.hd.type == cnghttp2.NGHTTP2_DATA:
|
|
if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM:
|
|
handler = _get_stream_user_data(session, frame.hd.stream_id)
|
|
if not handler:
|
|
return 0
|
|
try:
|
|
handler.on_response_done()
|
|
except:
|
|
sys.stderr.write(traceback.format_exc())
|
|
return http2._rst_stream(frame.hd.stream_id)
|
|
elif frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
|
|
if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_RESPONSE or frame.headers.cat == cnghttp2.NGHTTP2_HCAT_PUSH_RESPONSE:
|
|
handler = _get_stream_user_data(session, frame.hd.stream_id)
|
|
|
|
if not handler:
|
|
return 0
|
|
# TODO handle 1xx non-final response
|
|
if handler.cookies:
|
|
handler.headers.append((b'cookie',
|
|
b'; '.join(handler.cookies)))
|
|
handler.cookies = None
|
|
try:
|
|
handler.on_headers()
|
|
if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM:
|
|
handler.on_response_done()
|
|
except:
|
|
sys.stderr.write(traceback.format_exc())
|
|
return http2._rst_stream(frame.hd.stream_id)
|
|
elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS:
|
|
if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK):
|
|
http2._stop_settings_timer()
|
|
elif frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
|
|
handler = _get_stream_user_data(session, frame.hd.stream_id)
|
|
if not handler:
|
|
return 0
|
|
# Get the temporary push_handler which now should have all of the header data
|
|
push_handler = _get_stream_user_data(session, frame.push_promise.promised_stream_id)
|
|
if not push_handler:
|
|
return 0
|
|
# Remove the temporary handler
|
|
http2._remove_handler(push_handler)
|
|
cnghttp2.nghttp2_session_set_stream_user_data(session, frame.push_promise.promised_stream_id,
|
|
<void*>NULL)
|
|
|
|
try:
|
|
handler.on_push_promise(push_handler)
|
|
except:
|
|
sys.stderr.write(traceback.format_exc())
|
|
return http2._rst_stream(frame.hd.stream_id)
|
|
|
|
return 0
|
|
|
|
cdef int client_on_frame_send(cnghttp2.nghttp2_session *session,
|
|
const cnghttp2.nghttp2_frame *frame,
|
|
void *user_data):
|
|
cdef http2 = <_HTTP2ClientSessionCore>user_data
|
|
logging.debug('client_on_frame_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
|
|
|
|
if frame.hd.type == cnghttp2.NGHTTP2_SETTINGS:
|
|
if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK) != 0:
|
|
return 0
|
|
http2._start_settings_timer()
|
|
|
|
cdef class _HTTP2SessionCoreBase:
|
|
cdef cnghttp2.nghttp2_session *session
|
|
cdef transport
|
|
cdef handler_class
|
|
cdef handlers
|
|
cdef settings_timer
|
|
cdef inside_callback
|
|
|
|
def __cinit__(self, transport, handler_class=None):
|
|
self.session = NULL
|
|
self.transport = transport
|
|
self.handler_class = handler_class
|
|
self.handlers = set()
|
|
self.settings_timer = None
|
|
self.inside_callback = False
|
|
|
|
def __dealloc__(self):
|
|
cnghttp2.nghttp2_session_del(self.session)
|
|
|
|
def data_received(self, data):
|
|
cdef ssize_t rv
|
|
|
|
rv = cnghttp2.nghttp2_session_mem_recv(self.session, data, len(data))
|
|
if rv < 0:
|
|
raise Exception('nghttp2_session_mem_recv failed: {}'.format\
|
|
(_strerror(rv)))
|
|
self.send_data()
|
|
|
|
OUTBUF_MAX = 65535
|
|
SETTINGS_TIMEOUT = 5.0
|
|
|
|
def send_data(self):
|
|
cdef ssize_t outbuflen
|
|
cdef const uint8_t *outbuf
|
|
|
|
while True:
|
|
if self.transport.get_write_buffer_size() > self.OUTBUF_MAX:
|
|
break
|
|
outbuflen = cnghttp2.nghttp2_session_mem_send(self.session, &outbuf)
|
|
if outbuflen == 0:
|
|
break
|
|
if outbuflen < 0:
|
|
raise Exception('nghttp2_session_mem_send faild: {}'.format\
|
|
(_strerror(outbuflen)))
|
|
self.transport.write(outbuf[:outbuflen])
|
|
|
|
if self.transport.get_write_buffer_size() == 0 and \
|
|
cnghttp2.nghttp2_session_want_read(self.session) == 0 and \
|
|
cnghttp2.nghttp2_session_want_write(self.session) == 0:
|
|
self.transport.close()
|
|
|
|
def resume(self, stream_id):
|
|
cnghttp2.nghttp2_session_resume_data(self.session, stream_id)
|
|
if not self.inside_callback:
|
|
self.send_data()
|
|
|
|
def enter_callback(self):
|
|
self.inside_callback = True
|
|
|
|
def leave_callback(self):
|
|
self.inside_callback = False
|
|
|
|
def _make_handler(self, stream_id):
|
|
logging.debug('_make_handler, stream_id:%s', stream_id)
|
|
handler = self.handler_class(self, stream_id)
|
|
self.handlers.add(handler)
|
|
return handler
|
|
|
|
def _remove_handler(self, handler):
|
|
logging.debug('_remove_handler, stream_id:%s', handler.stream_id)
|
|
self.handlers.remove(handler)
|
|
|
|
def _add_handler(self, handler, stream_id):
|
|
logging.debug('_add_handler, stream_id:%s', stream_id)
|
|
handler.stream_id = stream_id
|
|
handler.http2 = self
|
|
handler.remote_address = self._get_remote_address()
|
|
handler.client_certificate = self._get_client_certificate()
|
|
self.handlers.add(handler)
|
|
|
|
def _rst_stream(self, stream_id,
|
|
error_code=cnghttp2.NGHTTP2_INTERNAL_ERROR):
|
|
cdef int rv
|
|
|
|
rv = cnghttp2.nghttp2_submit_rst_stream\
|
|
(self.session, cnghttp2.NGHTTP2_FLAG_NONE,
|
|
stream_id, error_code)
|
|
|
|
return rv
|
|
|
|
def _get_remote_address(self):
|
|
return self.transport.get_extra_info('peername')
|
|
|
|
def _get_client_certificate(self):
|
|
sock = self.transport.get_extra_info('socket')
|
|
try:
|
|
return sock.getpeercert()
|
|
except AttributeError:
|
|
return None
|
|
|
|
def _start_settings_timer(self):
|
|
loop = asyncio.get_event_loop()
|
|
self.settings_timer = loop.call_later(self.SETTINGS_TIMEOUT,
|
|
self._settings_timeout)
|
|
|
|
def _stop_settings_timer(self):
|
|
if self.settings_timer:
|
|
self.settings_timer.cancel()
|
|
self.settings_timer = None
|
|
|
|
def _settings_timeout(self):
|
|
cdef int rv
|
|
|
|
logging.debug('_settings_timeout')
|
|
|
|
self.settings_timer = None
|
|
|
|
rv = cnghttp2.nghttp2_session_terminate_session\
|
|
(self.session, cnghttp2.NGHTTP2_SETTINGS_TIMEOUT)
|
|
try:
|
|
self.send_data()
|
|
except Exception as err:
|
|
sys.stderr.write(traceback.format_exc())
|
|
self.transport.close()
|
|
return
|
|
|
|
def _log_request(self, handler):
|
|
now = datetime.datetime.now()
|
|
tv = time.mktime(now.timetuple())
|
|
datestr = email.utils.formatdate(timeval=tv, localtime=False,
|
|
usegmt=True)
|
|
try:
|
|
method = handler.method.decode('utf-8')
|
|
except:
|
|
method = handler.method
|
|
try:
|
|
path = handler.path.decode('utf-8')
|
|
except:
|
|
path = handler.path
|
|
logging.info('%s - - [%s] "%s %s HTTP/2" %s - %s', handler.remote_address[0],
|
|
datestr, method, path, handler.status,
|
|
'P' if handler.pushed else '-')
|
|
|
|
def close(self):
|
|
rv = cnghttp2.nghttp2_session_terminate_session\
|
|
(self.session, cnghttp2.NGHTTP2_NO_ERROR)
|
|
try:
|
|
self.send_data()
|
|
except Exception as err:
|
|
sys.stderr.write(traceback.format_exc())
|
|
self.transport.close()
|
|
return
|
|
|
|
cdef class _HTTP2SessionCore(_HTTP2SessionCoreBase):
|
|
def __cinit__(self, *args, **kwargs):
|
|
cdef cnghttp2.nghttp2_session_callbacks *callbacks
|
|
cdef cnghttp2.nghttp2_settings_entry iv[2]
|
|
cdef int rv
|
|
|
|
super(_HTTP2SessionCore, self).__init__(*args, **kwargs)
|
|
|
|
rv = cnghttp2.nghttp2_session_callbacks_new(&callbacks)
|
|
|
|
if rv != 0:
|
|
raise Exception('nghttp2_session_callbacks_new failed: {}'.format\
|
|
(_strerror(rv)))
|
|
|
|
cnghttp2.nghttp2_session_callbacks_set_on_header_callback(
|
|
callbacks, server_on_header)
|
|
cnghttp2.nghttp2_session_callbacks_set_on_begin_headers_callback(
|
|
callbacks, server_on_begin_headers)
|
|
cnghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback(
|
|
callbacks, server_on_frame_recv)
|
|
cnghttp2.nghttp2_session_callbacks_set_on_stream_close_callback(
|
|
callbacks, on_stream_close)
|
|
cnghttp2.nghttp2_session_callbacks_set_on_frame_send_callback(
|
|
callbacks, server_on_frame_send)
|
|
cnghttp2.nghttp2_session_callbacks_set_on_frame_not_send_callback(
|
|
callbacks, server_on_frame_not_send)
|
|
cnghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
|
|
callbacks, on_data_chunk_recv)
|
|
|
|
rv = cnghttp2.nghttp2_session_server_new(&self.session, callbacks,
|
|
<void*>self)
|
|
|
|
cnghttp2.nghttp2_session_callbacks_del(callbacks)
|
|
|
|
if rv != 0:
|
|
raise Exception('nghttp2_session_server_new failed: {}'.format\
|
|
(_strerror(rv)))
|
|
|
|
iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS
|
|
iv[0].value = 100
|
|
iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE
|
|
iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE
|
|
|
|
rv = cnghttp2.nghttp2_submit_settings(self.session,
|
|
cnghttp2.NGHTTP2_FLAG_NONE,
|
|
iv, sizeof(iv) // sizeof(iv[0]))
|
|
|
|
if rv != 0:
|
|
raise Exception('nghttp2_submit_settings failed: {}'.format\
|
|
(_strerror(rv)))
|
|
|
|
def send_response(self, handler):
|
|
cdef cnghttp2.nghttp2_data_provider prd
|
|
cdef cnghttp2.nghttp2_data_provider *prd_ptr
|
|
cdef cnghttp2.nghttp2_nv *nva
|
|
cdef size_t nvlen
|
|
cdef int rv
|
|
|
|
logging.debug('send_response, stream_id:%s', handler.stream_id)
|
|
|
|
nva = NULL
|
|
nvlen = _make_nva(&nva, handler.response_headers)
|
|
|
|
if handler.response_body:
|
|
prd.source.ptr = <void*>handler.response_body
|
|
prd.read_callback = data_source_read
|
|
prd_ptr = &prd
|
|
else:
|
|
prd_ptr = NULL
|
|
|
|
rv = cnghttp2.nghttp2_submit_response(self.session, handler.stream_id,
|
|
nva, nvlen, prd_ptr)
|
|
|
|
free(nva)
|
|
|
|
if rv != 0:
|
|
# TODO Ignore return value
|
|
self._rst_stream(handler.stream_id)
|
|
raise Exception('nghttp2_submit_response failed: {}'.format\
|
|
(_strerror(rv)))
|
|
|
|
self._log_request(handler)
|
|
|
|
def push(self, handler, promised_handler):
|
|
cdef cnghttp2.nghttp2_nv *nva
|
|
cdef size_t nvlen
|
|
cdef int32_t promised_stream_id
|
|
|
|
self.handlers.add(promised_handler)
|
|
|
|
nva = NULL
|
|
nvlen = _make_nva(&nva, promised_handler.headers)
|
|
|
|
promised_stream_id = cnghttp2.nghttp2_submit_push_promise\
|
|
(self.session,
|
|
cnghttp2.NGHTTP2_FLAG_NONE,
|
|
handler.stream_id,
|
|
nva, nvlen,
|
|
<void*>promised_handler)
|
|
if promised_stream_id < 0:
|
|
raise Exception('nghttp2_submit_push_promise failed: {}'.format\
|
|
(_strerror(promised_stream_id)))
|
|
|
|
promised_handler.stream_id = promised_stream_id
|
|
|
|
logging.debug('push, stream_id:%s', promised_stream_id)
|
|
|
|
return promised_handler
|
|
|
|
def connection_lost(self):
|
|
self._stop_settings_timer()
|
|
|
|
for handler in self.handlers:
|
|
handler.on_close(cnghttp2.NGHTTP2_INTERNAL_ERROR)
|
|
self.handlers = set()
|
|
|
|
cdef class _HTTP2ClientSessionCore(_HTTP2SessionCoreBase):
|
|
def __cinit__(self, *args, **kwargs):
|
|
cdef cnghttp2.nghttp2_session_callbacks *callbacks
|
|
cdef cnghttp2.nghttp2_settings_entry iv[2]
|
|
cdef int rv
|
|
|
|
super(_HTTP2ClientSessionCore, self).__init__(*args, **kwargs)
|
|
|
|
rv = cnghttp2.nghttp2_session_callbacks_new(&callbacks)
|
|
|
|
if rv != 0:
|
|
raise Exception('nghttp2_session_callbacks_new failed: {}'.format\
|
|
(_strerror(rv)))
|
|
|
|
cnghttp2.nghttp2_session_callbacks_set_on_header_callback(
|
|
callbacks, client_on_header)
|
|
cnghttp2.nghttp2_session_callbacks_set_on_begin_headers_callback(
|
|
callbacks, client_on_begin_headers)
|
|
cnghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback(
|
|
callbacks, client_on_frame_recv)
|
|
cnghttp2.nghttp2_session_callbacks_set_on_stream_close_callback(
|
|
callbacks, on_stream_close)
|
|
cnghttp2.nghttp2_session_callbacks_set_on_frame_send_callback(
|
|
callbacks, client_on_frame_send)
|
|
cnghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
|
|
callbacks, on_data_chunk_recv)
|
|
|
|
rv = cnghttp2.nghttp2_session_client_new(&self.session, callbacks,
|
|
<void*>self)
|
|
|
|
cnghttp2.nghttp2_session_callbacks_del(callbacks)
|
|
|
|
if rv != 0:
|
|
raise Exception('nghttp2_session_client_new failed: {}'.format\
|
|
(_strerror(rv)))
|
|
|
|
iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS
|
|
iv[0].value = 100
|
|
iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE
|
|
iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE
|
|
|
|
rv = cnghttp2.nghttp2_submit_settings(self.session,
|
|
cnghttp2.NGHTTP2_FLAG_NONE,
|
|
iv, sizeof(iv) // sizeof(iv[0]))
|
|
|
|
if rv != 0:
|
|
raise Exception('nghttp2_submit_settings failed: {}'.format\
|
|
(_strerror(rv)))
|
|
|
|
def send_request(self, method, scheme, host, path, headers, body, handler):
|
|
cdef cnghttp2.nghttp2_data_provider prd
|
|
cdef cnghttp2.nghttp2_data_provider *prd_ptr
|
|
cdef cnghttp2.nghttp2_priority_spec *pri_ptr
|
|
cdef cnghttp2.nghttp2_nv *nva
|
|
cdef size_t nvlen
|
|
cdef int32_t stream_id
|
|
|
|
body = wrap_body(body)
|
|
|
|
custom_headers = _encode_headers(headers)
|
|
headers = [
|
|
(b':method', method.encode('utf-8')),
|
|
(b':scheme', scheme.encode('utf-8')),
|
|
(b':authority', host.encode('utf-8')),
|
|
(b':path', path.encode('utf-8'))
|
|
]
|
|
headers.extend(custom_headers)
|
|
|
|
nva = NULL
|
|
nvlen = _make_nva(&nva, headers)
|
|
|
|
if body:
|
|
prd.source.ptr = <void*>body
|
|
prd.read_callback = data_source_read
|
|
prd_ptr = &prd
|
|
else:
|
|
prd_ptr = NULL
|
|
|
|
# TODO: Enable priorities
|
|
pri_ptr = NULL
|
|
|
|
stream_id = cnghttp2.nghttp2_submit_request\
|
|
(self.session, pri_ptr,
|
|
nva, nvlen, prd_ptr,
|
|
<void*>handler)
|
|
free(nva)
|
|
|
|
if stream_id < 0:
|
|
raise Exception('nghttp2_submit_request failed: {}'.format\
|
|
(_strerror(stream_id)))
|
|
|
|
logging.debug('request, stream_id:%s', stream_id)
|
|
|
|
self._add_handler(handler, stream_id)
|
|
cnghttp2.nghttp2_session_set_stream_user_data(self.session, stream_id,
|
|
<void*>handler)
|
|
|
|
return handler
|
|
|
|
def push(self, push_promise, handler):
|
|
if handler:
|
|
# push_promise accepted, fill in the handler with the stored
|
|
# headers from the push_promise
|
|
handler.status = push_promise.status
|
|
handler.scheme = push_promise.scheme
|
|
handler.method = push_promise.method
|
|
handler.host = push_promise.host
|
|
handler.path = push_promise.path
|
|
handler.cookies = push_promise.cookies
|
|
handler.stream_id = push_promise.stream_id
|
|
handler.http2 = self
|
|
handler.pushed = True
|
|
|
|
self._add_handler(handler, handler.stream_id)
|
|
|
|
cnghttp2.nghttp2_session_set_stream_user_data(self.session, handler.stream_id,
|
|
<void*>handler)
|
|
else:
|
|
# push_promise rejected, reset the stream
|
|
self._rst_stream(push_promise.stream_id,
|
|
error_code=cnghttp2.NGHTTP2_NO_ERROR)
|
|
|
|
if asyncio:
|
|
|
|
class BaseRequestHandler:
|
|
|
|
"""HTTP/2 request (stream) handler base class.
|
|
|
|
The class is used to handle the HTTP/2 stream. By default, it does
|
|
not nothing. It must be subclassed to handle each event callback
|
|
method.
|
|
|
|
The first callback method invoked is on_headers(). It is called
|
|
when HEADERS frame, which includes request header fields, is
|
|
arrived.
|
|
|
|
If request has request body, on_data(data) is invoked for each
|
|
chunk of received data.
|
|
|
|
When whole request is received, on_request_done() is invoked.
|
|
|
|
When stream is closed, on_close(error_code) is called.
|
|
|
|
The application can send response using send_response() method. It
|
|
can be used in on_headers(), on_data() or on_request_done().
|
|
|
|
The application can push resource using push() method. It must be
|
|
used before send_response() call.
|
|
|
|
The following instance variables are available:
|
|
|
|
client_address
|
|
Contains a tuple of the form (host, port) referring to the client's
|
|
address.
|
|
|
|
client_certificate
|
|
May contain the client certifcate in its non-binary form
|
|
|
|
stream_id
|
|
Stream ID of this stream
|
|
|
|
scheme
|
|
Scheme of the request URI. This is a value of :scheme header field.
|
|
|
|
method
|
|
Method of this stream. This is a value of :method header field.
|
|
|
|
host
|
|
This is a value of :authority or host header field.
|
|
|
|
path
|
|
This is a value of :path header field.
|
|
|
|
headers
|
|
Request header fields
|
|
|
|
"""
|
|
|
|
def __init__(self, http2, stream_id):
|
|
self.headers = []
|
|
self.cookies = []
|
|
# Stream ID. For promised stream, it is initially -1.
|
|
self.stream_id = stream_id
|
|
self.http2 = http2
|
|
# address of the client
|
|
self.remote_address = self.http2._get_remote_address()
|
|
# certificate of the client
|
|
self._client_certificate = self.http2._get_client_certificate()
|
|
# :scheme header field in request
|
|
self.scheme = None
|
|
# :method header field in request
|
|
self.method = None
|
|
# :authority or host header field in request
|
|
self.host = None
|
|
# :path header field in request
|
|
self.path = None
|
|
# HTTP status
|
|
self.status = None
|
|
# True if this is a handler for pushed resource
|
|
self.pushed = False
|
|
|
|
@property
|
|
def client_address(self):
|
|
return self.remote_address
|
|
|
|
@property
|
|
def client_certificate(self):
|
|
return self._client_certificate
|
|
|
|
def on_headers(self):
|
|
|
|
'''Called when request HEADERS is arrived.
|
|
|
|
'''
|
|
pass
|
|
|
|
def on_data(self, data):
|
|
|
|
'''Called when a chunk of request body is arrived. This method
|
|
will be called multiple times until all data are received.
|
|
|
|
'''
|
|
pass
|
|
|
|
def on_request_done(self):
|
|
|
|
'''Called when whole request was received
|
|
|
|
'''
|
|
pass
|
|
|
|
def on_close(self, error_code):
|
|
|
|
'''Called when stream is about to close.
|
|
|
|
'''
|
|
pass
|
|
|
|
def send_response(self, status=200, headers=None, body=None):
|
|
|
|
'''Send response. The status is HTTP status code. The headers is
|
|
additional response headers. The :status header field is
|
|
appended by the library. The body is the response body. It
|
|
could be None if response body is empty. Or it must be
|
|
instance of either str, bytes, io.IOBase or callable,
|
|
called body generator, which takes one parameter,
|
|
size. The body generator generates response body. It can
|
|
pause generation of response so that it can wait for slow
|
|
backend data generation. When invoked, it should return
|
|
tuple, byte string and flag. The flag is either DATA_OK,
|
|
DATA_EOF and DATA_DEFERRED. For non-empty byte string and
|
|
it is not the last chunk of response, DATA_OK is returned
|
|
as flag. If this is the last chunk of the response (byte
|
|
string is possibly None), DATA_EOF must be returned as
|
|
flag. If there is no data available right now, but
|
|
additional data are anticipated, return tuple (None,
|
|
DATA_DEFERRD). When data arrived, call resume() and
|
|
restart response body transmission.
|
|
|
|
Only the body generator can pause response body
|
|
generation; instance of io.IOBase must not block.
|
|
|
|
If instance of str is specified as body, it is encoded
|
|
using UTF-8.
|
|
|
|
The headers is a list of tuple of the form (name,
|
|
value). The name and value can be either unicode string or
|
|
byte string.
|
|
|
|
On error, exception will be thrown.
|
|
|
|
'''
|
|
if self.status is not None:
|
|
raise Exception('response has already been sent')
|
|
|
|
if not status:
|
|
raise Exception('status must not be empty')
|
|
|
|
body = wrap_body(body)
|
|
|
|
self._set_response_prop(status, headers, body)
|
|
self.http2.send_response(self)
|
|
|
|
def push(self, path, method='GET', request_headers=None,
|
|
status=200, headers=None, body=None):
|
|
|
|
'''Push a resource. The path is a path portion of request URI
|
|
for this
|
|
resource. The method is a method to access this
|
|
resource. The request_headers is additional request
|
|
headers to access this resource. The :scheme, :method,
|
|
:authority and :path are appended by the library. The
|
|
:scheme and :authority are inherited from the request (not
|
|
request_headers parameter).
|
|
|
|
The status is HTTP status code. The headers is additional
|
|
response headers. The :status header field is appended by
|
|
the library. The body is the response body. It has the
|
|
same semantics of body parameter of send_response().
|
|
|
|
The headers and request_headers are a list of tuple of the
|
|
form (name, value). The name and value can be either
|
|
unicode string or byte string.
|
|
|
|
On error, exception will be thrown.
|
|
|
|
'''
|
|
if not status:
|
|
raise Exception('status must not be empty')
|
|
|
|
if not method:
|
|
raise Exception('method must not be empty')
|
|
|
|
if not path:
|
|
raise Exception('path must not be empty')
|
|
|
|
body = wrap_body(body)
|
|
|
|
promised_handler = self.http2._make_handler(-1)
|
|
promised_handler.pushed = True
|
|
promised_handler.scheme = self.scheme
|
|
promised_handler.method = method.encode('utf-8')
|
|
promised_handler.host = self.host
|
|
promised_handler.path = path.encode('utf-8')
|
|
promised_handler._set_response_prop(status, headers, body)
|
|
|
|
headers = [
|
|
(b':method', promised_handler.method),
|
|
(b':scheme', promised_handler.scheme),
|
|
(b':authority', promised_handler.host),
|
|
(b':path', promised_handler.path)
|
|
]
|
|
headers.extend(_encode_headers(request_headers))
|
|
|
|
promised_handler.headers = headers
|
|
|
|
return self.http2.push(self, promised_handler)
|
|
|
|
def _set_response_prop(self, status, headers, body):
|
|
self.status = status
|
|
|
|
if headers is None:
|
|
headers = []
|
|
|
|
self.response_headers = [(b':status', str(status).encode('utf-8'))]
|
|
self.response_headers.extend(_encode_headers(headers))
|
|
|
|
self.response_body = body
|
|
|
|
def resume(self):
|
|
self.http2.resume(self.stream_id)
|
|
|
|
def _encode_headers(headers):
|
|
if not headers:
|
|
return []
|
|
return [(k if isinstance(k, bytes) else k.encode('utf-8'),
|
|
v if isinstance(v, bytes) else v.encode('utf-8')) \
|
|
for k, v in headers]
|
|
|
|
class _HTTP2Session(asyncio.Protocol):
|
|
|
|
def __init__(self, RequestHandlerClass):
|
|
asyncio.Protocol.__init__(self)
|
|
self.RequestHandlerClass = RequestHandlerClass
|
|
self.http2 = None
|
|
|
|
def connection_made(self, transport):
|
|
address = transport.get_extra_info('peername')
|
|
logging.info('connection_made, address:%s, port:%s', address[0], address[1])
|
|
|
|
self.transport = transport
|
|
sock = self.transport.get_extra_info('socket')
|
|
try:
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
except OSError as e:
|
|
logging.info('failed to set tcp-nodelay: %s', str(e))
|
|
ssl_ctx = self.transport.get_extra_info('sslcontext')
|
|
if ssl_ctx:
|
|
ssl_obj = self.transport.get_extra_info('ssl_object')
|
|
protocol = negotiated_protocol(ssl_obj)
|
|
if protocol is None or protocol.encode('utf-8') != \
|
|
cnghttp2.NGHTTP2_PROTO_VERSION_ID:
|
|
self.transport.abort()
|
|
return
|
|
try:
|
|
self.http2 = _HTTP2SessionCore\
|
|
(self.transport,
|
|
self.RequestHandlerClass)
|
|
except Exception as err:
|
|
sys.stderr.write(traceback.format_exc())
|
|
self.transport.abort()
|
|
return
|
|
|
|
|
|
def connection_lost(self, exc):
|
|
logging.info('connection_lost')
|
|
if self.http2:
|
|
self.http2.connection_lost()
|
|
self.http2 = None
|
|
|
|
def data_received(self, data):
|
|
try:
|
|
self.http2.data_received(data)
|
|
except Exception as err:
|
|
sys.stderr.write(traceback.format_exc())
|
|
self.transport.close()
|
|
return
|
|
|
|
def resume_writing(self):
|
|
try:
|
|
self.http2.send_data()
|
|
except Exception as err:
|
|
sys.stderr.write(traceback.format_exc())
|
|
self.transport.close()
|
|
return
|
|
|
|
class HTTP2Server:
|
|
|
|
'''HTTP/2 server.
|
|
|
|
This class builds on top of the asyncio event loop. On
|
|
construction, RequestHandlerClass must be given, which must be a
|
|
subclass of BaseRequestHandler class.
|
|
|
|
'''
|
|
def __init__(self, address, RequestHandlerClass, ssl=None):
|
|
|
|
'''address is a tuple of the listening address and port (e.g.,
|
|
('127.0.0.1', 8080)). RequestHandlerClass must be a subclass
|
|
of BaseRequestHandler class to handle a HTTP/2 stream. The
|
|
ssl can be ssl.SSLContext instance. If it is not None, the
|
|
resulting server is SSL/TLS capable.
|
|
|
|
'''
|
|
def session_factory():
|
|
return _HTTP2Session(RequestHandlerClass)
|
|
|
|
self.loop = asyncio.get_event_loop()
|
|
|
|
if ssl:
|
|
set_application_protocol(ssl)
|
|
|
|
coro = self.loop.create_server(session_factory,
|
|
host=address[0], port=address[1],
|
|
ssl=ssl)
|
|
self.server = self.loop.run_until_complete(coro)
|
|
logging.info('listen, address:%s, port:%s', address[0], address[1])
|
|
|
|
def serve_forever(self):
|
|
try:
|
|
self.loop.run_forever()
|
|
finally:
|
|
self.server.close()
|
|
self.loop.close()
|
|
|
|
|
|
|
|
class BaseResponseHandler:
|
|
|
|
"""HTTP/2 response (stream) handler base class.
|
|
|
|
The class is used to handle the HTTP/2 stream. By default, it does
|
|
not nothing. It must be subclassed to handle each event callback
|
|
method.
|
|
|
|
The first callback method invoked is on_headers(). It is called
|
|
when HEADERS frame, which includes response header fields, is
|
|
arrived.
|
|
|
|
If response has a body, on_data(data) is invoked for each
|
|
chunk of received data.
|
|
|
|
When whole response is received, on_response_done() is invoked.
|
|
|
|
When stream is closed or underlying connection is lost,
|
|
on_close(error_code) is called.
|
|
|
|
The application can send follow up requests using HTTP2Client.send_request() method.
|
|
|
|
The application can handle push resource using on_push_promise() method.
|
|
|
|
The following instance variables are available:
|
|
|
|
server_address
|
|
Contains a tuple of the form (host, port) referring to the server's
|
|
address.
|
|
|
|
stream_id
|
|
Stream ID of this stream
|
|
|
|
scheme
|
|
Scheme of the request URI. This is a value of :scheme header field.
|
|
|
|
method
|
|
Method of this stream. This is a value of :method header field.
|
|
|
|
host
|
|
This is a value of :authority or host header field.
|
|
|
|
path
|
|
This is a value of :path header field.
|
|
|
|
headers
|
|
Response header fields. There is a special exception. If this
|
|
object is passed to push_promise(), this instance variable contains
|
|
pushed request header fields.
|
|
|
|
"""
|
|
|
|
def __init__(self, http2=None, stream_id=-1):
|
|
self.headers = []
|
|
self.cookies = []
|
|
# Stream ID. For promised stream, it is initially -1.
|
|
self.stream_id = stream_id
|
|
self.http2 = http2
|
|
# address of the server
|
|
self.remote_address = None
|
|
# :scheme header field in request
|
|
self.scheme = None
|
|
# :method header field in request
|
|
self.method = None
|
|
# :authority or host header field in request
|
|
self.host = None
|
|
# :path header field in request
|
|
self.path = None
|
|
# HTTP status
|
|
self.status = None
|
|
# True if this is a handler for pushed resource
|
|
self.pushed = False
|
|
|
|
@property
|
|
def server_address(self):
|
|
return self.remote_address
|
|
|
|
def on_headers(self):
|
|
|
|
'''Called when response HEADERS is arrived.
|
|
|
|
'''
|
|
pass
|
|
|
|
def on_data(self, data):
|
|
|
|
'''Called when a chunk of response body is arrived. This method
|
|
will be called multiple times until all data are received.
|
|
|
|
'''
|
|
pass
|
|
|
|
def on_response_done(self):
|
|
|
|
'''Called when whole response was received
|
|
|
|
'''
|
|
pass
|
|
|
|
def on_close(self, error_code):
|
|
|
|
'''Called when stream is about to close.
|
|
|
|
'''
|
|
pass
|
|
|
|
def on_push_promise(self, push_promise):
|
|
|
|
'''Called when a push is promised. Default behavior is to
|
|
cancel the push. If application overrides this method,
|
|
it should call either accept_push or reject_push.
|
|
|
|
'''
|
|
self.reject_push(push_promise)
|
|
|
|
def reject_push(self, push_promise):
|
|
|
|
'''Convenience method equivalent to calling accept_push
|
|
with a falsy value.
|
|
|
|
'''
|
|
self.http2.push(push_promise, None)
|
|
|
|
def accept_push(self, push_promise, handler=None):
|
|
|
|
'''Accept a push_promise and provider a handler for the
|
|
new stream. If a falsy value is supplied for the handler,
|
|
the push is rejected.
|
|
|
|
'''
|
|
self.http2.push(push_promise, handler)
|
|
|
|
def resume(self):
|
|
self.http2.resume(self.stream_id)
|
|
|
|
class _HTTP2ClientSession(asyncio.Protocol):
|
|
|
|
def __init__(self, client):
|
|
asyncio.Protocol.__init__(self)
|
|
self.http2 = None
|
|
self.pending = []
|
|
self.client = client
|
|
|
|
def connection_made(self, transport):
|
|
address = transport.get_extra_info('peername')
|
|
logging.info('connection_made, address:%s, port:%s', address[0], address[1])
|
|
|
|
self.transport = transport
|
|
sock = self.transport.get_extra_info('socket')
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
ssl_ctx = self.transport.get_extra_info('sslcontext')
|
|
if ssl_ctx:
|
|
ssl_obj = self.transport.get_extra_info('ssl_object')
|
|
protocol = negotiated_protocol(ssl_obj)
|
|
if protocol is None or protocol.encode('utf-8') != \
|
|
cnghttp2.NGHTTP2_PROTO_VERSION_ID:
|
|
self.transport.abort()
|
|
|
|
self.http2 = _HTTP2ClientSessionCore(self.transport)
|
|
|
|
# Clear pending requests
|
|
send_pending = self.pending
|
|
self.pending = []
|
|
for method,scheme,host,path,headers,body,handler in send_pending:
|
|
self.send_request(method=method, scheme=scheme, host=host, path=path,\
|
|
headers=headers, body=body, handler=handler)
|
|
self.http2.send_data()
|
|
|
|
def connection_lost(self, exc):
|
|
logging.info('connection_lost')
|
|
if self.http2:
|
|
self.http2 = None
|
|
self.client.close()
|
|
|
|
def data_received(self, data):
|
|
try:
|
|
self.http2.data_received(data)
|
|
except Exception as err:
|
|
sys.stderr.write(traceback.format_exc())
|
|
self.transport.close()
|
|
return
|
|
|
|
def resume_writing(self):
|
|
try:
|
|
self.http2.send_data()
|
|
except Exception as err:
|
|
sys.stderr.write(traceback.format_exc())
|
|
self.transport.close()
|
|
return
|
|
|
|
def send_request(self, method, scheme, host, path, headers, body, handler):
|
|
try:
|
|
# Waiting until connection established
|
|
if not self.http2:
|
|
self.pending.append([method, scheme, host, path, headers, body, handler])
|
|
return
|
|
|
|
self.http2.send_request(method=method, scheme=scheme, host=host, path=path,\
|
|
headers=headers, body=body, handler=handler)
|
|
self.http2.send_data()
|
|
except Exception as err:
|
|
sys.stderr.write(traceback.format_exc())
|
|
self.transport.close()
|
|
return
|
|
|
|
def close(self):
|
|
if self.http2:
|
|
self.http2.close()
|
|
|
|
|
|
class HTTP2Client:
|
|
|
|
'''HTTP/2 client.
|
|
|
|
This class builds on top of the asyncio event loop.
|
|
|
|
'''
|
|
def __init__(self, address, loop=None, ssl=None):
|
|
|
|
'''address is a tuple of the connect address and port (e.g.,
|
|
('127.0.0.1', 8080)). The ssl can be ssl.SSLContext instance.
|
|
If it is not None, the resulting client is SSL/TLS capable.
|
|
'''
|
|
|
|
self.address = address
|
|
self.session = _HTTP2ClientSession(self)
|
|
def session_factory():
|
|
return self.session
|
|
|
|
if ssl:
|
|
set_application_protocol(ssl)
|
|
|
|
self.loop = loop
|
|
if not self.loop:
|
|
self.loop = asyncio.get_event_loop()
|
|
|
|
coro = self.loop.create_connection(session_factory,
|
|
host=address[0], port=address[1],
|
|
ssl=ssl)
|
|
|
|
if ssl:
|
|
self.scheme = 'https'
|
|
else:
|
|
self.scheme = 'http'
|
|
|
|
self.transport,_ = self.loop.run_until_complete(coro)
|
|
logging.info('connect, address:%s, port:%s', self.address[0], self.address[1])
|
|
|
|
@property
|
|
def io_loop(self):
|
|
return self.loop
|
|
|
|
def close(self):
|
|
self.session.close()
|
|
|
|
def send_request(self, method='GET', url='/', headers=None, body=None, handler=None):
|
|
url = urlparse(url)
|
|
scheme = url.scheme if url.scheme else self.scheme
|
|
host = url.netloc if url.netloc else self.address[0]+':'+str(self.address[1])
|
|
path = url.path
|
|
if url.params:
|
|
path += ';'+url.params
|
|
if url.query:
|
|
path += '?'+url.query
|
|
if url.fragment:
|
|
path += '#'+url.fragment
|
|
|
|
self.session.send_request(method=method, scheme=scheme, host=host, path=path,\
|
|
headers=headers, body=body, handler=handler)
|