diff --git a/README.md b/README.md index fa793c7..40b56a5 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,8 @@ Command line interface for testing internet bandwidth using speedtest.net ## Versions -speedtest-cli is written for use with Python 2.4-2.7 +speedtest-cli works with Python 2.4-3.3 -speedtest-cli-3 is written for use with Python 3 ## Usage diff --git a/speedtest-cli b/speedtest-cli index ff4ff15..cc8cdc1 100755 --- a/speedtest-cli +++ b/speedtest-cli @@ -15,28 +15,100 @@ # License for the specific language governing permissions and limitations # under the License. -import urllib2 +try: + from urllib2 import urlopen, Request +except ImportError: + from urllib.request import urlopen, Request + import math import time import os import sys import threading -from Queue import Queue +import binascii from xml.dom import minidom as DOM + +try: + from Queue import Queue +except ImportError: + from queue import Queue + try: from urlparse import parse_qs except ImportError: - from cgi import parse_qs + try: + from urllib.parse import parse_qs + except ImportError: + from cgi import parse_qs + try: from hashlib import md5 except ImportError: from md5 import md5 + try: from argparse import ArgumentParser as ArgParser except ImportError: from optparse import OptionParser as ArgParser +try: + import builtins +except ImportError: + def print_(*args, **kwargs): + """The new-style print function taken from + https://pypi.python.org/pypi/six/ + + """ + fp = kwargs.pop("file", sys.stdout) + if fp is None: + return + + def write(data): + if not isinstance(data, basestring): + data = str(data) + fp.write(data) + + want_unicode = False + sep = kwargs.pop("sep", None) + if sep is not None: + if isinstance(sep, unicode): + want_unicode = True + elif not isinstance(sep, str): + raise TypeError("sep must be None or a string") + end = kwargs.pop("end", None) + if end is not None: + if isinstance(end, unicode): + want_unicode = True + elif not isinstance(end, str): + raise TypeError("end must be None or a string") + if kwargs: + raise TypeError("invalid keyword arguments to print()") + if not want_unicode: + for arg in args: + if isinstance(arg, unicode): + want_unicode = True + break + if want_unicode: + newline = unicode("\n") + space = unicode(" ") + else: + newline = "\n" + space = " " + if sep is None: + sep = space + if end is None: + end = newline + for i, arg in enumerate(args): + if i: + write(sep) + write(arg) + write(end) +else: + print_ = getattr(builtins, 'print') + del builtins + + def distance(origin, destination): """Determine distance between 2 sets of [lat,lon] in km""" @@ -68,7 +140,7 @@ class FileGetter(threading.Thread): def run(self): try: if (time.time() - self.starttime) <= 10: - f = urllib2.urlopen(self.url) + f = urlopen(self.url) self.result = 0 while 1: contents = f.read(10240) @@ -118,8 +190,8 @@ def downloadSpeed(files, quiet=False): class FilePutter(threading.Thread): def __init__(self, url, start, size): self.url = url - data = os.urandom(int(size)).encode('hex') - self.data = 'content1=%s' % data[0:int(size)-9] + data = binascii.hexlify(os.urandom(int(size)-9)).decode() + self.data = ('content1=%s' % data[0:int(size)-9]).encode() del data self.result = None self.starttime = start @@ -131,7 +203,7 @@ class FilePutter(threading.Thread): def run(self): try: if (time.time() - self.starttime) <= 10: - f = urllib2.urlopen(self.url, self.data) + f = urlopen(self.url, self.data) contents = f.read() f.close() self.result = len(self.data) @@ -175,7 +247,7 @@ def uploadSpeed(url, sizes, quiet=False): def getAttributesByTagName(dom, tagName): elem = dom.getElementsByTagName(tagName)[0] - return dict(elem.attributes.items()) + return dict(list(elem.attributes.items())) def getConfig(): @@ -183,7 +255,7 @@ def getConfig(): we are interested in """ - uh = urllib2.urlopen('http://www.speedtest.net/speedtest-config.php') + uh = urlopen('http://www.speedtest.net/speedtest-config.php') configxml = uh.read() if int(uh.code) != 200: return None @@ -204,7 +276,7 @@ def closestServers(client, all=False): distance """ - uh = urllib2.urlopen('http://www.speedtest.net/speedtest-servers.php') + uh = urlopen('http://www.speedtest.net/speedtest-servers.php') serversxml = uh.read() if int(uh.code) != 200: return None @@ -212,7 +284,7 @@ def closestServers(client, all=False): root = DOM.parseString(serversxml) servers = {} for server in root.getElementsByTagName('server'): - attrib = dict(server.attributes.items()) + attrib = dict(list(server.attributes.items())) d = distance([float(client['lat']), float(client['lon'])], [float(attrib.get('lat')), float(attrib.get('lon'))]) attrib['d'] = d @@ -245,12 +317,12 @@ def getBestServer(servers): for server in servers: cum = 0 url = os.path.dirname(server['url']) - for i in xrange(0, 3): - uh = urllib2.urlopen('%s/latency.txt' % url) + for i in range(0, 3): + uh = urlopen('%s/latency.txt' % url) start = time.time() text = uh.read().strip() total = time.time() - start - if int(uh.code) == 200 and text == 'test=test': + if int(uh.code) == 200 and text == 'test=test'.encode(): cum += total else: cum += 3600 @@ -299,11 +371,11 @@ def speedtest(): del options if not args.simple: - print 'Retrieving speedtest.net configuration...' + print_('Retrieving speedtest.net configuration...') config = getConfig() if not args.simple: - print 'Retrieving speedtest.net server list...' + print_('Retrieving speedtest.net server list...') if args.list or args.server: servers = closestServers(config['client'], True) if args.list: @@ -313,7 +385,7 @@ def speedtest(): '[%(d)0.2f km]' % server) serverList.append(line) try: - print '\n'.join(serverList).encode('utf-8', 'ignore') + print_('\n'.join(serverList).encode('utf-8', 'ignore')) except IOError: pass sys.exit(0) @@ -321,50 +393,50 @@ def speedtest(): servers = closestServers(config['client']) if not args.simple: - print 'Testing from %(isp)s (%(ip)s)...' % config['client'] + print_('Testing from %(isp)s (%(ip)s)...' % config['client']) if args.server: try: best = getBestServer(filter(lambda x: x['id'] == args.server, servers)) except IndexError: - print 'Invalid server ID' + print_('Invalid server ID') sys.exit(1) else: if not args.simple: - print 'Selecting best server based on ping...' + print_('Selecting best server based on ping...') best = getBestServer(servers) if not args.simple: - print ('Hosted by %(sponsor)s (%(name)s) [%(d)0.2f km]: ' + print_('Hosted by %(sponsor)s (%(name)s) [%(d)0.2f km]: ' '%(latency)s ms' % best) else: - print 'Ping: %(latency)s ms' % best + print_('Ping: %(latency)s ms' % best) sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000] urls = [] for size in sizes: - for i in xrange(0, 4): + for i in range(0, 4): urls.append('%s/random%sx%s.jpg' % (os.path.dirname(best['url']), size, size)) if not args.simple: - print 'Testing download speed', + print_('Testing download speed', end='') dlspeed = downloadSpeed(urls, args.simple) if not args.simple: - print - print 'Download: %0.2f Mbit/s' % ((dlspeed / 1000 / 1000) * 8) + print_() + print_('Download: %0.2f Mbit/s' % ((dlspeed / 1000 / 1000) * 8)) sizesizes = [int(.25 * 1000 * 1000), int(.5 * 1000 * 1000)] sizes = [] for size in sizesizes: - for i in xrange(0, 25): + for i in range(0, 25): sizes.append(size) if not args.simple: - print 'Testing upload speed', + print_('Testing upload speed', end='') ulspeed = uploadSpeed(best['url'], sizes, args.simple) if not args.simple: - print - print 'Upload: %0.2f Mbit/s' % ((ulspeed / 1000 / 1000) * 8) + print_() + print_('Upload: %0.2f Mbit/s' % ((ulspeed / 1000 / 1000) * 8)) if args.share: dlspeedk = int(round((dlspeed / 1000) * 8, 0)) @@ -380,35 +452,35 @@ def speedtest(): 'recommendedserverid=%s' % best['id'], 'accuracy=%s' % 1, 'serverid=%s' % best['id'], - 'hash=%s' % md5('%s-%s-%s-%s' % - (ping, ulspeedk, dlspeedk, '297aae72') - ).hexdigest()] + 'hash=%s' % md5(('%s-%s-%s-%s' % + (ping, ulspeedk, dlspeedk, '297aae72')) + .encode()).hexdigest()] - req = urllib2.Request('http://www.speedtest.net/api/api.php', - data='&'.join(apiData)) + req = Request('http://www.speedtest.net/api/api.php', + data='&'.join(apiData).encode()) req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf') - f = urllib2.urlopen(req) + f = urlopen(req) response = f.read() code = f.code f.close() if int(code) != 200: - print 'Could not submit results to speedtest.net' + print_('Could not submit results to speedtest.net') sys.exit(1) - qsargs = parse_qs(response) + qsargs = parse_qs(response.decode()) resultid = qsargs.get('resultid') if not resultid or len(resultid) != 1: - print 'Could not submit results to speedtest.net' + print_('Could not submit results to speedtest.net') sys.exit(1) - print ('Share results: http://www.speedtest.net/result/%s.png' % + print_('Share results: http://www.speedtest.net/result/%s.png' % resultid[0]) if __name__ == '__main__': try: speedtest() except KeyboardInterrupt: - print '\nCancelling...' + print_('\nCancelling...') # vim:ts=4:sw=4:expandtab diff --git a/speedtest-cli-3 b/speedtest-cli-3 deleted file mode 100755 index 25fdfb9..0000000 --- a/speedtest-cli-3 +++ /dev/null @@ -1,391 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# Copyright 2013 Matt Martz -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import urllib.request -import urllib.parse -import urllib.error -import math -import xml.etree.ElementTree as ET -import time -import os -import sys -import hashlib -import threading -import binascii -import argparse -from queue import Queue - - -def distance(origin, destination): - """Determine distance between 2 sets of [lat,lon] in km""" - - lat1, lon1 = origin - lat2, lon2 = destination - radius = 6371 # km - - dlat = math.radians(lat2-lat1) - dlon = math.radians(lon2-lon1) - a = (math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.radians(lat1)) - * math.cos(math.radians(lat2)) * math.sin(dlon / 2) - * math.sin(dlon / 2)) - c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) - d = radius * c - - return d - - -class FileGetter(threading.Thread): - def __init__(self, url, start): - self.url = url - self.result = None - self.starttime = start - threading.Thread.__init__(self) - - def get_result(self): - return self.result - - def run(self): - try: - if (time.time() - self.starttime) <= 10: - f = urllib.request.urlopen(self.url) - self.result = 0 - while 1: - contents = f.read(10240) - if contents: - self.result += len(contents) - else: - break - f.close() - else: - self.result = 0 - except IOError: - self.result = 0 - - -def downloadSpeed(files, quiet=False): - start = time.time() - - def producer(q, files): - for file in files: - thread = FileGetter(file, start) - thread.start() - q.put(thread, True) - if not quiet: - sys.stdout.write('.') - sys.stdout.flush() - - finished = [] - - def consumer(q, total_files): - while len(finished) < total_files: - thread = q.get(True) - thread.join() - finished.append(thread.result) - thread.result = 0 - - q = Queue(6) - start = time.time() - prod_thread = threading.Thread(target=producer, args=(q, files)) - cons_thread = threading.Thread(target=consumer, args=(q, len(files))) - prod_thread.start() - cons_thread.start() - prod_thread.join() - cons_thread.join() - return (sum(finished)/(time.time()-start)) - - -class FilePutter(threading.Thread): - def __init__(self, url, start, size): - self.url = url - data = binascii.hexlify(os.urandom(int(size)-9)).decode() - self.data = 'content1=%s' % data[0:int(size)-9] - del data - self.result = None - self.starttime = start - threading.Thread.__init__(self) - - def get_result(self): - return self.result - - def run(self): - try: - if (time.time() - self.starttime) <= 10: - f = urllib.request.urlopen(self.url, self.data.encode()) - contents = f.read() - f.close() - self.result = len(self.data) - else: - self.result = 0 - except IOError: - self.result = 0 - - -def uploadSpeed(url, sizes, quiet=False): - start = time.time() - - def producer(q, sizes): - for size in sizes: - thread = FilePutter(url, start, size) - thread.start() - q.put(thread, True) - if not quiet: - sys.stdout.write('.') - sys.stdout.flush() - - finished = [] - - def consumer(q, total_sizes): - while len(finished) < total_sizes: - thread = q.get(True) - thread.join() - finished.append(thread.result) - thread.result = 0 - - q = Queue(6) - start = time.time() - prod_thread = threading.Thread(target=producer, args=(q, sizes)) - cons_thread = threading.Thread(target=consumer, args=(q, len(sizes))) - prod_thread.start() - cons_thread.start() - prod_thread.join() - cons_thread.join() - return (sum(finished)/(time.time()-start)) - - -def getConfig(): - """Download the speedtest.net configuration and return only the data - we are interested in - """ - - uh = urllib.request.urlopen('http://www.speedtest.net/' - 'speedtest-config.php') - configxml = uh.read() - if int(uh.getcode()) != 200: - return None - uh.close() - root = ET.fromstring(configxml) - config = { - 'client': root.find('client').attrib, - 'times': root.find('times').attrib, - 'download': root.find('download').attrib, - 'upload': root.find('upload').attrib} - return config - - -def closestServers(client, all=False): - """Determine the 5 closest speedtest.net servers based on geographic - distance - """ - - uh = urllib.request.urlopen('http://www.speedtest.net/speedtest-servers.php') - serversxml = uh.read() - if int(uh.getcode()) != 200: - return None - uh.close() - root = ET.fromstring(serversxml) - servers = {} - for server in root[0]: - d = distance([float(client['lat']), float(client['lon'])], - [float(server.get('lat')), float(server.get('lon'))]) - server.attrib['d'] = d - if d not in servers: - servers[d] = [server.attrib] - else: - servers[d].append(server.attrib) - del root - - closest = [] - for d in sorted(servers.keys()): - for s in servers[d]: - closest.append(s) - if len(closest) == 5 and not all: - break - else: - continue - break - - del servers - return closest - - -def getBestServer(servers): - """Perform a speedtest.net "ping" to determine which speedtest.net - server has the lowest latency - """ - - results = {} - for server in servers: - cum = 0 - url = os.path.dirname(server['url']) - for i in range(0, 3): - uh = urllib.request.urlopen('%s/latency.txt' % url) - start = time.time() - text = uh.read().strip() - total = time.time() - start - if int(uh.getcode()) == 200 and text == b'test=test': - cum += total - else: - cum += 3600 - uh.close() - avg = round((cum / 3) * 1000000, 3) - results[avg] = server - - fastest = sorted(results.keys())[0] - best = results[fastest] - best['latency'] = fastest - - return best - - -def speedtest(): - """Run the full speedtest.net test""" - - description = ( - 'Command line interface for testing internet bandwidth using ' - 'speedtest.net.\n' - '------------------------------------------------------------' - '--------------\n' - 'https://github.com/sivel/speedtest-cli') - - parser = argparse.ArgumentParser(description=description) - parser.add_argument('--share', action='store_true', - help='Generate and provide a URL to the speedtest.net ' - 'share results image') - parser.add_argument('--simple', action='store_true', - help='Suppress verbose output, only show basic ' - 'information') - parser.add_argument('--list', action='store_true', - help='Display a list of speedtest.net servers ' - 'sorted by distance') - parser.add_argument('--server', help='Specify a server ID to test against') - args = parser.parse_args() - - if not args.simple: - print('Retrieving speedtest.net configuration...') - config = getConfig() - - if not args.simple: - print('Retrieving speedtest.net server list...') - #servers = closestServers(config['client']) - if args.list or args.server: - servers = closestServers(config['client'], True) - if args.list: - serverList = [] - for server in servers: - line = ('%(id)4s) %(sponsor)s (%(name)s, %(country)s) ' - '[%(d)0.2f km]' % server) - serverList.append(line) - try: - print('\n'.join(serverList)) - except IOError: - pass - sys.exit(0) - else: - servers = closestServers(config['client']) - - if not args.simple: - print('Testing from %(isp)s (%(ip)s)...' % config['client']) - - if args.server: - try: - best = getBestServer(filter(lambda x: x['id'] == args.server, - servers)) - except IndexError: - print('Invalid server ID') - sys.exit(1) - else: - if not args.simple: - print('Selecting best server based on ping...') - best = getBestServer(servers) - - if not args.simple: - print('Hosted by %(sponsor)s (%(name)s) [%(d)0.2f km]: ' - '%(latency)s ms' % best) - else: - print('Ping: %(latency)s ms' % best) - - sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000] - urls = [] - for size in sizes: - for i in range(0, 4): - urls.append('%s/random%sx%s.jpg' % (os.path.dirname(best['url']), - size, size)) - if not args.simple: - print('Testing download speed', end='') - dlspeed = downloadSpeed(urls, args.simple) - if not args.simple: - print() - print('Download: %0.2f Mbit/s' % ((dlspeed / 1000 / 1000) * 8)) - - sizesizes = [int(.25 * 1000 * 1000), int(.5 * 1000 * 1000)] - sizes = [] - for size in sizesizes: - for i in range(0, 25): - sizes.append(size) - if not args.simple: - print('Testing upload speed', end='') - ulspeed = uploadSpeed(best['url'], sizes, args.simple) - if not args.simple: - print() - print('Upload: %0.2f Mbit/s' % ((ulspeed / 1000 / 1000) * 8)) - - if args.share: - dlspeedk = int(round((dlspeed / 1000) * 8, 0)) - ping = int(round(best['latency'], 0)) - ulspeedk = int(round((ulspeed / 1000) * 8, 0)) - - apiData = [ - 'download=%s' % dlspeedk, - 'ping=%s' % ping, - 'upload=%s' % ulspeedk, - 'promo=', - 'startmode=%s' % 'pingselect', - 'recommendedserverid=%s' % best['id'], - 'accuracy=%s' % 1, - 'serverid=%s' % best['id'], - 'hash=%s' % hashlib.md5(('%s-%s-%s-%s' % - (ping, ulspeedk, dlspeedk, '297aae72')) - .encode()).hexdigest()] - - req = urllib.request.Request('http://www.speedtest.net/api/api.php', - data='&'.join(apiData).encode()) - req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf') - f = urllib.request.urlopen(req) - response = f.read() - code = f.getcode() - f.close() - - if int(code) != 200: - print('Could not submit results to speedtest.net') - sys.exit(1) - - qsargs = urllib.parse.parse_qs(response) - resultid = qsargs.get(b'resultid') - if not resultid or len(resultid) != 1: - print('Could not submit results to speedtest.net') - sys.exit(1) - - print('Share results: http://www.speedtest.net/result/%s.png' % - resultid[0].decode()) - -if __name__ == '__main__': - try: - speedtest() - except KeyboardInterrupt: - print('\nCancelling...') - -# vim:ts=4:sw=4:expandtab