Added static option for static server list

This commit is contained in:
Nils Gerke 2020-10-30 09:51:35 +01:00
parent c58ad3367b
commit 67985db684
1 changed files with 108 additions and 98 deletions

View File

@ -17,6 +17,7 @@
import os
import re
import json
import csv
import sys
import math
@ -1224,132 +1225,139 @@ class Speedtest(object):
return self.config
def get_servers(self, servers=None, exclude=None):
def get_servers(self, servers=None, exclude=None, servers_json=None):
"""Retrieve a the list of speedtest.net servers, optionally filtered
to servers matching those specified in the ``servers`` argument
"""
if servers is None:
servers = []
if exclude is None:
exclude = []
if servers_json is None:
if servers is None:
servers = []
self.servers.clear()
if exclude is None:
exclude = []
for server_list in (servers, exclude):
for i, s in enumerate(server_list):
self.servers.clear()
for server_list in (servers, exclude):
for i, s in enumerate(server_list):
try:
server_list[i] = int(s)
except ValueError:
raise InvalidServerIDType(
'%s is an invalid server type, must be int' % s
)
urls = [
'://www.speedtest.net/speedtest-servers-static.php',
'http://c.speedtest.net/speedtest-servers-static.php',
'://www.speedtest.net/speedtest-servers.php',
'http://c.speedtest.net/speedtest-servers.php',
]
headers = {}
if gzip:
headers['Accept-Encoding'] = 'gzip'
errors = []
for url in urls:
try:
server_list[i] = int(s)
except ValueError:
raise InvalidServerIDType(
'%s is an invalid server type, must be int' % s
request = build_request(
'%s?threads=%s' % (url,
self.config['threads']['download']),
headers=headers,
secure=self._secure
)
uh, e = catch_request(request, opener=self._opener)
if e:
errors.append('%s' % e)
raise ServersRetrievalError()
urls = [
'://www.speedtest.net/speedtest-servers-static.php',
'http://c.speedtest.net/speedtest-servers-static.php',
'://www.speedtest.net/speedtest-servers.php',
'http://c.speedtest.net/speedtest-servers.php',
]
stream = get_response_stream(uh)
headers = {}
if gzip:
headers['Accept-Encoding'] = 'gzip'
serversxml_list = []
while 1:
try:
serversxml_list.append(stream.read(1024))
except (OSError, EOFError):
raise ServersRetrievalError(get_exception())
if len(serversxml_list[-1]) == 0:
break
errors = []
for url in urls:
try:
request = build_request(
'%s?threads=%s' % (url,
self.config['threads']['download']),
headers=headers,
secure=self._secure
)
uh, e = catch_request(request, opener=self._opener)
if e:
errors.append('%s' % e)
raise ServersRetrievalError()
stream.close()
uh.close()
stream = get_response_stream(uh)
if int(uh.code) != 200:
raise ServersRetrievalError()
serversxml_list = []
while 1:
try:
serversxml_list.append(stream.read(1024))
except (OSError, EOFError):
raise ServersRetrievalError(get_exception())
if len(serversxml_list[-1]) == 0:
break
serversxml = ''.encode().join(serversxml_list)
stream.close()
uh.close()
printer('Servers XML:\n%s' % serversxml, debug=True)
if int(uh.code) != 200:
raise ServersRetrievalError()
serversxml = ''.encode().join(serversxml_list)
printer('Servers XML:\n%s' % serversxml, debug=True)
try:
try:
try:
root = ET.fromstring(serversxml)
except ET.ParseError:
e = get_exception()
raise SpeedtestServersError(
'Malformed speedtest.net server list: %s' % e
)
elements = etree_iter(root, 'server')
except AttributeError:
try:
root = ET.fromstring(serversxml)
except ET.ParseError:
e = get_exception()
raise SpeedtestServersError(
'Malformed speedtest.net server list: %s' % e
)
elements = etree_iter(root, 'server')
except AttributeError:
try:
root = DOM.parseString(serversxml)
except ExpatError:
e = get_exception()
raise SpeedtestServersError(
'Malformed speedtest.net server list: %s' % e
)
elements = root.getElementsByTagName('server')
except (SyntaxError, xml.parsers.expat.ExpatError):
raise ServersRetrievalError()
for server in elements:
try:
root = DOM.parseString(serversxml)
except ExpatError:
e = get_exception()
raise SpeedtestServersError(
'Malformed speedtest.net server list: %s' % e
)
elements = root.getElementsByTagName('server')
except (SyntaxError, xml.parsers.expat.ExpatError):
raise ServersRetrievalError()
attrib = server.attrib
except AttributeError:
attrib = dict(list(server.attributes.items()))
for server in elements:
try:
attrib = server.attrib
except AttributeError:
attrib = dict(list(server.attributes.items()))
if servers and int(attrib.get('id')) not in servers:
continue
if servers and int(attrib.get('id')) not in servers:
continue
if (int(attrib.get('id')) in self.config['ignore_servers']
or int(attrib.get('id')) in exclude):
continue
if (int(attrib.get('id')) in self.config['ignore_servers']
or int(attrib.get('id')) in exclude):
continue
try:
d = distance(self.lat_lon,
(float(attrib.get('lat')),
float(attrib.get('lon'))))
except Exception:
continue
try:
d = distance(self.lat_lon,
(float(attrib.get('lat')),
float(attrib.get('lon'))))
except Exception:
continue
attrib['d'] = d
attrib['d'] = d
try:
self.servers[d].append(attrib)
except KeyError:
self.servers[d] = [attrib]
try:
self.servers[d].append(attrib)
except KeyError:
self.servers[d] = [attrib]
break
break
except ServersRetrievalError:
continue
except ServersRetrievalError:
continue
if (servers or exclude) and not self.servers:
raise NoMatchedServers()
if (servers or exclude) and not self.servers:
raise NoMatchedServers()
else:
printer('Loading Servers from:\n%s' % servers_json, debug=True)
with open(servers_json) as json_file:
self.servers = json.load(json_file)
return self.servers
def set_mini_server(self, server):
"""Instead of querying for a list of servers, set a link to a
speedtest mini server
@ -1744,6 +1752,8 @@ def parse_args():
help='Suppress verbose output, only show basic '
'information in JSON format. Speeds listed in '
'bit/s and not affected by --bytes')
parser.add_argument('--load-servers-from-json', dest='servers_json', type=PARSER_TYPE_STR,
help='Serverlist for static testing in json Format')
parser.add_argument('--list', action='store_true',
help='Display a list of speedtest.net servers '
'sorted by distance')
@ -1903,7 +1913,7 @@ def shell():
if not args.mini:
printer('Retrieving speedtest.net server list...', quiet)
try:
speedtest.get_servers(servers=args.server, exclude=args.exclude)
speedtest.get_servers(servers=args.server, exclude=args.exclude, servers_json=args.servers_json)
except NoMatchedServers:
raise SpeedtestCLIError(
'No matched servers: %s' %