diff options
Diffstat (limited to 'test/test_results.py')
| -rw-r--r-- | test/test_results.py | 344 |
1 files changed, 344 insertions, 0 deletions
diff --git a/test/test_results.py b/test/test_results.py new file mode 100644 index 0000000..8f03f03 --- /dev/null +++ b/test/test_results.py @@ -0,0 +1,344 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Copyright 2015-2019 Mike Fährmann +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. + +import os +import sys +import re +import json +import hashlib +import unittest +from gallery_dl import extractor, util, job, config, exception + + +# these don't work on Travis CI +TRAVIS_SKIP = { + "exhentai", "kissmanga", "mangafox", "dynastyscans", "nijie", "bobx", + "archivedmoe", "archiveofsins", "thebarchive", "fireden", "4plebs", + "sankaku", "idolcomplex", "mangahere", "readcomiconline", "mangadex", + "sankakucomplex", +} + +# temporary issues, etc. +BROKEN = { + "komikcast", + "mangapark", +} + + +class TestExtractorResults(unittest.TestCase): + + def setUp(self): + setup_test_config() + + def tearDown(self): + config.clear() + + @classmethod + def setUpClass(cls): + cls._skipped = [] + + @classmethod + def tearDownClass(cls): + if cls._skipped: + print("\n\nSkipped tests:") + for url, exc in cls._skipped: + print('- {} ("{}")'.format(url, exc)) + + def _run_test(self, extr, url, result): + if result: + if "options" in result: + for key, value in result["options"]: + config.set(key.split("."), value) + if "range" in result: + config.set(("image-range",), result["range"]) + config.set(("chapter-range",), result["range"]) + content = "content" in result + else: + content = False + + tjob = ResultJob(url, content=content) + self.assertEqual(extr, tjob.extractor.__class__) + + if not result: + return + if "exception" in result: + with self.assertRaises(result["exception"]): + tjob.run() + return + try: + tjob.run() + except exception.StopExtraction: + pass + except exception.HttpError as exc: + exc = str(exc) + if re.match(r"5\d\d: ", exc) or \ + re.search(r"\bRead timed out\b", exc): + self._skipped.append((url, exc)) + self.skipTest(exc) + raise + + # test archive-id uniqueness + self.assertEqual(len(set(tjob.list_archive)), len(tjob.list_archive)) + + # test '_extractor' entries + if tjob.queue: + for url, kwdict in zip(tjob.list_url, tjob.list_keyword): + if "_extractor" in kwdict: + extr = kwdict["_extractor"].from_url(url) + self.assertIsInstance(extr, kwdict["_extractor"]) + self.assertEqual(extr.url, url) + + # test extraction results + if "url" in result: + self.assertEqual(result["url"], tjob.hash_url.hexdigest()) + + if "content" in result: + self.assertEqual(result["content"], tjob.hash_content.hexdigest()) + + if "keyword" in result: + keyword = result["keyword"] + if isinstance(keyword, dict): + for kwdict in tjob.list_keyword: + self._test_kwdict(kwdict, keyword) + else: # assume SHA1 hash + self.assertEqual(keyword, tjob.hash_keyword.hexdigest()) + + if "count" in result: + count = result["count"] + if isinstance(count, str): + self.assertRegex(count, r"^ *(==|!=|<|<=|>|>=) *\d+ *$") + expr = "{} {}".format(len(tjob.list_url), count) + self.assertTrue(eval(expr), msg=expr) + else: # assume integer + self.assertEqual(len(tjob.list_url), count) + + if "pattern" in result: + self.assertGreater(len(tjob.list_url), 0) + for url in tjob.list_url: + self.assertRegex(url, result["pattern"]) + + def _test_kwdict(self, kwdict, tests): + for key, test in tests.items(): + if key.startswith("?"): + key = key[1:] + if key not in kwdict: + continue + self.assertIn(key, kwdict) + value = kwdict[key] + + if isinstance(test, dict): + self._test_kwdict(value, test) + elif isinstance(test, type): + self.assertIsInstance(value, test, msg=key) + elif isinstance(test, str): + if test.startswith("re:"): + self.assertRegex(value, test[3:], msg=key) + elif test.startswith("type:"): + self.assertEqual(type(value).__name__, test[5:], msg=key) + else: + self.assertEqual(value, test, msg=key) + else: + self.assertEqual(value, test, msg=key) + + +class ResultJob(job.DownloadJob): + """Generate test-results for extractor runs""" + + def __init__(self, url, parent=None, content=False): + job.DownloadJob.__init__(self, url, parent) + self.queue = False + self.content = content + self.list_url = [] + self.list_keyword = [] + self.list_archive = [] + self.hash_url = hashlib.sha1() + self.hash_keyword = hashlib.sha1() + self.hash_archive = hashlib.sha1() + self.hash_content = hashlib.sha1() + if content: + self.fileobj = TestPathfmt(self.hash_content) + self.get_downloader("http")._check_extension = lambda a, b: None + + self.format_directory = TestFormatter( + "".join(self.extractor.directory_fmt)) + self.format_filename = TestFormatter(self.extractor.filename_fmt) + + def run(self): + for msg in self.extractor: + self.dispatch(msg) + + def handle_url(self, url, keywords, fallback=None): + self.update_url(url) + self.update_keyword(keywords) + self.update_archive(keywords) + self.update_content(url) + self.format_filename.format_map(keywords) + + def handle_directory(self, keywords): + self.update_keyword(keywords, False) + self.format_directory.format_map(keywords) + + def handle_queue(self, url, keywords): + self.queue = True + self.update_url(url) + self.update_keyword(keywords) + + def update_url(self, url): + self.list_url.append(url) + self.hash_url.update(url.encode()) + + def update_keyword(self, kwdict, to_list=True): + if to_list: + self.list_keyword.append(kwdict) + kwdict = self._filter(kwdict) + self.hash_keyword.update( + json.dumps(kwdict, sort_keys=True, default=str).encode()) + + def update_archive(self, kwdict): + archive_id = self.extractor.archive_fmt.format_map(kwdict) + self.list_archive.append(archive_id) + self.hash_archive.update(archive_id.encode()) + + def update_content(self, url): + if self.content: + scheme = url.partition(":")[0] + self.get_downloader(scheme).download(url, self.fileobj) + + +class TestPathfmt(): + + def __init__(self, hashobj): + self.hashobj = hashobj + self.path = "" + self.size = 0 + self.has_extension = True + + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + def open(self, mode): + self.size = 0 + return self + + def write(self, content): + """Update SHA1 hash""" + self.size += len(content) + self.hashobj.update(content) + + def tell(self): + return self.size + + def part_size(self): + return 0 + + +class TestFormatter(util.Formatter): + + @staticmethod + def _noop(_): + return "" + + def _apply_simple(self, key, fmt): + if key == "extension" or "._format_optional." in repr(fmt): + return self._noop + + def wrap(obj): + return fmt(obj[key]) + return wrap + + def _apply(self, key, funcs, fmt): + if key == "extension" or "._format_optional." in repr(fmt): + return self._noop + + def wrap(obj): + obj = obj[key] + for func in funcs: + obj = func(obj) + return fmt(obj) + return wrap + + +def setup_test_config(): + name = "gallerydl" + email = "gallerydl@openaliasbox.org" + + config.clear() + config.set(("cache", "file"), ":memory:") + config.set(("downloader", "part"), False) + config.set(("extractor", "timeout"), 60) + config.set(("extractor", "username"), name) + config.set(("extractor", "password"), name) + config.set(("extractor", "nijie", "username"), email) + config.set(("extractor", "seiga", "username"), email) + config.set(("extractor", "danbooru", "username"), None) + config.set(("extractor", "twitter" , "username"), None) + config.set(("extractor", "mangoxo" , "password"), "VZ8DL3983u") + + config.set(("extractor", "deviantart", "client-id"), "7777") + config.set(("extractor", "deviantart", "client-secret"), + "ff14994c744d9208e5caeec7aab4a026") + + config.set(("extractor", "tumblr", "api-key"), + "0cXoHfIqVzMQcc3HESZSNsVlulGxEXGDTTZCDrRrjaa0jmuTc6") + config.set(("extractor", "tumblr", "api-secret"), + "6wxAK2HwrXdedn7VIoZWxGqVhZ8JdYKDLjiQjL46MLqGuEtyVj") + config.set(("extractor", "tumblr", "access-token"), + "N613fPV6tOZQnyn0ERTuoEZn0mEqG8m2K8M3ClSJdEHZJuqFdG") + config.set(("extractor", "tumblr", "access-token-secret"), + "sgOA7ZTT4FBXdOGGVV331sSp0jHYp4yMDRslbhaQf7CaS71i4O") + + +def generate_tests(): + """Dynamically generate extractor unittests""" + def _generate_test(extr, tcase): + def test(self): + url, result = tcase + print("\n", url, sep="") + self._run_test(extr, url, result) + return test + + # enable selective testing for direct calls + if __name__ == '__main__' and len(sys.argv) > 1: + if sys.argv[1].lower() == "all": + fltr = lambda c, bc: True # noqa: E731 + elif sys.argv[1].lower() == "broken": + fltr = lambda c, bc: c in BROKEN # noqa: E731 + else: + argv = sys.argv[1:] + fltr = lambda c, bc: c in argv or bc in argv # noqa: E731 + del sys.argv[1:] + else: + skip = set(BROKEN) + if "CI" in os.environ and "TRAVIS" in os.environ: + skip |= set(TRAVIS_SKIP) + if skip: + print("skipping:", ", ".join(skip)) + fltr = lambda c, bc: c not in skip # noqa: E731 + + # filter available extractor classes + extractors = [ + extr for extr in extractor.extractors() + if fltr(extr.category, getattr(extr, "basecategory", None)) + ] + + # add 'test_...' methods + for extr in extractors: + name = "test_" + extr.__name__ + "_" + for num, tcase in enumerate(extr._get_tests(), 1): + test = _generate_test(extr, tcase) + test.__name__ = name + str(num) + setattr(TestExtractorResults, test.__name__, test) + + +generate_tests() +if __name__ == '__main__': + unittest.main(warnings='ignore') |
