summaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorLibravatarUnit 193 <unit193@unit193.net>2025-07-31 01:22:01 -0400
committerLibravatarUnit 193 <unit193@unit193.net>2025-07-31 01:22:01 -0400
commita6e995c093de8aae2e91a0787281bb34c0b871eb (patch)
tree2d79821b05300d34d8871eb6c9662b359a2de85d /test
parent7672a750cb74bf31e21d76aad2776367fd476155 (diff)
New upstream version 1.30.2.upstream/1.30.2
Diffstat (limited to 'test')
-rw-r--r--test/test_config.py5
-rw-r--r--test/test_cookies.py25
-rw-r--r--test/test_downloader.py8
-rw-r--r--test/test_extractor.py59
-rw-r--r--test/test_formatter.py159
-rw-r--r--test/test_job.py11
-rw-r--r--test/test_postprocessor.py115
-rw-r--r--test/test_results.py176
-rw-r--r--test/test_text.py76
-rw-r--r--test/test_util.py234
-rw-r--r--test/test_ytdl.py11
11 files changed, 662 insertions, 217 deletions
diff --git a/test/test_config.py b/test/test_config.py
index be58456..5c94b1b 100644
--- a/test/test_config.py
+++ b/test/test_config.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-# Copyright 2015-2023 Mike Fährmann
+# Copyright 2015-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -223,8 +223,7 @@ class TestConfigFiles(unittest.TestCase):
self.assertIsInstance(cfg, dict)
self.assertTrue(cfg)
- @staticmethod
- def _load(name):
+ def _load(self, name):
path = os.path.join(ROOTDIR, "docs", name)
try:
with open(path) as fp:
diff --git a/test/test_cookies.py b/test/test_cookies.py
index 9ba562c..5900473 100644
--- a/test/test_cookies.py
+++ b/test/test_cookies.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-# Copyright 2017-2023 Mike Fährmann
+# Copyright 2017-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -14,6 +14,7 @@ from unittest import mock
import time
import logging
+import datetime
import tempfile
from os.path import join
@@ -70,8 +71,7 @@ class TestCookiejar(unittest.TestCase):
self.assertEqual(len(cookies), 0)
self.assertEqual(mock_warning.call_count, 1)
- self.assertEqual(mock_warning.call_args[0][0], "cookies: %s")
- self.assertIsInstance(mock_warning.call_args[0][1], exc)
+ self.assertIsInstance(mock_warning.call_args[0][-1], exc)
class TestCookiedict(unittest.TestCase):
@@ -205,27 +205,32 @@ class TestCookieUtils(unittest.TestCase):
now = int(time.time())
log = logging.getLogger("generic")
- extr.cookies.set("a", "1", expires=now-100)
+ extr.cookies.set("a", "1", expires=now-100, domain=".example.org")
with mock.patch.object(log, "warning") as mw:
self.assertFalse(extr.cookies_check(("a",)))
self.assertEqual(mw.call_count, 1)
- self.assertEqual(mw.call_args[0], ("Cookie '%s' has expired", "a"))
+ self.assertEqual(mw.call_args[0], (
+ "cookies: %s/%s expired at %s", "example.org", "a",
+ datetime.datetime.fromtimestamp(now-100)))
- extr.cookies.set("a", "1", expires=now+100)
+ extr.cookies.set("a", "1", expires=now+100, domain=".example.org")
with mock.patch.object(log, "warning") as mw:
self.assertTrue(extr.cookies_check(("a",)))
self.assertEqual(mw.call_count, 1)
self.assertEqual(mw.call_args[0], (
- "Cookie '%s' will expire in less than %s hour%s", "a", 1, ""))
+ "cookies: %s/%s will expire in less than %s hour%s",
+ "example.org", "a", 1, ""))
- extr.cookies.set("a", "1", expires=now+100+7200)
+ extr.cookies.set("a", "1", expires=now+100+7200, domain=".example.org")
with mock.patch.object(log, "warning") as mw:
self.assertTrue(extr.cookies_check(("a",)))
self.assertEqual(mw.call_count, 1)
self.assertEqual(mw.call_args[0], (
- "Cookie '%s' will expire in less than %s hour%s", "a", 3, "s"))
+ "cookies: %s/%s will expire in less than %s hour%s",
+ "example.org", "a", 3, "s"))
- extr.cookies.set("a", "1", expires=now+100+24*3600)
+ extr.cookies.set(
+ "a", "1", expires=now+100+24*3600, domain=".example.org")
with mock.patch.object(log, "warning") as mw:
self.assertTrue(extr.cookies_check(("a",)))
self.assertEqual(mw.call_count, 0)
diff --git a/test/test_downloader.py b/test/test_downloader.py
index 5a9a20b..3e5bf84 100644
--- a/test/test_downloader.py
+++ b/test/test_downloader.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-# Copyright 2018-2022 Mike Fährmann
+# Copyright 2018-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -163,7 +163,7 @@ class TestDownloaderConfig(unittest.TestCase):
self.assertEqual(dl.timeout, 10)
self.assertEqual(dl.verify, False)
self.assertEqual(dl.mtime, False)
- self.assertEqual(dl.rate, 42)
+ self.assertEqual(dl.rate(), 42)
self.assertEqual(dl.part, False)
@@ -332,7 +332,7 @@ class HttpRequestHandler(http.server.BaseHTTPRequestHandler):
status = 206
match = re.match(r"bytes=(\d+)-", self.headers["Range"])
- start = int(match.group(1))
+ start = int(match[1])
headers["Content-Range"] = "bytes {}-{}/{}".format(
start, len(output)-1, len(output))
@@ -369,6 +369,8 @@ SAMPLES = {
("heic", b"????ftypheis"),
("heic", b"????ftypheix"),
("svg" , b"<?xml"),
+ ("html", b"<!DOCTYPE html><html>...</html>"),
+ ("html", b" \n \n\r\t\n <!DOCTYPE html><html>...</html>"),
("ico" , b"\x00\x00\x01\x00"),
("cur" , b"\x00\x00\x02\x00"),
("psd" , b"8BPS"),
diff --git a/test/test_extractor.py b/test/test_extractor.py
index dfc5ff8..bf4aa07 100644
--- a/test/test_extractor.py
+++ b/test/test_extractor.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-# Copyright 2018-2023 Mike Fährmann
+# Copyright 2018-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -122,8 +122,8 @@ class TestExtractorModule(unittest.TestCase):
extr = cls.from_url(url)
except ImportError as exc:
if exc.name in ("youtube_dl", "yt_dlp"):
- print("Skipping '{}' category checks".format(cls.category))
- return
+ return sys.stdout.write(
+ f"Skipping '{cls.category}' category checks\n")
raise
self.assertTrue(extr, url)
@@ -138,46 +138,8 @@ class TestExtractorModule(unittest.TestCase):
self.assertEqual(extr.subcategory, sub, url)
self.assertEqual(extr.basecategory, base, url)
- @unittest.skipIf(not results, "no test data")
- def test_unique_pattern_matches(self):
- # collect testcase URLs
- test_urls = []
- append = test_urls.append
-
- for result in results.all():
- if not result.get("#fail"):
- append((result["#url"], result["#class"]))
-
- # iterate over all testcase URLs
- for url, extr1 in test_urls:
- matches = []
-
- # ... and apply all regex patterns to each one
- for extr2 in _list_classes():
-
- # skip DirectlinkExtractor pattern if it isn't tested
- if extr1 != DirectlinkExtractor and \
- extr2 == DirectlinkExtractor:
- continue
-
- match = extr2.pattern.match(url)
- if match:
- matches.append((match, extr2))
-
- # fail if more or less than 1 match happened
- if len(matches) > 1:
- msg = "'{}' gets matched by more than one pattern:".format(url)
- for match, extr in matches:
- msg += "\n\n- {}:\n{}".format(
- extr.__name__, match.re.pattern)
- self.fail(msg)
-
- elif len(matches) < 1:
- msg = "'{}' isn't matched by any pattern".format(url)
- self.fail(msg)
-
- else:
- self.assertIs(extr1, matches[0][1], url)
+ if base not in ("reactor", "wikimedia"):
+ self.assertEqual(extr._cfgpath, ("extractor", cat, sub), url)
def test_init(self):
"""Test for exceptions in Extractor.initialize() and .finalize()"""
@@ -188,14 +150,16 @@ class TestExtractorModule(unittest.TestCase):
if cls.category == "ytdl":
continue
extr = cls.from_url(cls.example)
- if not extr and cls.basecategory and not cls.instances:
- continue
+ if not extr:
+ if cls.basecategory and not cls.instances:
+ continue
+ self.fail(f"{cls.__name__} pattern does not match "
+ f"example URL '{cls.example}'")
extr.request = fail_request
extr.initialize()
extr.finalize()
- @unittest.skipIf(sys.hexversion < 0x3060000, "test fails in CI")
def test_init_ytdl(self):
try:
extr = extractor.find("ytdl:")
@@ -293,8 +257,7 @@ class TestExtractorWait(unittest.TestCase):
u = self._isotime_to_seconds(until.time().isoformat()[:8])
self.assertLessEqual(o-u, 1.0)
- @staticmethod
- def _isotime_to_seconds(isotime):
+ def _isotime_to_seconds(self, isotime):
parts = isotime.split(":")
return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
diff --git a/test/test_formatter.py b/test/test_formatter.py
index 646f179..3305983 100644
--- a/test/test_formatter.py
+++ b/test/test_formatter.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-# Copyright 2021-2023 Mike Fährmann
+# Copyright 2021-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -15,11 +15,19 @@ import datetime
import tempfile
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-from gallery_dl import formatter, text, util # noqa E402
+from gallery_dl import formatter, text, util, config # noqa E402
+
+try:
+ import jinja2
+except ImportError:
+ jinja2 = None
class TestFormatter(unittest.TestCase):
+ def tearDown(self):
+ config.clear()
+
kwdict = {
"a": "hElLo wOrLd",
"b": "äöü",
@@ -27,16 +35,23 @@ class TestFormatter(unittest.TestCase):
"d": {"a": "foo", "b": 0, "c": None},
"i": 2,
"l": ["a", "b", "c"],
+ "L": [
+ {"name": "John Doe" , "age": 42, "email": "jd@example.org"},
+ {"name": "Jane Smith" , "age": 24, "email": None},
+ {"name": "Max Mustermann", "age": False},
+ ],
"n": None,
"s": " \n\r\tSPACE ",
+ "S": " \n\r\tS P A\tC\nE ",
"h": "<p>foo </p> &amp; bar <p> </p>",
"u": "&#x27;&lt; / &gt;&#x27;",
"t": 1262304000,
- "ds": "2010-01-01T01:00:00+0100",
+ "ds": "2010-01-01T01:00:00+01:00",
"dt": datetime.datetime(2010, 1, 1),
"dt_dst": datetime.datetime(2010, 6, 1),
"i_str": "12345",
"f_str": "12.45",
+ "lang": "en",
"name": "Name",
"title1": "Title",
"title2": "",
@@ -50,6 +65,7 @@ class TestFormatter(unittest.TestCase):
self._run_test("{a!c}", "Hello world")
self._run_test("{a!C}", "Hello World")
self._run_test("{s!t}", "SPACE")
+ self._run_test("{S!t}", "S P A\tC\nE")
self._run_test("{a!U}", self.kwdict["a"])
self._run_test("{u!U}", "'< / >'")
self._run_test("{a!H}", self.kwdict["a"])
@@ -65,13 +81,22 @@ class TestFormatter(unittest.TestCase):
self._run_test("{n!S}", "")
self._run_test("{t!d}", datetime.datetime(2010, 1, 1))
self._run_test("{t!d:%Y-%m-%d}", "2010-01-01")
+ self._run_test("{t!D}" , datetime.datetime(2010, 1, 1))
+ self._run_test("{ds!D}", datetime.datetime(2010, 1, 1))
+ self._run_test("{dt!D}", datetime.datetime(2010, 1, 1))
+ self._run_test("{t!D:%Y-%m-%d}", "2010-01-01")
self._run_test("{dt!T}", "1262304000")
self._run_test("{l!j}", '["a","b","c"]')
self._run_test("{dt!j}", '"2010-01-01 00:00:00"')
self._run_test("{a!g}", "hello-world")
- self._run_test("{a!L}", 11)
- self._run_test("{l!L}", 3)
- self._run_test("{d!L}", 3)
+ self._run_test("{lang!L}", "English")
+ self._run_test("{'fr'!L}", "French")
+ self._run_test("{a!L}", None)
+ self._run_test("{a!n}", 11)
+ self._run_test("{l!n}", 3)
+ self._run_test("{d!n}", 3)
+ self._run_test("{s!W}", "SPACE")
+ self._run_test("{S!W}", "S P A C E")
self._run_test("{i_str!i}", 12345)
self._run_test("{i_str!f}", 12345.0)
self._run_test("{f_str!f}", 12.45)
@@ -201,7 +226,7 @@ class TestFormatter(unittest.TestCase):
self._run_test("{j:[b:]}" , v)
self._run_test("{j:[b::]}" , v)
- def test_maxlen(self):
+ def test_specifier_maxlen(self):
v = self.kwdict["a"]
self._run_test("{a:L5/foo/}" , "foo")
self._run_test("{a:L50/foo/}", v)
@@ -209,7 +234,7 @@ class TestFormatter(unittest.TestCase):
self._run_test("{a:L50/foo/>51}", "foo")
self._run_test("{a:Lab/foo/}", "foo")
- def test_join(self):
+ def test_specifier_join(self):
self._run_test("{l:J}" , "abc")
self._run_test("{l:J,}" , "a,b,c")
self._run_test("{l:J,/}" , "a,b,c")
@@ -221,7 +246,7 @@ class TestFormatter(unittest.TestCase):
self._run_test("{a:J/}" , self.kwdict["a"])
self._run_test("{a:J, /}" , self.kwdict["a"])
- def test_replace(self):
+ def test_specifier_replace(self):
self._run_test("{a:Rh/C/}" , "CElLo wOrLd")
self._run_test("{a!l:Rh/C/}", "Cello world")
self._run_test("{a!u:Rh/C/}", "HELLO WORLD")
@@ -230,12 +255,12 @@ class TestFormatter(unittest.TestCase):
self._run_test("{a!l:Rl//}" , "heo word")
self._run_test("{name:Rame/othing/}", "Nothing")
- def test_datetime(self):
+ def test_specifier_datetime(self):
self._run_test("{ds:D%Y-%m-%dT%H:%M:%S%z}", "2010-01-01 00:00:00")
- self._run_test("{ds:D%Y}", "2010-01-01T01:00:00+0100")
+ self._run_test("{ds:D%Y}", "2010-01-01T01:00:00+01:00")
self._run_test("{l:D%Y}", "None")
- def test_offset(self):
+ def test_specifier_offset(self):
self._run_test("{dt:O 01:00}", "2010-01-01 01:00:00")
self._run_test("{dt:O+02:00}", "2010-01-01 02:00:00")
self._run_test("{dt:O-03:45}", "2009-12-31 20:15:00")
@@ -246,7 +271,7 @@ class TestFormatter(unittest.TestCase):
self._run_test("{ds:D%Y-%m-%dT%H:%M:%S%z/O1}", "2010-01-01 01:00:00")
self._run_test("{t!d:O2}", "2010-01-01 02:00:00")
- def test_offset_local(self):
+ def test_specifier_offset_local(self):
ts = self.kwdict["dt"].replace(
tzinfo=datetime.timezone.utc).timestamp()
offset = time.localtime(ts).tm_gmtoff
@@ -261,7 +286,7 @@ class TestFormatter(unittest.TestCase):
self._run_test("{dt_dst:O}", str(dt))
self._run_test("{dt_dst:Olocal}", str(dt))
- def test_sort(self):
+ def test_specifier_sort(self):
self._run_test("{l:S}" , "['a', 'b', 'c']")
self._run_test("{l:Sa}", "['a', 'b', 'c']")
self._run_test("{l:Sd}", "['c', 'b', 'a']")
@@ -293,6 +318,19 @@ class TestFormatter(unittest.TestCase):
with self.assertRaises(ValueError):
self._run_test("{a:Xfoo/ */}", "hello wo *")
+ def test_specifier_map(self):
+ self._run_test("{L:Mname/}" ,
+ "['John Doe', 'Jane Smith', 'Max Mustermann']")
+ self._run_test("{L:Mage/}" ,
+ "[42, 24, False]")
+
+ self._run_test("{a:Mname}", self.kwdict["a"])
+ self._run_test("{n:Mname}", "None")
+ self._run_test("{title4:Mname}", "0")
+
+ with self.assertRaises(ValueError):
+ self._run_test("{t:Mname", "")
+
def test_chain_special(self):
# multiple replacements
self._run_test("{a:Rh/C/RE/e/RL/l/}", "Cello wOrld")
@@ -314,6 +352,9 @@ class TestFormatter(unittest.TestCase):
# sort and join
self._run_test("{a:S/J}", " ELLOdhlorw")
+ # map and join
+ self._run_test("{L:Mname/J-}", "John Doe-Jane Smith-Max Mustermann")
+
def test_separator(self):
orig_separator = formatter._SEPARATOR
try:
@@ -420,7 +461,6 @@ class TestFormatter(unittest.TestCase):
self._run_test("\fE name * 2 + ' ' + a", "{}{} {}".format(
self.kwdict["name"], self.kwdict["name"], self.kwdict["a"]))
- @unittest.skipIf(sys.hexversion < 0x3060000, "no fstring support")
def test_fstring(self):
self._run_test("\fF {a}", self.kwdict["a"])
self._run_test("\fF {name}{name} {a}", "{}{} {}".format(
@@ -428,7 +468,6 @@ class TestFormatter(unittest.TestCase):
self._run_test("\fF foo-'\"{a.upper()}\"'-bar",
"""foo-'"{}"'-bar""".format(self.kwdict["a"].upper()))
- @unittest.skipIf(sys.hexversion < 0x3060000, "no fstring support")
def test_template_fstring(self):
with tempfile.TemporaryDirectory() as tmpdirname:
path1 = os.path.join(tmpdirname, "tpl1")
@@ -449,6 +488,90 @@ class TestFormatter(unittest.TestCase):
with self.assertRaises(OSError):
formatter.parse("\fTF /")
+ @unittest.skipIf(jinja2 is None, "no jinja2")
+ def test_jinja(self):
+ formatter.JinjaFormatter.env = None
+
+ self._run_test("\fJ {{a}}", self.kwdict["a"])
+ self._run_test("\fJ {{name}}{{name}} {{a}}", "{}{} {}".format(
+ self.kwdict["name"], self.kwdict["name"], self.kwdict["a"]))
+ self._run_test("\fJ foo-'\"{{a | upper}}\"'-bar",
+ """foo-'"{}"'-bar""".format(self.kwdict["a"].upper()))
+
+ @unittest.skipIf(jinja2 is None, "no jinja2")
+ def test_template_jinja(self):
+ formatter.JinjaFormatter.env = None
+
+ with tempfile.TemporaryDirectory() as tmpdirname:
+ path1 = os.path.join(tmpdirname, "tpl1")
+ path2 = os.path.join(tmpdirname, "tpl2")
+
+ with open(path1, "w") as fp:
+ fp.write("{{a}}")
+ fmt1 = formatter.parse("\fTJ " + path1)
+
+ with open(path2, "w") as fp:
+ fp.write("foo-'\"{{a | upper}}\"'-bar")
+ fmt2 = formatter.parse("\fTJ " + path2)
+
+ self.assertEqual(fmt1.format_map(self.kwdict), self.kwdict["a"])
+ self.assertEqual(fmt2.format_map(self.kwdict),
+ """foo-'"{}"'-bar""".format(self.kwdict["a"].upper()))
+
+ with self.assertRaises(OSError):
+ formatter.parse("\fTJ /")
+
+ @unittest.skipIf(jinja2 is None, "no jinja2")
+ def test_template_jinja_opts(self):
+ formatter.JinjaFormatter.env = None
+
+ with tempfile.TemporaryDirectory() as tmpdirname:
+ path_filters = os.path.join(tmpdirname, "jinja_filters.py")
+ path_template = os.path.join(tmpdirname, "jinja_template.txt")
+
+ config.set((), "jinja", {
+ "environment": {
+ "variable_start_string": "(((",
+ "variable_end_string" : ")))",
+ "keep_trailing_newline": True,
+ },
+ "filters": path_filters,
+ })
+
+ with open(path_filters, "w") as fp:
+ fp.write(r"""
+import re
+
+def datetime_format(value, format="%H:%M %d-%m-%y"):
+ return value.strftime(format)
+
+def sanitize(value):
+ return re.sub(r"\s+", " ", value.strip())
+
+__filters__ = {
+ "dt_fmt": datetime_format,
+ "sanitize_whitespace": sanitize,
+}
+""")
+
+ with open(path_template, "w") as fp:
+ fp.write("""\
+Present Day is ((( dt | dt_fmt("%B %d, %Y") )))
+Present Time is ((( dt | dt_fmt("%H:%M:%S") )))
+
+Hello ((( s | sanitize_whitespace ))).
+I hope there is enough "(((S|sanitize_whitespace)))" for you.
+""")
+ fmt = formatter.parse("\fTJ " + path_template)
+
+ self.assertEqual(fmt.format_map(self.kwdict), """\
+Present Day is January 01, 2010
+Present Time is 00:00:00
+
+Hello SPACE.
+I hope there is enough "S P A C E" for you.
+""")
+
def test_module(self):
with tempfile.TemporaryDirectory() as tmpdirname:
path = os.path.join(tmpdirname, "testmod.py")
@@ -488,10 +611,10 @@ def noarg():
fmt4 = formatter.parse("\fM " + path + ":lengths")
self.assertEqual(fmt1.format_map(self.kwdict), "'Title' by Name")
- self.assertEqual(fmt2.format_map(self.kwdict), "136")
+ self.assertEqual(fmt2.format_map(self.kwdict), "168")
self.assertEqual(fmt3.format_map(self.kwdict), "'Title' by Name")
- self.assertEqual(fmt4.format_map(self.kwdict), "136")
+ self.assertEqual(fmt4.format_map(self.kwdict), "168")
with self.assertRaises(TypeError):
self.assertEqual(fmt0.format_map(self.kwdict), "")
diff --git a/test/test_job.py b/test/test_job.py
index 3e6f85b..3aa28e8 100644
--- a/test/test_job.py
+++ b/test/test_job.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-# Copyright 2021-2023 Mike Fährmann
+# Copyright 2021-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -282,7 +282,12 @@ class TestDataJob(TestJob):
tjob = self.jobclass(extr, file=io.StringIO())
tjob.run()
self.assertEqual(
- tjob.data[-1], ("ZeroDivisionError", "division by zero"))
+ tjob.data[-1],
+ (-1, {
+ "error" : "ZeroDivisionError",
+ "message": "division by zero",
+ })
+ )
def test_private(self):
config.set(("output",), "private", True)
@@ -364,7 +369,7 @@ class TestExtractor(Extractor):
def __init__(self, match):
Extractor.__init__(self, match)
self.user = {"id": 123, "name": "test"}
- if match.group(1) == "self":
+ if match[1] == "self":
self.user["self"] = self.user
def items(self):
diff --git a/test/test_postprocessor.py b/test/test_postprocessor.py
index 76e728c..2e39cc7 100644
--- a/test/test_postprocessor.py
+++ b/test/test_postprocessor.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-# Copyright 2019-2023 Mike Fährmann
+# Copyright 2019-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -10,7 +10,7 @@
import os
import sys
import unittest
-from unittest.mock import Mock, mock_open, patch
+from unittest.mock import Mock, mock_open, patch, call
import shutil
import logging
@@ -20,7 +20,7 @@ import collections
from datetime import datetime
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-from gallery_dl import extractor, output, path # noqa E402
+from gallery_dl import extractor, output, path, util # noqa E402
from gallery_dl import postprocessor, config # noqa E402
from gallery_dl.postprocessor.common import PostProcessor # noqa E402
@@ -209,7 +209,10 @@ class ExecTest(BasePostprocessorTest):
self.pathfmt.realpath,
self.pathfmt.realdirectory,
self.pathfmt.filename),
- shell=True)
+ shell=True,
+ creationflags=0,
+ start_new_session=False,
+ )
i.wait.assert_called_once_with()
def test_command_list(self):
@@ -231,8 +234,46 @@ class ExecTest(BasePostprocessorTest):
self.pathfmt.realdirectory.upper(),
],
shell=False,
+ creationflags=0,
+ start_new_session=False,
)
+ def test_command_many(self):
+ self._create({
+ "commands": [
+ "echo {} {_path} {_directory} {_filename} && rm {};",
+ ["~/script.sh", "{category}", "\fE _directory.upper()"],
+ ]
+ })
+
+ with patch("gallery_dl.util.Popen") as p:
+ i = Mock()
+ i.wait.return_value = 0
+ p.return_value = i
+ self._trigger(("after",))
+
+ self.assertEqual(p.call_args_list, [
+ call(
+ "echo {0} {0} {1} {2} && rm {0};".format(
+ self.pathfmt.realpath,
+ self.pathfmt.realdirectory,
+ self.pathfmt.filename),
+ shell=True,
+ creationflags=0,
+ start_new_session=False,
+ ),
+ call(
+ [
+ os.path.expanduser("~/script.sh"),
+ self.pathfmt.kwdict["category"],
+ self.pathfmt.realdirectory.upper(),
+ ],
+ shell=False,
+ creationflags=0,
+ start_new_session=False,
+ ),
+ ])
+
def test_command_returncode(self):
self._create({
"command": "echo {}",
@@ -264,6 +305,49 @@ class ExecTest(BasePostprocessorTest):
self.assertTrue(p.called)
self.assertFalse(i.wait.called)
+ @unittest.skipIf(util.WINDOWS, "not POSIX")
+ def test_session_posix(self):
+ self._create({
+ "session": True,
+ "command": ["echo", "foobar"],
+ })
+
+ with patch("gallery_dl.util.Popen") as p:
+ i = Mock()
+ i.wait.return_value = 0
+ p.return_value = i
+ self._trigger(("after",))
+
+ p.assert_called_once_with(
+ ["echo", "foobar"],
+ shell=False,
+ creationflags=0,
+ start_new_session=True,
+ )
+ i.wait.assert_called_once_with()
+
+ @unittest.skipIf(not util.WINDOWS, "not Windows")
+ def test_session_windows(self):
+ self._create({
+ "session": True,
+ "command": ["echo", "foobar"],
+ })
+
+ with patch("gallery_dl.util.Popen") as p:
+ i = Mock()
+ i.wait.return_value = 0
+ p.return_value = i
+ self._trigger(("after",))
+
+ import subprocess
+ p.assert_called_once_with(
+ ["echo", "foobar"],
+ shell=False,
+ creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
+ start_new_session=False,
+ )
+ i.wait.assert_called_once_with()
+
class HashTest(BasePostprocessorTest):
@@ -345,9 +429,7 @@ class MetadataTest(BasePostprocessorTest):
path = self.pathfmt.realpath + ".JSON"
m.assert_called_once_with(path, "w", encoding="utf-8")
- if sys.hexversion >= 0x3060000:
- # python 3.4 & 3.5 have random order without 'sort: True'
- self.assertEqual(self._output(m), """{
+ self.assertEqual(self._output(m), """{
"category": "test",
"filename": "file",
"extension": "ext",
@@ -713,8 +795,7 @@ class MetadataTest(BasePostprocessorTest):
}
""")
- @staticmethod
- def _output(mock):
+ def _output(self, mock):
return "".join(
call[1][0]
for call in mock.mock_calls
@@ -727,32 +808,32 @@ class MtimeTest(BasePostprocessorTest):
def test_mtime_datetime(self):
self._create(None, {"date": datetime(1980, 1, 1)})
self._trigger()
- self.assertEqual(self.pathfmt.kwdict["_mtime"], 315532800)
+ self.assertEqual(self.pathfmt.kwdict["_mtime_meta"], 315532800)
def test_mtime_timestamp(self):
self._create(None, {"date": 315532800})
self._trigger()
- self.assertEqual(self.pathfmt.kwdict["_mtime"], 315532800)
+ self.assertEqual(self.pathfmt.kwdict["_mtime_meta"], 315532800)
def test_mtime_none(self):
self._create(None, {"date": None})
self._trigger()
- self.assertNotIn("_mtime", self.pathfmt.kwdict)
+ self.assertNotIn("_mtime_meta", self.pathfmt.kwdict)
def test_mtime_undefined(self):
self._create(None, {})
self._trigger()
- self.assertNotIn("_mtime", self.pathfmt.kwdict)
+ self.assertNotIn("_mtime_meta", self.pathfmt.kwdict)
def test_mtime_key(self):
self._create({"key": "foo"}, {"foo": 315532800})
self._trigger()
- self.assertEqual(self.pathfmt.kwdict["_mtime"], 315532800)
+ self.assertEqual(self.pathfmt.kwdict["_mtime_meta"], 315532800)
def test_mtime_value(self):
self._create({"value": "{foo}"}, {"foo": 315532800})
self._trigger()
- self.assertEqual(self.pathfmt.kwdict["_mtime"], 315532800)
+ self.assertEqual(self.pathfmt.kwdict["_mtime_meta"], 315532800)
class PythonTest(BasePostprocessorTest):
@@ -945,8 +1026,8 @@ class ZipTest(BasePostprocessorTest):
self._trigger(("finalize",))
self.assertEqual(pp.zfile.write.call_count, 3)
- for call in pp.zfile.write.call_args_list:
- args, kwargs = call
+ for call_args in pp.zfile.write.call_args_list:
+ args, kwargs = call_args
self.assertEqual(len(args), 2)
self.assertEqual(len(kwargs), 0)
self.assertEqual(args[0], self.pathfmt.temppath)
diff --git a/test/test_results.py b/test/test_results.py
index 6e04e1d..4b1c4c1 100644
--- a/test/test_results.py
+++ b/test/test_results.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-# Copyright 2015-2023 Mike Fährmann
+# Copyright 2015-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -80,9 +80,9 @@ class TestExtractorResults(unittest.TestCase):
@classmethod
def tearDownClass(cls):
if cls._skipped:
- print("\n\nSkipped tests:")
- for url, exc in cls._skipped:
- print('- {} ("{}")'.format(url, exc))
+ sys.stdout.write("\n\nSkipped tests:\n")
+ for url, reason in cls._skipped:
+ sys.stdout.write(f'- {url} ("{reason}")\n')
def assertRange(self, value, range, msg=None):
if range.step > 1:
@@ -91,6 +91,24 @@ class TestExtractorResults(unittest.TestCase):
self.assertLessEqual(value, range.stop, msg=msg)
self.assertGreaterEqual(value, range.start, msg=msg)
+ def assertLogEqual(self, expected, output):
+ if isinstance(expected, str):
+ expected = (expected,)
+ self.assertEqual(len(expected), len(output), "#log/count")
+
+ for exp, out in zip(expected, output):
+ level, name, message = out.split(":", 2)
+
+ if isinstance(exp, str):
+ return self.assertEqual(exp, message, "#log")
+
+ self.assertEqual(exp[0].lower(), level.lower(), "#log/level")
+ if len(exp) < 3:
+ self.assertEqual(exp[1], message, "#log/message")
+ else:
+ self.assertEqual(exp[1], name , "#log/name")
+ self.assertEqual(exp[2], message, "#log/message")
+
def _run_test(self, result):
if result.get("#fail"):
del result["#fail"]
@@ -145,7 +163,11 @@ class TestExtractorResults(unittest.TestCase):
return
try:
- tjob.run()
+ if "#log" in result:
+ with self.assertLogs() as log_info:
+ tjob.run()
+ else:
+ tjob.run()
except exception.StopExtraction:
pass
except exception.HttpError as exc:
@@ -156,6 +178,9 @@ class TestExtractorResults(unittest.TestCase):
self.skipTest(exc)
raise
+ if "#log" in result:
+ self.assertLogEqual(result["#log"], log_info.output)
+
if result.get("#archive", True):
self.assertEqual(
len(set(tjob.archive_list)),
@@ -220,13 +245,15 @@ class TestExtractorResults(unittest.TestCase):
for url, pat in zip(tjob.url_list, pattern):
self.assertRegex(url, pat, msg="#pattern")
- if "#urls" in result:
- expected = result["#urls"]
+ if "#results" in result:
+ expected = result["#results"]
if isinstance(expected, str):
- self.assertTrue(tjob.url_list, msg="#urls")
- self.assertEqual(tjob.url_list[0], expected, msg="#urls")
+ self.assertTrue(tjob.url_list, msg="#results")
+ self.assertEqual(
+ tjob.url_list[0], expected, msg="#results")
else:
- self.assertSequenceEqual(tjob.url_list, expected, msg="#urls")
+ self.assertSequenceEqual(
+ tjob.url_list, expected, msg="#results")
metadata = {k: v for k, v in result.items() if k[0] != "#"}
if metadata:
@@ -235,56 +262,74 @@ class TestExtractorResults(unittest.TestCase):
def _test_kwdict(self, kwdict, tests, parent=None):
for key, test in tests.items():
+
if key.startswith("?"):
key = key[1:]
if key not in kwdict:
continue
+ if key.endswith("[*]"):
+ key = key[:-3]
+ subtest = True
+ else:
+ subtest = False
+
path = "{}.{}".format(parent, key) if parent else key
+
if key.startswith("!"):
self.assertNotIn(key[1:], kwdict, msg=path)
continue
+
self.assertIn(key, kwdict, msg=path)
value = kwdict[key]
- if isinstance(test, dict):
- self._test_kwdict(value, test, path)
- elif isinstance(test, type):
- self.assertIsInstance(value, test, msg=path)
- elif isinstance(test, range):
- self.assertRange(value, test, msg=path)
- elif isinstance(test, set):
- try:
- self.assertIn(value, test, msg=path)
- except AssertionError:
- self.assertIn(type(value), test, msg=path)
- elif isinstance(test, list):
- subtest = False
- for idx, item in enumerate(test):
- if isinstance(item, dict):
- subtest = True
- subpath = "{}[{}]".format(path, idx)
- self._test_kwdict(value[idx], item, subpath)
- if not subtest:
- self.assertEqual(test, value, msg=path)
- elif isinstance(test, str):
- if test.startswith("re:"):
- self.assertRegex(value, test[3:], msg=path)
- elif test.startswith("dt:"):
- self.assertIsInstance(value, datetime.datetime, msg=path)
- self.assertEqual(test[3:], str(value), msg=path)
- elif test.startswith("type:"):
- self.assertEqual(test[5:], type(value).__name__, msg=path)
- elif test.startswith("len:"):
- cls, _, length = test[4:].rpartition(":")
- if cls:
- self.assertEqual(
- cls, type(value).__name__, msg=path + "/type")
- self.assertEqual(int(length), len(value), msg=path)
- else:
- self.assertEqual(test, value, msg=path)
+ if subtest:
+ self.assertNotIsInstance(value, str, msg=path)
+ for idx, item in enumerate(value):
+ subpath = "{}[{}]".format(path, idx)
+ self._test_kwdict_value(item, test, subpath)
else:
+ self._test_kwdict_value(value, test, path)
+
+ def _test_kwdict_value(self, value, test, path):
+ if isinstance(test, dict):
+ self._test_kwdict(value, test, path)
+ elif isinstance(test, type):
+ self.assertIsInstance(value, test, msg=path)
+ elif isinstance(test, range):
+ self.assertRange(value, test, msg=path)
+ elif isinstance(test, set):
+ try:
+ self.assertIn(value, test, msg=path)
+ except AssertionError:
+ self.assertIn(type(value), test, msg=path)
+ elif isinstance(test, list):
+ subtest = False
+ for idx, item in enumerate(test):
+ if isinstance(item, dict):
+ subtest = True
+ subpath = "{}[{}]".format(path, idx)
+ self._test_kwdict(value[idx], item, subpath)
+ if not subtest:
self.assertEqual(test, value, msg=path)
+ elif isinstance(test, str):
+ if test.startswith("re:"):
+ self.assertRegex(value, test[3:], msg=path)
+ elif test.startswith("dt:"):
+ self.assertIsInstance(value, datetime.datetime, msg=path)
+ self.assertEqual(test[3:], str(value), msg=path)
+ elif test.startswith("type:"):
+ self.assertEqual(test[5:], type(value).__name__, msg=path)
+ elif test.startswith("len:"):
+ cls, _, length = test[4:].rpartition(":")
+ if cls:
+ self.assertEqual(
+ cls, type(value).__name__, msg=path + "/type")
+ self.assertEqual(int(length), len(value), msg=path)
+ else:
+ self.assertEqual(test, value, msg=path)
+ else:
+ self.assertEqual(test, value, msg=path)
class ResultJob(job.DownloadJob):
@@ -402,27 +447,31 @@ class TestPathfmt():
class TestFormatter(formatter.StringFormatter):
- @staticmethod
- def _noop(_):
- return ""
-
def _apply_simple(self, key, fmt):
if key == "extension" or "_parse_optional." in repr(fmt):
- return self._noop
-
- def wrap(obj):
- return fmt(obj[key])
+ def wrap(obj):
+ try:
+ return fmt(obj[key])
+ except KeyError:
+ return ""
+ else:
+ def wrap(obj):
+ return fmt(obj[key])
return wrap
def _apply(self, key, funcs, fmt):
if key == "extension" or "_parse_optional." in repr(fmt):
- return self._noop
-
- def wrap(obj):
- obj = obj[key]
- for func in funcs:
- obj = func(obj)
- return fmt(obj)
+ def wrap(obj):
+ obj = obj[key] if key in obj else ""
+ for func in funcs:
+ obj = func(obj)
+ return fmt(obj)
+ else:
+ def wrap(obj):
+ obj = obj[key]
+ for func in funcs:
+ obj = func(obj)
+ return fmt(obj)
return wrap
@@ -457,7 +506,10 @@ def generate_tests():
"""Dynamically generate extractor unittests"""
def _generate_method(result):
def test(self):
- print("\n" + result["#url"])
+ sys.stdout.write(f"\n{result['#url']}\n")
+ if "#comment" in result:
+ sys.stdout.write(f"# {result['#comment']}\n")
+
try:
self._run_test(result)
except KeyboardInterrupt as exc:
diff --git a/test/test_text.py b/test/test_text.py
index d42507c..13029d2 100644
--- a/test/test_text.py
+++ b/test/test_text.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-# Copyright 2015-2022 Mike Fährmann
+# Copyright 2015-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -23,6 +23,20 @@ INVALID_ALT = ((), [], {}, None, "")
class TestText(unittest.TestCase):
+ def test_re(self):
+ p1 = text.re_compile("foo")
+ p2 = text.re("foo")
+ p3 = text.re("foo")
+
+ Pattern = text.re_module.Pattern
+ self.assertIsInstance(p1, Pattern)
+ self.assertIsInstance(p2, Pattern)
+ self.assertIsInstance(p3, Pattern)
+
+ self.assertEqual(p1, p2)
+ self.assertIsNot(p1, p2)
+ self.assertIs(p2, p3)
+
def test_remove_html(self, f=text.remove_html):
result = "Hello World."
@@ -92,6 +106,17 @@ class TestText(unittest.TestCase):
self.assertEqual(f(1), "1")
self.assertEqual(f(2.3), "23")
+ def test_sanitize_whitespace(self, f=text.sanitize_whitespace):
+ self.assertEqual(f("Hello World"), "Hello World")
+ self.assertEqual(f("Hello\tWorld"), "Hello World")
+ self.assertEqual(f(" Hello World "), "Hello World")
+ self.assertEqual(f("\tHello \n\tWorld "), "Hello World")
+
+ self.assertEqual(f(""), "")
+ self.assertEqual(f(" "), "")
+ self.assertEqual(f(" "), "")
+ self.assertEqual(f(" \t\n "), "")
+
def test_ensure_http_scheme(self, f=text.ensure_http_scheme):
result = "https://example.org/filename.ext"
@@ -241,6 +266,29 @@ class TestText(unittest.TestCase):
self.assertEqual(f(txt , value, ">") , (None, -1))
self.assertEqual(f(txt , "<" , value), (None, -1))
+ def test_rextr(self, f=text.rextr):
+ txt = "<a><b>"
+ self.assertEqual(f(txt, "<", ">"), "b")
+ self.assertEqual(f(txt, "X", ">"), "")
+ self.assertEqual(f(txt, "<", "X"), "")
+
+ # 'pos' argument
+ for i in range(10, 3, -1):
+ self.assertEqual(f(txt, "<", ">", i), "b")
+ for i in range(3, 0, -1):
+ self.assertEqual(f(txt, "<", ">", i), "a")
+
+ # 'default' argument
+ self.assertEqual(f(txt, "[", "]", -1, "none"), "none")
+ self.assertEqual(f(txt, "[", "]", None, "none"), "none")
+ self.assertEqual(f(txt, "[", "]", default="none"), "none")
+
+ # invalid arguments
+ for value in INVALID:
+ self.assertEqual(f(value, "<" , ">") , "")
+ self.assertEqual(f(txt , value, ">") , "")
+ self.assertEqual(f(txt , "<" , value), "")
+
def test_extract_all(self, f=text.extract_all):
txt = "[c][b][a]: xyz! [d][e"
@@ -336,6 +384,8 @@ class TestText(unittest.TestCase):
)
def test_parse_bytes(self, f=text.parse_bytes):
+ self.assertEqual(f(0), 0)
+ self.assertEqual(f(50), 50)
self.assertEqual(f("0"), 0)
self.assertEqual(f("50"), 50)
self.assertEqual(f("50k"), 50 * 1024**1)
@@ -343,10 +393,13 @@ class TestText(unittest.TestCase):
self.assertEqual(f("50g"), 50 * 1024**3)
self.assertEqual(f("50t"), 50 * 1024**4)
self.assertEqual(f("50p"), 50 * 1024**5)
+ self.assertEqual(f(" 50p "), 50 * 1024**5)
# fractions
+ self.assertEqual(f(123.456), 123)
self.assertEqual(f("123.456"), 123)
self.assertEqual(f("123.567"), 124)
+ self.assertEqual(f(" 123.89 "), 124)
self.assertEqual(f("0.5M"), round(0.5 * 1024**2))
# invalid arguments
@@ -405,8 +458,12 @@ class TestText(unittest.TestCase):
# missing value
self.assertEqual(f("bar"), {})
+ self.assertEqual(f("bar="), {"bar": ""})
self.assertEqual(f("foo=1&bar"), {"foo": "1"})
+ self.assertEqual(f("foo=1&bar="), {"foo": "1", "bar": ""})
self.assertEqual(f("foo=1&bar&baz=3"), {"foo": "1", "baz": "3"})
+ self.assertEqual(f("foo=1&bar=&baz=3"),
+ {"foo": "1", "bar": "", "baz": "3"})
# keys with identical names
self.assertEqual(f("foo=1&foo=2"), {"foo": "1"})
@@ -424,6 +481,8 @@ class TestText(unittest.TestCase):
self.assertEqual(f(""), {})
self.assertEqual(f("foo=1"), {"foo": "1"})
self.assertEqual(f("foo=1&bar=2"), {"foo": "1", "bar": "2"})
+ self.assertEqual(f("%C3%A4%26=%E3%81%82%E3%81%A8&%23=%3F"),
+ {"ä&": "あと", "#": "?"})
# missing value
self.assertEqual(f("bar"), {})
@@ -441,6 +500,21 @@ class TestText(unittest.TestCase):
for value in INVALID:
self.assertEqual(f(value), {})
+ def test_build_query(self, f=text.build_query):
+ # standard usage
+ self.assertEqual(f({}), "")
+ self.assertEqual(f({"foo": "1"}), "foo=1")
+ self.assertEqual(f({"foo": "1", "bar": "2"}), "foo=1&bar=2")
+
+ # missing value
+ self.assertEqual(f({"bar": ""}), "bar=")
+ self.assertEqual(f({"foo": "1", "bar": ""}), "foo=1&bar=")
+ self.assertEqual(f({"foo": "1", "bar": "", "baz": "3"}),
+ "foo=1&bar=&baz=3")
+
+ self.assertEqual(f({"ä&": "あと", "#": "?"}),
+ "%C3%A4%26=%E3%81%82%E3%81%A8&%23=%3F")
+
def test_parse_timestamp(self, f=text.parse_timestamp):
null = util.datetime_utcfromtimestamp(0)
value = util.datetime_utcfromtimestamp(1555816235)
diff --git a/test/test_util.py b/test/test_util.py
index 27f78ec..00e8c4b 100644
--- a/test/test_util.py
+++ b/test/test_util.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-# Copyright 2015-2023 Mike Fährmann
+# Copyright 2015-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -10,6 +10,7 @@
import os
import sys
import unittest
+from unittest.mock import patch
import io
import time
@@ -27,11 +28,18 @@ from gallery_dl import util, text, exception # noqa E402
class TestRange(unittest.TestCase):
- def test_parse_empty(self, f=util.RangePredicate._parse):
+ def setUp(self):
+ self.predicate = util.RangePredicate("")
+
+ def test_parse_empty(self):
+ f = self.predicate._parse
+
self.assertEqual(f(""), [])
self.assertEqual(f([]), [])
- def test_parse_digit(self, f=util.RangePredicate._parse):
+ def test_parse_digit(self):
+ f = self.predicate._parse
+
self.assertEqual(f("2"), [range(2, 3)])
self.assertEqual(
@@ -41,7 +49,9 @@ class TestRange(unittest.TestCase):
range(4, 5)],
)
- def test_parse_range(self, f=util.RangePredicate._parse):
+ def test_parse_range(self):
+ f = self.predicate._parse
+
self.assertEqual(f("1-2"), [range(1, 3)])
self.assertEqual(f("2-"), [range(2, sys.maxsize)])
self.assertEqual(f("-3"), [range(1, 4)])
@@ -61,7 +71,9 @@ class TestRange(unittest.TestCase):
range(2, 7)],
)
- def test_parse_slice(self, f=util.RangePredicate._parse):
+ def test_parse_slice(self):
+ f = self.predicate._parse
+
self.assertEqual(f("2:4") , [range(2, 4)])
self.assertEqual(f("3::") , [range(3, sys.maxsize)])
self.assertEqual(f(":4:") , [range(1, 4)])
@@ -149,6 +161,10 @@ class TestPredicate(unittest.TestCase):
self.assertFalse(pred(url, {"a": 2}))
+ pred = util.FilterPredicate("re.search(r'.+', url)")
+ self.assertTrue(pred(url, {"url": "https://example.org/"}))
+ self.assertFalse(pred(url, {"url": ""}))
+
def test_build_predicate(self):
pred = util.build_predicate([])
self.assertIsInstance(pred, type(lambda: True))
@@ -390,6 +406,89 @@ def hash(value):
self.assertEqual(expr(value), result)
+class TestDatetime(unittest.TestCase):
+
+ def test_to_datetime(self, f=util.to_datetime):
+
+ def _assert(value, expected):
+ result = f(value)
+ self.assertIsInstance(result, datetime.datetime)
+ self.assertEqual(result, expected, msg=repr(value))
+
+ dt = datetime.datetime(2010, 1, 1)
+ self.assertIs(f(dt), dt)
+
+ _assert(dt , dt)
+ _assert(1262304000 , dt)
+ _assert(1262304000.0 , dt)
+ _assert(1262304000.123, dt)
+ _assert("1262304000" , dt)
+
+ _assert("2010-01-01" , dt)
+ _assert("2010-01-01 00:00:00" , dt)
+ _assert("2010-01-01T00:00:00" , dt)
+ _assert("2010-01-01T00:00:00.123456" , dt)
+ _assert("2009-12-31T19:00:00-05:00" , dt)
+ _assert("2009-12-31T19:00:00.123456-05:00", dt)
+ _assert("2010-01-01T00:00:00Z" , dt)
+ _assert("2010-01-01T00:00:00.123456Z" , dt)
+
+ _assert(0 , util.EPOCH)
+ _assert("" , util.EPOCH)
+ _assert("foo", util.EPOCH)
+ _assert(None , util.EPOCH)
+ _assert(() , util.EPOCH)
+ _assert([] , util.EPOCH)
+ _assert({} , util.EPOCH)
+ _assert((1, 2, 3), util.EPOCH)
+
+ @unittest.skipIf(sys.hexversion < 0x30b0000,
+ "extended fromisoformat timezones")
+ def test_to_datetime_tz(self, f=util.to_datetime):
+
+ def _assert(value, expected):
+ result = f(value)
+ self.assertIsInstance(result, datetime.datetime)
+ self.assertEqual(result, expected, msg=repr(value))
+
+ dt = datetime.datetime(2010, 1, 1)
+
+ _assert("2009-12-31T19:00:00-05" , dt)
+ _assert("2009-12-31T19:00:00-0500" , dt)
+ _assert("2009-12-31T19:00:00.123456-05" , dt)
+ _assert("2009-12-31T19:00:00.123456-0500" , dt)
+
+ def test_datetime_to_timestamp(self, f=util.datetime_to_timestamp):
+ self.assertEqual(f(util.EPOCH), 0.0)
+ self.assertEqual(f(datetime.datetime(2010, 1, 1)), 1262304000.0)
+ self.assertEqual(f(datetime.datetime(2010, 1, 1, 0, 0, 0, 128000)),
+ 1262304000.128000)
+ with self.assertRaises(TypeError):
+ f(None)
+
+ def test_datetime_to_timestamp_string(
+ self, f=util.datetime_to_timestamp_string):
+ self.assertEqual(f(util.EPOCH), "0")
+ self.assertEqual(f(datetime.datetime(2010, 1, 1)), "1262304000")
+ self.assertEqual(f(None), "")
+
+ def test_datetime_from_timestamp(
+ self, f=util.datetime_from_timestamp):
+ self.assertEqual(f(0.0), util.EPOCH)
+ self.assertEqual(f(1262304000.0), datetime.datetime(2010, 1, 1))
+ self.assertEqual(f(1262304000.128000).replace(microsecond=0),
+ datetime.datetime(2010, 1, 1, 0, 0, 0))
+
+ def test_datetime_utcfromtimestamp(
+ self, f=util.datetime_utcfromtimestamp):
+ self.assertEqual(f(0.0), util.EPOCH)
+ self.assertEqual(f(1262304000.0), datetime.datetime(2010, 1, 1))
+
+ def test_datetime_utcnow(
+ self, f=util.datetime_utcnow):
+ self.assertIsInstance(f(), datetime.datetime)
+
+
class TestOther(unittest.TestCase):
def test_bencode(self):
@@ -492,6 +591,7 @@ class TestOther(unittest.TestCase):
def test_noop(self):
self.assertEqual(util.noop(), None)
+ self.assertEqual(util.noop(...), None)
def test_md5(self):
self.assertEqual(util.md5(b""),
@@ -552,17 +652,21 @@ value = 123
self.assertEqual(module.value, 123)
self.assertIs(module.datetime, datetime)
- def test_build_duration_func(self, f=util.build_duration_func):
+ def test_build_selection_func(self, f=util.build_selection_func):
- def test_single(df, v):
+ def test_single(df, v, type=None):
for _ in range(10):
self.assertEqual(df(), v)
+ if type is not None:
+ self.assertIsInstance(df(), type)
- def test_range(df, lower, upper):
+ def test_range(df, lower, upper, type=None):
for __ in range(10):
v = df()
self.assertGreaterEqual(v, lower)
self.assertLessEqual(v, upper)
+ if type is not None:
+ self.assertIsInstance(v, type)
for v in (0, 0.0, "", None, (), []):
self.assertIsNone(f(v))
@@ -570,16 +674,24 @@ value = 123
for v in (0, 0.0, "", None, (), []):
test_single(f(v, 1.0), 1.0)
- test_single(f(3), 3)
- test_single(f(3.0), 3.0)
- test_single(f("3"), 3)
- test_single(f("3.0-"), 3)
- test_single(f(" 3 -"), 3)
+ test_single(f(3) , 3 , float)
+ test_single(f(3.0) , 3.0, float)
+ test_single(f("3") , 3 , float)
+ test_single(f("3.0-") , 3 , float)
+ test_single(f(" 3 -"), 3 , float)
- test_range(f((2, 4)), 2, 4)
- test_range(f([2, 4]), 2, 4)
- test_range(f("2-4"), 2, 4)
- test_range(f(" 2.0 - 4 "), 2, 4)
+ test_range(f((2, 4)) , 2, 4, float)
+ test_range(f([2.0, 4.0]) , 2, 4, float)
+ test_range(f("2-4") , 2, 4, float)
+ test_range(f(" 2.0 - 4 "), 2, 4, float)
+
+ pb = text.parse_bytes
+ test_single(f("3", 0, pb) , 3, int)
+ test_single(f("3.0-", 0, pb) , 3, int)
+ test_single(f(" 3 -", 0, pb), 3, int)
+
+ test_range(f("2k-4k", 0, pb) , 2048, 4096, int)
+ test_range(f(" 2.0k - 4k ", 0, pb), 2048, 4096, int)
def test_extractor_filter(self):
# empty
@@ -765,40 +877,16 @@ value = 123
self.assertEqual(f(["a", "b", "c"]), "a, b, c")
self.assertEqual(f([1, 2, 3]), "1, 2, 3")
- def test_datetime_to_timestamp(self, f=util.datetime_to_timestamp):
- self.assertEqual(f(util.EPOCH), 0.0)
- self.assertEqual(f(datetime.datetime(2010, 1, 1)), 1262304000.0)
- self.assertEqual(f(datetime.datetime(2010, 1, 1, 0, 0, 0, 128000)),
- 1262304000.128000)
- with self.assertRaises(TypeError):
- f(None)
-
- def test_datetime_to_timestamp_string(
- self, f=util.datetime_to_timestamp_string):
- self.assertEqual(f(util.EPOCH), "0")
- self.assertEqual(f(datetime.datetime(2010, 1, 1)), "1262304000")
- self.assertEqual(f(None), "")
-
- def test_datetime_from_timestamp(
- self, f=util.datetime_from_timestamp):
- self.assertEqual(f(0.0), util.EPOCH)
- self.assertEqual(f(1262304000.0), datetime.datetime(2010, 1, 1))
- self.assertEqual(f(1262304000.128000).replace(microsecond=0),
- datetime.datetime(2010, 1, 1, 0, 0, 0))
-
- def test_datetime_utcfromtimestamp(
- self, f=util.datetime_utcfromtimestamp):
- self.assertEqual(f(0.0), util.EPOCH)
- self.assertEqual(f(1262304000.0), datetime.datetime(2010, 1, 1))
-
- def test_datetime_utcnow(
- self, f=util.datetime_utcnow):
- self.assertIsInstance(f(), datetime.datetime)
-
def test_universal_none(self):
obj = util.NONE
self.assertFalse(obj)
+ self.assertEqual(obj, obj)
+ self.assertEqual(obj, None)
+ self.assertNotEqual(obj, False)
+ self.assertNotEqual(obj, 0)
+ self.assertNotEqual(obj, "")
+
self.assertEqual(len(obj), 0)
self.assertEqual(int(obj), 0)
self.assertEqual(hash(obj), 0)
@@ -873,6 +961,26 @@ value = 123
i += 1
self.assertEqual(i, 0)
+ def test_HTTPBasicAuth(self, f=util.HTTPBasicAuth):
+ class Request:
+ headers = {}
+ request = Request()
+
+ auth = f("", "")
+ auth(request)
+ self.assertEqual(request.headers["Authorization"],
+ b"Basic Og==")
+
+ f("foo", "bar")(request)
+ self.assertEqual(request.headers["Authorization"],
+ b"Basic Zm9vOmJhcg==")
+
+ f("ewsxcvbhnjtr",
+ "RVXQ4i9Ju5ypi86VGJ8MqhDYpDKluS0sxiSRBAG7ymB3Imok")(request)
+ self.assertEqual(request.headers["Authorization"],
+ b"Basic ZXdzeGN2YmhuanRyOlJWWFE0aTlKdTV5cGk4NlZHSjhNc"
+ b"WhEWXBES2x1UzBzeGlTUkJBRzd5bUIzSW1vaw==")
+
def test_module_proxy(self):
proxy = util.ModuleProxy()
@@ -887,6 +995,16 @@ value = 123
self.assertIs(proxy["abc.def.ghi"], util.NONE)
self.assertIs(proxy["os.path2"], util.NONE)
+ def test_lazy_prompt(self):
+ prompt = util.LazyPrompt()
+
+ with patch("getpass.getpass") as p:
+ p.return_value = "***"
+ result = str(prompt)
+
+ self.assertEqual(result, "***")
+ p.assert_called_once_with()
+
def test_null_context(self):
with util.NullContext():
pass
@@ -901,6 +1019,28 @@ value = 123
except ValueError as exc:
self.assertIs(exc, exc_orig)
+ def test_null_response(self):
+ response = util.NullResponse("https://example.org")
+
+ self.assertEqual(response.url, "https://example.org")
+ self.assertEqual(response.status_code, 900)
+ self.assertEqual(response.reason, "")
+ self.assertEqual(response.text, "")
+ self.assertEqual(response.content, b"")
+ self.assertEqual(response.json(), {})
+
+ self.assertFalse(response.ok)
+ self.assertFalse(response.is_redirect)
+ self.assertFalse(response.is_permanent_redirect)
+ self.assertFalse(response.history)
+
+ self.assertEqual(response.encoding, "utf-8")
+ self.assertEqual(response.apparent_encoding, "utf-8")
+ self.assertEqual(response.cookies.get("foo"), None)
+ self.assertEqual(response.headers.get("foo"), None)
+ self.assertEqual(response.links.get("next"), None)
+ self.assertEqual(response.close(), None)
+
class TestExtractor():
category = "test_category"
diff --git a/test/test_ytdl.py b/test/test_ytdl.py
index f7eb671..ecc6d2f 100644
--- a/test/test_ytdl.py
+++ b/test/test_ytdl.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-# Copyright 2022-2023 Mike Fährmann
+# Copyright 2022-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -26,6 +26,7 @@ class Test_CommandlineArguments(unittest.TestCase):
raise unittest.SkipTest("cannot import module '{}'".format(
cls.module_name))
cls.default = ytdl.parse_command_line(cls.module, [])
+ cls.ytdlp = hasattr(cls.module, "cookies")
def test_ignore_errors(self):
self._("--ignore-errors" , "ignoreerrors", True)
@@ -155,21 +156,21 @@ class Test_CommandlineArguments(unittest.TestCase):
def test_subs(self):
opts = self._(["--convert-subs", "srt"])
conv = {"key": "FFmpegSubtitlesConvertor", "format": "srt"}
- if self.module_name == "yt_dlp":
+ if self.ytdlp:
conv["when"] = "before_dl"
self.assertEqual(opts["postprocessors"][0], conv)
def test_embed(self):
subs = {"key": "FFmpegEmbedSubtitle"}
thumb = {"key": "EmbedThumbnail", "already_have_thumbnail": False}
- if self.module_name == "yt_dlp":
+ if self.ytdlp:
subs["already_have_subtitle"] = False
opts = self._(["--embed-subs", "--embed-thumbnail"])
self.assertEqual(opts["postprocessors"][:2], [subs, thumb])
thumb["already_have_thumbnail"] = True
- if self.module_name == "yt_dlp":
+ if self.ytdlp:
subs["already_have_subtitle"] = True
thumb["already_have_thumbnail"] = "all"
@@ -212,7 +213,7 @@ class Test_CommandlineArguments(unittest.TestCase):
"--ignore-config",
]
- if self.module_name != "yt_dlp":
+ if not self.ytdlp:
cmdline.extend((
"--dump-json",
"--dump-single-json",