First pass at adding some command line options, the default is not to automatically generate the share link

This commit is contained in:
Matt Martz 2013-01-24 14:40:29 -06:00
parent a2f2a46f6f
commit 54505e6edd
2 changed files with 173 additions and 95 deletions

View File

@ -31,6 +31,10 @@ try:
from hashlib import md5
except ImportError:
from md5 import md5
try:
from argparse import ArgumentParser as ArgParser
except ImportError:
from optparse import OptionParser as ArgParser
def distance(origin, destination):
@ -74,7 +78,7 @@ class FileGetter(threading.Thread):
self.result = ''
def downloadSpeed(files):
def downloadSpeed(files, quiet=False):
start = time.time()
def producer(q, files):
@ -82,8 +86,9 @@ def downloadSpeed(files):
thread = FileGetter(file, start)
thread.start()
q.put(thread, True)
sys.stdout.write('.')
sys.stdout.flush()
if not quiet:
sys.stdout.write('.')
sys.stdout.flush()
finished = []
@ -130,7 +135,7 @@ class FilePutter(threading.Thread):
self.result = ''
def uploadSpeed(url, sizes):
def uploadSpeed(url, sizes, quiet=False):
start = time.time()
def producer(q, sizes):
@ -138,8 +143,9 @@ def uploadSpeed(url, sizes):
thread = FilePutter(url, start, size)
thread.start()
q.put(thread, True)
sys.stdout.write('.')
sys.stdout.flush()
if not quiet:
sys.stdout.write('.')
sys.stdout.flush()
finished = []
@ -244,15 +250,47 @@ def getBestServer(servers):
def speedtest():
"""Run the full speedtest.net test"""
print 'Retrieving speedtest.net configuration...'
description = (
'Command line interface for testing internet bandwidth using '
'speedtest.net.\n'
'------------------------------------------------------------'
'--------------\n'
'https://github.com/sivel/speedtest-cli')
parser = ArgParser(description=description)
try:
parser.add_argument = parser.add_option
except AttributeError:
pass
parser.add_argument('--share', action='store_true',
help='Generate and provide a URL to the speedtest.net '
'share results image')
parser.add_argument('--simple', action='store_true',
help='Suppress verbose output, only show basic '
'information')
options = parser.parse_args()
if isinstance(options, tuple):
args = options[0]
else:
args = options
del options
if not args.simple:
print 'Retrieving speedtest.net configuration...'
config = getConfig()
print 'Retrieving speedtest.net server list...'
if not args.simple:
print 'Retrieving speedtest.net server list...'
servers = closestServers(config['client'])
print 'Selecting best server based on ping...'
if not args.simple:
print 'Selecting best server based on ping...'
best = getBestServer(servers)
print 'Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best
if not args.simple:
print 'Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best
else:
print 'Ping: %(latency)sms' % best
sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000]
urls = []
@ -260,55 +298,63 @@ def speedtest():
for i in xrange(0, 4):
urls.append('%s/random%sx%s.jpg' %
(os.path.dirname(best['url']), size, size))
print 'Testing download speed',
dlspeed = downloadSpeed(urls)
print '\nDownload: %s Mbit/s' % round((dlspeed / 1024 / 1024) * 8, 2)
if not args.simple:
print 'Testing download speed',
dlspeed = downloadSpeed(urls, args.simple)
if not args.simple:
print
print 'Download: %s Mbit/s' % round((dlspeed / 1024 / 1024) * 8, 2)
sizesizes = [int(.25 * 1024 * 1024), int(.5 * 1024 * 1024)]
sizes = []
for size in sizesizes:
for i in xrange(0, 25):
sizes.append(size)
print 'Testing upload speed',
ulspeed = uploadSpeed(best['url'], sizes)
print '\nUpload speed: %s Mbit/s' % round((ulspeed / 1024 / 1024) * 8, 2)
if not args.simple:
print 'Testing upload speed',
ulspeed = uploadSpeed(best['url'], sizes, args.simple)
if not args.simple:
print
print 'Upload: %s Mbit/s' % round((ulspeed / 1024 / 1024) * 8, 2)
dlspeedk = int(round((dlspeed / 1024) * 8, 0))
ping = int(round(best['latency'], 0))
ulspeedk = int(round((ulspeed / 1024) * 8, 0))
if args.share:
dlspeedk = int(round((dlspeed / 1024) * 8, 0))
ping = int(round(best['latency'], 0))
ulspeedk = int(round((ulspeed / 1024) * 8, 0))
apiData = [
'download=%s' % dlspeedk,
'ping=%s' % ping,
'upload=%s' % ulspeedk,
'promo=',
'startmode=%s' % 'pingselect',
'recommendedserverid=%s' % best['id'],
'accuracy=%s' % 1,
'serverid=%s' % best['id'],
'hash=%s' % md5('%s-%s-%s-%s' %
(ping, ulspeedk, dlspeedk, '297aae72')
).hexdigest()]
apiData = [
'download=%s' % dlspeedk,
'ping=%s' % ping,
'upload=%s' % ulspeedk,
'promo=',
'startmode=%s' % 'pingselect',
'recommendedserverid=%s' % best['id'],
'accuracy=%s' % 1,
'serverid=%s' % best['id'],
'hash=%s' % md5('%s-%s-%s-%s' %
(ping, ulspeedk, dlspeedk, '297aae72')
).hexdigest()]
req = urllib2.Request('http://www.speedtest.net/api/api.php',
data='&'.join(apiData))
req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf')
f = urllib2.urlopen(req)
response = f.read()
code = f.code
f.close()
req = urllib2.Request('http://www.speedtest.net/api/api.php',
data='&'.join(apiData))
req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf')
f = urllib2.urlopen(req)
response = f.read()
code = f.code
f.close()
if int(code) != 200:
print 'Could not submit results to speedtest.net'
sys.exit(1)
if int(code) != 200:
print 'Could not submit results to speedtest.net'
sys.exit(1)
qsargs = parse_qs(response)
resultid = qsargs.get('resultid')
if not resultid or len(resultid) != 1:
print 'Could not submit results to speedtest.net'
sys.exit(1)
qsargs = parse_qs(response)
resultid = qsargs.get('resultid')
if not resultid or len(resultid) != 1:
print 'Could not submit results to speedtest.net'
sys.exit(1)
print 'Share results: http://www.speedtest.net/result/%s.png' % resultid[0]
print ('Share results: http://www.speedtest.net/result/%s.png' %
resultid[0])
if __name__ == '__main__':
speedtest()

View File

@ -26,6 +26,7 @@ import sys
import hashlib
import threading
import binascii
import argparse
from queue import Queue
@ -70,7 +71,7 @@ class FileGetter(threading.Thread):
self.result = ''
def downloadSpeed(files):
def downloadSpeed(files, quiet=False):
start = time.time()
def producer(q, files):
@ -78,8 +79,9 @@ def downloadSpeed(files):
thread = FileGetter(file, start)
thread.start()
q.put(thread, True)
sys.stdout.write('.')
sys.stdout.flush()
if not quiet:
sys.stdout.write('.')
sys.stdout.flush()
finished = []
@ -127,7 +129,7 @@ class FilePutter(threading.Thread):
self.result = ''
def uploadSpeed(url, sizes):
def uploadSpeed(url, sizes, quiet=False):
start = time.time()
def producer(q, sizes):
@ -135,8 +137,9 @@ def uploadSpeed(url, sizes):
thread = FilePutter(url, start, size)
thread.start()
q.put(thread, True)
sys.stdout.write('.')
sys.stdout.flush()
if not quiet:
sys.stdout.write('.')
sys.stdout.flush()
finished = []
@ -235,15 +238,37 @@ def getBestServer(servers):
def speedtest():
"""Run the full speedtest.net test"""
print('Retrieving speedtest.net configuration...')
description = (
'Command line interface for testing internet bandwidth using '
'speedtest.net.\n'
'------------------------------------------------------------'
'--------------\n'
'https://github.com/sivel/speedtest-cli')
parser = argparse.ArgumentParser(description=description)
parser.add_argument('--share', action='store_true',
help='Generate and provide a URL to the speedtest.net '
'share results image')
parser.add_argument('--simple', action='store_true',
help='Suppress verbose output, only show basic '
'information')
args = parser.parse_args()
if not args.simple:
print('Retrieving speedtest.net configuration...')
config = getConfig()
print('Retrieving speedtest.net server list...')
if not args.simple:
print('Retrieving speedtest.net server list...')
servers = closestServers(config['client'])
print('Selecting best server based on ping...')
if not args.simple:
print('Selecting best server based on ping...')
best = getBestServer(servers)
print('Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best)
if not args.simple:
print('Hosted by %(sponsor)s (%(name)s): %(latency)sms' % best)
else:
print('Ping: %(latency)sms' % best)
sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000]
urls = []
@ -251,56 +276,63 @@ def speedtest():
for i in range(0, 4):
urls.append('%s/random%sx%s.jpg' % (os.path.dirname(best['url']),
size, size))
print('Testing download speed', end='')
dlspeed = downloadSpeed(urls)
print('\nDownload: %s Mbit/s' % round((dlspeed / 1024 / 1024) * 8, 2))
if not args.simple:
print('Testing download speed', end='')
dlspeed = downloadSpeed(urls, args.simple)
if not args.simple:
print()
print('Download: %s Mbit/s' % round((dlspeed / 1024 / 1024) * 8, 2))
sizesizes = [int(.25 * 1024 * 1024), int(.5 * 1024 * 1024)]
sizes = []
for size in sizesizes:
for i in range(0, 25):
sizes.append(size)
print('Testing upload speed', end='')
ulspeed = uploadSpeed(best['url'], sizes)
print('\nUpload speed: %s Mbit/s' % round((ulspeed / 1024 / 1024) * 8, 2))
if not args.simple:
print('Testing upload speed', end='')
ulspeed = uploadSpeed(best['url'], sizes, args.simple)
if not args.simple:
print()
print('Upload speed: %s Mbit/s' % round((ulspeed / 1024 / 1024) * 8, 2))
dlspeedk = int(round((dlspeed / 1024) * 8, 0))
ping = int(round(best['latency'], 0))
ulspeedk = int(round((ulspeed / 1024) * 8, 0))
if args.share:
dlspeedk = int(round((dlspeed / 1024) * 8, 0))
ping = int(round(best['latency'], 0))
ulspeedk = int(round((ulspeed / 1024) * 8, 0))
apiData = [
'download=%s' % dlspeedk,
'ping=%s' % ping,
'upload=%s' % ulspeedk,
'promo=',
'startmode=%s' % 'pingselect',
'recommendedserverid=%s' % best['id'],
'accuracy=%s' % 1,
'serverid=%s' % best['id'],
'hash=%s' % hashlib.md5(('%s-%s-%s-%s' %
(ping, ulspeedk, dlspeedk, '297aae72'))
.encode()).hexdigest()]
apiData = [
'download=%s' % dlspeedk,
'ping=%s' % ping,
'upload=%s' % ulspeedk,
'promo=',
'startmode=%s' % 'pingselect',
'recommendedserverid=%s' % best['id'],
'accuracy=%s' % 1,
'serverid=%s' % best['id'],
'hash=%s' % hashlib.md5(('%s-%s-%s-%s' %
(ping, ulspeedk, dlspeedk, '297aae72'))
.encode()).hexdigest()]
req = urllib.request.Request('http://www.speedtest.net/api/api.php',
data='&'.join(apiData).encode())
req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf')
f = urllib.request.urlopen(req)
response = f.read()
code = f.getcode()
f.close()
req = urllib.request.Request('http://www.speedtest.net/api/api.php',
data='&'.join(apiData).encode())
req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf')
f = urllib.request.urlopen(req)
response = f.read()
code = f.getcode()
f.close()
if int(code) != 200:
print('Could not submit results to speedtest.net')
sys.exit(1)
if int(code) != 200:
print('Could not submit results to speedtest.net')
sys.exit(1)
qsargs = urllib.parse.parse_qs(response)
resultid = qsargs.get(b'resultid')
if not resultid or len(resultid) != 1:
print('Could not submit results to speedtest.net')
sys.exit(1)
qsargs = urllib.parse.parse_qs(response)
resultid = qsargs.get(b'resultid')
if not resultid or len(resultid) != 1:
print('Could not submit results to speedtest.net')
sys.exit(1)
print('Share results: http://www.speedtest.net/result/%s.png' % resultid[0]
.decode())
print('Share results: http://www.speedtest.net/result/%s.png' %
resultid[0].decode())
if __name__ == '__main__':
speedtest()