Merge speedtest-cli and speedtest-cli-3

This commit is contained in:
Matt Martz 2013-07-26 16:11:10 -05:00
parent fe0940c574
commit d9cd9c8562
3 changed files with 114 additions and 434 deletions

View File

@ -4,9 +4,8 @@ Command line interface for testing internet bandwidth using speedtest.net
## Versions ## Versions
speedtest-cli is written for use with Python 2.4-2.7 speedtest-cli works with Python 2.4-3.3
speedtest-cli-3 is written for use with Python 3
## Usage ## Usage

View File

@ -15,28 +15,100 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import urllib2 try:
from urllib2 import urlopen, Request
except ImportError:
from urllib.request import urlopen, Request
import math import math
import time import time
import os import os
import sys import sys
import threading import threading
from Queue import Queue import binascii
from xml.dom import minidom as DOM from xml.dom import minidom as DOM
try:
from Queue import Queue
except ImportError:
from queue import Queue
try: try:
from urlparse import parse_qs from urlparse import parse_qs
except ImportError: except ImportError:
from cgi import parse_qs try:
from urllib.parse import parse_qs
except ImportError:
from cgi import parse_qs
try: try:
from hashlib import md5 from hashlib import md5
except ImportError: except ImportError:
from md5 import md5 from md5 import md5
try: try:
from argparse import ArgumentParser as ArgParser from argparse import ArgumentParser as ArgParser
except ImportError: except ImportError:
from optparse import OptionParser as ArgParser from optparse import OptionParser as ArgParser
try:
import builtins
except ImportError:
def print_(*args, **kwargs):
"""The new-style print function taken from
https://pypi.python.org/pypi/six/
"""
fp = kwargs.pop("file", sys.stdout)
if fp is None:
return
def write(data):
if not isinstance(data, basestring):
data = str(data)
fp.write(data)
want_unicode = False
sep = kwargs.pop("sep", None)
if sep is not None:
if isinstance(sep, unicode):
want_unicode = True
elif not isinstance(sep, str):
raise TypeError("sep must be None or a string")
end = kwargs.pop("end", None)
if end is not None:
if isinstance(end, unicode):
want_unicode = True
elif not isinstance(end, str):
raise TypeError("end must be None or a string")
if kwargs:
raise TypeError("invalid keyword arguments to print()")
if not want_unicode:
for arg in args:
if isinstance(arg, unicode):
want_unicode = True
break
if want_unicode:
newline = unicode("\n")
space = unicode(" ")
else:
newline = "\n"
space = " "
if sep is None:
sep = space
if end is None:
end = newline
for i, arg in enumerate(args):
if i:
write(sep)
write(arg)
write(end)
else:
print_ = getattr(builtins, 'print')
del builtins
def distance(origin, destination): def distance(origin, destination):
"""Determine distance between 2 sets of [lat,lon] in km""" """Determine distance between 2 sets of [lat,lon] in km"""
@ -68,7 +140,7 @@ class FileGetter(threading.Thread):
def run(self): def run(self):
try: try:
if (time.time() - self.starttime) <= 10: if (time.time() - self.starttime) <= 10:
f = urllib2.urlopen(self.url) f = urlopen(self.url)
self.result = 0 self.result = 0
while 1: while 1:
contents = f.read(10240) contents = f.read(10240)
@ -118,8 +190,8 @@ def downloadSpeed(files, quiet=False):
class FilePutter(threading.Thread): class FilePutter(threading.Thread):
def __init__(self, url, start, size): def __init__(self, url, start, size):
self.url = url self.url = url
data = os.urandom(int(size)).encode('hex') data = binascii.hexlify(os.urandom(int(size)-9)).decode()
self.data = 'content1=%s' % data[0:int(size)-9] self.data = ('content1=%s' % data[0:int(size)-9]).encode()
del data del data
self.result = None self.result = None
self.starttime = start self.starttime = start
@ -131,7 +203,7 @@ class FilePutter(threading.Thread):
def run(self): def run(self):
try: try:
if (time.time() - self.starttime) <= 10: if (time.time() - self.starttime) <= 10:
f = urllib2.urlopen(self.url, self.data) f = urlopen(self.url, self.data)
contents = f.read() contents = f.read()
f.close() f.close()
self.result = len(self.data) self.result = len(self.data)
@ -175,7 +247,7 @@ def uploadSpeed(url, sizes, quiet=False):
def getAttributesByTagName(dom, tagName): def getAttributesByTagName(dom, tagName):
elem = dom.getElementsByTagName(tagName)[0] elem = dom.getElementsByTagName(tagName)[0]
return dict(elem.attributes.items()) return dict(list(elem.attributes.items()))
def getConfig(): def getConfig():
@ -183,7 +255,7 @@ def getConfig():
we are interested in we are interested in
""" """
uh = urllib2.urlopen('http://www.speedtest.net/speedtest-config.php') uh = urlopen('http://www.speedtest.net/speedtest-config.php')
configxml = uh.read() configxml = uh.read()
if int(uh.code) != 200: if int(uh.code) != 200:
return None return None
@ -204,7 +276,7 @@ def closestServers(client, all=False):
distance distance
""" """
uh = urllib2.urlopen('http://www.speedtest.net/speedtest-servers.php') uh = urlopen('http://www.speedtest.net/speedtest-servers.php')
serversxml = uh.read() serversxml = uh.read()
if int(uh.code) != 200: if int(uh.code) != 200:
return None return None
@ -212,7 +284,7 @@ def closestServers(client, all=False):
root = DOM.parseString(serversxml) root = DOM.parseString(serversxml)
servers = {} servers = {}
for server in root.getElementsByTagName('server'): for server in root.getElementsByTagName('server'):
attrib = dict(server.attributes.items()) attrib = dict(list(server.attributes.items()))
d = distance([float(client['lat']), float(client['lon'])], d = distance([float(client['lat']), float(client['lon'])],
[float(attrib.get('lat')), float(attrib.get('lon'))]) [float(attrib.get('lat')), float(attrib.get('lon'))])
attrib['d'] = d attrib['d'] = d
@ -245,12 +317,12 @@ def getBestServer(servers):
for server in servers: for server in servers:
cum = 0 cum = 0
url = os.path.dirname(server['url']) url = os.path.dirname(server['url'])
for i in xrange(0, 3): for i in range(0, 3):
uh = urllib2.urlopen('%s/latency.txt' % url) uh = urlopen('%s/latency.txt' % url)
start = time.time() start = time.time()
text = uh.read().strip() text = uh.read().strip()
total = time.time() - start total = time.time() - start
if int(uh.code) == 200 and text == 'test=test': if int(uh.code) == 200 and text == 'test=test'.encode():
cum += total cum += total
else: else:
cum += 3600 cum += 3600
@ -299,11 +371,11 @@ def speedtest():
del options del options
if not args.simple: if not args.simple:
print 'Retrieving speedtest.net configuration...' print_('Retrieving speedtest.net configuration...')
config = getConfig() config = getConfig()
if not args.simple: if not args.simple:
print 'Retrieving speedtest.net server list...' print_('Retrieving speedtest.net server list...')
if args.list or args.server: if args.list or args.server:
servers = closestServers(config['client'], True) servers = closestServers(config['client'], True)
if args.list: if args.list:
@ -313,7 +385,7 @@ def speedtest():
'[%(d)0.2f km]' % server) '[%(d)0.2f km]' % server)
serverList.append(line) serverList.append(line)
try: try:
print '\n'.join(serverList).encode('utf-8', 'ignore') print_('\n'.join(serverList).encode('utf-8', 'ignore'))
except IOError: except IOError:
pass pass
sys.exit(0) sys.exit(0)
@ -321,50 +393,50 @@ def speedtest():
servers = closestServers(config['client']) servers = closestServers(config['client'])
if not args.simple: if not args.simple:
print 'Testing from %(isp)s (%(ip)s)...' % config['client'] print_('Testing from %(isp)s (%(ip)s)...' % config['client'])
if args.server: if args.server:
try: try:
best = getBestServer(filter(lambda x: x['id'] == args.server, best = getBestServer(filter(lambda x: x['id'] == args.server,
servers)) servers))
except IndexError: except IndexError:
print 'Invalid server ID' print_('Invalid server ID')
sys.exit(1) sys.exit(1)
else: else:
if not args.simple: if not args.simple:
print 'Selecting best server based on ping...' print_('Selecting best server based on ping...')
best = getBestServer(servers) best = getBestServer(servers)
if not args.simple: if not args.simple:
print ('Hosted by %(sponsor)s (%(name)s) [%(d)0.2f km]: ' print_('Hosted by %(sponsor)s (%(name)s) [%(d)0.2f km]: '
'%(latency)s ms' % best) '%(latency)s ms' % best)
else: else:
print 'Ping: %(latency)s ms' % best print_('Ping: %(latency)s ms' % best)
sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000] sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000]
urls = [] urls = []
for size in sizes: for size in sizes:
for i in xrange(0, 4): for i in range(0, 4):
urls.append('%s/random%sx%s.jpg' % urls.append('%s/random%sx%s.jpg' %
(os.path.dirname(best['url']), size, size)) (os.path.dirname(best['url']), size, size))
if not args.simple: if not args.simple:
print 'Testing download speed', print_('Testing download speed', end='')
dlspeed = downloadSpeed(urls, args.simple) dlspeed = downloadSpeed(urls, args.simple)
if not args.simple: if not args.simple:
print print_()
print 'Download: %0.2f Mbit/s' % ((dlspeed / 1000 / 1000) * 8) print_('Download: %0.2f Mbit/s' % ((dlspeed / 1000 / 1000) * 8))
sizesizes = [int(.25 * 1000 * 1000), int(.5 * 1000 * 1000)] sizesizes = [int(.25 * 1000 * 1000), int(.5 * 1000 * 1000)]
sizes = [] sizes = []
for size in sizesizes: for size in sizesizes:
for i in xrange(0, 25): for i in range(0, 25):
sizes.append(size) sizes.append(size)
if not args.simple: if not args.simple:
print 'Testing upload speed', print_('Testing upload speed', end='')
ulspeed = uploadSpeed(best['url'], sizes, args.simple) ulspeed = uploadSpeed(best['url'], sizes, args.simple)
if not args.simple: if not args.simple:
print print_()
print 'Upload: %0.2f Mbit/s' % ((ulspeed / 1000 / 1000) * 8) print_('Upload: %0.2f Mbit/s' % ((ulspeed / 1000 / 1000) * 8))
if args.share: if args.share:
dlspeedk = int(round((dlspeed / 1000) * 8, 0)) dlspeedk = int(round((dlspeed / 1000) * 8, 0))
@ -380,35 +452,35 @@ def speedtest():
'recommendedserverid=%s' % best['id'], 'recommendedserverid=%s' % best['id'],
'accuracy=%s' % 1, 'accuracy=%s' % 1,
'serverid=%s' % best['id'], 'serverid=%s' % best['id'],
'hash=%s' % md5('%s-%s-%s-%s' % 'hash=%s' % md5(('%s-%s-%s-%s' %
(ping, ulspeedk, dlspeedk, '297aae72') (ping, ulspeedk, dlspeedk, '297aae72'))
).hexdigest()] .encode()).hexdigest()]
req = urllib2.Request('http://www.speedtest.net/api/api.php', req = Request('http://www.speedtest.net/api/api.php',
data='&'.join(apiData)) data='&'.join(apiData).encode())
req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf') req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf')
f = urllib2.urlopen(req) f = urlopen(req)
response = f.read() response = f.read()
code = f.code code = f.code
f.close() f.close()
if int(code) != 200: if int(code) != 200:
print 'Could not submit results to speedtest.net' print_('Could not submit results to speedtest.net')
sys.exit(1) sys.exit(1)
qsargs = parse_qs(response) qsargs = parse_qs(response.decode())
resultid = qsargs.get('resultid') resultid = qsargs.get('resultid')
if not resultid or len(resultid) != 1: if not resultid or len(resultid) != 1:
print 'Could not submit results to speedtest.net' print_('Could not submit results to speedtest.net')
sys.exit(1) sys.exit(1)
print ('Share results: http://www.speedtest.net/result/%s.png' % print_('Share results: http://www.speedtest.net/result/%s.png' %
resultid[0]) resultid[0])
if __name__ == '__main__': if __name__ == '__main__':
try: try:
speedtest() speedtest()
except KeyboardInterrupt: except KeyboardInterrupt:
print '\nCancelling...' print_('\nCancelling...')
# vim:ts=4:sw=4:expandtab # vim:ts=4:sw=4:expandtab

View File

@ -1,391 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2013 Matt Martz
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import urllib.request
import urllib.parse
import urllib.error
import math
import xml.etree.ElementTree as ET
import time
import os
import sys
import hashlib
import threading
import binascii
import argparse
from queue import Queue
def distance(origin, destination):
"""Determine distance between 2 sets of [lat,lon] in km"""
lat1, lon1 = origin
lat2, lon2 = destination
radius = 6371 # km
dlat = math.radians(lat2-lat1)
dlon = math.radians(lon2-lon1)
a = (math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.radians(lat1))
* math.cos(math.radians(lat2)) * math.sin(dlon / 2)
* math.sin(dlon / 2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
d = radius * c
return d
class FileGetter(threading.Thread):
def __init__(self, url, start):
self.url = url
self.result = None
self.starttime = start
threading.Thread.__init__(self)
def get_result(self):
return self.result
def run(self):
try:
if (time.time() - self.starttime) <= 10:
f = urllib.request.urlopen(self.url)
self.result = 0
while 1:
contents = f.read(10240)
if contents:
self.result += len(contents)
else:
break
f.close()
else:
self.result = 0
except IOError:
self.result = 0
def downloadSpeed(files, quiet=False):
start = time.time()
def producer(q, files):
for file in files:
thread = FileGetter(file, start)
thread.start()
q.put(thread, True)
if not quiet:
sys.stdout.write('.')
sys.stdout.flush()
finished = []
def consumer(q, total_files):
while len(finished) < total_files:
thread = q.get(True)
thread.join()
finished.append(thread.result)
thread.result = 0
q = Queue(6)
start = time.time()
prod_thread = threading.Thread(target=producer, args=(q, files))
cons_thread = threading.Thread(target=consumer, args=(q, len(files)))
prod_thread.start()
cons_thread.start()
prod_thread.join()
cons_thread.join()
return (sum(finished)/(time.time()-start))
class FilePutter(threading.Thread):
def __init__(self, url, start, size):
self.url = url
data = binascii.hexlify(os.urandom(int(size)-9)).decode()
self.data = 'content1=%s' % data[0:int(size)-9]
del data
self.result = None
self.starttime = start
threading.Thread.__init__(self)
def get_result(self):
return self.result
def run(self):
try:
if (time.time() - self.starttime) <= 10:
f = urllib.request.urlopen(self.url, self.data.encode())
contents = f.read()
f.close()
self.result = len(self.data)
else:
self.result = 0
except IOError:
self.result = 0
def uploadSpeed(url, sizes, quiet=False):
start = time.time()
def producer(q, sizes):
for size in sizes:
thread = FilePutter(url, start, size)
thread.start()
q.put(thread, True)
if not quiet:
sys.stdout.write('.')
sys.stdout.flush()
finished = []
def consumer(q, total_sizes):
while len(finished) < total_sizes:
thread = q.get(True)
thread.join()
finished.append(thread.result)
thread.result = 0
q = Queue(6)
start = time.time()
prod_thread = threading.Thread(target=producer, args=(q, sizes))
cons_thread = threading.Thread(target=consumer, args=(q, len(sizes)))
prod_thread.start()
cons_thread.start()
prod_thread.join()
cons_thread.join()
return (sum(finished)/(time.time()-start))
def getConfig():
"""Download the speedtest.net configuration and return only the data
we are interested in
"""
uh = urllib.request.urlopen('http://www.speedtest.net/'
'speedtest-config.php')
configxml = uh.read()
if int(uh.getcode()) != 200:
return None
uh.close()
root = ET.fromstring(configxml)
config = {
'client': root.find('client').attrib,
'times': root.find('times').attrib,
'download': root.find('download').attrib,
'upload': root.find('upload').attrib}
return config
def closestServers(client, all=False):
"""Determine the 5 closest speedtest.net servers based on geographic
distance
"""
uh = urllib.request.urlopen('http://www.speedtest.net/speedtest-servers.php')
serversxml = uh.read()
if int(uh.getcode()) != 200:
return None
uh.close()
root = ET.fromstring(serversxml)
servers = {}
for server in root[0]:
d = distance([float(client['lat']), float(client['lon'])],
[float(server.get('lat')), float(server.get('lon'))])
server.attrib['d'] = d
if d not in servers:
servers[d] = [server.attrib]
else:
servers[d].append(server.attrib)
del root
closest = []
for d in sorted(servers.keys()):
for s in servers[d]:
closest.append(s)
if len(closest) == 5 and not all:
break
else:
continue
break
del servers
return closest
def getBestServer(servers):
"""Perform a speedtest.net "ping" to determine which speedtest.net
server has the lowest latency
"""
results = {}
for server in servers:
cum = 0
url = os.path.dirname(server['url'])
for i in range(0, 3):
uh = urllib.request.urlopen('%s/latency.txt' % url)
start = time.time()
text = uh.read().strip()
total = time.time() - start
if int(uh.getcode()) == 200 and text == b'test=test':
cum += total
else:
cum += 3600
uh.close()
avg = round((cum / 3) * 1000000, 3)
results[avg] = server
fastest = sorted(results.keys())[0]
best = results[fastest]
best['latency'] = fastest
return best
def speedtest():
"""Run the full speedtest.net test"""
description = (
'Command line interface for testing internet bandwidth using '
'speedtest.net.\n'
'------------------------------------------------------------'
'--------------\n'
'https://github.com/sivel/speedtest-cli')
parser = argparse.ArgumentParser(description=description)
parser.add_argument('--share', action='store_true',
help='Generate and provide a URL to the speedtest.net '
'share results image')
parser.add_argument('--simple', action='store_true',
help='Suppress verbose output, only show basic '
'information')
parser.add_argument('--list', action='store_true',
help='Display a list of speedtest.net servers '
'sorted by distance')
parser.add_argument('--server', help='Specify a server ID to test against')
args = parser.parse_args()
if not args.simple:
print('Retrieving speedtest.net configuration...')
config = getConfig()
if not args.simple:
print('Retrieving speedtest.net server list...')
#servers = closestServers(config['client'])
if args.list or args.server:
servers = closestServers(config['client'], True)
if args.list:
serverList = []
for server in servers:
line = ('%(id)4s) %(sponsor)s (%(name)s, %(country)s) '
'[%(d)0.2f km]' % server)
serverList.append(line)
try:
print('\n'.join(serverList))
except IOError:
pass
sys.exit(0)
else:
servers = closestServers(config['client'])
if not args.simple:
print('Testing from %(isp)s (%(ip)s)...' % config['client'])
if args.server:
try:
best = getBestServer(filter(lambda x: x['id'] == args.server,
servers))
except IndexError:
print('Invalid server ID')
sys.exit(1)
else:
if not args.simple:
print('Selecting best server based on ping...')
best = getBestServer(servers)
if not args.simple:
print('Hosted by %(sponsor)s (%(name)s) [%(d)0.2f km]: '
'%(latency)s ms' % best)
else:
print('Ping: %(latency)s ms' % best)
sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000]
urls = []
for size in sizes:
for i in range(0, 4):
urls.append('%s/random%sx%s.jpg' % (os.path.dirname(best['url']),
size, size))
if not args.simple:
print('Testing download speed', end='')
dlspeed = downloadSpeed(urls, args.simple)
if not args.simple:
print()
print('Download: %0.2f Mbit/s' % ((dlspeed / 1000 / 1000) * 8))
sizesizes = [int(.25 * 1000 * 1000), int(.5 * 1000 * 1000)]
sizes = []
for size in sizesizes:
for i in range(0, 25):
sizes.append(size)
if not args.simple:
print('Testing upload speed', end='')
ulspeed = uploadSpeed(best['url'], sizes, args.simple)
if not args.simple:
print()
print('Upload: %0.2f Mbit/s' % ((ulspeed / 1000 / 1000) * 8))
if args.share:
dlspeedk = int(round((dlspeed / 1000) * 8, 0))
ping = int(round(best['latency'], 0))
ulspeedk = int(round((ulspeed / 1000) * 8, 0))
apiData = [
'download=%s' % dlspeedk,
'ping=%s' % ping,
'upload=%s' % ulspeedk,
'promo=',
'startmode=%s' % 'pingselect',
'recommendedserverid=%s' % best['id'],
'accuracy=%s' % 1,
'serverid=%s' % best['id'],
'hash=%s' % hashlib.md5(('%s-%s-%s-%s' %
(ping, ulspeedk, dlspeedk, '297aae72'))
.encode()).hexdigest()]
req = urllib.request.Request('http://www.speedtest.net/api/api.php',
data='&'.join(apiData).encode())
req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf')
f = urllib.request.urlopen(req)
response = f.read()
code = f.getcode()
f.close()
if int(code) != 200:
print('Could not submit results to speedtest.net')
sys.exit(1)
qsargs = urllib.parse.parse_qs(response)
resultid = qsargs.get(b'resultid')
if not resultid or len(resultid) != 1:
print('Could not submit results to speedtest.net')
sys.exit(1)
print('Share results: http://www.speedtest.net/result/%s.png' %
resultid[0].decode())
if __name__ == '__main__':
try:
speedtest()
except KeyboardInterrupt:
print('\nCancelling...')
# vim:ts=4:sw=4:expandtab