From 3a2260d6af7e51e1d5f1da4a259df665582875fc Mon Sep 17 00:00:00 2001 From: Matt Martz Date: Mon, 7 Jan 2013 13:01:14 -0600 Subject: [PATCH] pep8 formatting changes --- speedtest-cli | 414 ++++++++++++++++++++++++----------------------- speedtest-cli-3 | 416 +++++++++++++++++++++++++----------------------- 2 files changed, 431 insertions(+), 399 deletions(-) diff --git a/speedtest-cli b/speedtest-cli index 7408cae..96ec88f 100755 --- a/speedtest-cli +++ b/speedtest-cli @@ -29,259 +29,273 @@ from Queue import Queue def distance(origin, destination): - """Determine distance between 2 sets of [lat,lon] in km""" + """Determine distance between 2 sets of [lat,lon] in km""" - lat1, lon1 = origin - lat2, lon2 = destination - radius = 6371 # km + lat1, lon1 = origin + lat2, lon2 = destination + radius = 6371 # km - dlat = math.radians(lat2-lat1) - dlon = math.radians(lon2-lon1) - a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \ - * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2) - c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) - d = radius * c + dlat = math.radians(lat2-lat1) + dlon = math.radians(lon2-lon1) + a = (math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.radians(lat1)) + * math.cos(math.radians(lat2)) * math.sin(dlon / 2) + * math.sin(dlon / 2)) + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + d = radius * c - return d + return d class FileGetter(threading.Thread): - def __init__(self, url, start): - self.url = url - self.result = None - self.starttime = start - threading.Thread.__init__(self) + def __init__(self, url, start): + self.url = url + self.result = None + self.starttime = start + threading.Thread.__init__(self) - def get_result(self): - return self.result + def get_result(self): + return self.result - def run(self): - try: - if ( time.time() - self.starttime ) <= 10: - f = urllib.urlopen(self.url) - contents = f.read() - f.close() - self.result = contents - else: - self.result = '' - except IOError: - self.result = '' + def run(self): + try: + if (time.time() - self.starttime) <= 10: + f = urllib.urlopen(self.url) + contents = f.read() + f.close() + self.result = contents + else: + self.result = '' + except IOError: + self.result = '' def downloadSpeed(files): - start = time.time() - def producer(q, files): - for file in files: - thread = FileGetter(file, start) - thread.start() - q.put(thread, True) - sys.stdout.write('.') - sys.stdout.flush() + start = time.time() - finished = [] - def consumer(q, total_files): - while len(finished) < total_files: - thread = q.get(True) - thread.join() - finished.append(thread.get_result()) + def producer(q, files): + for file in files: + thread = FileGetter(file, start) + thread.start() + q.put(thread, True) + sys.stdout.write('.') + sys.stdout.flush() - q = Queue(3) - start = time.time() - prod_thread = threading.Thread(target=producer, args=(q, files)) - cons_thread = threading.Thread(target=consumer, args=(q, len(files))) - prod_thread.start() - cons_thread.start() - prod_thread.join() - cons_thread.join() - return (len(''.join(finished))/(time.time()-start)) + finished = [] + + def consumer(q, total_files): + while len(finished) < total_files: + thread = q.get(True) + thread.join() + finished.append(thread.get_result()) + + q = Queue(3) + start = time.time() + prod_thread = threading.Thread(target=producer, args=(q, files)) + cons_thread = threading.Thread(target=consumer, args=(q, len(files))) + prod_thread.start() + cons_thread.start() + prod_thread.join() + cons_thread.join() + return (len(''.join(finished))/(time.time()-start)) class FilePutter(threading.Thread): - def __init__(self, url, start, size): - self.url = url - self.data = os.urandom(int(size)-len('content1=')) - self.result = None - self.starttime = start - threading.Thread.__init__(self) + def __init__(self, url, start, size): + self.url = url + self.data = os.urandom(int(size)-len('content1=')) + self.result = None + self.starttime = start + threading.Thread.__init__(self) - def get_result(self): - return self.result + def get_result(self): + return self.result - def run(self): - try: - if ( time.time() - self.starttime ) <= 10: - f = urllib.urlopen(self.url, self.data) - contents = f.read() - f.close() - self.result = self.data - else: - self.result = '' - except IOError: - self.result = '' + def run(self): + try: + if (time.time() - self.starttime) <= 10: + f = urllib.urlopen(self.url, self.data) + contents = f.read() + f.close() + self.result = self.data + else: + self.result = '' + except IOError: + self.result = '' def uploadSpeed(url, sizes): - start = time.time() - def producer(q, sizes): - for size in sizes: - thread = FilePutter(url, start, size) - thread.start() - q.put(thread, True) - sys.stdout.write('.') - sys.stdout.flush() + start = time.time() - finished = [] - def consumer(q, total_sizes): - while len(finished) < total_sizes: - thread = q.get(True) - thread.join() - finished.append(thread.get_result()) + def producer(q, sizes): + for size in sizes: + thread = FilePutter(url, start, size) + thread.start() + q.put(thread, True) + sys.stdout.write('.') + sys.stdout.flush() - q = Queue(9) - start = time.time() - prod_thread = threading.Thread(target=producer, args=(q, sizes)) - cons_thread = threading.Thread(target=consumer, args=(q, len(sizes))) - prod_thread.start() - cons_thread.start() - prod_thread.join() - cons_thread.join() - return (len(''.join(finished))/(time.time()-start)) + finished = [] + + def consumer(q, total_sizes): + while len(finished) < total_sizes: + thread = q.get(True) + thread.join() + finished.append(thread.get_result()) + + q = Queue(9) + start = time.time() + prod_thread = threading.Thread(target=producer, args=(q, sizes)) + cons_thread = threading.Thread(target=consumer, args=(q, len(sizes))) + prod_thread.start() + cons_thread.start() + prod_thread.join() + cons_thread.join() + return (len(''.join(finished))/(time.time()-start)) def getConfig(): - """Download the speedtest.net configuration and return only the data we are interested in""" + """Download the speedtest.net configuration and return only the data + we are interested in + """ - uh = urllib.urlopen('http://www.speedtest.net/speedtest-config.php') - configxml = uh.read() - if int(uh.getcode()) != 200: - return None - uh.close() - root = ET.fromstring(configxml) - config = { - 'client': root.find('client').attrib, - 'times': root.find('times').attrib, - 'download': root.find('download').attrib, - 'upload': root.find('upload').attrib - } - return config + uh = urllib.urlopen('http://www.speedtest.net/speedtest-config.php') + configxml = uh.read() + if int(uh.getcode()) != 200: + return None + uh.close() + root = ET.fromstring(configxml) + config = { + 'client': root.find('client').attrib, + 'times': root.find('times').attrib, + 'download': root.find('download').attrib, + 'upload': root.find('upload').attrib} + return config def closestServers(client): - """Determine the 5 closest speedtest.net servers based on geographic distance""" + """Determine the 5 closest speedtest.net servers based on geographic + distance + """ - uh = urllib.urlopen('http://speedtest.net/speedtest-servers.php') - serversxml = uh.read() - if int(uh.getcode()) != 200: - return None - uh.close() - root = ET.fromstring(serversxml) - servers = {} - for server in root[0]: - d = distance([float(client['lat']), float(client['lon'])], [float(server.get('lat')), float(server.get('lon'))]) - servers[d] = server.attrib + uh = urllib.urlopen('http://speedtest.net/speedtest-servers.php') + serversxml = uh.read() + if int(uh.getcode()) != 200: + return None + uh.close() + root = ET.fromstring(serversxml) + servers = {} + for server in root[0]: + d = distance([float(client['lat']), float(client['lon'])], + [float(server.get('lat')), float(server.get('lon'))]) + servers[d] = server.attrib - closest = [] - for d in sorted(servers.keys())[0:4]: - closest.append(servers[d]) + closest = [] + for d in sorted(servers.keys())[0:4]: + closest.append(servers[d]) - del servers - del root - return closest + del servers + del root + return closest def getBestServer(servers): - """Perform a speedtest.net "ping" to determine which speedtest.net server has the lowest latency""" + """Perform a speedtest.net "ping" to determine which speedtest.net + server has the lowest latency + """ - results = {} - for server in servers: - cum = 0 - url = os.path.dirname(server['url']) - for i in xrange(0,3): - uh = urllib.urlopen('%s/latency.txt' % url) - start = time.time() - text = uh.read().strip() - total = time.time() - start - if int(uh.getcode()) == 200 and text == 'test=test': - cum += total - else: - cum += 3600 - uh.close() - avg = round((cum/3)*1000000,3) - results[avg] = server + results = {} + for server in servers: + cum = 0 + url = os.path.dirname(server['url']) + for i in xrange(0, 3): + uh = urllib.urlopen('%s/latency.txt' % url) + start = time.time() + text = uh.read().strip() + total = time.time() - start + if int(uh.getcode()) == 200 and text == 'test=test': + cum += total + else: + cum += 3600 + uh.close() + avg = round((cum / 3) * 1000000, 3) + results[avg] = server - best = results[sorted(results.keys())[0]] - best['latency'] = avg + best = results[sorted(results.keys())[0]] + best['latency'] = avg - return best + return best def speedtest(): - """Run the full speedtest.net test""" + """Run the full speedtest.net test""" - print 'Retrieving speedtest.net configuration...' - config = getConfig() + print 'Retrieving speedtest.net configuration...' + config = getConfig() - print 'Retrieving speedtest.net server list...' - servers = closestServers(config['client']) + print 'Retrieving speedtest.net server list...' + servers = closestServers(config['client']) - print 'Selecting best server based on ping...' - best = getBestServer(servers) - print 'Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best + print 'Selecting best server based on ping...' + best = getBestServer(servers) + print 'Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best - sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000] - urls = [] - for size in sizes: - for i in xrange(0,4): - urls.append('%s/random%sx%s.jpg' % (os.path.dirname(best['url']), size, size)) - print 'Testing download speed', - dlspeed = downloadSpeed(urls) - print '\nDownload: %s Mbit/s' % round((dlspeed/1024/1024)*8,2) + sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000] + urls = [] + for size in sizes: + for i in xrange(0, 4): + urls.append('%s/random%sx%s.jpg' % + (os.path.dirname(best['url']), size, size)) + print 'Testing download speed', + dlspeed = downloadSpeed(urls) + print '\nDownload: %s Mbit/s' % round((dlspeed / 1024 / 1024) * 8, 2) - sizesizes = [int(.25*1024*1024), int(.5*1024*1024)] - sizes = [] - for size in sizesizes: - for i in xrange(0,25): - sizes.append(size) - print 'Testing upload speed', - ulspeed = uploadSpeed(best['url'], sizes) - print '\nUpload speed: %s Mbit/s' % round((ulspeed/1024/1024)*8,2) + sizesizes = [int(.25 * 1024 * 1024), int(.5 * 1024 * 1024)] + sizes = [] + for size in sizesizes: + for i in xrange(0, 25): + sizes.append(size) + print 'Testing upload speed', + ulspeed = uploadSpeed(best['url'], sizes) + print '\nUpload speed: %s Mbit/s' % round((ulspeed / 1024 / 1024) * 8, 2) - dlspeedk = int(round((dlspeed/1024)*8, 0)) - ping = int(round(best['latency'], 0)) - ulspeedk = int(round((ulspeed/1024)*8, 0)) + dlspeedk = int(round((dlspeed / 1024) * 8, 0)) + ping = int(round(best['latency'], 0)) + ulspeedk = int(round((ulspeed / 1024) * 8, 0)) - apiData = [ - 'download=%s' % dlspeedk, - 'ping=%s' % ping, - 'upload=%s' % ulspeedk, - 'promo=', - 'startmode=%s' % 'pingselect', - 'recommendedserverid=%s' % best['id'], - 'accuracy=%s' % 1, - 'serverid=%s' % best['id'], - 'hash=%s' % hashlib.md5('%s-%s-%s-%s' % (ping, ulspeedk, dlspeedk, '297aae72')).hexdigest() - ] + apiData = [ + 'download=%s' % dlspeedk, + 'ping=%s' % ping, + 'upload=%s' % ulspeedk, + 'promo=', + 'startmode=%s' % 'pingselect', + 'recommendedserverid=%s' % best['id'], + 'accuracy=%s' % 1, + 'serverid=%s' % best['id'], + 'hash=%s' % hashlib.md5('%s-%s-%s-%s' % + (ping, ulspeedk, dlspeedk, '297aae72') + ).hexdigest()] - req = urllib2.Request('http://www.speedtest.net/api/api.php', data='&'.join(apiData)) - req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf') - f = urllib2.urlopen(req) - response = f.read() - code = f.getcode() - f.close() + req = urllib2.Request('http://www.speedtest.net/api/api.php', + data='&'.join(apiData)) + req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf') + f = urllib2.urlopen(req) + response = f.read() + code = f.getcode() + f.close() - if int(code) != 200: - print 'Could not submit results to speedtest.net' - sys.exit(1) + if int(code) != 200: + print 'Could not submit results to speedtest.net' + sys.exit(1) - qsargs = urlparse.parse_qs(response) - resultid = qsargs.get('resultid') - if not resultid or len(resultid) != 1: - print 'Could not submit results to speedtest.net' - sys.exit(1) + qsargs = urlparse.parse_qs(response) + resultid = qsargs.get('resultid') + if not resultid or len(resultid) != 1: + print 'Could not submit results to speedtest.net' + sys.exit(1) - print 'Share results: http://www.speedtest.net/result/%s.png' % resultid[0] + print 'Share results: http://www.speedtest.net/result/%s.png' % resultid[0] if __name__ == '__main__': - speedtest() + speedtest() -# vim:ts=2:sw=2:expandtab +# vim:ts=4:sw=4:expandtab diff --git a/speedtest-cli-3 b/speedtest-cli-3 index 03fe729..f90312d 100755 --- a/speedtest-cli-3 +++ b/speedtest-cli-3 @@ -29,259 +29,277 @@ from queue import Queue def distance(origin, destination): - """Determine distance between 2 sets of [lat,lon] in km""" + """Determine distance between 2 sets of [lat,lon] in km""" - lat1, lon1 = origin - lat2, lon2 = destination - radius = 6371 # km + lat1, lon1 = origin + lat2, lon2 = destination + radius = 6371 # km - dlat = math.radians(lat2-lat1) - dlon = math.radians(lon2-lon1) - a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \ - * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2) - c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) - d = radius * c + dlat = math.radians(lat2-lat1) + dlon = math.radians(lon2-lon1) + a = (math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.radians(lat1)) + * math.cos(math.radians(lat2)) * math.sin(dlon / 2) + * math.sin(dlon / 2)) + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + d = radius * c - return d + return d class FileGetter(threading.Thread): - def __init__(self, url, start): - self.url = url - self.result = None - self.starttime = start - threading.Thread.__init__(self) + def __init__(self, url, start): + self.url = url + self.result = None + self.starttime = start + threading.Thread.__init__(self) - def get_result(self): - return self.result + def get_result(self): + return self.result - def run(self): - try: - if ( time.time() - self.starttime ) <= 10: - f = urllib.request.urlopen(self.url) - contents = f.read() - f.close() - self.result = contents - else: - self.result = '' - except IOError: - self.result = '' + def run(self): + try: + if (time.time() - self.starttime) <= 10: + f = urllib.request.urlopen(self.url) + contents = f.read() + f.close() + self.result = contents + else: + self.result = '' + except IOError: + self.result = '' def downloadSpeed(files): - start = time.time() - def producer(q, files): - for file in files: - thread = FileGetter(file, start) - thread.start() - q.put(thread, True) - sys.stdout.write('.') - sys.stdout.flush() + start = time.time() - finished = [] - def consumer(q, total_files): - while len(finished) < total_files: - thread = q.get(True) - thread.join() - finished.append(thread.get_result()) + def producer(q, files): + for file in files: + thread = FileGetter(file, start) + thread.start() + q.put(thread, True) + sys.stdout.write('.') + sys.stdout.flush() - q = Queue(3) - start = time.time() - prod_thread = threading.Thread(target=producer, args=(q, files)) - cons_thread = threading.Thread(target=consumer, args=(q, len(files))) - prod_thread.start() - cons_thread.start() - prod_thread.join() - cons_thread.join() - return (len(b''.join([chunk if isinstance(chunk, bytes) else chunk.encode() for chunk in finished]))/(time.time()-start)) + finished = [] + + def consumer(q, total_files): + while len(finished) < total_files: + thread = q.get(True) + thread.join() + finished.append(thread.get_result()) + + q = Queue(3) + start = time.time() + prod_thread = threading.Thread(target=producer, args=(q, files)) + cons_thread = threading.Thread(target=consumer, args=(q, len(files))) + prod_thread.start() + cons_thread.start() + prod_thread.join() + cons_thread.join() + return (len(b''.join([chunk if isinstance(chunk, bytes) else chunk.encode() + for chunk in finished]))/(time.time()-start)) class FilePutter(threading.Thread): - def __init__(self, url, start, size): - self.url = url - self.data = os.urandom(int(size)-len('content1=')) - self.result = None - self.starttime = start - threading.Thread.__init__(self) + def __init__(self, url, start, size): + self.url = url + self.data = os.urandom(int(size)-len('content1=')) + self.result = None + self.starttime = start + threading.Thread.__init__(self) - def get_result(self): - return self.result + def get_result(self): + return self.result - def run(self): - try: - if ( time.time() - self.starttime ) <= 10: - f = urllib.request.urlopen(self.url, self.data) - contents = f.read() - f.close() - self.result = self.data - else: - self.result = '' - except IOError: - self.result = '' + def run(self): + try: + if (time.time() - self.starttime) <= 10: + f = urllib.request.urlopen(self.url, self.data) + contents = f.read() + f.close() + self.result = self.data + else: + self.result = '' + except IOError: + self.result = '' def uploadSpeed(url, sizes): - start = time.time() - def producer(q, sizes): - for size in sizes: - thread = FilePutter(url, start, size) - thread.start() - q.put(thread, True) - sys.stdout.write('.') - sys.stdout.flush() + start = time.time() - finished = [] - def consumer(q, total_sizes): - while len(finished) < total_sizes: - thread = q.get(True) - thread.join() - finished.append(thread.get_result()) + def producer(q, sizes): + for size in sizes: + thread = FilePutter(url, start, size) + thread.start() + q.put(thread, True) + sys.stdout.write('.') + sys.stdout.flush() - q = Queue(9) - start = time.time() - prod_thread = threading.Thread(target=producer, args=(q, sizes)) - cons_thread = threading.Thread(target=consumer, args=(q, len(sizes))) - prod_thread.start() - cons_thread.start() - prod_thread.join() - cons_thread.join() - return (len(b''.join([chunk if isinstance(chunk, bytes) else chunk.encode() for chunk in finished]))/(time.time()-start)) + finished = [] + + def consumer(q, total_sizes): + while len(finished) < total_sizes: + thread = q.get(True) + thread.join() + finished.append(thread.get_result()) + + q = Queue(9) + start = time.time() + prod_thread = threading.Thread(target=producer, args=(q, sizes)) + cons_thread = threading.Thread(target=consumer, args=(q, len(sizes))) + prod_thread.start() + cons_thread.start() + prod_thread.join() + cons_thread.join() + return (len(b''.join([chunk if isinstance(chunk, bytes) else chunk.encode() + for chunk in finished]))/(time.time()-start)) def getConfig(): - """Download the speedtest.net configuration and return only the data we are interested in""" + """Download the speedtest.net configuration and return only the data + we are interested in + """ - uh = urllib.request.urlopen('http://www.speedtest.net/speedtest-config.php') - configxml = uh.read() - if int(uh.getcode()) != 200: - return None - uh.close() - root = ET.fromstring(configxml) - config = { - 'client': root.find('client').attrib, - 'times': root.find('times').attrib, - 'download': root.find('download').attrib, - 'upload': root.find('upload').attrib - } - return config + uh = urllib.request.urlopen('http://www.speedtest.net/' + 'speedtest-config.php') + configxml = uh.read() + if int(uh.getcode()) != 200: + return None + uh.close() + root = ET.fromstring(configxml) + config = { + 'client': root.find('client').attrib, + 'times': root.find('times').attrib, + 'download': root.find('download').attrib, + 'upload': root.find('upload').attrib} + return config def closestServers(client): - """Determine the 5 closest speedtest.net servers based on geographic distance""" + """Determine the 5 closest speedtest.net servers based on geographic + distance + """ - uh = urllib.request.urlopen('http://speedtest.net/speedtest-servers.php') - serversxml = uh.read() - if int(uh.getcode()) != 200: - return None - uh.close() - root = ET.fromstring(serversxml) - servers = {} - for server in root[0]: - d = distance([float(client['lat']), float(client['lon'])], [float(server.get('lat')), float(server.get('lon'))]) - servers[d] = server.attrib + uh = urllib.request.urlopen('http://speedtest.net/speedtest-servers.php') + serversxml = uh.read() + if int(uh.getcode()) != 200: + return None + uh.close() + root = ET.fromstring(serversxml) + servers = {} + for server in root[0]: + d = distance([float(client['lat']), float(client['lon'])], + [float(server.get('lat')), float(server.get('lon'))]) + servers[d] = server.attrib - closest = [] - for d in sorted(servers.keys())[0:4]: - closest.append(servers[d]) + closest = [] + for d in sorted(servers.keys())[0:4]: + closest.append(servers[d]) - del servers - del root - return closest + del servers + del root + return closest def getBestServer(servers): - """Perform a speedtest.net "ping" to determine which speedtest.net server has the lowest latency""" + """Perform a speedtest.net "ping" to determine which speedtest.net + server has the lowest latency + """ - results = {} - for server in servers: - cum = 0 - url = os.path.dirname(server['url']) - for i in range(0,3): - uh = urllib.request.urlopen('%s/latency.txt' % url) - start = time.time() - text = uh.read().strip() - total = time.time() - start - if int(uh.getcode()) == 200 and text == b'test=test': - cum += total - else: - cum += 3600 - uh.close() - avg = round((cum/3)*1000000,3) - results[avg] = server + results = {} + for server in servers: + cum = 0 + url = os.path.dirname(server['url']) + for i in range(0, 3): + uh = urllib.request.urlopen('%s/latency.txt' % url) + start = time.time() + text = uh.read().strip() + total = time.time() - start + if int(uh.getcode()) == 200 and text == b'test=test': + cum += total + else: + cum += 3600 + uh.close() + avg = round((cum / 3) * 1000000, 3) + results[avg] = server - best = results[sorted(results.keys())[0]] - best['latency'] = avg + best = results[sorted(results.keys())[0]] + best['latency'] = avg - return best + return best def speedtest(): - """Run the full speedtest.net test""" + """Run the full speedtest.net test""" - print('Retrieving speedtest.net configuration...') - config = getConfig() + print('Retrieving speedtest.net configuration...') + config = getConfig() - print('Retrieving speedtest.net server list...') - servers = closestServers(config['client']) + print('Retrieving speedtest.net server list...') + servers = closestServers(config['client']) - print('Selecting best server based on ping...') - best = getBestServer(servers) - print('Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best) + print('Selecting best server based on ping...') + best = getBestServer(servers) + print('Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best) - sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000] - urls = [] - for size in sizes: - for i in range(0,4): - urls.append('%s/random%sx%s.jpg' % (os.path.dirname(best['url']), size, size)) - print('Testing download speed', end='') - dlspeed = downloadSpeed(urls) - print('\nDownload: %s Mbit/s' % round((dlspeed/1024/1024)*8,2)) + sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000] + urls = [] + for size in sizes: + for i in range(0, 4): + urls.append('%s/random%sx%s.jpg' % (os.path.dirname(best['url']), + size, size)) + print('Testing download speed', end='') + dlspeed = downloadSpeed(urls) + print('\nDownload: %s Mbit/s' % round((dlspeed / 1024 / 1024) * 8, 2)) - sizesizes = [int(.25*1024*1024), int(.5*1024*1024)] - sizes = [] - for size in sizesizes: - for i in range(0,25): - sizes.append(size) - print('Testing upload speed', end='') - ulspeed = uploadSpeed(best['url'], sizes) - print('\nUpload speed: %s Mbit/s' % round((ulspeed/1024/1024)*8,2)) + sizesizes = [int(.25 * 1024 * 1024), int(.5 * 1024 * 1024)] + sizes = [] + for size in sizesizes: + for i in range(0, 25): + sizes.append(size) + print('Testing upload speed', end='') + ulspeed = uploadSpeed(best['url'], sizes) + print('\nUpload speed: %s Mbit/s' % round((ulspeed / 1024 / 1024) * 8, 2)) - dlspeedk = int(round((dlspeed/1024)*8, 0)) - ping = int(round(best['latency'], 0)) - ulspeedk = int(round((ulspeed/1024)*8, 0)) + dlspeedk = int(round((dlspeed / 1024) * 8, 0)) + ping = int(round(best['latency'], 0)) + ulspeedk = int(round((ulspeed / 1024) * 8, 0)) - apiData = [ - 'download=%s' % dlspeedk, - 'ping=%s' % ping, - 'upload=%s' % ulspeedk, - 'promo=', - 'startmode=%s' % 'pingselect', - 'recommendedserverid=%s' % best['id'], - 'accuracy=%s' % 1, - 'serverid=%s' % best['id'], - 'hash=%s' % hashlib.md5(('%s-%s-%s-%s' % (ping, ulspeedk, dlspeedk, '297aae72')).encode()).hexdigest() - ] + apiData = [ + 'download=%s' % dlspeedk, + 'ping=%s' % ping, + 'upload=%s' % ulspeedk, + 'promo=', + 'startmode=%s' % 'pingselect', + 'recommendedserverid=%s' % best['id'], + 'accuracy=%s' % 1, + 'serverid=%s' % best['id'], + 'hash=%s' % hashlib.md5(('%s-%s-%s-%s' % + (ping, ulspeedk, dlspeedk, '297aae72')) + .encode()).hexdigest()] - req = urllib.request.Request('http://www.speedtest.net/api/api.php', data='&'.join(apiData).encode()) - req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf') - f = urllib.request.urlopen(req) - response = f.read() - code = f.getcode() - f.close() + req = urllib.request.Request('http://www.speedtest.net/api/api.php', + data='&'.join(apiData).encode()) + req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf') + f = urllib.request.urlopen(req) + response = f.read() + code = f.getcode() + f.close() - if int(code) != 200: - print('Could not submit results to speedtest.net') - sys.exit(1) + if int(code) != 200: + print('Could not submit results to speedtest.net') + sys.exit(1) - qsargs = urllib.parse.parse_qs(response) - resultid = qsargs.get(b'resultid') - if not resultid or len(resultid) != 1: - print('Could not submit results to speedtest.net') - sys.exit(1) + qsargs = urllib.parse.parse_qs(response) + resultid = qsargs.get(b'resultid') + if not resultid or len(resultid) != 1: + print('Could not submit results to speedtest.net') + sys.exit(1) - print('Share results: http://www.speedtest.net/result/%s.png' % resultid[0].decode()) + print('Share results: http://www.speedtest.net/result/%s.png' % resultid[0] + .decode()) if __name__ == '__main__': - speedtest() + speedtest() # vim:ts=2:sw=2:expandtab