python: add simple SPDY client function urlfetch()

This commit is contained in:
Tatsuhiro Tsujikawa 2012-08-28 00:58:45 +09:00
parent 6b64276c08
commit 33d35912a5
1 changed files with 191 additions and 0 deletions

View File

@ -1481,6 +1481,197 @@ try:
socketserver.ThreadingMixIn.process_request(self,
request, client_address)
# Simple SPDY client implementation. Since this implementation
# uses TLS NPN, Python 3.3.0 or later is required.
from urllib.parse import urlsplit
class BaseSPDYStreamHandler:
def __init__(self, uri, fetcher):
self.uri = uri
self.fetcher = fetcher
self.stream_id = None
def on_header(self, nv):
pass
def on_data(self, data):
pass
def on_close(self, status_code):
pass
class UrlFetchError(Exception):
pass
class UrlFetcher:
def __init__(self, server_address, uris, StreamHandlerClass):
self.server_address = server_address
self.handlers = [StreamHandlerClass(uri, self) for uri in uris]
self.streams = {}
self.finished = []
self.ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
self.ctx.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 | \
ssl.OP_NO_COMPRESSION
self.ctx.set_npn_protocols(get_npn_protocols())
def send_cb(self, session, data):
return self.sock.send(data)
def before_ctrl_send_cb(self, session, frame):
if frame.frame_type == SYN_STREAM:
handler = session.get_stream_user_data(frame.stream_id)
if handler:
handler.stream_id = frame.stream_id
self.streams[handler.stream_id] = handler
def on_ctrl_recv_cb(self, session, frame):
if frame.frame_type == SYN_REPLY or frame.frame_type == HEADERS:
if frame.stream_id in self.streams:
handler = self.streams[frame.stream_id]
handler.on_header(frame.nv)
def on_data_chunk_recv_cb(self, session, flags, stream_id, data):
if stream_id in self.streams:
handler = self.streams[stream_id]
handler.on_data(data)
def on_stream_close_cb(self, session, stream_id, status_code):
if stream_id in self.streams:
handler = self.streams[stream_id]
handler.on_close(status_code)
del self.streams[stream_id]
self.finished.append(handler)
def connect(self, server_address):
self.sock = None
for res in socket.getaddrinfo(server_address[0], server_address[1],
socket.AF_UNSPEC,
socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
except OSError as msg:
self.sock = None
continue
try:
self.sock.connect(sa)
except OSError as msg:
self.sock.close()
self.sock = None
continue
break
else:
raise UrlFetchError('Could not connect to {}'\
.format(server_address))
def tls_handshake(self):
self.sock = self.ctx.wrap_socket(self.sock, server_side=False,
do_handshake_on_connect=False)
self.sock.do_handshake()
self.version = npn_get_version(self.sock.selected_npn_protocol())
if self.version == 0:
raise UrlFetchError('NPN failed')
def loop(self):
self.connect(self.server_address)
self.tls_handshake()
self.sock.setblocking(False)
session = Session(CLIENT,
self.version,
send_cb=self.send_cb,
on_ctrl_recv_cb=self.on_ctrl_recv_cb,
on_data_chunk_recv_cb=self.on_data_chunk_recv_cb,
before_ctrl_send_cb=self.before_ctrl_send_cb,
on_stream_close_cb=self.on_stream_close_cb)
session.submit_settings(\
FLAG_SETTINGS_NONE,
[(SETTINGS_MAX_CONCURRENT_STREAMS, ID_FLAG_SETTINGS_NONE, 100)]
)
if self.server_address[1] == 443:
hostport = self.server_address[0]
else:
hostport = '{}:{}'.format(self.server_address[0],
self.server_address[1])
for handler in self.handlers:
res = urlsplit(handler.uri)
if res.path:
path = res.path
else:
path = '/'
if res.query:
path = '?'.join([path, res.query])
session.submit_request(0,
[(':method', 'GET'),
(':scheme', 'https'),
(':path', path),
(':version', 'HTTP/1.1'),
(':host', hostport),
('accept', '*/*'),
('user-agent', 'python-spdylay')],
stream_user_data=handler)
while (session.want_read() or session.want_write()) \
and not len(self.finished) == len(self.handlers):
want_read = want_write = False
try:
data = self.sock.recv(4096)
if data:
session.recv(data)
else:
break
except ssl.SSLWantReadError:
want_read = True
except ssl.SSLWantWriteError:
want_write = True
try:
session.send()
except ssl.SSLWantReadError:
want_read = True
except ssl.SSLWantWriteError:
want_write = True
if want_read or want_write:
select.select([self.sock] if want_read else [],
[self.sock] if want_write else [],
[])
def _urlfetch_session_one(uris, StreamHandlerClass):
res = urlsplit(uris[0])
if res.scheme != 'https':
raise UrlFetchError('Unsupported scheme {}'.format(res.scheme))
hostname = res.hostname
port = res.port if res.port else 443
f = UrlFetcher((hostname, port), uris, StreamHandlerClass)
f.loop()
def urlfetch(uri_or_uris, StreamHandlerClass):
if isinstance(uri_or_uris, str):
_urlfetch_session_one([uri_or_uris], StreamHandlerClass)
else:
uris = []
prev_addr = (None, None)
for uri in uri_or_uris:
res = urlsplit(uri)
port = res.port if res.port else 443
if prev_addr != (res.hostname, port):
if uris:
_urlfetch_session_one(uris, StreamHandlerClass)
uris = []
prev_addr = (res.hostname, port)
uris.append(uri)
if uris:
_urlfetch_session_one(uris, StreamHandlerClass)
except ImportError:
# No server for 2.x because they lack TLS NPN.
pass