Commit 8f87d9a3 authored by wbond's avatar wbond

Updated tests to work with both Python 2.6 and 3.3, and made it possible to…

Updated tests to work with both Python 2.6 and 3.3, and made it possible to import them without running them for the upcoming channel repository tools package
parent 974e9a31
# Copyright (c) 2009 Raymond Hettinger
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
from UserDict import DictMixin
class OrderedDict(dict, DictMixin):
def __init__(self, *args, **kwds):
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
try:
self.__end
except AttributeError:
self.clear()
self.update(*args, **kwds)
def clear(self):
self.__end = end = []
end += [None, end, end] # sentinel node for doubly linked list
self.__map = {} # key --> [key, prev, next]
dict.clear(self)
def __setitem__(self, key, value):
if key not in self:
end = self.__end
curr = end[1]
curr[2] = end[1] = self.__map[key] = [key, curr, end]
dict.__setitem__(self, key, value)
def __delitem__(self, key):
dict.__delitem__(self, key)
key, prev, next = self.__map.pop(key)
prev[2] = next
next[1] = prev
def __iter__(self):
end = self.__end
curr = end[2]
while curr is not end:
yield curr[0]
curr = curr[2]
def __reversed__(self):
end = self.__end
curr = end[1]
while curr is not end:
yield curr[0]
curr = curr[1]
def popitem(self, last=True):
if not self:
raise KeyError('dictionary is empty')
if last:
key = reversed(self).next()
else:
key = iter(self).next()
value = self.pop(key)
return key, value
def __reduce__(self):
items = [[k, self[k]] for k in self]
tmp = self.__map, self.__end
del self.__map, self.__end
inst_dict = vars(self).copy()
self.__map, self.__end = tmp
if inst_dict:
return (self.__class__, (items,), inst_dict)
return self.__class__, (items,)
def keys(self):
return list(self)
setdefault = DictMixin.setdefault
update = DictMixin.update
pop = DictMixin.pop
values = DictMixin.values
items = DictMixin.items
iterkeys = DictMixin.iterkeys
itervalues = DictMixin.itervalues
iteritems = DictMixin.iteritems
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, self.items())
def copy(self):
return self.__class__(self)
@classmethod
def fromkeys(cls, iterable, value=None):
d = cls()
for key in iterable:
d[key] = value
return d
def __eq__(self, other):
if isinstance(other, OrderedDict):
if len(self) != len(other):
return False
for p, q in zip(self.items(), other.items()):
if p != q:
return False
return True
return dict.__eq__(self, other)
def __ne__(self, other):
return not self == other
......@@ -18,18 +18,28 @@ import json
import sys
import unittest
from collections import OrderedDict
from functools import wraps
from urllib.request import urlopen
from urllib.error import HTTPError
from urllib.parse import urljoin
arglist = ['--test-repositories']
# Exctract used arguments form the commandline an strip them for unittest.main
userargs = [arg for arg in sys.argv if arg in arglist]
for arg in userargs:
if arg in sys.argv:
sys.argv.remove(arg)
if sys.version_info >= (3,):
from collections import OrderedDict
from urllib.request import urlopen
from urllib.error import HTTPError
from urllib.parse import urljoin
else:
from .ordereddict import OrderedDict
from urlparse import urljoin
from urllib2 import HTTPError, urlopen
if hasattr(sys, 'argv'):
arglist = ['--test-repositories']
# Exctract used arguments form the commandline an strip them for unittest.main
userargs = [arg for arg in sys.argv if arg in arglist]
for arg in userargs:
if arg in sys.argv:
sys.argv.remove(arg)
else:
userargs = []
################################################################################
......@@ -41,10 +51,19 @@ def _open(filepath, *args, **kwargs):
if not os.path.exists(filepath):
filepath = os.path.join("..", filepath)
if sys.version_info >= (3,):
encoding = 'utf-8'
errors = 'replace'
if args and args[0] in ['rb', 'wb', 'ab']:
encoding = None
errors = None
kwargs['encoding'] = encoding
kwargs['errors'] = errors
return open(filepath, *args, **kwargs)
def generator_class(cls):
def generate_test_methods(cls, stream):
"""Class decorator for classes that use test generating methods.
A class that is decorated with this function will be searched for methods
......@@ -64,7 +83,7 @@ def generator_class(cls):
raise TypeError("Generator methods must be classmethods")
# Create new methods for each `yield`
for sub_call in generator():
for sub_call in generator(stream):
method, params = sub_call
@wraps(method)
......@@ -284,6 +303,17 @@ class TestContainer(object):
r"^\*|(osx|linux|windows)(-x(32|64))?$")
def _test_error(self, msg, e=None):
"""
A generic error-returning function used the meta-programming features
of this class.
:param msg:
The error message to return
:param e:
An optional exception to include with the error message
"""
if e:
if isinstance(e, HTTPError):
self.fail("%s: %s" % (msg, e))
......@@ -293,107 +323,157 @@ class TestContainer(object):
self.fail(msg)
@classmethod
def _fail(cls, *args):
return cls._test_error, args
def _include_tests(cls, path, stream):
"""
Return a tuple of (method, args) to add to a unittest TestCase class.
A meta-programming function to expand the definition of class at run
time, based on the contents of a file or URL.
@generator_class
class ChannelTests(TestContainer, unittest.TestCase):
maxDiff = None
with _open('channel.json') as f:
j = json.load(f)
:param cls:
The class to add the methods to
def test_channel_keys(self):
keys = sorted(self.j.keys())
self.assertEqual(keys, ['repositories', 'schema_version'])
:param path:
The URL or file path to fetch the repository info from
self.assertEqual(self.j['schema_version'], '2.0')
self.assertIsInstance(self.j['repositories'], list)
:param stream:
A file-like object used for diagnostic output that provides .write()
and .flush()
"""
for repo in self.j['repositories']:
self.assertIsInstance(repo, str)
cls._write(stream, "%s ... " % path)
def test_channel_repo_order(self):
repos = self.j['repositories']
self.assertEqual(repos, sorted(repos, key=str.lower),
"Repositories must be sorted alphabetically")
@classmethod
def generate_repository_tests(cls):
if "--test-repositories" not in userargs:
# Only generate tests for all repositories (those hosted online)
# when run with "--test-repositories" parameter.
return
for repository in cls.j['repositories']:
if repository.startswith('.'):
continue
if not repository.startswith("http"):
cls._fail("Unexcpected repository url: %s" % repository)
yield from cls._include_tests(repository)
@classmethod
def _include_tests(cls, url):
print("fetching %s" % url)
# Download the repository
try:
with urlopen(url) as f:
source = f.read().decode("utf-8")
except Exception as e:
yield cls._fail("Downloading %s failed" % url, e)
return
if re.match('https?://', path, re.I) is not None:
# Download the repository
try:
with urlopen(path) as f:
source = f.read().decode("utf-8")
except Exception as e:
cls._write(stream, 'failed (%s)\n' % str(e))
yield cls._fail("Downloading %s failed" % path, e)
return
else:
try:
with _open(path, 'rb') as f:
source = f.read().decode('utf-8')
except Exception as e:
cls._write(stream, 'failed (%s)\n' % str(e))
yield cls._fail("Opening %s failed" % path, e)
return
if not source:
yield cls._fail("%s is empty" % url)
yield cls._fail("%s is empty" % path)
return
# Parse the repository
try:
data = json.loads(source)
except Exception as e:
yield cls._fail("Could not parse %s" % url, e)
yield cls._fail("Could not parse %s" % path, e)
return
# Check for the schema version first (and generator failures it's
# badly formatted)
if 'schema_version' not in data:
yield cls._fail("No schema_version found in %s" % url)
yield cls._fail("No schema_version found in %s" % path)
return
schema = float(data['schema_version'])
if schema not in (1.0, 1.1, 1.2, 2.0):
yield cls._fail("Unrecognized schema version %s in %s"
% (schema, url))
% (schema, path))
return
# Do not generate 1000 failing tests for not yet updated repos
if schema != 2.0:
print("schema version %s, skipping" % data['schema_version'])
cls._write(stream, "skipping (schema version %s)\n" % data['schema_version'])
return
# `url` is for output during tests only
yield cls._test_repository_keys, (url, data)
cls._write(stream, 'done\n')
# `path` is for output during tests only
yield cls._test_repository_keys, (path, data)
if 'packages' in data:
for package in data['packages']:
yield cls._test_package, (url, package)
yield cls._test_package, (path, package)
package_name = get_package_name(package)
if 'releases' in package:
for release in package['releases']:
(yield cls._test_release,
("%s (%s)" % (package_name, url),
("%s (%s)" % (package_name, path),
release, False))
if 'includes' in data:
for include in data['includes']:
i_url = urljoin(url, include)
yield from cls._include_tests(i_url)
i_url = urljoin(path, include)
yield from cls._include_tests(i_url, stream)
@classmethod
def _fail(cls, *args):
"""
Generates a (method, args) tuple that returns an error when called.
Allows for defering an error until the tests are actually run.
"""
return cls._test_error, args
@classmethod
def _write(cls, stream, string):
"""
Writes dianostic output to a file-like object.
@generator_class
class RepositoryTests(TestContainer, unittest.TestCase):
:param stream:
Must have the methods .write() and .flush()
:param string:
The string to write - a newline will NOT be appended
"""
stream.write(string)
stream.flush()
class DefaultChannelTests(TestContainer, unittest.TestCase):
maxDiff = None
with _open('channel.json') as f:
j = json.load(f)
def test_channel_keys(self):
keys = sorted(self.j.keys())
self.assertEqual(keys, ['repositories', 'schema_version'])
self.assertEqual(self.j['schema_version'], '2.0')
self.assertIsInstance(self.j['repositories'], list)
for repo in self.j['repositories']:
self.assertIsInstance(repo, str)
def test_channel_repo_order(self):
repos = self.j['repositories']
self.assertEqual(repos, sorted(repos, key=str.lower),
"Repositories must be sorted alphabetically")
@classmethod
def generate_repository_tests(cls, stream):
if not "--test-repositories" in userargs:
# Only generate tests for all repositories (those hosted online)
# when run with "--test-repositories" parameter.
return
cls._write(stream, "Fetching remote repositories:\n")
for repository in cls.j['repositories']:
if repository.startswith('.'):
continue
if not repository.startswith("http"):
cls._fail("Unexpected repository url: %s" % repository)
yield from cls._include_tests(repository, stream)
cls._write(stream, '\n')
class DefaultRepositoryTests(TestContainer, unittest.TestCase):
maxDiff = None
with _open('repository.json') as f:
......@@ -411,7 +491,7 @@ class RepositoryTests(TestContainer, unittest.TestCase):
self.assertIsInstance(include, str)
@classmethod
def generate_include_tests(cls):
def generate_include_tests(cls, stream):
for include in cls.j['includes']:
try:
with _open(include) as f:
......@@ -437,9 +517,22 @@ class RepositoryTests(TestContainer, unittest.TestCase):
("%s (%s)" % (package_name, include), release))
def generate_default_test_methods(stream=None):
if not stream:
stream = sys.stdout
generate_test_methods(DefaultRepositoryTests, stream)
generate_test_methods(DefaultChannelTests, stream)
################################################################################
# Main
# When included to a Sublime package, sys.argv will not be set. We need to
# generate the test methods differently in that context, so we only generate
# them if sys.argv exists.
if hasattr(sys, 'argv'):
generate_default_test_methods()
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment