summaryrefslogtreecommitdiffstats
path: root/gallery_dl/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'gallery_dl/util.py')
-rw-r--r--gallery_dl/util.py592
1 files changed, 32 insertions, 560 deletions
diff --git a/gallery_dl/util.py b/gallery_dl/util.py
index 935bf99..4a7fdbf 100644
--- a/gallery_dl/util.py
+++ b/gallery_dl/util.py
@@ -12,20 +12,14 @@ import re
import os
import sys
import json
-import time
import random
-import shutil
-import string
-import _string
import sqlite3
import binascii
import datetime
-import operator
import functools
import itertools
import urllib.parse
from http.cookiejar import Cookie
-from email.utils import mktime_tz, parsedate_tz
from . import text, exception
@@ -97,15 +91,15 @@ def generate_token(size=16):
return binascii.hexlify(data).decode()
-def format_value(value, unit="B", suffixes="kMGTPEZY"):
+def format_value(value, suffixes="kMGTPEZY"):
value = format(value)
value_len = len(value)
index = value_len - 4
if index >= 0:
offset = (value_len - 1) % 3 + 1
return (value[:offset] + "." + value[offset:offset+2] +
- suffixes[index // 3] + unit)
- return value + unit
+ suffixes[index // 3])
+ return value
def combine_dict(a, b):
@@ -139,6 +133,17 @@ def delete_items(obj, keys):
del obj[key]
+def enumerate_reversed(iterable, start=0, length=None):
+ """Enumerate 'iterable' and return its elements in reverse order"""
+ start -= 1
+ if length is None:
+ length = len(iterable)
+ return zip(
+ range(length - start, start, -1),
+ reversed(iterable),
+ )
+
+
def number_to_string(value, numbers=(int, float)):
"""Convert numbers (int, float) to string; Return everything else as is."""
return str(value) if value.__class__ in numbers else value
@@ -409,6 +414,24 @@ def compile_expression(expr, name="<expr>", globals=GLOBALS):
return functools.partial(eval, code_object, globals)
+def build_duration_func(duration, min=0.0):
+ if not duration:
+ return None
+
+ try:
+ lower, upper = duration
+ except TypeError:
+ pass
+ else:
+ return functools.partial(
+ random.uniform,
+ lower if lower > min else min,
+ upper if upper > min else min,
+ )
+
+ return functools.partial(identity, duration if duration > min else min)
+
+
def build_predicate(predicates):
if not predicates:
return lambda url, kwdict: True
@@ -534,557 +557,6 @@ class ExtendedUrl():
return self.value
-class Formatter():
- """Custom, extended version of string.Formatter
-
- This string formatter implementation is a mostly performance-optimized
- variant of the original string.Formatter class. Unnecessary features have
- been removed (positional arguments, unused argument check) and new
- formatting options have been added.
-
- Extra Conversions:
- - "l": calls str.lower on the target value
- - "u": calls str.upper
- - "c": calls str.capitalize
- - "C": calls string.capwords
- - "j". calls json.dumps
- - "t": calls str.strip
- - "d": calls text.parse_timestamp
- - "U": calls urllib.parse.unquote
- - "S": calls util.to_string()
- - "T": calls util.to_timestamü()
- - Example: {f!l} -> "example"; {f!u} -> "EXAMPLE"
-
- Extra Format Specifiers:
- - "?<before>/<after>/":
- Adds <before> and <after> to the actual value if it evaluates to True.
- Otherwise the whole replacement field becomes an empty string.
- Example: {f:?-+/+-/} -> "-+Example+-" (if "f" contains "Example")
- -> "" (if "f" is None, 0, "")
-
- - "L<maxlen>/<replacement>/":
- Replaces the output with <replacement> if its length (in characters)
- exceeds <maxlen>. Otherwise everything is left as is.
- Example: {f:L5/too long/} -> "foo" (if "f" is "foo")
- -> "too long" (if "f" is "foobar")
-
- - "J<separator>/":
- Joins elements of a list (or string) using <separator>
- Example: {f:J - /} -> "a - b - c" (if "f" is ["a", "b", "c"])
-
- - "R<old>/<new>/":
- Replaces all occurrences of <old> with <new>
- Example: {f:R /_/} -> "f_o_o_b_a_r" (if "f" is "f o o b a r")
- """
- CACHE = {}
- CONVERSIONS = {
- "l": str.lower,
- "u": str.upper,
- "c": str.capitalize,
- "C": string.capwords,
- "j": json.dumps,
- "t": str.strip,
- "T": to_timestamp,
- "d": text.parse_timestamp,
- "U": urllib.parse.unquote,
- "S": to_string,
- "s": str,
- "r": repr,
- "a": ascii,
- }
-
- def __init__(self, format_string, default=None):
- self.default = default
- key = (format_string, default)
-
- try:
- self.result, self.fields = self.CACHE[key]
- except KeyError:
- self.result = []
- self.fields = []
-
- for literal_text, field_name, format_spec, conv in \
- _string.formatter_parser(format_string):
- if literal_text:
- self.result.append(literal_text)
- if field_name:
- self.fields.append((
- len(self.result),
- self._field_access(field_name, format_spec, conv),
- ))
- self.result.append("")
-
- self.CACHE[key] = (self.result, self.fields)
-
- if len(self.result) == 1:
- if self.fields:
- self.format_map = self.fields[0][1]
- else:
- self.format_map = lambda _: format_string
- del self.result, self.fields
-
- def format_map(self, kwdict):
- """Apply 'kwdict' to the initial format_string and return its result"""
- result = self.result
- for index, func in self.fields:
- result[index] = func(kwdict)
- return "".join(result)
-
- def _field_access(self, field_name, format_spec, conversion):
- fmt = self._parse_format_spec(format_spec, conversion)
-
- if "|" in field_name:
- return self._apply_list([
- self._parse_field_name(fn)
- for fn in field_name.split("|")
- ], fmt)
- else:
- key, funcs = self._parse_field_name(field_name)
- if funcs:
- return self._apply(key, funcs, fmt)
- return self._apply_simple(key, fmt)
-
- @staticmethod
- def _parse_field_name(field_name):
- first, rest = _string.formatter_field_name_split(field_name)
- funcs = []
-
- for is_attr, key in rest:
- if is_attr:
- func = operator.attrgetter
- else:
- func = operator.itemgetter
- try:
- if ":" in key:
- start, _, stop = key.partition(":")
- stop, _, step = stop.partition(":")
- start = int(start) if start else None
- stop = int(stop) if stop else None
- step = int(step) if step else None
- key = slice(start, stop, step)
- except TypeError:
- pass # key is an integer
-
- funcs.append(func(key))
-
- return first, funcs
-
- def _parse_format_spec(self, format_spec, conversion):
- fmt = self._build_format_func(format_spec)
- if not conversion:
- return fmt
-
- conversion = self.CONVERSIONS[conversion]
- if fmt is format:
- return conversion
- else:
- def chain(obj):
- return fmt(conversion(obj))
- return chain
-
- def _build_format_func(self, format_spec):
- if format_spec:
- fmt = format_spec[0]
- if fmt == "?":
- return self._parse_optional(format_spec)
- if fmt == "L":
- return self._parse_maxlen(format_spec)
- if fmt == "J":
- return self._parse_join(format_spec)
- if fmt == "R":
- return self._parse_replace(format_spec)
- return self._default_format(format_spec)
- return format
-
- def _apply(self, key, funcs, fmt):
- def wrap(kwdict):
- try:
- obj = kwdict[key]
- for func in funcs:
- obj = func(obj)
- except Exception:
- obj = self.default
- return fmt(obj)
- return wrap
-
- def _apply_simple(self, key, fmt):
- def wrap(kwdict):
- return fmt(kwdict[key] if key in kwdict else self.default)
- return wrap
-
- def _apply_list(self, lst, fmt):
- def wrap(kwdict):
- for key, funcs in lst:
- try:
- obj = kwdict[key]
- for func in funcs:
- obj = func(obj)
- if obj:
- break
- except Exception:
- pass
- else:
- obj = self.default
- return fmt(obj)
- return wrap
-
- def _parse_optional(self, format_spec):
- before, after, format_spec = format_spec.split("/", 2)
- before = before[1:]
- fmt = self._build_format_func(format_spec)
-
- def optional(obj):
- return before + fmt(obj) + after if obj else ""
- return optional
-
- def _parse_maxlen(self, format_spec):
- maxlen, replacement, format_spec = format_spec.split("/", 2)
- maxlen = text.parse_int(maxlen[1:])
- fmt = self._build_format_func(format_spec)
-
- def mlen(obj):
- obj = fmt(obj)
- return obj if len(obj) <= maxlen else replacement
- return mlen
-
- def _parse_join(self, format_spec):
- separator, _, format_spec = format_spec.partition("/")
- separator = separator[1:]
- fmt = self._build_format_func(format_spec)
-
- def join(obj):
- return fmt(separator.join(obj))
- return join
-
- def _parse_replace(self, format_spec):
- old, new, format_spec = format_spec.split("/", 2)
- old = old[1:]
- fmt = self._build_format_func(format_spec)
-
- def replace(obj):
- return fmt(obj.replace(old, new))
- return replace
-
- @staticmethod
- def _default_format(format_spec):
- def wrap(obj):
- return format(obj, format_spec)
- return wrap
-
-
-class PathFormat():
- EXTENSION_MAP = {
- "jpeg": "jpg",
- "jpe" : "jpg",
- "jfif": "jpg",
- "jif" : "jpg",
- "jfi" : "jpg",
- }
-
- def __init__(self, extractor):
- config = extractor.config
- kwdefault = config("keywords-default")
-
- filename_fmt = config("filename")
- try:
- if filename_fmt is None:
- filename_fmt = extractor.filename_fmt
- elif isinstance(filename_fmt, dict):
- self.filename_conditions = [
- (compile_expression(expr),
- Formatter(fmt, kwdefault).format_map)
- for expr, fmt in filename_fmt.items() if expr
- ]
- self.build_filename = self.build_filename_conditional
- filename_fmt = filename_fmt.get("", extractor.filename_fmt)
-
- self.filename_formatter = Formatter(
- filename_fmt, kwdefault).format_map
- except Exception as exc:
- raise exception.FilenameFormatError(exc)
-
- directory_fmt = config("directory")
- try:
- if directory_fmt is None:
- directory_fmt = extractor.directory_fmt
- elif isinstance(directory_fmt, dict):
- self.directory_conditions = [
- (compile_expression(expr), [
- Formatter(fmt, kwdefault).format_map
- for fmt in fmts
- ])
- for expr, fmts in directory_fmt.items() if expr
- ]
- self.build_directory = self.build_directory_conditional
- directory_fmt = directory_fmt.get("", extractor.directory_fmt)
-
- self.directory_formatters = [
- Formatter(dirfmt, kwdefault).format_map
- for dirfmt in directory_fmt
- ]
- except Exception as exc:
- raise exception.DirectoryFormatError(exc)
-
- self.kwdict = {}
- self.directory = self.realdirectory = \
- self.filename = self.extension = self.prefix = \
- self.path = self.realpath = self.temppath = ""
- self.delete = self._create_directory = False
-
- extension_map = config("extension-map")
- if extension_map is None:
- extension_map = self.EXTENSION_MAP
- self.extension_map = extension_map.get
-
- restrict = config("path-restrict", "auto")
- replace = config("path-replace", "_")
- if restrict == "auto":
- restrict = "\\\\|/<>:\"?*" if WINDOWS else "/"
- elif restrict == "unix":
- restrict = "/"
- elif restrict == "windows":
- restrict = "\\\\|/<>:\"?*"
- elif restrict == "ascii":
- restrict = "^0-9A-Za-z_."
- self.clean_segment = self._build_cleanfunc(restrict, replace)
-
- remove = config("path-remove", "\x00-\x1f\x7f")
- self.clean_path = self._build_cleanfunc(remove, "")
-
- strip = config("path-strip", "auto")
- if strip == "auto":
- strip = ". " if WINDOWS else ""
- elif strip == "unix":
- strip = ""
- elif strip == "windows":
- strip = ". "
- self.strip = strip
-
- basedir = extractor._parentdir
- if not basedir:
- basedir = config("base-directory")
- sep = os.sep
- if basedir is None:
- basedir = "." + sep + "gallery-dl" + sep
- elif basedir:
- basedir = expand_path(basedir)
- altsep = os.altsep
- if altsep and altsep in basedir:
- basedir = basedir.replace(altsep, sep)
- if basedir[-1] != sep:
- basedir += sep
- basedir = self.clean_path(basedir)
- self.basedirectory = basedir
-
- @staticmethod
- def _build_cleanfunc(chars, repl):
- if not chars:
- return identity
- elif isinstance(chars, dict):
- def func(x, table=str.maketrans(chars)):
- return x.translate(table)
- elif len(chars) == 1:
- def func(x, c=chars, r=repl):
- return x.replace(c, r)
- else:
- return functools.partial(
- re.compile("[" + chars + "]").sub, repl)
- return func
-
- def open(self, mode="wb"):
- """Open file and return a corresponding file object"""
- return open(self.temppath, mode)
-
- def exists(self):
- """Return True if the file exists on disk"""
- if self.extension and os.path.exists(self.realpath):
- return self.check_file()
- return False
-
- @staticmethod
- def check_file():
- return True
-
- def _enum_file(self):
- num = 1
- try:
- while True:
- self.prefix = str(num) + "."
- self.set_extension(self.extension, False)
- os.stat(self.realpath) # raises OSError if file doesn't exist
- num += 1
- except OSError:
- pass
- return False
-
- def set_directory(self, kwdict):
- """Build directory path and create it if necessary"""
- self.kwdict = kwdict
- sep = os.sep
-
- segments = self.build_directory(kwdict)
- if segments:
- self.directory = directory = self.basedirectory + self.clean_path(
- sep.join(segments) + sep)
- else:
- self.directory = directory = self.basedirectory
-
- if WINDOWS:
- # Enable longer-than-260-character paths on Windows
- directory = "\\\\?\\" + os.path.abspath(directory)
-
- # abspath() in Python 3.7+ removes trailing path separators (#402)
- if directory[-1] != sep:
- directory += sep
-
- self.realdirectory = directory
- self._create_directory = True
-
- def set_filename(self, kwdict):
- """Set general filename data"""
- self.kwdict = kwdict
- self.temppath = self.prefix = ""
-
- ext = kwdict["extension"]
- kwdict["extension"] = self.extension = self.extension_map(ext, ext)
-
- if self.extension:
- self.build_path()
- else:
- self.filename = ""
-
- def set_extension(self, extension, real=True):
- """Set filename extension"""
- extension = self.extension_map(extension, extension)
- if real:
- self.extension = extension
- self.kwdict["extension"] = self.prefix + extension
- self.build_path()
-
- def fix_extension(self, _=None):
- """Fix filenames without a given filename extension"""
- if not self.extension:
- self.set_extension("", False)
- if self.path[-1] == ".":
- self.path = self.path[:-1]
- self.temppath = self.realpath = self.realpath[:-1]
- return True
-
- def build_filename(self, kwdict):
- """Apply 'kwdict' to filename format string"""
- try:
- return self.clean_path(self.clean_segment(
- self.filename_formatter(kwdict)))
- except Exception as exc:
- raise exception.FilenameFormatError(exc)
-
- def build_filename_conditional(self, kwdict):
- try:
- for condition, formatter in self.filename_conditions:
- if condition(kwdict):
- break
- else:
- formatter = self.filename_formatter
- return self.clean_path(self.clean_segment(formatter(kwdict)))
- except Exception as exc:
- raise exception.FilenameFormatError(exc)
-
- def build_directory(self, kwdict):
- """Apply 'kwdict' to directory format strings"""
- segments = []
- append = segments.append
- strip = self.strip
-
- try:
- for formatter in self.directory_formatters:
- segment = formatter(kwdict).strip()
- if strip:
- # remove trailing dots and spaces (#647)
- segment = segment.rstrip(strip)
- if segment:
- append(self.clean_segment(segment))
- return segments
- except Exception as exc:
- raise exception.DirectoryFormatError(exc)
-
- def build_directory_conditional(self, kwdict):
- segments = []
- append = segments.append
- strip = self.strip
-
- try:
- for condition, formatters in self.directory_conditions:
- if condition(kwdict):
- break
- else:
- formatters = self.directory_formatters
- for formatter in formatters:
- segment = formatter(kwdict).strip()
- if strip:
- segment = segment.rstrip(strip)
- if segment:
- append(self.clean_segment(segment))
- return segments
- except Exception as exc:
- raise exception.DirectoryFormatError(exc)
-
- def build_path(self):
- """Combine directory and filename to full paths"""
- if self._create_directory:
- os.makedirs(self.realdirectory, exist_ok=True)
- self._create_directory = False
- self.filename = filename = self.build_filename(self.kwdict)
- self.path = self.directory + filename
- self.realpath = self.realdirectory + filename
- if not self.temppath:
- self.temppath = self.realpath
-
- def part_enable(self, part_directory=None):
- """Enable .part file usage"""
- if self.extension:
- self.temppath += ".part"
- else:
- self.set_extension("part", False)
- if part_directory:
- self.temppath = os.path.join(
- part_directory,
- os.path.basename(self.temppath),
- )
-
- def part_size(self):
- """Return size of .part file"""
- try:
- return os.stat(self.temppath).st_size
- except OSError:
- pass
- return 0
-
- def finalize(self):
- """Move tempfile to its target location"""
- if self.delete:
- self.delete = False
- os.unlink(self.temppath)
- return
-
- if self.temppath != self.realpath:
- # Move temp file to its actual location
- try:
- os.replace(self.temppath, self.realpath)
- except OSError:
- shutil.copyfile(self.temppath, self.realpath)
- os.unlink(self.temppath)
-
- mtime = self.kwdict.get("_mtime")
- if mtime:
- # Set file modification time
- try:
- if isinstance(mtime, str):
- mtime = mktime_tz(parsedate_tz(mtime))
- os.utime(self.realpath, (time.time(), mtime))
- except Exception:
- pass
-
-
class DownloadArchive():
def __init__(self, path, extractor):