Pre build requests, lazy build upload data

This commit is contained in:
Matt Martz 2016-05-16 16:10:51 -05:00
parent c5f75f783e
commit 07c38d7194
1 changed files with 64 additions and 52 deletions

View File

@ -390,9 +390,9 @@ def do_nothing(*args, **kwargs):
class HTTPDownloader(threading.Thread):
"""Thread class for retrieving a URL"""
def __init__(self, i, url, start, timeout):
def __init__(self, i, request, start, timeout):
threading.Thread.__init__(self)
self.url = url
self.request = request
self.result = [0]
self.starttime = start
self.timeout = timeout
@ -401,8 +401,7 @@ class HTTPDownloader(threading.Thread):
def run(self):
try:
if (timeit.default_timer() - self.starttime) <= self.timeout:
request = build_request(self.url)
f = urlopen(request)
f = urlopen(self.request)
while (not SHUTDOWN_EVENT.isSet() and
(timeit.default_timer() - self.starttime) <=
self.timeout):
@ -424,13 +423,22 @@ class HTTPUploaderData(object):
self.start = start
self.timeout = timeout
chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
multiplier = int(round(int(length) / 36.0))
self.data = StringIO('content1=%s' %
(chars * multiplier)[0:int(length) - 9])
self._data = None
self.total = [0]
def _create_data(self):
chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
multiplier = int(round(int(self.length) / 36.0))
self._data = StringIO('content1=%s' %
(chars * multiplier)[0:int(self.length) - 9])
@property
def data(self):
if not self._data:
self._create_data()
return self._data
def read(self, n=10240):
if ((timeit.default_timer() - self.start) <= self.timeout and
not SHUTDOWN_EVENT.isSet()):
@ -447,39 +455,36 @@ class HTTPUploaderData(object):
class HTTPUploader(threading.Thread):
"""Thread class for putting a URL"""
def __init__(self, i, url, start, size, timeout):
def __init__(self, i, request, start, size, timeout):
threading.Thread.__init__(self)
self.url = url
self.data = HTTPUploaderData(size, start, timeout)
self.request = request
self.request.data.start = self.starttime = start
self.size = size
self.result = None
self.starttime = start
self.timeout = timeout
self.i = i
def run(self):
request = self.request
try:
if ((timeit.default_timer() - self.starttime) <= self.timeout and
not SHUTDOWN_EVENT.isSet()):
try:
request = build_request(self.url, data=self.data)
f = urlopen(request)
except TypeError:
# PY24 expects a string or buffer
# This also causes issues with Ctrl-C, but we will concede
# for the moment that Ctrl-C on PY24 isn't immediate
request = build_request(self.url,
data=self.data.read(self.size))
request = build_request(self.request.get_full_url(),
data=request.data.read(self.size))
f = urlopen(request)
f.read(11)
f.close()
self.result = sum(self.data.total)
self.result = sum(request.data.total)
else:
self.result = 0
except (IOError, SpeedtestUploadTimeout):
self.result = sum(self.data.total)
del self.data
self.result = sum(request.data.total)
class UnitsDataDescriptor(object):
@ -747,8 +752,6 @@ class Speedtest(object):
self.lat_lon = (float(client['lat']), float(client['lon']))
del root
del configxml
return self.config
def get_servers(self, servers=[]):
@ -831,10 +834,6 @@ class Speedtest(object):
printer(''.encode().join(serversxml), debug=True)
del root
del serversxml
del elements
break
except ServersRetrievalError:
@ -977,33 +976,34 @@ class Speedtest(object):
urls.append('%s/random%sx%s.jpg' %
(os.path.dirname(self.best['url']), size, size))
url_count = len(urls)
request_count = len(urls)
requests = []
for url in urls:
requests.append(build_request(url))
start = timeit.default_timer()
def producer(q, urls, url_count):
for i, url in enumerate(urls):
thread = HTTPDownloader(i, url, start,
def producer(q, requests, request_count):
for i, request in enumerate(requests):
thread = HTTPDownloader(i, request, start,
self.config['length']['download'])
thread.start()
q.put(thread, True)
callback(i, url_count, start=True)
callback(i, request_count, start=True)
finished = []
def consumer(q, url_count):
while len(finished) < url_count:
def consumer(q, request_count):
while len(finished) < request_count:
thread = q.get(True)
while thread.isAlive():
thread.join(timeout=0.1)
finished.append(sum(thread.result))
callback(thread.i, url_count, end=True)
del thread
callback(thread.i, request_count, end=True)
q = Queue(self.config['threads']['download'])
prod_thread = threading.Thread(target=producer,
args=(q, urls, url_count))
cons_thread = threading.Thread(target=consumer, args=(q, url_count))
args=(q, requests, request_count))
cons_thread = threading.Thread(target=consumer,
args=(q, request_count))
start = timeit.default_timer()
prod_thread.start()
cons_thread.start()
@ -1030,35 +1030,47 @@ class Speedtest(object):
for _ in range(0, self.config['counts']['upload']):
sizes.append(size)
# size_count = len(sizes)
size_count = self.config['upload_max']
# request_count = len(sizes)
request_count = self.config['upload_max']
start = timeit.default_timer()
requests = []
for size in sizes:
# We set ``0`` for ``start`` and handle setting the actual
# ``start`` in ``HTTPUploader`` to get better measurements
requests.append(
(
build_request(
self.best['url'],
HTTPUploaderData(size, 0,
self.config['length']['upload'])
),
size
)
)
def producer(q, sizes, size_count):
# for i, size in enumerate(sizes):
for i, size in enumerate(sizes[:size_count]):
thread = HTTPUploader(i, self.best['url'], start, size,
def producer(q, requests, request_count):
for i, request in enumerate(requests[:request_count]):
thread = HTTPUploader(i, request[0], start, request[1],
self.config['length']['upload'])
thread.start()
q.put(thread, True)
callback(i, size_count, start=True)
callback(i, request_count, start=True)
finished = []
def consumer(q, size_count):
while len(finished) < size_count:
def consumer(q, request_count):
while len(finished) < request_count:
thread = q.get(True)
while thread.isAlive():
thread.join(timeout=0.1)
finished.append(thread.result)
callback(thread.i, size_count, end=True)
del thread
callback(thread.i, request_count, end=True)
q = Queue(self.config['threads']['upload'])
prod_thread = threading.Thread(target=producer,
args=(q, sizes, size_count))
cons_thread = threading.Thread(target=consumer, args=(q, size_count))
args=(q, requests, request_count))
cons_thread = threading.Thread(target=consumer,
args=(q, request_count))
start = timeit.default_timer()
prod_thread.start()
cons_thread.start()