From 7ebb9965ddaeb64ecb27efdf06e91d661c129301 Mon Sep 17 00:00:00 2001 From: Matt Martz Date: Wed, 21 Aug 2019 12:25:32 -0500 Subject: [PATCH] Ensure threads don't start before a position in the queue is available. Fixes #628 --- speedtest.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/speedtest.py b/speedtest.py index 7be8d65..b7a5124 100755 --- a/speedtest.py +++ b/speedtest.py @@ -1496,6 +1496,9 @@ class Speedtest(object): build_request(url, bump=i, secure=self._secure) ) + max_threads = threads or self.config['threads']['download'] + in_flight = {'threads': 0} + def producer(q, requests, request_count): for i, request in enumerate(requests): thread = HTTPDownloader( @@ -1506,8 +1509,11 @@ class Speedtest(object): opener=self._opener, shutdown_event=self._shutdown_event ) + while in_flight['threads'] >= max_threads: + timeit.time.sleep(0.001) thread.start() q.put(thread, True) + in_flight['threads'] += 1 callback(i, request_count, start=True) finished = [] @@ -1517,11 +1523,12 @@ class Speedtest(object): while len(finished) < request_count: thread = q.get(True) while _is_alive(thread): - thread.join(timeout=0.1) + thread.join(timeout=0.001) + in_flight['threads'] -= 1 finished.append(sum(thread.result)) callback(thread.i, request_count, end=True) - q = Queue(threads or self.config['threads']['download']) + q = Queue(max_threads) prod_thread = threading.Thread(target=producer, args=(q, requests, request_count)) cons_thread = threading.Thread(target=consumer, @@ -1531,9 +1538,9 @@ class Speedtest(object): cons_thread.start() _is_alive = thread_is_alive while _is_alive(prod_thread): - prod_thread.join(timeout=0.1) + prod_thread.join(timeout=0.001) while _is_alive(cons_thread): - cons_thread.join(timeout=0.1) + cons_thread.join(timeout=0.001) stop = timeit.default_timer() self.results.bytes_received = sum(finished) @@ -1582,6 +1589,9 @@ class Speedtest(object): ) ) + max_threads = threads or self.config['threads']['upload'] + in_flight = {'threads': 0} + def producer(q, requests, request_count): for i, request in enumerate(requests[:request_count]): thread = HTTPUploader( @@ -1593,8 +1603,11 @@ class Speedtest(object): opener=self._opener, shutdown_event=self._shutdown_event ) + while in_flight['threads'] >= max_threads: + timeit.time.sleep(0.001) thread.start() q.put(thread, True) + in_flight['threads'] += 1 callback(i, request_count, start=True) finished = [] @@ -1604,7 +1617,8 @@ class Speedtest(object): while len(finished) < request_count: thread = q.get(True) while _is_alive(thread): - thread.join(timeout=0.1) + thread.join(timeout=0.001) + in_flight['threads'] -= 1 finished.append(thread.result) callback(thread.i, request_count, end=True)