speedtest-cli/speedtest-cli-3

358 lines
11 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
2013-01-07 19:29:42 +01:00
# Copyright 2013 Matt Martz
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import urllib.request
import urllib.parse
import urllib.error
import math
import xml.etree.ElementTree as ET
import time
import os
import sys
import hashlib
import threading
import binascii
import argparse
from queue import Queue
def distance(origin, destination):
2013-01-07 20:01:14 +01:00
"""Determine distance between 2 sets of [lat,lon] in km"""
2013-01-07 20:01:14 +01:00
lat1, lon1 = origin
lat2, lon2 = destination
radius = 6371 # km
2013-01-07 20:01:14 +01:00
dlat = math.radians(lat2-lat1)
dlon = math.radians(lon2-lon1)
a = (math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.radians(lat1))
* math.cos(math.radians(lat2)) * math.sin(dlon / 2)
* math.sin(dlon / 2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
d = radius * c
2013-01-07 20:01:14 +01:00
return d
class FileGetter(threading.Thread):
2013-01-07 20:01:14 +01:00
def __init__(self, url, start):
self.url = url
self.result = None
self.starttime = start
threading.Thread.__init__(self)
def get_result(self):
return self.result
def run(self):
try:
if (time.time() - self.starttime) <= 10:
f = urllib.request.urlopen(self.url)
contents = f.read()
f.close()
self.result = contents
else:
self.result = ''
except IOError:
self.result = ''
def downloadSpeed(files, quiet=False):
2013-01-07 20:01:14 +01:00
start = time.time()
def producer(q, files):
for file in files:
thread = FileGetter(file, start)
thread.start()
q.put(thread, True)
if not quiet:
sys.stdout.write('.')
sys.stdout.flush()
2013-01-07 20:01:14 +01:00
finished = []
def consumer(q, total_files):
while len(finished) < total_files:
thread = q.get(True)
thread.join()
finished.append(thread.get_result())
q = Queue(3)
start = time.time()
prod_thread = threading.Thread(target=producer, args=(q, files))
cons_thread = threading.Thread(target=consumer, args=(q, len(files)))
prod_thread.start()
cons_thread.start()
prod_thread.join()
cons_thread.join()
return (len(b''.join([chunk if isinstance(chunk, bytes) else chunk.encode()
for chunk in finished]))/(time.time()-start))
class FilePutter(threading.Thread):
2013-01-07 20:01:14 +01:00
def __init__(self, url, start, size):
self.url = url
data = binascii.hexlify(os.urandom(int(size)-9)).decode()
self.data = 'content1=%s' % data[0:int(size)-9]
del data
2013-01-07 20:01:14 +01:00
self.result = None
self.starttime = start
threading.Thread.__init__(self)
def get_result(self):
return self.result
def run(self):
try:
if (time.time() - self.starttime) <= 10:
f = urllib.request.urlopen(self.url, self.data.encode())
2013-01-07 20:01:14 +01:00
contents = f.read()
f.close()
self.result = self.data
else:
self.result = ''
except IOError:
self.result = ''
def uploadSpeed(url, sizes, quiet=False):
2013-01-07 20:01:14 +01:00
start = time.time()
def producer(q, sizes):
for size in sizes:
thread = FilePutter(url, start, size)
thread.start()
q.put(thread, True)
if not quiet:
sys.stdout.write('.')
sys.stdout.flush()
2013-01-07 20:01:14 +01:00
finished = []
def consumer(q, total_sizes):
while len(finished) < total_sizes:
thread = q.get(True)
thread.join()
finished.append(thread.get_result())
q = Queue(9)
start = time.time()
prod_thread = threading.Thread(target=producer, args=(q, sizes))
cons_thread = threading.Thread(target=consumer, args=(q, len(sizes)))
prod_thread.start()
cons_thread.start()
prod_thread.join()
cons_thread.join()
return (len(b''.join([chunk if isinstance(chunk, bytes) else chunk.encode()
for chunk in finished]))/(time.time()-start))
def getConfig():
2013-01-07 20:01:14 +01:00
"""Download the speedtest.net configuration and return only the data
we are interested in
"""
uh = urllib.request.urlopen('http://www.speedtest.net/'
'speedtest-config.php')
configxml = uh.read()
if int(uh.getcode()) != 200:
return None
uh.close()
root = ET.fromstring(configxml)
config = {
'client': root.find('client').attrib,
'times': root.find('times').attrib,
'download': root.find('download').attrib,
'upload': root.find('upload').attrib}
return config
def closestServers(client):
2013-01-07 20:01:14 +01:00
"""Determine the 5 closest speedtest.net servers based on geographic
distance
"""
uh = urllib.request.urlopen('http://speedtest.net/speedtest-servers.php')
serversxml = uh.read()
if int(uh.getcode()) != 200:
return None
uh.close()
root = ET.fromstring(serversxml)
servers = {}
for server in root[0]:
d = distance([float(client['lat']), float(client['lon'])],
[float(server.get('lat')), float(server.get('lon'))])
if d not in servers:
servers[d] = [server.attrib]
else:
servers[d].append(server.attrib)
2013-01-07 20:01:14 +01:00
closest = []
for d in sorted(servers.keys()):
for s in servers[d]:
closest.append(s)
if(len(closest) == 5):
break
else:
continue
break
2013-01-07 20:01:14 +01:00
del servers
del root
return closest
2013-01-07 20:01:14 +01:00
def getBestServer(servers):
"""Perform a speedtest.net "ping" to determine which speedtest.net
server has the lowest latency
"""
results = {}
for server in servers:
cum = 0
url = os.path.dirname(server['url'])
for i in range(0, 3):
uh = urllib.request.urlopen('%s/latency.txt' % url)
start = time.time()
text = uh.read().strip()
total = time.time() - start
if int(uh.getcode()) == 200 and text == b'test=test':
cum += total
else:
cum += 3600
uh.close()
avg = round((cum / 3) * 1000000, 3)
results[avg] = server
fastest = sorted(results.keys())[0]
best = results[fastest]
best['latency'] = fastest
2013-01-07 20:01:14 +01:00
return best
2013-01-07 20:01:14 +01:00
def speedtest():
"""Run the full speedtest.net test"""
description = (
'Command line interface for testing internet bandwidth using '
'speedtest.net.\n'
'------------------------------------------------------------'
'--------------\n'
'https://github.com/sivel/speedtest-cli')
parser = argparse.ArgumentParser(description=description)
parser.add_argument('--share', action='store_true',
help='Generate and provide a URL to the speedtest.net '
'share results image')
parser.add_argument('--simple', action='store_true',
help='Suppress verbose output, only show basic '
'information')
args = parser.parse_args()
if not args.simple:
print('Retrieving speedtest.net configuration...')
2013-01-07 20:01:14 +01:00
config = getConfig()
if not args.simple:
print('Retrieving speedtest.net server list...')
2013-01-07 20:01:14 +01:00
servers = closestServers(config['client'])
2013-01-25 18:53:45 +01:00
if not args.simple:
client = dict(config['client'].items())
print('Testing From: %(isp)s - %(ip)s' % client)
if not args.simple:
print('Selecting best server based on ping...')
2013-01-07 20:01:14 +01:00
best = getBestServer(servers)
if not args.simple:
print('Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best)
else:
print('Ping: %(latency)sms' % best)
2013-01-07 20:01:14 +01:00
sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000]
urls = []
for size in sizes:
for i in range(0, 4):
urls.append('%s/random%sx%s.jpg' % (os.path.dirname(best['url']),
size, size))
if not args.simple:
print('Testing download speed', end='')
dlspeed = downloadSpeed(urls, args.simple)
if not args.simple:
print()
print('Download: %s Mbit/s' % round((dlspeed / 1024 / 1024) * 8, 2))
2013-01-07 20:01:14 +01:00
sizesizes = [int(.25 * 1024 * 1024), int(.5 * 1024 * 1024)]
sizes = []
for size in sizesizes:
for i in range(0, 25):
sizes.append(size)
if not args.simple:
print('Testing upload speed', end='')
ulspeed = uploadSpeed(best['url'], sizes, args.simple)
if not args.simple:
print()
print('Upload speed: %s Mbit/s' % round((ulspeed / 1024 / 1024) * 8, 2))
if args.share:
dlspeedk = int(round((dlspeed / 1024) * 8, 0))
ping = int(round(best['latency'], 0))
ulspeedk = int(round((ulspeed / 1024) * 8, 0))
apiData = [
'download=%s' % dlspeedk,
'ping=%s' % ping,
'upload=%s' % ulspeedk,
'promo=',
'startmode=%s' % 'pingselect',
'recommendedserverid=%s' % best['id'],
'accuracy=%s' % 1,
'serverid=%s' % best['id'],
'hash=%s' % hashlib.md5(('%s-%s-%s-%s' %
(ping, ulspeedk, dlspeedk, '297aae72'))
.encode()).hexdigest()]
req = urllib.request.Request('http://www.speedtest.net/api/api.php',
data='&'.join(apiData).encode())
req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf')
f = urllib.request.urlopen(req)
response = f.read()
code = f.getcode()
f.close()
if int(code) != 200:
print('Could not submit results to speedtest.net')
sys.exit(1)
qsargs = urllib.parse.parse_qs(response)
resultid = qsargs.get(b'resultid')
if not resultid or len(resultid) != 1:
print('Could not submit results to speedtest.net')
sys.exit(1)
print('Share results: http://www.speedtest.net/result/%s.png' %
resultid[0].decode())
if __name__ == '__main__':
2013-01-25 18:53:57 +01:00
try:
speedtest()
except KeyboardInterrupt:
print('\nCancelling...')
# vim:ts=2:sw=2:expandtab