summaryrefslogtreecommitdiffstats
path: root/gallery_dl/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'gallery_dl/util.py')
-rw-r--r--gallery_dl/util.py260
1 files changed, 161 insertions, 99 deletions
diff --git a/gallery_dl/util.py b/gallery_dl/util.py
index ba31ea7..4027ac6 100644
--- a/gallery_dl/util.py
+++ b/gallery_dl/util.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
-# Copyright 2017-2023 Mike Fährmann
+# Copyright 2017-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -8,7 +8,6 @@
"""Utility functions and classes"""
-import re
import os
import sys
import json
@@ -27,11 +26,6 @@ from http.cookiejar import Cookie
from email.utils import mktime_tz, parsedate_tz
from . import text, version, exception
-try:
- re_compile = re._compiler.compile
-except AttributeError:
- re_compile = re.sre_compile.compile
-
def bencode(num, alphabet="0123456789"):
"""Encode an integer into a base-N encoded string"""
@@ -48,8 +42,7 @@ def bdecode(data, alphabet="0123456789"):
num = 0
base = len(alphabet)
for c in data:
- num *= base
- num += alphabet.index(c)
+ num = num * base + alphabet.find(c)
return num
@@ -135,7 +128,7 @@ def false(_, __=None):
return False
-def noop():
+def noop(_=None):
"""Does nothing"""
@@ -159,18 +152,17 @@ def sha1(s):
def generate_token(size=16):
"""Generate a random token with hexadecimal digits"""
- data = random.getrandbits(size * 8).to_bytes(size, "big")
- return binascii.hexlify(data).decode()
+ return random.getrandbits(size * 8).to_bytes(size, "big").hex()
def format_value(value, suffixes="kMGTPEZY"):
- value = format(value)
+ value = str(value)
value_len = len(value)
index = value_len - 4
if index >= 0:
offset = (value_len - 1) % 3 + 1
- return (value[:offset] + "." + value[offset:offset+2] +
- suffixes[index // 3])
+ return (f"{value[:offset]}.{value[offset:offset+2]}"
+ f"{suffixes[index // 3]}")
return value
@@ -236,6 +228,34 @@ def to_string(value):
return str(value)
+def to_datetime(value):
+ """Convert 'value' to a datetime object"""
+ if not value:
+ return EPOCH
+
+ if isinstance(value, datetime.datetime):
+ return value
+
+ if isinstance(value, str):
+ try:
+ if value[-1] == "Z":
+ # compat for Python < 3.11
+ value = value[:-1]
+ dt = datetime.datetime.fromisoformat(value)
+ if dt.tzinfo is None:
+ if dt.microsecond:
+ dt = dt.replace(microsecond=0)
+ else:
+ # convert to naive UTC
+ dt = dt.astimezone(datetime.timezone.utc).replace(
+ microsecond=0, tzinfo=None)
+ return dt
+ except Exception:
+ pass
+
+ return text.parse_timestamp(value, EPOCH)
+
+
def datetime_to_timestamp(dt):
"""Convert naive UTC datetime to Unix timestamp"""
return (dt - EPOCH) / SECOND
@@ -298,7 +318,32 @@ def dump_response(response, fp, headers=False, content=True, hide_auth=True):
request = response.request
req_headers = request.headers.copy()
res_headers = response.headers.copy()
- outfmt = """\
+
+ if hide_auth:
+ if authorization := req_headers.get("Authorization"):
+ atype, sep, _ = str(authorization).partition(" ")
+ req_headers["Authorization"] = f"{atype} ***" if sep else "***"
+
+ if cookie := req_headers.get("Cookie"):
+ req_headers["Cookie"] = ";".join(
+ c.partition("=")[0] + "=***"
+ for c in cookie.split(";")
+ )
+
+ if set_cookie := res_headers.get("Set-Cookie"):
+ res_headers["Set-Cookie"] = re(r"(^|, )([^ =]+)=[^,;]*").sub(
+ r"\1\2=***", set_cookie)
+
+ request_headers = "\n".join(
+ f"{name}: {value}"
+ for name, value in req_headers.items()
+ )
+ response_headers = "\n".join(
+ f"{name}: {value}"
+ for name, value in res_headers.items()
+ )
+
+ output = f"""\
{request.method} {request.url}
Status: {response.status_code} {response.reason}
@@ -307,49 +352,17 @@ Request Headers
{request_headers}
"""
if request.body:
- outfmt += """
+ output = f"""{output}
Request Body
------------
{request.body}
"""
- outfmt += """
+ output = f"""{output}
Response Headers
----------------
{response_headers}
"""
- if hide_auth:
- authorization = req_headers.get("Authorization")
- if authorization:
- atype, sep, _ = str(authorization).partition(" ")
- req_headers["Authorization"] = atype + " ***" if sep else "***"
-
- cookie = req_headers.get("Cookie")
- if cookie:
- req_headers["Cookie"] = ";".join(
- c.partition("=")[0] + "=***"
- for c in cookie.split(";")
- )
-
- set_cookie = res_headers.get("Set-Cookie")
- if set_cookie:
- res_headers["Set-Cookie"] = re.sub(
- r"(^|, )([^ =]+)=[^,;]*", r"\1\2=***", set_cookie,
- )
-
- fmt_nv = "{}: {}".format
-
- fp.write(outfmt.format(
- request=request,
- response=response,
- request_headers="\n".join(
- fmt_nv(name, value)
- for name, value in req_headers.items()
- ),
- response_headers="\n".join(
- fmt_nv(name, value)
- for name, value in res_headers.items()
- ),
- ).encode())
+ fp.write(output.encode())
if content:
if headers:
@@ -361,14 +374,11 @@ def extract_headers(response):
headers = response.headers
data = dict(headers)
- hcd = headers.get("content-disposition")
- if hcd:
- name = text.extr(hcd, 'filename="', '"')
- if name:
+ if hcd := headers.get("content-disposition"):
+ if name := text.extr(hcd, 'filename="', '"'):
text.nameext_from_url(name, data)
- hlm = headers.get("last-modified")
- if hlm:
+ if hlm := headers.get("last-modified"):
data["date"] = datetime.datetime(*parsedate_tz(hlm)[:6])
return data
@@ -488,8 +498,7 @@ def cookiestxt_load(fp):
def cookiestxt_store(fp, cookies):
"""Write 'cookies' in Netscape cookies.txt format to 'fp'"""
- write = fp.write
- write("# Netscape HTTP Cookie File\n\n")
+ fp.write("# Netscape HTTP Cookie File\n\n")
for cookie in cookies:
if not cookie.domain:
@@ -503,7 +512,7 @@ def cookiestxt_store(fp, cookies):
value = cookie.value
domain = cookie.domain
- write("\t".join((
+ fp.write("\t".join((
domain,
"TRUE" if domain and domain[0] == "." else "FALSE",
cookie.path,
@@ -568,8 +577,7 @@ class HTTPBasicAuth():
def __init__(self, username, password):
self.authorization = b"Basic " + binascii.b2a_base64(
- username.encode("latin1") + b":" + str(password).encode("latin1")
- )[:-1]
+ f"{username}:{password}".encode("latin1"), newline=False)
def __call__(self, request):
request.headers["Authorization"] = self.authorization
@@ -611,6 +619,28 @@ class NullContext():
pass
+class NullResponse():
+ __slots__ = ("url", "reason")
+
+ ok = is_redirect = is_permanent_redirect = False
+ cookies = headers = history = links = {}
+ encoding = apparent_encoding = "utf-8"
+ content = b""
+ text = ""
+ status_code = 900
+ close = noop
+
+ def __init__(self, url, reason=""):
+ self.url = url
+ self.reason = str(reason)
+
+ def __str__(self):
+ return "900 " + self.reason
+
+ def json(self):
+ return {}
+
+
class CustomNone():
"""None-style type that supports more operations than regular None"""
__slots__ = ()
@@ -622,15 +652,14 @@ class CustomNone():
def __call__(self, *args, **kwargs):
return self
- @staticmethod
- def __next__():
+ def __next__(self):
raise StopIteration
def __eq__(self, other):
- return self is other
+ return other is self or other is None
def __ne__(self, other):
- return self is not other
+ return other is not self and other is not None
__lt__ = true
__le__ = true
@@ -671,25 +700,40 @@ class CustomNone():
__abs__ = identity
__invert__ = identity
- @staticmethod
- def __len__():
+ def __len__(self):
return 0
__int__ = __len__
__hash__ = __len__
__index__ = __len__
- @staticmethod
- def __format__(_):
+ def __format__(self, _):
return "None"
- @staticmethod
- def __str__():
+ def __str__(self):
return "None"
__repr__ = __str__
+class Flags():
+
+ def __init__(self):
+ self.FILE = self.POST = self.CHILD = self.DOWNLOAD = None
+
+ def process(self, flag):
+ value = self.__dict__[flag]
+ self.__dict__[flag] = None
+
+ if value == "abort":
+ raise exception.AbortExtraction()
+ if value == "terminate":
+ raise exception.TerminateExtraction()
+ if value == "restart":
+ raise exception.RestartExtraction()
+ raise exception.StopExtraction()
+
+
# v137.0 release of Firefox on 2025-04-01 has ordinal 739342
# 735506 == 739342 - 137 * 28
# v135.0 release of Chrome on 2025-04-01 has ordinal 739342
@@ -701,19 +745,30 @@ class CustomNone():
_ff_ver = (datetime.date.today().toordinal() - 735506) // 28
# _ch_ver = _ff_ver - 2
+re = text.re
+re_compile = text.re_compile
+
NONE = CustomNone()
+FLAGS = Flags()
EPOCH = datetime.datetime(1970, 1, 1)
SECOND = datetime.timedelta(0, 1)
WINDOWS = (os.name == "nt")
SENTINEL = object()
EXECUTABLE = getattr(sys, "frozen", False)
+SPECIAL_EXTRACTORS = {"oauth", "recursive", "generic"}
+
+EXTS_IMAGE = {"jpg", "jpeg", "png", "gif", "bmp", "svg", "psd", "ico",
+ "webp", "avif", "heic", "heif"}
+EXTS_VIDEO = {"mp4", "m4v", "mov", "webm", "mkv", "ogv", "flv", "avi", "wmv"}
+EXTS_ARCHIVE = {"zip", "rar", "7z", "tar", "gz", "bz2", "lzma", "xz"}
+
USERAGENT = "gallery-dl/" + version.__version__
-USERAGENT_FIREFOX = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{}.0) "
- "Gecko/20100101 Firefox/{}.0").format(_ff_ver, _ff_ver)
+USERAGENT_FIREFOX = (f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; "
+ f"rv:{_ff_ver}.0) Gecko/20100101 Firefox/{_ff_ver}.0")
USERAGENT_CHROME = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{}.0.0.0 "
- "Safari/537.36").format(_ff_ver - 2)
-SPECIAL_EXTRACTORS = {"oauth", "recursive", "generic"}
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
+ f"Chrome/{_ff_ver - 2}.0.0.0 Safari/537.36")
+
GLOBALS = {
"contains" : contains,
"parse_int": text.parse_int,
@@ -721,12 +776,16 @@ GLOBALS = {
"datetime" : datetime.datetime,
"timedelta": datetime.timedelta,
"abort" : raises(exception.StopExtraction),
+ "error" : raises(exception.AbortExtraction),
"terminate": raises(exception.TerminateExtraction),
"restart" : raises(exception.RestartExtraction),
"hash_sha1": sha1,
"hash_md5" : md5,
"std" : ModuleProxy(),
- "re" : re,
+ "re" : text.re_module,
+ "exts_image" : EXTS_IMAGE,
+ "exts_video" : EXTS_VIDEO,
+ "exts_archive": EXTS_ARCHIVE,
}
@@ -786,10 +845,12 @@ def compile_expression_defaultdict_impl(expr, name="<expr>", globals=None):
def compile_expression_tryexcept(expr, name="<expr>", globals=None):
code_object = compile(expr, name, "eval")
+ if globals is None:
+ globals = GLOBALS
- def _eval(locals=None, globals=(globals or GLOBALS), co=code_object):
+ def _eval(locals=None):
try:
- return eval(co, globals, locals)
+ return eval(code_object, globals, locals)
except exception.GalleryDLException:
raise
except Exception:
@@ -803,7 +864,7 @@ compile_expression = compile_expression_tryexcept
def compile_filter(expr, name="<filter>", globals=None):
if not isinstance(expr, str):
- expr = "(" + ") and (".join(expr) + ")"
+ expr = f"({') and ('.join(expr)})"
return compile_expression(expr, name, globals)
@@ -826,25 +887,25 @@ def import_file(path):
return __import__(name.replace("-", "_"))
-def build_duration_func(duration, min=0.0):
- if not duration:
+def build_selection_func(value, min=0.0, conv=float):
+ if not value:
if min:
return lambda: min
return None
- if isinstance(duration, str):
- lower, _, upper = duration.partition("-")
- lower = float(lower)
+ if isinstance(value, str):
+ lower, _, upper = value.partition("-")
else:
try:
- lower, upper = duration
+ lower, upper = value
except TypeError:
- lower, upper = duration, None
+ lower, upper = value, None
+ lower = conv(lower)
if upper:
- upper = float(upper)
+ upper = conv(upper)
return functools.partial(
- random.uniform,
+ random.uniform if lower.__class__ is float else random.randint,
lower if lower > min else min,
upper if upper > min else min,
)
@@ -854,6 +915,9 @@ def build_duration_func(duration, min=0.0):
return lambda: lower
+build_duration_func = build_selection_func
+
+
def build_extractor_filter(categories, negate=True, special=None):
"""Build a function that takes an Extractor class as argument
and returns True if that class is allowed by 'categories'
@@ -931,13 +995,13 @@ def build_proxy_map(proxies, log=None):
proxies[scheme] = "http://" + proxy.lstrip("/")
return proxies
- if log:
+ if log is not None:
log.warning("invalid proxy specifier: %s", proxies)
def build_predicate(predicates):
if not predicates:
- return lambda url, kwdict: True
+ return true
elif len(predicates) == 1:
return predicates[0]
return functools.partial(chain_predicates, predicates)
@@ -977,8 +1041,7 @@ class RangePredicate():
return True
return False
- @staticmethod
- def _parse(rangespec):
+ def _parse(self, rangespec):
"""Parse an integer range string and return the resulting ranges
Examples:
@@ -987,7 +1050,6 @@ class RangePredicate():
_parse("1:2,4:8:2") -> [(1,1), (4,7,2)]
"""
ranges = []
- append = ranges.append
if isinstance(rangespec, str):
rangespec = rangespec.split(",")
@@ -999,7 +1061,7 @@ class RangePredicate():
elif ":" in group:
start, _, stop = group.partition(":")
stop, _, step = stop.partition(":")
- append(range(
+ ranges.append(range(
int(start) if start.strip() else 1,
int(stop) if stop.strip() else sys.maxsize,
int(step) if step.strip() else 1,
@@ -1007,14 +1069,14 @@ class RangePredicate():
elif "-" in group:
start, _, stop = group.partition("-")
- append(range(
+ ranges.append(range(
int(start) if start.strip() else 1,
int(stop) + 1 if stop.strip() else sys.maxsize,
))
else:
start = int(group)
- append(range(start, start+1))
+ ranges.append(range(start, start+1))
return ranges
@@ -1037,7 +1099,7 @@ class FilterPredicate():
"""Predicate; True if evaluating the given expression returns True"""
def __init__(self, expr, target="image"):
- name = "<{} filter>".format(target)
+ name = f"<{target} filter>"
self.expr = compile_filter(expr, name)
def __call__(self, _, kwdict):