#!/usr/bin/env python # -*- coding: utf-8 -*- import urllib.request import urllib.parse import urllib.error import math import xml.etree.ElementTree as ET import time import os import sys import hashlib import threading from queue import Queue def distance(origin, destination): """Determine distance between 2 sets of [lat,lon] in km""" lat1, lon1 = origin lat2, lon2 = destination radius = 6371 # km dlat = math.radians(lat2-lat1) dlon = math.radians(lon2-lon1) a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \ * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) d = radius * c return d class FileGetter(threading.Thread): def __init__(self, url, start): self.url = url self.result = None self.starttime = start threading.Thread.__init__(self) def get_result(self): return self.result def run(self): try: if ( time.time() - self.starttime ) <= 10: f = urllib.request.urlopen(self.url) contents = f.read() f.close() self.result = contents else: self.result = '' except IOError: self.result = '' def downloadSpeed(files): start = time.time() def producer(q, files): for file in files: thread = FileGetter(file, start) thread.start() q.put(thread, True) sys.stdout.write('.') sys.stdout.flush() finished = [] def consumer(q, total_files): while len(finished) < total_files: thread = q.get(True) thread.join() finished.append(thread.get_result()) q = Queue(3) start = time.time() prod_thread = threading.Thread(target=producer, args=(q, files)) cons_thread = threading.Thread(target=consumer, args=(q, len(files))) prod_thread.start() cons_thread.start() prod_thread.join() cons_thread.join() return (len(b''.join([chunk if isinstance(chunk, bytes) else chunk.encode() for chunk in finished]))/(time.time()-start)) class FilePutter(threading.Thread): def __init__(self, url, start, size): self.url = url self.data = os.urandom(int(size)-len('content1=')) self.result = None self.starttime = start threading.Thread.__init__(self) def get_result(self): return self.result def run(self): try: if ( time.time() - self.starttime ) <= 10: f = urllib.request.urlopen(self.url, self.data) contents = f.read() f.close() self.result = self.data else: self.result = '' except IOError: self.result = '' def uploadSpeed(url, sizes): start = time.time() def producer(q, sizes): for size in sizes: thread = FilePutter(url, start, size) thread.start() q.put(thread, True) sys.stdout.write('.') sys.stdout.flush() finished = [] def consumer(q, total_sizes): while len(finished) < total_sizes: thread = q.get(True) thread.join() finished.append(thread.get_result()) q = Queue(9) start = time.time() prod_thread = threading.Thread(target=producer, args=(q, sizes)) cons_thread = threading.Thread(target=consumer, args=(q, len(sizes))) prod_thread.start() cons_thread.start() prod_thread.join() cons_thread.join() return (len(b''.join([chunk if isinstance(chunk, bytes) else chunk.encode() for chunk in finished]))/(time.time()-start)) def getConfig(): """Download the speedtest.net configuration and return only the data we are interested in""" uh = urllib.request.urlopen('http://www.speedtest.net/speedtest-config.php') configxml = uh.read() if int(uh.getcode()) != 200: return None uh.close() root = ET.fromstring(configxml) config = { 'client': root.find('client').attrib, 'times': root.find('times').attrib, 'download': root.find('download').attrib, 'upload': root.find('upload').attrib } return config def closestServers(client): """Determine the 5 closest speedtest.net servers based on geographic distance""" uh = urllib.request.urlopen('http://speedtest.net/speedtest-servers.php') serversxml = uh.read() if int(uh.getcode()) != 200: return None uh.close() root = ET.fromstring(serversxml) servers = {} for server in root[0]: d = distance([float(client['lat']), float(client['lon'])], [float(server.get('lat')), float(server.get('lon'))]) servers[d] = server.attrib closest = [] for d in sorted(servers.keys())[0:4]: closest.append(servers[d]) del servers del root return closest def getBestServer(servers): """Perform a speedtest.net "ping" to determine which speedtest.net server has the lowest latency""" results = {} for server in servers: cum = 0 url = os.path.dirname(server['url']) for i in range(0,3): uh = urllib.request.urlopen('%s/latency.txt' % url) start = time.time() text = uh.read().strip() total = time.time() - start if int(uh.getcode()) == 200 and text == b'test=test': cum += total else: cum += 3600 uh.close() avg = round((cum/3)*1000000,3) results[avg] = server best = results[sorted(results.keys())[0]] best['latency'] = avg return best def speedtest(): """Run the full speedtest.net test""" print('Retrieving speedtest.net configuration...') config = getConfig() print('Retrieving speedtest.net server list...') servers = closestServers(config['client']) print('Selecting best server based on ping...') best = getBestServer(servers) print('Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best) sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000] urls = [] for size in sizes: for i in range(0,4): urls.append('%s/random%sx%s.jpg' % (os.path.dirname(best['url']), size, size)) print('Testing download speed', end='') dlspeed = downloadSpeed(urls) print('\nDownload: %s Mbit/s' % round((dlspeed/1024/1024)*8,2)) sizesizes = [int(.25*1024*1024), int(.5*1024*1024)] sizes = [] for size in sizesizes: for i in range(0,25): sizes.append(size) print('Testing upload speed', end='') ulspeed = uploadSpeed(best['url'], sizes) print('\nUpload speed: %s Mbit/s' % round((ulspeed/1024/1024)*8,2)) dlspeedk = int(round((dlspeed/1024)*8, 0)) ping = int(round(best['latency'], 0)) ulspeedk = int(round((ulspeed/1024)*8, 0)) apiData = [ 'download=%s' % dlspeedk, 'ping=%s' % ping, 'upload=%s' % ulspeedk, 'promo=', 'startmode=%s' % 'pingselect', 'recommendedserverid=%s' % best['id'], 'accuracy=%s' % 1, 'serverid=%s' % best['id'], 'hash=%s' % hashlib.md5(('%s-%s-%s-%s' % (ping, ulspeedk, dlspeedk, '297aae72')).encode()).hexdigest() ] req = urllib.request.Request('http://www.speedtest.net/api/api.php', data='&'.join(apiData).encode()) req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf') f = urllib.request.urlopen(req) response = f.read() code = f.getcode() f.close() if int(code) != 200: print('Could not submit results to speedtest.net') sys.exit(1) qsargs = urllib.parse.parse_qs(response) resultid = qsargs.get(b'resultid') if not resultid or len(resultid) != 1: print('Could not submit results to speedtest.net') sys.exit(1) print('Share results: http://www.speedtest.net/result/%s.png' % resultid[0].decode()) if __name__ == '__main__': speedtest() # vim:ts=2:sw=2:expandtab