Make it compatible with Python 2.4-2.7 and Python 3.

This commit is contained in:
Grey Lee 2013-01-13 03:15:05 +08:00
parent 9e1a582826
commit 2b8d672812
3 changed files with 80 additions and 356 deletions

View File

@ -1,9 +1,3 @@
# speedtest-cli
Command line interface for testing internet bandwidth using speedtest.net
## Versions
speedtest-cli is written for use with Python 2.6-2.7
speedtest-cli-3 is written for use with Python 3

125
speedtest-cli Executable file → Normal file
View File

@ -15,22 +15,56 @@
# License for the specific language governing permissions and limitations
# under the License.
import urllib2
try:
from urlparse import parse_qs
except ImportError:
from cgi import parse_qs
import math
from xml.dom import minidom as DOM
import time
import os
import sys
import math
import time
from xml.dom import minidom as DOM
try:
from urllib.request import urlopen, Request
except ImportError:
from urllib2 import urlopen, Request
try:
from urllib.parse import parse_qs
except ImportError:
try:
from urllib2 import parse_qs
except ImportError:
from cgi import parse_qs
try:
from hashlib import md5
except:
except ImportError:
from md5 import md5
import threading
from Queue import Queue
try:
from queue import Queue
except ImportError:
from Queue import Queue
def print3(*objects, **kwargs):
"""print3(value, ..., sep=' ', end='\n', file=sys.stdout, flush=False)
Prints the values to a stream, or to sys.stdout by default.
Optional keyword arguments:
file: a file-like object (stream); defaults to the current sys.stdout.
sep: string inserted between values, default a space.
end: string appended after the last value, default a newline.
flush: whether to forcibly flush the stream."""
sep = kwargs.get('sep', ' ')
end = kwargs.get('end', '\n')
fp = kwargs.get('file', sys.stdout)
flush = kwargs.get('flush', False)
fp.write(sep.join([str(o) for o in objects]))
fp.write(end)
if flush:
fp.flush()
def distance(origin, destination):
@ -64,7 +98,7 @@ class FileGetter(threading.Thread):
def run(self):
try:
if (time.time() - self.starttime) <= 10:
f = urllib2.urlopen(self.url)
f = urlopen(self.url)
contents = f.read()
f.close()
self.result = contents
@ -82,8 +116,7 @@ def downloadSpeed(files):
thread = FileGetter(file, start)
thread.start()
q.put(thread, True)
sys.stdout.write('.')
sys.stdout.flush()
print3('.', end='', flush=True)
finished = []
@ -101,7 +134,7 @@ def downloadSpeed(files):
cons_thread.start()
prod_thread.join()
cons_thread.join()
return (len(''.join(finished))/(time.time()-start))
return (sum(map(len, finished))/(time.time()-start))
class FilePutter(threading.Thread):
@ -118,7 +151,9 @@ class FilePutter(threading.Thread):
def run(self):
try:
if (time.time() - self.starttime) <= 10:
f = urllib2.urlopen(self.url, self.data)
# convert URL to bytestring to avoid UnicodeDecodeError
# see http://bugs.python.org/issue12398
f = urlopen(str(self.url), self.data)
contents = f.read()
f.close()
self.result = self.data
@ -136,8 +171,7 @@ def uploadSpeed(url, sizes):
thread = FilePutter(url, start, size)
thread.start()
q.put(thread, True)
sys.stdout.write('.')
sys.stdout.flush()
print3('.', end='', flush=True)
finished = []
@ -155,12 +189,12 @@ def uploadSpeed(url, sizes):
cons_thread.start()
prod_thread.join()
cons_thread.join()
return (len(''.join(finished))/(time.time()-start))
return (sum(map(len, finished))/(time.time()-start))
def getAttributesByTagName(dom, tagName):
elem = dom.getElementsByTagName(tagName)[0]
return dict(elem.attributes.items())
return dict(list(elem.attributes.items()))
def getConfig():
@ -168,7 +202,7 @@ def getConfig():
we are interested in
"""
uh = urllib2.urlopen('http://www.speedtest.net/speedtest-config.php')
uh = urlopen('http://www.speedtest.net/speedtest-config.php')
configxml = uh.read()
if int(uh.code) != 200:
return None
@ -189,7 +223,7 @@ def closestServers(client):
distance
"""
uh = urllib2.urlopen('http://speedtest.net/speedtest-servers.php')
uh = urlopen('http://speedtest.net/speedtest-servers.php')
serversxml = uh.read()
if int(uh.code) != 200:
return None
@ -197,7 +231,7 @@ def closestServers(client):
root = DOM.parseString(serversxml)
servers = {}
for server in root.getElementsByTagName('server'):
attrib = dict(server.attributes.items())
attrib = dict(list(server.attributes.items()))
d = distance([float(client['lat']), float(client['lon'])],
[float(attrib.get('lat')), float(attrib.get('lon'))])
servers[d] = attrib
@ -220,12 +254,12 @@ def getBestServer(servers):
for server in servers:
cum = 0
url = os.path.dirname(server['url'])
for i in xrange(0, 3):
uh = urllib2.urlopen('%s/latency.txt' % url)
for i in range(0, 3):
uh = urlopen('%s/latency.txt' % url)
start = time.time()
text = uh.read().strip()
total = time.time() - start
if int(uh.code) == 200 and text == 'test=test':
if int(uh.code) == 200 and text == 'test=test'.encode():
cum += total
else:
cum += 3600
@ -242,34 +276,34 @@ def getBestServer(servers):
def speedtest():
"""Run the full speedtest.net test"""
print 'Retrieving speedtest.net configuration...'
print3('Retrieving speedtest.net configuration...')
config = getConfig()
print 'Retrieving speedtest.net server list...'
print3('Retrieving speedtest.net server list...')
servers = closestServers(config['client'])
print 'Selecting best server based on ping...'
print3('Selecting best server based on ping...')
best = getBestServer(servers)
print 'Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best
print3('Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best)
sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000]
urls = []
for size in sizes:
for i in xrange(0, 4):
for i in range(0, 4):
urls.append('%s/random%sx%s.jpg' %
(os.path.dirname(best['url']), size, size))
print 'Testing download speed',
print3('Testing download speed', end='')
dlspeed = downloadSpeed(urls)
print '\nDownload: %s Mbit/s' % round((dlspeed / 1024 / 1024) * 8, 2)
print3('\nDownload: %s Mbit/s' % round((dlspeed / 1024 / 1024) * 8, 2))
sizesizes = [int(.25 * 1024 * 1024), int(.5 * 1024 * 1024)]
sizes = []
for size in sizesizes:
for i in xrange(0, 25):
for i in range(0, 25):
sizes.append(size)
print 'Testing upload speed',
print3('Testing upload speed', end='')
ulspeed = uploadSpeed(best['url'], sizes)
print '\nUpload speed: %s Mbit/s' % round((ulspeed / 1024 / 1024) * 8, 2)
print3('\nUpload speed: %s Mbit/s' % round((ulspeed / 1024 / 1024) * 8, 2))
dlspeedk = int(round((dlspeed / 1024) * 8, 0))
ping = int(round(best['latency'], 0))
@ -284,29 +318,30 @@ def speedtest():
'recommendedserverid=%s' % best['id'],
'accuracy=%s' % 1,
'serverid=%s' % best['id'],
'hash=%s' % md5('%s-%s-%s-%s' %
(ping, ulspeedk, dlspeedk, '297aae72')
).hexdigest()]
'hash=%s' % md5(('%s-%s-%s-%s' %
(ping, ulspeedk, dlspeedk, '297aae72'))
.encode()).hexdigest()]
req = urllib2.Request('http://www.speedtest.net/api/api.php',
data='&'.join(apiData))
req = Request('http://www.speedtest.net/api/api.php',
data='&'.join(apiData).encode())
req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf')
f = urllib2.urlopen(req)
f = urlopen(req)
response = f.read()
code = f.code
f.close()
if int(code) != 200:
print 'Could not submit results to speedtest.net'
print3('Could not submit results to speedtest.net')
sys.exit(1)
qsargs = parse_qs(response)
qsargs = parse_qs(response.decode())
resultid = qsargs.get('resultid')
if not resultid or len(resultid) != 1:
print 'Could not submit results to speedtest.net'
print3('Could not submit results to speedtest.net')
sys.exit(1)
print 'Share results: http://www.speedtest.net/result/%s.png' % resultid[0]
print3('Share results: http://www.speedtest.net/result/%s.png' %
resultid[0])
if __name__ == '__main__':
speedtest()

View File

@ -1,305 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2013 Matt Martz
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import urllib.request
import urllib.parse
import urllib.error
import math
import xml.etree.ElementTree as ET
import time
import os
import sys
import hashlib
import threading
from queue import Queue
def distance(origin, destination):
"""Determine distance between 2 sets of [lat,lon] in km"""
lat1, lon1 = origin
lat2, lon2 = destination
radius = 6371 # km
dlat = math.radians(lat2-lat1)
dlon = math.radians(lon2-lon1)
a = (math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.radians(lat1))
* math.cos(math.radians(lat2)) * math.sin(dlon / 2)
* math.sin(dlon / 2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
d = radius * c
return d
class FileGetter(threading.Thread):
def __init__(self, url, start):
self.url = url
self.result = None
self.starttime = start
threading.Thread.__init__(self)
def get_result(self):
return self.result
def run(self):
try:
if (time.time() - self.starttime) <= 10:
f = urllib.request.urlopen(self.url)
contents = f.read()
f.close()
self.result = contents
else:
self.result = ''
except IOError:
self.result = ''
def downloadSpeed(files):
start = time.time()
def producer(q, files):
for file in files:
thread = FileGetter(file, start)
thread.start()
q.put(thread, True)
sys.stdout.write('.')
sys.stdout.flush()
finished = []
def consumer(q, total_files):
while len(finished) < total_files:
thread = q.get(True)
thread.join()
finished.append(thread.get_result())
q = Queue(3)
start = time.time()
prod_thread = threading.Thread(target=producer, args=(q, files))
cons_thread = threading.Thread(target=consumer, args=(q, len(files)))
prod_thread.start()
cons_thread.start()
prod_thread.join()
cons_thread.join()
return (len(b''.join([chunk if isinstance(chunk, bytes) else chunk.encode()
for chunk in finished]))/(time.time()-start))
class FilePutter(threading.Thread):
def __init__(self, url, start, size):
self.url = url
self.data = os.urandom(int(size)-len('content1='))
self.result = None
self.starttime = start
threading.Thread.__init__(self)
def get_result(self):
return self.result
def run(self):
try:
if (time.time() - self.starttime) <= 10:
f = urllib.request.urlopen(self.url, self.data)
contents = f.read()
f.close()
self.result = self.data
else:
self.result = ''
except IOError:
self.result = ''
def uploadSpeed(url, sizes):
start = time.time()
def producer(q, sizes):
for size in sizes:
thread = FilePutter(url, start, size)
thread.start()
q.put(thread, True)
sys.stdout.write('.')
sys.stdout.flush()
finished = []
def consumer(q, total_sizes):
while len(finished) < total_sizes:
thread = q.get(True)
thread.join()
finished.append(thread.get_result())
q = Queue(9)
start = time.time()
prod_thread = threading.Thread(target=producer, args=(q, sizes))
cons_thread = threading.Thread(target=consumer, args=(q, len(sizes)))
prod_thread.start()
cons_thread.start()
prod_thread.join()
cons_thread.join()
return (len(b''.join([chunk if isinstance(chunk, bytes) else chunk.encode()
for chunk in finished]))/(time.time()-start))
def getConfig():
"""Download the speedtest.net configuration and return only the data
we are interested in
"""
uh = urllib.request.urlopen('http://www.speedtest.net/'
'speedtest-config.php')
configxml = uh.read()
if int(uh.getcode()) != 200:
return None
uh.close()
root = ET.fromstring(configxml)
config = {
'client': root.find('client').attrib,
'times': root.find('times').attrib,
'download': root.find('download').attrib,
'upload': root.find('upload').attrib}
return config
def closestServers(client):
"""Determine the 5 closest speedtest.net servers based on geographic
distance
"""
uh = urllib.request.urlopen('http://speedtest.net/speedtest-servers.php')
serversxml = uh.read()
if int(uh.getcode()) != 200:
return None
uh.close()
root = ET.fromstring(serversxml)
servers = {}
for server in root[0]:
d = distance([float(client['lat']), float(client['lon'])],
[float(server.get('lat')), float(server.get('lon'))])
servers[d] = server.attrib
closest = []
for d in sorted(servers.keys())[0:4]:
closest.append(servers[d])
del servers
del root
return closest
def getBestServer(servers):
"""Perform a speedtest.net "ping" to determine which speedtest.net
server has the lowest latency
"""
results = {}
for server in servers:
cum = 0
url = os.path.dirname(server['url'])
for i in range(0, 3):
uh = urllib.request.urlopen('%s/latency.txt' % url)
start = time.time()
text = uh.read().strip()
total = time.time() - start
if int(uh.getcode()) == 200 and text == b'test=test':
cum += total
else:
cum += 3600
uh.close()
avg = round((cum / 3) * 1000000, 3)
results[avg] = server
best = results[sorted(results.keys())[0]]
best['latency'] = avg
return best
def speedtest():
"""Run the full speedtest.net test"""
print('Retrieving speedtest.net configuration...')
config = getConfig()
print('Retrieving speedtest.net server list...')
servers = closestServers(config['client'])
print('Selecting best server based on ping...')
best = getBestServer(servers)
print('Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best)
sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000]
urls = []
for size in sizes:
for i in range(0, 4):
urls.append('%s/random%sx%s.jpg' % (os.path.dirname(best['url']),
size, size))
print('Testing download speed', end='')
dlspeed = downloadSpeed(urls)
print('\nDownload: %s Mbit/s' % round((dlspeed / 1024 / 1024) * 8, 2))
sizesizes = [int(.25 * 1024 * 1024), int(.5 * 1024 * 1024)]
sizes = []
for size in sizesizes:
for i in range(0, 25):
sizes.append(size)
print('Testing upload speed', end='')
ulspeed = uploadSpeed(best['url'], sizes)
print('\nUpload speed: %s Mbit/s' % round((ulspeed / 1024 / 1024) * 8, 2))
dlspeedk = int(round((dlspeed / 1024) * 8, 0))
ping = int(round(best['latency'], 0))
ulspeedk = int(round((ulspeed / 1024) * 8, 0))
apiData = [
'download=%s' % dlspeedk,
'ping=%s' % ping,
'upload=%s' % ulspeedk,
'promo=',
'startmode=%s' % 'pingselect',
'recommendedserverid=%s' % best['id'],
'accuracy=%s' % 1,
'serverid=%s' % best['id'],
'hash=%s' % hashlib.md5(('%s-%s-%s-%s' %
(ping, ulspeedk, dlspeedk, '297aae72'))
.encode()).hexdigest()]
req = urllib.request.Request('http://www.speedtest.net/api/api.php',
data='&'.join(apiData).encode())
req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf')
f = urllib.request.urlopen(req)
response = f.read()
code = f.getcode()
f.close()
if int(code) != 200:
print('Could not submit results to speedtest.net')
sys.exit(1)
qsargs = urllib.parse.parse_qs(response)
resultid = qsargs.get(b'resultid')
if not resultid or len(resultid) != 1:
print('Could not submit results to speedtest.net')
sys.exit(1)
print('Share results: http://www.speedtest.net/result/%s.png' % resultid[0]
.decode())
if __name__ == '__main__':
speedtest()
# vim:ts=2:sw=2:expandtab