#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 2013 Matt Martz # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import urllib.request import urllib.parse import urllib.error import math import xml.etree.ElementTree as ET import time import os import sys import hashlib import threading from queue import Queue def distance(origin, destination): """Determine distance between 2 sets of [lat,lon] in km""" lat1, lon1 = origin lat2, lon2 = destination radius = 6371 # km dlat = math.radians(lat2-lat1) dlon = math.radians(lon2-lon1) a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \ * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) d = radius * c return d class FileGetter(threading.Thread): def __init__(self, url, start): self.url = url self.result = None self.starttime = start threading.Thread.__init__(self) def get_result(self): return self.result def run(self): try: if ( time.time() - self.starttime ) <= 10: f = urllib.request.urlopen(self.url) contents = f.read() f.close() self.result = contents else: self.result = '' except IOError: self.result = '' def downloadSpeed(files): start = time.time() def producer(q, files): for file in files: thread = FileGetter(file, start) thread.start() q.put(thread, True) sys.stdout.write('.') sys.stdout.flush() finished = [] def consumer(q, total_files): while len(finished) < total_files: thread = q.get(True) thread.join() finished.append(thread.get_result()) q = Queue(3) start = time.time() prod_thread = threading.Thread(target=producer, args=(q, files)) cons_thread = threading.Thread(target=consumer, args=(q, len(files))) prod_thread.start() cons_thread.start() prod_thread.join() cons_thread.join() return (len(b''.join([chunk if isinstance(chunk, bytes) else chunk.encode() for chunk in finished]))/(time.time()-start)) class FilePutter(threading.Thread): def __init__(self, url, start, size): self.url = url self.data = os.urandom(int(size)-len('content1=')) self.result = None self.starttime = start threading.Thread.__init__(self) def get_result(self): return self.result def run(self): try: if ( time.time() - self.starttime ) <= 10: f = urllib.request.urlopen(self.url, self.data) contents = f.read() f.close() self.result = self.data else: self.result = '' except IOError: self.result = '' def uploadSpeed(url, sizes): start = time.time() def producer(q, sizes): for size in sizes: thread = FilePutter(url, start, size) thread.start() q.put(thread, True) sys.stdout.write('.') sys.stdout.flush() finished = [] def consumer(q, total_sizes): while len(finished) < total_sizes: thread = q.get(True) thread.join() finished.append(thread.get_result()) q = Queue(9) start = time.time() prod_thread = threading.Thread(target=producer, args=(q, sizes)) cons_thread = threading.Thread(target=consumer, args=(q, len(sizes))) prod_thread.start() cons_thread.start() prod_thread.join() cons_thread.join() return (len(b''.join([chunk if isinstance(chunk, bytes) else chunk.encode() for chunk in finished]))/(time.time()-start)) def getConfig(): """Download the speedtest.net configuration and return only the data we are interested in""" uh = urllib.request.urlopen('http://www.speedtest.net/speedtest-config.php') configxml = uh.read() if int(uh.getcode()) != 200: return None uh.close() root = ET.fromstring(configxml) config = { 'client': root.find('client').attrib, 'times': root.find('times').attrib, 'download': root.find('download').attrib, 'upload': root.find('upload').attrib } return config def closestServers(client): """Determine the 5 closest speedtest.net servers based on geographic distance""" uh = urllib.request.urlopen('http://speedtest.net/speedtest-servers.php') serversxml = uh.read() if int(uh.getcode()) != 200: return None uh.close() root = ET.fromstring(serversxml) servers = {} for server in root[0]: d = distance([float(client['lat']), float(client['lon'])], [float(server.get('lat')), float(server.get('lon'))]) servers[d] = server.attrib closest = [] for d in sorted(servers.keys())[0:4]: closest.append(servers[d]) del servers del root return closest def getBestServer(servers): """Perform a speedtest.net "ping" to determine which speedtest.net server has the lowest latency""" results = {} for server in servers: cum = 0 url = os.path.dirname(server['url']) for i in range(0,3): uh = urllib.request.urlopen('%s/latency.txt' % url) start = time.time() text = uh.read().strip() total = time.time() - start if int(uh.getcode()) == 200 and text == b'test=test': cum += total else: cum += 3600 uh.close() avg = round((cum/3)*1000000,3) results[avg] = server best = results[sorted(results.keys())[0]] best['latency'] = avg return best def speedtest(): """Run the full speedtest.net test""" print('Retrieving speedtest.net configuration...') config = getConfig() print('Retrieving speedtest.net server list...') servers = closestServers(config['client']) print('Selecting best server based on ping...') best = getBestServer(servers) print('Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best) sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000] urls = [] for size in sizes: for i in range(0,4): urls.append('%s/random%sx%s.jpg' % (os.path.dirname(best['url']), size, size)) print('Testing download speed', end='') dlspeed = downloadSpeed(urls) print('\nDownload: %s Mbit/s' % round((dlspeed/1024/1024)*8,2)) sizesizes = [int(.25*1024*1024), int(.5*1024*1024)] sizes = [] for size in sizesizes: for i in range(0,25): sizes.append(size) print('Testing upload speed', end='') ulspeed = uploadSpeed(best['url'], sizes) print('\nUpload speed: %s Mbit/s' % round((ulspeed/1024/1024)*8,2)) dlspeedk = int(round((dlspeed/1024)*8, 0)) ping = int(round(best['latency'], 0)) ulspeedk = int(round((ulspeed/1024)*8, 0)) apiData = [ 'download=%s' % dlspeedk, 'ping=%s' % ping, 'upload=%s' % ulspeedk, 'promo=', 'startmode=%s' % 'pingselect', 'recommendedserverid=%s' % best['id'], 'accuracy=%s' % 1, 'serverid=%s' % best['id'], 'hash=%s' % hashlib.md5(('%s-%s-%s-%s' % (ping, ulspeedk, dlspeedk, '297aae72')).encode()).hexdigest() ] req = urllib.request.Request('http://www.speedtest.net/api/api.php', data='&'.join(apiData).encode()) req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf') f = urllib.request.urlopen(req) response = f.read() code = f.getcode() f.close() if int(code) != 200: print('Could not submit results to speedtest.net') sys.exit(1) qsargs = urllib.parse.parse_qs(response) resultid = qsargs.get(b'resultid') if not resultid or len(resultid) != 1: print('Could not submit results to speedtest.net') sys.exit(1) print('Share results: http://www.speedtest.net/result/%s.png' % resultid[0].decode()) if __name__ == '__main__': speedtest() # vim:ts=2:sw=2:expandtab