First pass at adding ability to test using newer socket based connection

This commit is contained in:
Matt Martz 2018-03-22 15:49:28 -05:00
parent 9c2977acfc
commit 0ef2f6b04c
1 changed files with 197 additions and 55 deletions

View File

@ -713,7 +713,7 @@ class HTTPDownloader(threading.Thread):
shutdown_event=None): shutdown_event=None):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.request = request self.request = request
self.result = [0] self.result = 0
self.starttime = start self.starttime = start
self.timeout = timeout self.timeout = timeout
self.i = i self.i = i
@ -734,14 +734,62 @@ class HTTPDownloader(threading.Thread):
while (not self._shutdown_event.isSet() and while (not self._shutdown_event.isSet() and
(timeit.default_timer() - self.starttime) <= (timeit.default_timer() - self.starttime) <=
self.timeout): self.timeout):
self.result.append(len(f.read(10240))) data = len(f.read(10240))
if self.result[-1] == 0: if data == 0:
break break
self.result += data
f.close() f.close()
except IOError: except IOError:
pass pass
class SocketDownloader(threading.Thread):
def __init__(self, i, address, size, start, timeout, shutdown_event=None,
source_address=None):
threading.Thread.__init__(self)
self.result = 0
self.starttime = start
self.timeout = timeout
self.i = i
self.size = size
self.remaining = self.size
if shutdown_event:
self._shutdown_event = shutdown_event
else:
self._shutdown_event = FakeShutdownEvent()
self.sock = create_connection(address, timeout=timeout,
source_address=source_address)
def run(self):
try:
if (timeit.default_timer() - self.starttime) <= self.timeout:
self.sock.sendall('HI\n'.encode())
self.sock.recv(1024)
while (self.remaining and not self._shutdown_event.isSet() and
(timeit.default_timer() - self.starttime) <=
self.timeout):
if self.remaining > 1000000:
ask = 1000000
else:
ask = self.remaining
down = 0
self.sock.sendall(('DOWNLOAD %d\n' % ask).encode())
while down < ask:
down += len(self.sock.recv(10240))
self.result += down
self.remaining -= down
self.sock.close()
except IOError:
pass
class HTTPUploaderData(object): class HTTPUploaderData(object):
"""File like object to improve cutting off the upload once the timeout """File like object to improve cutting off the upload once the timeout
has been reached has been reached
@ -759,7 +807,7 @@ class HTTPUploaderData(object):
self._data = None self._data = None
self.total = [0] self.total = 0
def pre_allocate(self): def pre_allocate(self):
chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ' chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
@ -787,7 +835,7 @@ class HTTPUploaderData(object):
if ((timeit.default_timer() - self.start) <= self.timeout and if ((timeit.default_timer() - self.start) <= self.timeout and
not self._shutdown_event.isSet()): not self._shutdown_event.isSet()):
chunk = self.data.read(n) chunk = self.data.read(n)
self.total.append(len(chunk)) self.total += len(chunk)
return chunk return chunk
else: else:
raise SpeedtestUploadTimeout() raise SpeedtestUploadTimeout()
@ -835,11 +883,59 @@ class HTTPUploader(threading.Thread):
f = self._opener(request) f = self._opener(request)
f.read(11) f.read(11)
f.close() f.close()
self.result = sum(self.request.data.total) self.result = self.request.data.total
else: else:
self.result = 0 self.result = 0
except (IOError, SpeedtestUploadTimeout): except (IOError, SpeedtestUploadTimeout):
self.result = sum(self.request.data.total) self.result = self.request.data.total
class SocketUploader(threading.Thread):
def __init__(self, i, address, size, start, timeout, shutdown_event=None,
source_address=None):
threading.Thread.__init__(self)
self.result = 0
self.starttime = start
self.timeout = timeout
self.i = i
self.size = size
self.remaining = self.size
if shutdown_event:
self._shutdown_event = shutdown_event
else:
self._shutdown_event = FakeShutdownEvent()
self.sock = create_connection(address, timeout=timeout,
source_address=source_address)
def run(self):
try:
if (timeit.default_timer() - self.starttime) <= self.timeout:
self.sock.sendall('HI\n'.encode())
self.sock.recv(1024)
while (self.remaining and not self._shutdown_event.isSet() and
(timeit.default_timer() - self.starttime) <=
self.timeout):
if self.remaining > 100000:
give = 100000
else:
give = self.remaining
header = ('UPLOAD %d 0\n' % give).encode()
data = '0'.encode() * (give - len(header))
self.sock.sendall(header)
self.sock.sendall(data)
self.sock.recv(24)
self.result += give
self.remaining -= give
self.sock.close()
except IOError:
pass
class SpeedtestResults(object): class SpeedtestResults(object):
@ -997,7 +1093,7 @@ class Speedtest(object):
"""Class for performing standard speedtest.net testing operations""" """Class for performing standard speedtest.net testing operations"""
def __init__(self, config=None, source_address=None, timeout=10, def __init__(self, config=None, source_address=None, timeout=10,
secure=False, shutdown_event=None): secure=False, shutdown_event=None, use_socket=False):
self.config = {} self.config = {}
self._source_address = source_address self._source_address = source_address
@ -1011,6 +1107,8 @@ class Speedtest(object):
else: else:
self._shutdown_event = FakeShutdownEvent() self._shutdown_event = FakeShutdownEvent()
self._use_socket = use_socket
self.get_config() self.get_config()
if config is not None: if config is not None:
self.config.update(config) self.config.update(config)
@ -1105,9 +1203,14 @@ class Speedtest(object):
up_sizes = [32768, 65536, 131072, 262144, 524288, 1048576, 7340032] up_sizes = [32768, 65536, 131072, 262144, 524288, 1048576, 7340032]
sizes = { sizes = {
'upload': up_sizes[ratio - 1:], 'upload': up_sizes[ratio - 1:],
'download': [350, 500, 750, 1000, 1500, 2000, 2500,
3000, 3500, 4000]
} }
if self._use_socket:
sizes['download'] = [245388, 505544, 1118012, 1986284, 4468241,
7907740, 12407926, 17816816, 24262167,
31625365]
else:
sizes['download'] = [350, 500, 750, 1000, 1500, 2000, 2500,
3000, 3500, 4000]
size_count = len(sizes['upload']) size_count = len(sizes['upload'])
@ -1252,6 +1355,9 @@ class Speedtest(object):
or int(attrib.get('id')) in exclude): or int(attrib.get('id')) in exclude):
continue continue
host, port = attrib['host'].split(':')
attrib['host'] = (host, int(port))
try: try:
d = distance(self.lat_lon, d = distance(self.lat_lon,
(float(attrib.get('lat')), (float(attrib.get('lat')),
@ -1429,29 +1535,48 @@ class Speedtest(object):
def download(self, callback=do_nothing): def download(self, callback=do_nothing):
"""Test download speed against speedtest.net""" """Test download speed against speedtest.net"""
urls = [] if self._use_socket:
for size in self.config['sizes']['download']: requests = []
for _ in range(0, self.config['counts']['download']): for size in self.config['sizes']['download']:
urls.append('%s/random%sx%s.jpg' % for _ in range(0, self.config['counts']['download']):
(os.path.dirname(self.best['url']), size, size)) requests.append(size)
request_count = len(urls) request_count = len(requests)
requests = [] else:
for i, url in enumerate(urls): urls = []
requests.append( for size in self.config['sizes']['download']:
build_request(url, bump=i, secure=self._secure) for _ in range(0, self.config['counts']['download']):
) urls.append('%s/random%sx%s.jpg' %
(os.path.dirname(self.best['url']), size, size))
request_count = len(urls)
requests = []
for i, url in enumerate(urls):
requests.append(
build_request(url, bump=i, secure=self._secure)
)
def producer(q, requests, request_count): def producer(q, requests, request_count):
for i, request in enumerate(requests): for i, request in enumerate(requests):
thread = HTTPDownloader( if self._use_socket:
i, thread = SocketDownloader(
request, i,
start, self.best['host'],
self.config['length']['download'], request,
opener=self._opener, start,
shutdown_event=self._shutdown_event self.config['length']['download'],
) shutdown_event=self._shutdown_event,
source_address=self._source_address
)
else:
thread = HTTPDownloader(
i,
request,
start,
self.config['length']['download'],
opener=self._opener,
shutdown_event=self._shutdown_event
)
thread.start() thread.start()
q.put(thread, True) q.put(thread, True)
callback(i, request_count, start=True) callback(i, request_count, start=True)
@ -1463,7 +1588,7 @@ class Speedtest(object):
thread = q.get(True) thread = q.get(True)
while thread.isAlive(): while thread.isAlive():
thread.join(timeout=0.1) thread.join(timeout=0.1)
finished.append(sum(thread.result)) finished.append(thread.result)
callback(thread.i, request_count, end=True) callback(thread.i, request_count, end=True)
q = Queue(self.config['threads']['download']) q = Queue(self.config['threads']['download'])
@ -1502,34 +1627,48 @@ class Speedtest(object):
requests = [] requests = []
for i, size in enumerate(sizes): for i, size in enumerate(sizes):
# We set ``0`` for ``start`` and handle setting the actual if self._use_socket:
# ``start`` in ``HTTPUploader`` to get better measurements requests.append(size)
data = HTTPUploaderData( else:
size, # We set ``0`` for ``start`` and handle setting the actual
0, # ``start`` in ``HTTPUploader`` to get better measurements
self.config['length']['upload'], data = HTTPUploaderData(
shutdown_event=self._shutdown_event size,
) 0,
if pre_allocate: self.config['length']['upload'],
data.pre_allocate() shutdown_event=self._shutdown_event
requests.append( )
( if pre_allocate:
build_request(self.best['url'], data, secure=self._secure), data.pre_allocate()
size requests.append(
(
build_request(self.best['url'], data, secure=self._secure),
size
)
) )
)
def producer(q, requests, request_count): def producer(q, requests, request_count):
for i, request in enumerate(requests[:request_count]): for i, request in enumerate(requests[:request_count]):
thread = HTTPUploader( if self._use_socket:
i, thread = SocketUploader(
request[0], i,
start, self.best['host'],
request[1], request,
self.config['length']['upload'], start,
opener=self._opener, self.config['length']['upload'],
shutdown_event=self._shutdown_event shutdown_event=self._shutdown_event,
) source_address=self._source_address
)
else:
thread = HTTPUploader(
i,
request[0],
start,
request[1],
self.config['length']['upload'],
opener=self._opener,
shutdown_event=self._shutdown_event
)
thread.start() thread.start()
q.put(thread, True) q.put(thread, True)
callback(i, request_count, start=True) callback(i, request_count, start=True)
@ -1652,6 +1791,8 @@ def parse_args():
parser.add_argument('--secure', action='store_true', parser.add_argument('--secure', action='store_true',
help='Use HTTPS instead of HTTP when communicating ' help='Use HTTPS instead of HTTP when communicating '
'with speedtest.net operated servers') 'with speedtest.net operated servers')
parser.add_argument('--socket', action='store_true',
help='Use socket test instead of HTTP based tests')
parser.add_argument('--no-pre-allocate', dest='pre_allocate', parser.add_argument('--no-pre-allocate', dest='pre_allocate',
action='store_const', default=True, const=False, action='store_const', default=True, const=False,
help='Do not pre allocate upload data. Pre allocation ' help='Do not pre allocate upload data. Pre allocation '
@ -1764,7 +1905,8 @@ def shell():
speedtest = Speedtest( speedtest = Speedtest(
source_address=args.source, source_address=args.source,
timeout=args.timeout, timeout=args.timeout,
secure=args.secure secure=args.secure,
use_socket=args.socket
) )
except (ConfigRetrievalError,) + HTTP_ERRORS: except (ConfigRetrievalError,) + HTTP_ERRORS:
printer('Cannot retrieve speedtest configuration', error=True) printer('Cannot retrieve speedtest configuration', error=True)