Ensure threads don't start before a position in the queue is available.

Fixes #628
This commit is contained in:
Matt Martz 2019-08-21 12:25:32 -05:00
parent 2658bd50b4
commit 7ebb9965dd
1 changed files with 19 additions and 5 deletions

View File

@ -1496,6 +1496,9 @@ class Speedtest(object):
build_request(url, bump=i, secure=self._secure) build_request(url, bump=i, secure=self._secure)
) )
max_threads = threads or self.config['threads']['download']
in_flight = {'threads': 0}
def producer(q, requests, request_count): def producer(q, requests, request_count):
for i, request in enumerate(requests): for i, request in enumerate(requests):
thread = HTTPDownloader( thread = HTTPDownloader(
@ -1506,8 +1509,11 @@ class Speedtest(object):
opener=self._opener, opener=self._opener,
shutdown_event=self._shutdown_event shutdown_event=self._shutdown_event
) )
while in_flight['threads'] >= max_threads:
timeit.time.sleep(0.001)
thread.start() thread.start()
q.put(thread, True) q.put(thread, True)
in_flight['threads'] += 1
callback(i, request_count, start=True) callback(i, request_count, start=True)
finished = [] finished = []
@ -1517,11 +1523,12 @@ class Speedtest(object):
while len(finished) < request_count: while len(finished) < request_count:
thread = q.get(True) thread = q.get(True)
while _is_alive(thread): while _is_alive(thread):
thread.join(timeout=0.1) thread.join(timeout=0.001)
in_flight['threads'] -= 1
finished.append(sum(thread.result)) finished.append(sum(thread.result))
callback(thread.i, request_count, end=True) callback(thread.i, request_count, end=True)
q = Queue(threads or self.config['threads']['download']) q = Queue(max_threads)
prod_thread = threading.Thread(target=producer, prod_thread = threading.Thread(target=producer,
args=(q, requests, request_count)) args=(q, requests, request_count))
cons_thread = threading.Thread(target=consumer, cons_thread = threading.Thread(target=consumer,
@ -1531,9 +1538,9 @@ class Speedtest(object):
cons_thread.start() cons_thread.start()
_is_alive = thread_is_alive _is_alive = thread_is_alive
while _is_alive(prod_thread): while _is_alive(prod_thread):
prod_thread.join(timeout=0.1) prod_thread.join(timeout=0.001)
while _is_alive(cons_thread): while _is_alive(cons_thread):
cons_thread.join(timeout=0.1) cons_thread.join(timeout=0.001)
stop = timeit.default_timer() stop = timeit.default_timer()
self.results.bytes_received = sum(finished) self.results.bytes_received = sum(finished)
@ -1582,6 +1589,9 @@ class Speedtest(object):
) )
) )
max_threads = threads or self.config['threads']['upload']
in_flight = {'threads': 0}
def producer(q, requests, request_count): def producer(q, requests, request_count):
for i, request in enumerate(requests[:request_count]): for i, request in enumerate(requests[:request_count]):
thread = HTTPUploader( thread = HTTPUploader(
@ -1593,8 +1603,11 @@ class Speedtest(object):
opener=self._opener, opener=self._opener,
shutdown_event=self._shutdown_event shutdown_event=self._shutdown_event
) )
while in_flight['threads'] >= max_threads:
timeit.time.sleep(0.001)
thread.start() thread.start()
q.put(thread, True) q.put(thread, True)
in_flight['threads'] += 1
callback(i, request_count, start=True) callback(i, request_count, start=True)
finished = [] finished = []
@ -1604,7 +1617,8 @@ class Speedtest(object):
while len(finished) < request_count: while len(finished) < request_count:
thread = q.get(True) thread = q.get(True)
while _is_alive(thread): while _is_alive(thread):
thread.join(timeout=0.1) thread.join(timeout=0.001)
in_flight['threads'] -= 1
finished.append(thread.result) finished.append(thread.result)
callback(thread.i, request_count, end=True) callback(thread.i, request_count, end=True)