aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_results.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_results.py')
-rw-r--r--test/test_results.py176
1 files changed, 114 insertions, 62 deletions
diff --git a/test/test_results.py b/test/test_results.py
index 6e04e1d..4b1c4c1 100644
--- a/test/test_results.py
+++ b/test/test_results.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-# Copyright 2015-2023 Mike Fährmann
+# Copyright 2015-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -80,9 +80,9 @@ class TestExtractorResults(unittest.TestCase):
@classmethod
def tearDownClass(cls):
if cls._skipped:
- print("\n\nSkipped tests:")
- for url, exc in cls._skipped:
- print('- {} ("{}")'.format(url, exc))
+ sys.stdout.write("\n\nSkipped tests:\n")
+ for url, reason in cls._skipped:
+ sys.stdout.write(f'- {url} ("{reason}")\n')
def assertRange(self, value, range, msg=None):
if range.step > 1:
@@ -91,6 +91,24 @@ class TestExtractorResults(unittest.TestCase):
self.assertLessEqual(value, range.stop, msg=msg)
self.assertGreaterEqual(value, range.start, msg=msg)
+ def assertLogEqual(self, expected, output):
+ if isinstance(expected, str):
+ expected = (expected,)
+ self.assertEqual(len(expected), len(output), "#log/count")
+
+ for exp, out in zip(expected, output):
+ level, name, message = out.split(":", 2)
+
+ if isinstance(exp, str):
+ return self.assertEqual(exp, message, "#log")
+
+ self.assertEqual(exp[0].lower(), level.lower(), "#log/level")
+ if len(exp) < 3:
+ self.assertEqual(exp[1], message, "#log/message")
+ else:
+ self.assertEqual(exp[1], name , "#log/name")
+ self.assertEqual(exp[2], message, "#log/message")
+
def _run_test(self, result):
if result.get("#fail"):
del result["#fail"]
@@ -145,7 +163,11 @@ class TestExtractorResults(unittest.TestCase):
return
try:
- tjob.run()
+ if "#log" in result:
+ with self.assertLogs() as log_info:
+ tjob.run()
+ else:
+ tjob.run()
except exception.StopExtraction:
pass
except exception.HttpError as exc:
@@ -156,6 +178,9 @@ class TestExtractorResults(unittest.TestCase):
self.skipTest(exc)
raise
+ if "#log" in result:
+ self.assertLogEqual(result["#log"], log_info.output)
+
if result.get("#archive", True):
self.assertEqual(
len(set(tjob.archive_list)),
@@ -220,13 +245,15 @@ class TestExtractorResults(unittest.TestCase):
for url, pat in zip(tjob.url_list, pattern):
self.assertRegex(url, pat, msg="#pattern")
- if "#urls" in result:
- expected = result["#urls"]
+ if "#results" in result:
+ expected = result["#results"]
if isinstance(expected, str):
- self.assertTrue(tjob.url_list, msg="#urls")
- self.assertEqual(tjob.url_list[0], expected, msg="#urls")
+ self.assertTrue(tjob.url_list, msg="#results")
+ self.assertEqual(
+ tjob.url_list[0], expected, msg="#results")
else:
- self.assertSequenceEqual(tjob.url_list, expected, msg="#urls")
+ self.assertSequenceEqual(
+ tjob.url_list, expected, msg="#results")
metadata = {k: v for k, v in result.items() if k[0] != "#"}
if metadata:
@@ -235,56 +262,74 @@ class TestExtractorResults(unittest.TestCase):
def _test_kwdict(self, kwdict, tests, parent=None):
for key, test in tests.items():
+
if key.startswith("?"):
key = key[1:]
if key not in kwdict:
continue
+ if key.endswith("[*]"):
+ key = key[:-3]
+ subtest = True
+ else:
+ subtest = False
+
path = "{}.{}".format(parent, key) if parent else key
+
if key.startswith("!"):
self.assertNotIn(key[1:], kwdict, msg=path)
continue
+
self.assertIn(key, kwdict, msg=path)
value = kwdict[key]
- if isinstance(test, dict):
- self._test_kwdict(value, test, path)
- elif isinstance(test, type):
- self.assertIsInstance(value, test, msg=path)
- elif isinstance(test, range):
- self.assertRange(value, test, msg=path)
- elif isinstance(test, set):
- try:
- self.assertIn(value, test, msg=path)
- except AssertionError:
- self.assertIn(type(value), test, msg=path)
- elif isinstance(test, list):
- subtest = False
- for idx, item in enumerate(test):
- if isinstance(item, dict):
- subtest = True
- subpath = "{}[{}]".format(path, idx)
- self._test_kwdict(value[idx], item, subpath)
- if not subtest:
- self.assertEqual(test, value, msg=path)
- elif isinstance(test, str):
- if test.startswith("re:"):
- self.assertRegex(value, test[3:], msg=path)
- elif test.startswith("dt:"):
- self.assertIsInstance(value, datetime.datetime, msg=path)
- self.assertEqual(test[3:], str(value), msg=path)
- elif test.startswith("type:"):
- self.assertEqual(test[5:], type(value).__name__, msg=path)
- elif test.startswith("len:"):
- cls, _, length = test[4:].rpartition(":")
- if cls:
- self.assertEqual(
- cls, type(value).__name__, msg=path + "/type")
- self.assertEqual(int(length), len(value), msg=path)
- else:
- self.assertEqual(test, value, msg=path)
+ if subtest:
+ self.assertNotIsInstance(value, str, msg=path)
+ for idx, item in enumerate(value):
+ subpath = "{}[{}]".format(path, idx)
+ self._test_kwdict_value(item, test, subpath)
else:
+ self._test_kwdict_value(value, test, path)
+
+ def _test_kwdict_value(self, value, test, path):
+ if isinstance(test, dict):
+ self._test_kwdict(value, test, path)
+ elif isinstance(test, type):
+ self.assertIsInstance(value, test, msg=path)
+ elif isinstance(test, range):
+ self.assertRange(value, test, msg=path)
+ elif isinstance(test, set):
+ try:
+ self.assertIn(value, test, msg=path)
+ except AssertionError:
+ self.assertIn(type(value), test, msg=path)
+ elif isinstance(test, list):
+ subtest = False
+ for idx, item in enumerate(test):
+ if isinstance(item, dict):
+ subtest = True
+ subpath = "{}[{}]".format(path, idx)
+ self._test_kwdict(value[idx], item, subpath)
+ if not subtest:
self.assertEqual(test, value, msg=path)
+ elif isinstance(test, str):
+ if test.startswith("re:"):
+ self.assertRegex(value, test[3:], msg=path)
+ elif test.startswith("dt:"):
+ self.assertIsInstance(value, datetime.datetime, msg=path)
+ self.assertEqual(test[3:], str(value), msg=path)
+ elif test.startswith("type:"):
+ self.assertEqual(test[5:], type(value).__name__, msg=path)
+ elif test.startswith("len:"):
+ cls, _, length = test[4:].rpartition(":")
+ if cls:
+ self.assertEqual(
+ cls, type(value).__name__, msg=path + "/type")
+ self.assertEqual(int(length), len(value), msg=path)
+ else:
+ self.assertEqual(test, value, msg=path)
+ else:
+ self.assertEqual(test, value, msg=path)
class ResultJob(job.DownloadJob):
@@ -402,27 +447,31 @@ class TestPathfmt():
class TestFormatter(formatter.StringFormatter):
- @staticmethod
- def _noop(_):
- return ""
-
def _apply_simple(self, key, fmt):
if key == "extension" or "_parse_optional." in repr(fmt):
- return self._noop
-
- def wrap(obj):
- return fmt(obj[key])
+ def wrap(obj):
+ try:
+ return fmt(obj[key])
+ except KeyError:
+ return ""
+ else:
+ def wrap(obj):
+ return fmt(obj[key])
return wrap
def _apply(self, key, funcs, fmt):
if key == "extension" or "_parse_optional." in repr(fmt):
- return self._noop
-
- def wrap(obj):
- obj = obj[key]
- for func in funcs:
- obj = func(obj)
- return fmt(obj)
+ def wrap(obj):
+ obj = obj[key] if key in obj else ""
+ for func in funcs:
+ obj = func(obj)
+ return fmt(obj)
+ else:
+ def wrap(obj):
+ obj = obj[key]
+ for func in funcs:
+ obj = func(obj)
+ return fmt(obj)
return wrap
@@ -457,7 +506,10 @@ def generate_tests():
"""Dynamically generate extractor unittests"""
def _generate_method(result):
def test(self):
- print("\n" + result["#url"])
+ sys.stdout.write(f"\n{result['#url']}\n")
+ if "#comment" in result:
+ sys.stdout.write(f"# {result['#comment']}\n")
+
try:
self._run_test(result)
except KeyboardInterrupt as exc: