diff options
Diffstat (limited to 'test/test_results.py')
| -rw-r--r-- | test/test_results.py | 207 |
1 files changed, 128 insertions, 79 deletions
diff --git a/test/test_results.py b/test/test_results.py index 3c7d284..4fb22c7 100644 --- a/test/test_results.py +++ b/test/test_results.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -# Copyright 2015-2022 Mike Fährmann +# Copyright 2015-2023 Mike Fährmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as @@ -15,10 +15,12 @@ import re import json import hashlib import datetime +import collections sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from gallery_dl import \ extractor, util, job, config, exception, formatter # noqa E402 +from test import results # noqa E402 # temporary issues, etc. @@ -46,28 +48,40 @@ class TestExtractorResults(unittest.TestCase): for url, exc in cls._skipped: print('- {} ("{}")'.format(url, exc)) - def _run_test(self, extr, url, result): - if result: - if "options" in result: - for key, value in result["options"]: - key = key.split(".") - config.set(key[:-1], key[-1], value) - if "range" in result: - config.set((), "image-range" , result["range"]) - config.set((), "chapter-range", result["range"]) - content = "content" in result + def assertRange(self, value, range, msg=None): + if range.step > 1: + self.assertIn(value, range, msg=msg) else: + self.assertLessEqual(value, range.stop, msg=msg) + self.assertGreaterEqual(value, range.start, msg=msg) + + def _run_test(self, result): + result.pop("#comment", None) + only_matching = (len(result) <= 3) + + if only_matching: content = False + else: + if "#options" in result: + for key, value in result["#options"].items(): + key = key.split(".") + config.set(key[:-1], key[-1], value) + if "#range" in result: + config.set((), "image-range" , result["#range"]) + config.set((), "chapter-range", result["#range"]) + content = ("#sha1_content" in result) - tjob = ResultJob(url, content=content) - self.assertEqual(extr, tjob.extractor.__class__) + tjob = ResultJob(result["#url"], content=content) + self.assertEqual(result["#class"], tjob.extractor.__class__, "#class") - if not result: + if only_matching: return - if "exception" in result: - with self.assertRaises(result["exception"]): + + if "#exception" in result: + with self.assertRaises(result["#exception"], msg="#exception"): tjob.run() return + try: tjob.run() except exception.StopExtraction: @@ -76,64 +90,85 @@ class TestExtractorResults(unittest.TestCase): exc = str(exc) if re.match(r"'5\d\d ", exc) or \ re.search(r"\bRead timed out\b", exc): - self._skipped.append((url, exc)) + self._skipped.append((result["#url"], exc)) self.skipTest(exc) raise - if result.get("archive", True): + if result.get("#archive", True): self.assertEqual( len(set(tjob.archive_list)), len(tjob.archive_list), - "archive-id uniqueness", - ) + msg="archive-id uniqueness") if tjob.queue: # test '_extractor' entries for url, kwdict in zip(tjob.url_list, tjob.kwdict_list): if "_extractor" in kwdict: extr = kwdict["_extractor"].from_url(url) - if extr is None and not result.get("extractor", True): + if extr is None and not result.get("#extractor", True): continue self.assertIsInstance(extr, kwdict["_extractor"]) self.assertEqual(extr.url, url) else: # test 'extension' entries for kwdict in tjob.kwdict_list: - self.assertIn("extension", kwdict) + self.assertIn("extension", kwdict, msg="#extension") # test extraction results - if "url" in result: - self.assertEqual(result["url"], tjob.url_hash.hexdigest()) + if "#sha1_url" in result: + self.assertEqual( + result["#sha1_url"], + tjob.url_hash.hexdigest(), + msg="#sha1_url") - if "content" in result: - expected = result["content"] + if "#sha1_content" in result: + expected = result["#sha1_content"] digest = tjob.content_hash.hexdigest() if isinstance(expected, str): - self.assertEqual(digest, expected, "content") - else: # assume iterable - self.assertIn(digest, expected, "content") - - if "keyword" in result: - expected = result["keyword"] - if isinstance(expected, dict): - for kwdict in tjob.kwdict_list: - self._test_kwdict(kwdict, expected) - else: # assume SHA1 hash - self.assertEqual(expected, tjob.kwdict_hash.hexdigest()) - - if "count" in result: - count = result["count"] + self.assertEqual(expected, digest, msg="#sha1_content") + else: # iterable + self.assertIn(digest, expected, msg="#sha1_content") + + if "#sha1_metadata" in result: + self.assertEqual( + result["#sha1_metadata"], + tjob.kwdict_hash.hexdigest(), + "#sha1_metadata") + + if "#count" in result: + count = result["#count"] + len_urls = len(tjob.url_list) if isinstance(count, str): - self.assertRegex(count, r"^ *(==|!=|<|<=|>|>=) *\d+ *$") - expr = "{} {}".format(len(tjob.url_list), count) + self.assertRegex( + count, r"^ *(==|!=|<|<=|>|>=) *\d+ *$", msg="#count") + expr = "{} {}".format(len_urls, count) self.assertTrue(eval(expr), msg=expr) + elif isinstance(count, range): + self.assertRange(len_urls, count, msg="#count") else: # assume integer - self.assertEqual(len(tjob.url_list), count) + self.assertEqual(len_urls, count, msg="#count") + + if "#pattern" in result: + self.assertGreater(len(tjob.url_list), 0, msg="#pattern") + pattern = result["#pattern"] + if isinstance(pattern, str): + for url in tjob.url_list: + self.assertRegex(url, pattern, msg="#pattern") + else: + for url, pat in zip(tjob.url_list, pattern): + self.assertRegex(url, pat, msg="#pattern") - if "pattern" in result: - self.assertGreater(len(tjob.url_list), 0) - for url in tjob.url_list: - self.assertRegex(url, result["pattern"]) + if "#urls" in result: + expected = result["#urls"] + if isinstance(expected, str): + self.assertEqual(tjob.url_list[0], expected, msg="#urls") + else: + self.assertSequenceEqual(tjob.url_list, expected, msg="#urls") + + metadata = {k: v for k, v in result.items() if k[0] != "#"} + if metadata: + for kwdict in tjob.kwdict_list: + self._test_kwdict(kwdict, metadata) def _test_kwdict(self, kwdict, tests): for key, test in tests.items(): @@ -141,13 +176,15 @@ class TestExtractorResults(unittest.TestCase): key = key[1:] if key not in kwdict: continue - self.assertIn(key, kwdict) + self.assertIn(key, kwdict, msg=key) value = kwdict[key] if isinstance(test, dict): self._test_kwdict(value, test) elif isinstance(test, type): self.assertIsInstance(value, test, msg=key) + elif isinstance(test, range): + self.assertRange(value, test, msg=key) elif isinstance(test, list): subtest = False for idx, item in enumerate(test): @@ -188,6 +225,8 @@ class ResultJob(job.DownloadJob): if content: self.fileobj = TestPathfmt(self.content_hash) + else: + self._update_content = lambda url, kwdict: None self.format_directory = TestFormatter( "".join(self.extractor.directory_fmt)).format_map @@ -195,6 +234,7 @@ class ResultJob(job.DownloadJob): self.extractor.filename_fmt).format_map def run(self): + self._init() for msg in self.extractor: self.dispatch(msg) @@ -234,10 +274,17 @@ class ResultJob(job.DownloadJob): self.archive_hash.update(archive_id.encode()) def _update_content(self, url, kwdict): - if self.content: - scheme = url.partition(":")[0] - self.fileobj.kwdict = kwdict - self.get_downloader(scheme).download(url, self.fileobj) + self.fileobj.kwdict = kwdict + + downloader = self.get_downloader(url.partition(":")[0]) + if downloader.download(url, self.fileobj): + return + + for num, url in enumerate(kwdict.get("_fallback") or (), 1): + self.log.warning("Trying fallback URL #%d", num) + downloader = self.get_downloader(url.partition(":")[0]) + if downloader.download(url, self.fileobj): + return class TestPathfmt(): @@ -322,10 +369,11 @@ def setup_test_config(): config.set(("extractor", "mangoxo") , "username", "LiQiang3") config.set(("extractor", "mangoxo") , "password", "5zbQF10_5u25259Ma") - for category in ("danbooru", "atfbooru", "aibooru", "e621", "e926", + for category in ("danbooru", "atfbooru", "aibooru", "booruvar", + "e621", "e926", "e6ai", "instagram", "twitter", "subscribestar", "deviantart", "inkbunny", "tapas", "pillowfort", "mangadex", - "vipergirls", "gfycat"): + "vipergirls"): config.set(("extractor", category), "username", None) config.set(("extractor", "mastodon.social"), "access-token", @@ -351,39 +399,40 @@ def setup_test_config(): def generate_tests(): """Dynamically generate extractor unittests""" - def _generate_test(extr, tcase): + def _generate_method(result): def test(self): - url, result = tcase - print("\n", url, sep="") - self._run_test(extr, url, result) + print("\n" + result["#url"]) + self._run_test(result) return test # enable selective testing for direct calls - if __name__ == '__main__' and len(sys.argv) > 1: - categories = sys.argv[1:] - negate = False - if categories[0].lower() == "all": - categories = () - negate = True - elif categories[0].lower() == "broken": - categories = BROKEN + if __name__ == "__main__" and len(sys.argv) > 1: + category, _, subcategory = sys.argv[1].partition(":") del sys.argv[1:] + + if category.startswith("+"): + basecategory = category[1:].lower() + tests = [t for t in results.all() + if t["#category"][0].lower() == basecategory] + else: + tests = results.category(category) + + if subcategory: + tests = [t for t in tests if t["#category"][-1] == subcategory] else: - categories = BROKEN - negate = True - if categories: - print("skipping:", ", ".join(categories)) - fltr = util.build_extractor_filter(categories, negate=negate) + tests = results.all() # add 'test_...' methods - for extr in filter(fltr, extractor.extractors()): - name = "test_" + extr.__name__ + "_" - for num, tcase in enumerate(extr._get_tests(), 1): - test = _generate_test(extr, tcase) - test.__name__ = name + str(num) - setattr(TestExtractorResults, test.__name__, test) + enum = collections.defaultdict(int) + for result in tests: + name = "{1}_{2}".format(*result["#category"]) + enum[name] += 1 + + method = _generate_method(result) + method.__name__ = "test_{}_{}".format(name, enum[name]) + setattr(TestExtractorResults, method.__name__, method) generate_tests() -if __name__ == '__main__': - unittest.main(warnings='ignore') +if __name__ == "__main__": + unittest.main(warnings="ignore") |
