diff options
| author | 2025-07-31 01:22:01 -0400 | |
|---|---|---|
| committer | 2025-07-31 01:22:01 -0400 | |
| commit | a6e995c093de8aae2e91a0787281bb34c0b871eb (patch) | |
| tree | 2d79821b05300d34d8871eb6c9662b359a2de85d /test/test_results.py | |
| parent | 7672a750cb74bf31e21d76aad2776367fd476155 (diff) | |
New upstream version 1.30.2.upstream/1.30.2
Diffstat (limited to 'test/test_results.py')
| -rw-r--r-- | test/test_results.py | 176 |
1 files changed, 114 insertions, 62 deletions
diff --git a/test/test_results.py b/test/test_results.py index 6e04e1d..4b1c4c1 100644 --- a/test/test_results.py +++ b/test/test_results.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -# Copyright 2015-2023 Mike Fährmann +# Copyright 2015-2025 Mike Fährmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as @@ -80,9 +80,9 @@ class TestExtractorResults(unittest.TestCase): @classmethod def tearDownClass(cls): if cls._skipped: - print("\n\nSkipped tests:") - for url, exc in cls._skipped: - print('- {} ("{}")'.format(url, exc)) + sys.stdout.write("\n\nSkipped tests:\n") + for url, reason in cls._skipped: + sys.stdout.write(f'- {url} ("{reason}")\n') def assertRange(self, value, range, msg=None): if range.step > 1: @@ -91,6 +91,24 @@ class TestExtractorResults(unittest.TestCase): self.assertLessEqual(value, range.stop, msg=msg) self.assertGreaterEqual(value, range.start, msg=msg) + def assertLogEqual(self, expected, output): + if isinstance(expected, str): + expected = (expected,) + self.assertEqual(len(expected), len(output), "#log/count") + + for exp, out in zip(expected, output): + level, name, message = out.split(":", 2) + + if isinstance(exp, str): + return self.assertEqual(exp, message, "#log") + + self.assertEqual(exp[0].lower(), level.lower(), "#log/level") + if len(exp) < 3: + self.assertEqual(exp[1], message, "#log/message") + else: + self.assertEqual(exp[1], name , "#log/name") + self.assertEqual(exp[2], message, "#log/message") + def _run_test(self, result): if result.get("#fail"): del result["#fail"] @@ -145,7 +163,11 @@ class TestExtractorResults(unittest.TestCase): return try: - tjob.run() + if "#log" in result: + with self.assertLogs() as log_info: + tjob.run() + else: + tjob.run() except exception.StopExtraction: pass except exception.HttpError as exc: @@ -156,6 +178,9 @@ class TestExtractorResults(unittest.TestCase): self.skipTest(exc) raise + if "#log" in result: + self.assertLogEqual(result["#log"], log_info.output) + if result.get("#archive", True): self.assertEqual( len(set(tjob.archive_list)), @@ -220,13 +245,15 @@ class TestExtractorResults(unittest.TestCase): for url, pat in zip(tjob.url_list, pattern): self.assertRegex(url, pat, msg="#pattern") - if "#urls" in result: - expected = result["#urls"] + if "#results" in result: + expected = result["#results"] if isinstance(expected, str): - self.assertTrue(tjob.url_list, msg="#urls") - self.assertEqual(tjob.url_list[0], expected, msg="#urls") + self.assertTrue(tjob.url_list, msg="#results") + self.assertEqual( + tjob.url_list[0], expected, msg="#results") else: - self.assertSequenceEqual(tjob.url_list, expected, msg="#urls") + self.assertSequenceEqual( + tjob.url_list, expected, msg="#results") metadata = {k: v for k, v in result.items() if k[0] != "#"} if metadata: @@ -235,56 +262,74 @@ class TestExtractorResults(unittest.TestCase): def _test_kwdict(self, kwdict, tests, parent=None): for key, test in tests.items(): + if key.startswith("?"): key = key[1:] if key not in kwdict: continue + if key.endswith("[*]"): + key = key[:-3] + subtest = True + else: + subtest = False + path = "{}.{}".format(parent, key) if parent else key + if key.startswith("!"): self.assertNotIn(key[1:], kwdict, msg=path) continue + self.assertIn(key, kwdict, msg=path) value = kwdict[key] - if isinstance(test, dict): - self._test_kwdict(value, test, path) - elif isinstance(test, type): - self.assertIsInstance(value, test, msg=path) - elif isinstance(test, range): - self.assertRange(value, test, msg=path) - elif isinstance(test, set): - try: - self.assertIn(value, test, msg=path) - except AssertionError: - self.assertIn(type(value), test, msg=path) - elif isinstance(test, list): - subtest = False - for idx, item in enumerate(test): - if isinstance(item, dict): - subtest = True - subpath = "{}[{}]".format(path, idx) - self._test_kwdict(value[idx], item, subpath) - if not subtest: - self.assertEqual(test, value, msg=path) - elif isinstance(test, str): - if test.startswith("re:"): - self.assertRegex(value, test[3:], msg=path) - elif test.startswith("dt:"): - self.assertIsInstance(value, datetime.datetime, msg=path) - self.assertEqual(test[3:], str(value), msg=path) - elif test.startswith("type:"): - self.assertEqual(test[5:], type(value).__name__, msg=path) - elif test.startswith("len:"): - cls, _, length = test[4:].rpartition(":") - if cls: - self.assertEqual( - cls, type(value).__name__, msg=path + "/type") - self.assertEqual(int(length), len(value), msg=path) - else: - self.assertEqual(test, value, msg=path) + if subtest: + self.assertNotIsInstance(value, str, msg=path) + for idx, item in enumerate(value): + subpath = "{}[{}]".format(path, idx) + self._test_kwdict_value(item, test, subpath) else: + self._test_kwdict_value(value, test, path) + + def _test_kwdict_value(self, value, test, path): + if isinstance(test, dict): + self._test_kwdict(value, test, path) + elif isinstance(test, type): + self.assertIsInstance(value, test, msg=path) + elif isinstance(test, range): + self.assertRange(value, test, msg=path) + elif isinstance(test, set): + try: + self.assertIn(value, test, msg=path) + except AssertionError: + self.assertIn(type(value), test, msg=path) + elif isinstance(test, list): + subtest = False + for idx, item in enumerate(test): + if isinstance(item, dict): + subtest = True + subpath = "{}[{}]".format(path, idx) + self._test_kwdict(value[idx], item, subpath) + if not subtest: self.assertEqual(test, value, msg=path) + elif isinstance(test, str): + if test.startswith("re:"): + self.assertRegex(value, test[3:], msg=path) + elif test.startswith("dt:"): + self.assertIsInstance(value, datetime.datetime, msg=path) + self.assertEqual(test[3:], str(value), msg=path) + elif test.startswith("type:"): + self.assertEqual(test[5:], type(value).__name__, msg=path) + elif test.startswith("len:"): + cls, _, length = test[4:].rpartition(":") + if cls: + self.assertEqual( + cls, type(value).__name__, msg=path + "/type") + self.assertEqual(int(length), len(value), msg=path) + else: + self.assertEqual(test, value, msg=path) + else: + self.assertEqual(test, value, msg=path) class ResultJob(job.DownloadJob): @@ -402,27 +447,31 @@ class TestPathfmt(): class TestFormatter(formatter.StringFormatter): - @staticmethod - def _noop(_): - return "" - def _apply_simple(self, key, fmt): if key == "extension" or "_parse_optional." in repr(fmt): - return self._noop - - def wrap(obj): - return fmt(obj[key]) + def wrap(obj): + try: + return fmt(obj[key]) + except KeyError: + return "" + else: + def wrap(obj): + return fmt(obj[key]) return wrap def _apply(self, key, funcs, fmt): if key == "extension" or "_parse_optional." in repr(fmt): - return self._noop - - def wrap(obj): - obj = obj[key] - for func in funcs: - obj = func(obj) - return fmt(obj) + def wrap(obj): + obj = obj[key] if key in obj else "" + for func in funcs: + obj = func(obj) + return fmt(obj) + else: + def wrap(obj): + obj = obj[key] + for func in funcs: + obj = func(obj) + return fmt(obj) return wrap @@ -457,7 +506,10 @@ def generate_tests(): """Dynamically generate extractor unittests""" def _generate_method(result): def test(self): - print("\n" + result["#url"]) + sys.stdout.write(f"\n{result['#url']}\n") + if "#comment" in result: + sys.stdout.write(f"# {result['#comment']}\n") + try: self._run_test(result) except KeyboardInterrupt as exc: |
