aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_results.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_results.py')
-rw-r--r--test/test_results.py100
1 files changed, 53 insertions, 47 deletions
diff --git a/test/test_results.py b/test/test_results.py
index bde3af5..6d628c3 100644
--- a/test/test_results.py
+++ b/test/test_results.py
@@ -26,8 +26,10 @@ TRAVIS_SKIP = {
# temporary issues, etc.
BROKEN = {
- "8chan",
+ "hentaifox",
+ "livedoor",
"mangapark",
+ "yaplog",
}
@@ -84,47 +86,47 @@ class TestExtractorResults(unittest.TestCase):
raise
# test archive-id uniqueness
- self.assertEqual(len(set(tjob.list_archive)), len(tjob.list_archive))
+ self.assertEqual(len(set(tjob.archive_list)), len(tjob.archive_list))
if tjob.queue:
# test '_extractor' entries
- for url, kwdict in zip(tjob.list_url, tjob.list_keyword):
+ for url, kwdict in zip(tjob.url_list, tjob.kwdict_list):
if "_extractor" in kwdict:
extr = kwdict["_extractor"].from_url(url)
self.assertIsInstance(extr, kwdict["_extractor"])
self.assertEqual(extr.url, url)
else:
# test 'extension' entries
- for kwdict in tjob.list_keyword:
+ for kwdict in tjob.kwdict_list:
self.assertIn("extension", kwdict)
# test extraction results
if "url" in result:
- self.assertEqual(result["url"], tjob.hash_url.hexdigest())
+ self.assertEqual(result["url"], tjob.url_hash.hexdigest())
if "content" in result:
- self.assertEqual(result["content"], tjob.hash_content.hexdigest())
+ self.assertEqual(result["content"], tjob.content_hash.hexdigest())
if "keyword" in result:
- keyword = result["keyword"]
- if isinstance(keyword, dict):
- for kwdict in tjob.list_keyword:
- self._test_kwdict(kwdict, keyword)
+ expected = result["keyword"]
+ if isinstance(expected, dict):
+ for kwdict in tjob.kwdict_list:
+ self._test_kwdict(kwdict, expected)
else: # assume SHA1 hash
- self.assertEqual(keyword, tjob.hash_keyword.hexdigest())
+ self.assertEqual(expected, tjob.kwdict_hash.hexdigest())
if "count" in result:
count = result["count"]
if isinstance(count, str):
self.assertRegex(count, r"^ *(==|!=|<|<=|>|>=) *\d+ *$")
- expr = "{} {}".format(len(tjob.list_url), count)
+ expr = "{} {}".format(len(tjob.url_list), count)
self.assertTrue(eval(expr), msg=expr)
else: # assume integer
- self.assertEqual(len(tjob.list_url), count)
+ self.assertEqual(len(tjob.url_list), count)
if "pattern" in result:
- self.assertGreater(len(tjob.list_url), 0)
- for url in tjob.list_url:
+ self.assertGreater(len(tjob.url_list), 0)
+ for url in tjob.url_list:
self.assertRegex(url, result["pattern"])
def _test_kwdict(self, kwdict, tests):
@@ -158,58 +160,60 @@ class ResultJob(job.DownloadJob):
job.DownloadJob.__init__(self, url, parent)
self.queue = False
self.content = content
- self.list_url = []
- self.list_keyword = []
- self.list_archive = []
- self.hash_url = hashlib.sha1()
- self.hash_keyword = hashlib.sha1()
- self.hash_archive = hashlib.sha1()
- self.hash_content = hashlib.sha1()
+
+ self.url_list = []
+ self.url_hash = hashlib.sha1()
+ self.kwdict_list = []
+ self.kwdict_hash = hashlib.sha1()
+ self.archive_list = []
+ self.archive_hash = hashlib.sha1()
+ self.content_hash = hashlib.sha1()
if content:
- self.fileobj = TestPathfmt(self.hash_content)
+ self.fileobj = TestPathfmt(self.content_hash)
self.format_directory = TestFormatter(
- "".join(self.extractor.directory_fmt))
- self.format_filename = TestFormatter(self.extractor.filename_fmt)
+ "".join(self.extractor.directory_fmt)).format_map
+ self.format_filename = TestFormatter(
+ self.extractor.filename_fmt).format_map
def run(self):
for msg in self.extractor:
self.dispatch(msg)
- def handle_url(self, url, keywords, fallback=None):
- self.update_url(url)
- self.update_keyword(keywords)
- self.update_archive(keywords)
- self.update_content(url)
- self.format_filename.format_map(keywords)
+ def handle_url(self, url, kwdict, fallback=None):
+ self._update_url(url)
+ self._update_kwdict(kwdict)
+ self._update_archive(kwdict)
+ self._update_content(url)
+ self.format_filename(kwdict)
- def handle_directory(self, keywords):
- self.update_keyword(keywords, False)
- self.format_directory.format_map(keywords)
+ def handle_directory(self, kwdict):
+ self._update_kwdict(kwdict, False)
+ self.format_directory(kwdict)
- def handle_queue(self, url, keywords):
+ def handle_queue(self, url, kwdict):
self.queue = True
- self.update_url(url)
- self.update_keyword(keywords)
+ self._update_url(url)
+ self._update_kwdict(kwdict)
- def update_url(self, url):
- self.list_url.append(url)
- self.hash_url.update(url.encode())
+ def _update_url(self, url):
+ self.url_list.append(url)
+ self.url_hash.update(url.encode())
- def update_keyword(self, kwdict, to_list=True):
+ def _update_kwdict(self, kwdict, to_list=True):
if to_list:
- self.list_keyword.append(kwdict)
+ self.kwdict_list.append(kwdict.copy())
kwdict = self._filter(kwdict)
- self.hash_keyword.update(
+ self.kwdict_hash.update(
json.dumps(kwdict, sort_keys=True, default=str).encode())
- def update_archive(self, kwdict):
+ def _update_archive(self, kwdict):
archive_id = self.extractor.archive_fmt.format_map(kwdict)
- self.list_archive.append(archive_id)
- self.hash_archive.update(archive_id.encode())
+ self.archive_list.append(archive_id)
+ self.archive_hash.update(archive_id.encode())
- def update_content(self, url):
+ def _update_content(self, url):
if self.content:
scheme = url.partition(":")[0]
self.get_downloader(scheme).download(url, self.fileobj)
@@ -285,8 +289,10 @@ def setup_test_config():
config.set(("extractor", "password"), name)
config.set(("extractor", "nijie" , "username"), email)
config.set(("extractor", "seiga" , "username"), email)
+
config.set(("extractor", "danbooru" , "username"), None)
config.set(("extractor", "instagram", "username"), None)
+ config.set(("extractor", "imgur" , "username"), None)
config.set(("extractor", "twitter" , "username"), None)
config.set(("extractor", "mangoxo" , "username"), "LiQiang3")