Added feature to select custom server from url

This commit is contained in:
edseldim 2022-07-02 17:22:57 -05:00
parent 22210ca352
commit dcaf787080
2 changed files with 82 additions and 4 deletions

3
.gitignore vendored
View File

@ -25,3 +25,6 @@ pip-log.txt
#Mr Developer
.mr.developer.cfg
#Environment
speedtest-env

View File

@ -29,6 +29,8 @@ import threading
import timeit
import xml.parsers.expat
import requests
try:
import gzip
GZIP_BASE = gzip.GzipFile
@ -1237,7 +1239,21 @@ class Speedtest(object):
return self.config
def get_servers(self, servers=None, exclude=None):
def json_to_xml(self,json_url=None):
if json_url:
r = requests.get(json_url)
if r.status_code == 200:
json_data = json.loads(r.text)[0]
message = '<?xml version="1.0" encoding="UTF-8"?>\n<settings>\n<servers>'
try:
message += f'<server url="{json_data["url"]}" lat="{json_data["lat"]}" lon="{json_data["lon"]}" name="{json_data["name"]}" country="{json_data["country"]}" cc="{json_data["cc"]}" sponsor="{json_data["sponsor"]}" id="{json_data["id"]}" host="{json_data["host"]}" />'
except (KeyError,SyntaxError) as e:
pass
message += "\n</servers>\n</settings>\n"
return message.replace("&","").encode()
def get_servers(self, servers=None, exclude=None, custom_server=None):
"""Retrieve a the list of speedtest.net servers, optionally filtered
to servers matching those specified in the ``servers`` argument
"""
@ -1270,6 +1286,65 @@ class Speedtest(object):
headers['Accept-Encoding'] = 'gzip'
errors = []
if custom_server:
serversxml = "".encode().join([self.json_to_xml(custom_server)])
try:
try:
try:
root = ET.fromstring(serversxml)
except ET.ParseError:
e = get_exception()
raise SpeedtestServersError(
'Malformed speedtest.net server list: %s' % e
)
elements = etree_iter(root, 'server')
except AttributeError:
try:
root = DOM.parseString(serversxml)
except ExpatError:
e = get_exception()
raise SpeedtestServersError(
'Malformed speedtest.net server list: %s' % e
)
elements = root.getElementsByTagName('server')
except (SyntaxError, xml.parsers.expat.ExpatError):
raise ServersRetrievalError()
for server in elements:
try:
attrib = server.attrib
except AttributeError:
attrib = dict(list(server.attributes.items()))
if servers and int(attrib.get('id')) not in servers:
continue
if (int(attrib.get('id')) in self.config['ignore_servers']
or int(attrib.get('id')) in exclude):
continue
try:
d = distance(self.lat_lon,
(float(attrib.get('lat')),
float(attrib.get('lon'))))
except Exception:
continue
attrib['d'] = d
try:
self.servers[d].append(attrib)
except KeyError:
self.servers[d] = [attrib]
except ServersRetrievalError:
pass
if (servers or exclude) and not self.servers:
raise NoMatchedServers()
return self.servers
for url in urls:
try:
request = build_request(
@ -1299,9 +1374,8 @@ class Speedtest(object):
if int(uh.code) != 200:
raise ServersRetrievalError()
serversxml = ''.encode().join(serversxml_list)
print(serversxml)
printer('Servers XML:\n%s' % serversxml, debug=True)
try:
@ -1784,6 +1858,7 @@ def parse_args():
help='Show the version number and exit')
parser.add_argument('--debug', action='store_true',
help=ARG_SUPPRESS, default=ARG_SUPPRESS)
parser.add_argument('--custom',help="Test with a custom server using its link")
options = parser.parse_args()
if isinstance(options, tuple):
@ -1916,7 +1991,7 @@ def shell():
if not args.mini:
printer('Retrieving speedtest.net server list...', quiet)
try:
speedtest.get_servers(servers=args.server, exclude=args.exclude)
speedtest.get_servers(servers=args.server, exclude=args.exclude,custom_server=args.custom)
except NoMatchedServers:
raise SpeedtestCLIError(
'No matched servers: %s' %