diff options
Diffstat (limited to 'gallery_dl/util.py')
| -rw-r--r-- | gallery_dl/util.py | 592 |
1 files changed, 32 insertions, 560 deletions
diff --git a/gallery_dl/util.py b/gallery_dl/util.py index 935bf99..4a7fdbf 100644 --- a/gallery_dl/util.py +++ b/gallery_dl/util.py @@ -12,20 +12,14 @@ import re import os import sys import json -import time import random -import shutil -import string -import _string import sqlite3 import binascii import datetime -import operator import functools import itertools import urllib.parse from http.cookiejar import Cookie -from email.utils import mktime_tz, parsedate_tz from . import text, exception @@ -97,15 +91,15 @@ def generate_token(size=16): return binascii.hexlify(data).decode() -def format_value(value, unit="B", suffixes="kMGTPEZY"): +def format_value(value, suffixes="kMGTPEZY"): value = format(value) value_len = len(value) index = value_len - 4 if index >= 0: offset = (value_len - 1) % 3 + 1 return (value[:offset] + "." + value[offset:offset+2] + - suffixes[index // 3] + unit) - return value + unit + suffixes[index // 3]) + return value def combine_dict(a, b): @@ -139,6 +133,17 @@ def delete_items(obj, keys): del obj[key] +def enumerate_reversed(iterable, start=0, length=None): + """Enumerate 'iterable' and return its elements in reverse order""" + start -= 1 + if length is None: + length = len(iterable) + return zip( + range(length - start, start, -1), + reversed(iterable), + ) + + def number_to_string(value, numbers=(int, float)): """Convert numbers (int, float) to string; Return everything else as is.""" return str(value) if value.__class__ in numbers else value @@ -409,6 +414,24 @@ def compile_expression(expr, name="<expr>", globals=GLOBALS): return functools.partial(eval, code_object, globals) +def build_duration_func(duration, min=0.0): + if not duration: + return None + + try: + lower, upper = duration + except TypeError: + pass + else: + return functools.partial( + random.uniform, + lower if lower > min else min, + upper if upper > min else min, + ) + + return functools.partial(identity, duration if duration > min else min) + + def build_predicate(predicates): if not predicates: return lambda url, kwdict: True @@ -534,557 +557,6 @@ class ExtendedUrl(): return self.value -class Formatter(): - """Custom, extended version of string.Formatter - - This string formatter implementation is a mostly performance-optimized - variant of the original string.Formatter class. Unnecessary features have - been removed (positional arguments, unused argument check) and new - formatting options have been added. - - Extra Conversions: - - "l": calls str.lower on the target value - - "u": calls str.upper - - "c": calls str.capitalize - - "C": calls string.capwords - - "j". calls json.dumps - - "t": calls str.strip - - "d": calls text.parse_timestamp - - "U": calls urllib.parse.unquote - - "S": calls util.to_string() - - "T": calls util.to_timestamü() - - Example: {f!l} -> "example"; {f!u} -> "EXAMPLE" - - Extra Format Specifiers: - - "?<before>/<after>/": - Adds <before> and <after> to the actual value if it evaluates to True. - Otherwise the whole replacement field becomes an empty string. - Example: {f:?-+/+-/} -> "-+Example+-" (if "f" contains "Example") - -> "" (if "f" is None, 0, "") - - - "L<maxlen>/<replacement>/": - Replaces the output with <replacement> if its length (in characters) - exceeds <maxlen>. Otherwise everything is left as is. - Example: {f:L5/too long/} -> "foo" (if "f" is "foo") - -> "too long" (if "f" is "foobar") - - - "J<separator>/": - Joins elements of a list (or string) using <separator> - Example: {f:J - /} -> "a - b - c" (if "f" is ["a", "b", "c"]) - - - "R<old>/<new>/": - Replaces all occurrences of <old> with <new> - Example: {f:R /_/} -> "f_o_o_b_a_r" (if "f" is "f o o b a r") - """ - CACHE = {} - CONVERSIONS = { - "l": str.lower, - "u": str.upper, - "c": str.capitalize, - "C": string.capwords, - "j": json.dumps, - "t": str.strip, - "T": to_timestamp, - "d": text.parse_timestamp, - "U": urllib.parse.unquote, - "S": to_string, - "s": str, - "r": repr, - "a": ascii, - } - - def __init__(self, format_string, default=None): - self.default = default - key = (format_string, default) - - try: - self.result, self.fields = self.CACHE[key] - except KeyError: - self.result = [] - self.fields = [] - - for literal_text, field_name, format_spec, conv in \ - _string.formatter_parser(format_string): - if literal_text: - self.result.append(literal_text) - if field_name: - self.fields.append(( - len(self.result), - self._field_access(field_name, format_spec, conv), - )) - self.result.append("") - - self.CACHE[key] = (self.result, self.fields) - - if len(self.result) == 1: - if self.fields: - self.format_map = self.fields[0][1] - else: - self.format_map = lambda _: format_string - del self.result, self.fields - - def format_map(self, kwdict): - """Apply 'kwdict' to the initial format_string and return its result""" - result = self.result - for index, func in self.fields: - result[index] = func(kwdict) - return "".join(result) - - def _field_access(self, field_name, format_spec, conversion): - fmt = self._parse_format_spec(format_spec, conversion) - - if "|" in field_name: - return self._apply_list([ - self._parse_field_name(fn) - for fn in field_name.split("|") - ], fmt) - else: - key, funcs = self._parse_field_name(field_name) - if funcs: - return self._apply(key, funcs, fmt) - return self._apply_simple(key, fmt) - - @staticmethod - def _parse_field_name(field_name): - first, rest = _string.formatter_field_name_split(field_name) - funcs = [] - - for is_attr, key in rest: - if is_attr: - func = operator.attrgetter - else: - func = operator.itemgetter - try: - if ":" in key: - start, _, stop = key.partition(":") - stop, _, step = stop.partition(":") - start = int(start) if start else None - stop = int(stop) if stop else None - step = int(step) if step else None - key = slice(start, stop, step) - except TypeError: - pass # key is an integer - - funcs.append(func(key)) - - return first, funcs - - def _parse_format_spec(self, format_spec, conversion): - fmt = self._build_format_func(format_spec) - if not conversion: - return fmt - - conversion = self.CONVERSIONS[conversion] - if fmt is format: - return conversion - else: - def chain(obj): - return fmt(conversion(obj)) - return chain - - def _build_format_func(self, format_spec): - if format_spec: - fmt = format_spec[0] - if fmt == "?": - return self._parse_optional(format_spec) - if fmt == "L": - return self._parse_maxlen(format_spec) - if fmt == "J": - return self._parse_join(format_spec) - if fmt == "R": - return self._parse_replace(format_spec) - return self._default_format(format_spec) - return format - - def _apply(self, key, funcs, fmt): - def wrap(kwdict): - try: - obj = kwdict[key] - for func in funcs: - obj = func(obj) - except Exception: - obj = self.default - return fmt(obj) - return wrap - - def _apply_simple(self, key, fmt): - def wrap(kwdict): - return fmt(kwdict[key] if key in kwdict else self.default) - return wrap - - def _apply_list(self, lst, fmt): - def wrap(kwdict): - for key, funcs in lst: - try: - obj = kwdict[key] - for func in funcs: - obj = func(obj) - if obj: - break - except Exception: - pass - else: - obj = self.default - return fmt(obj) - return wrap - - def _parse_optional(self, format_spec): - before, after, format_spec = format_spec.split("/", 2) - before = before[1:] - fmt = self._build_format_func(format_spec) - - def optional(obj): - return before + fmt(obj) + after if obj else "" - return optional - - def _parse_maxlen(self, format_spec): - maxlen, replacement, format_spec = format_spec.split("/", 2) - maxlen = text.parse_int(maxlen[1:]) - fmt = self._build_format_func(format_spec) - - def mlen(obj): - obj = fmt(obj) - return obj if len(obj) <= maxlen else replacement - return mlen - - def _parse_join(self, format_spec): - separator, _, format_spec = format_spec.partition("/") - separator = separator[1:] - fmt = self._build_format_func(format_spec) - - def join(obj): - return fmt(separator.join(obj)) - return join - - def _parse_replace(self, format_spec): - old, new, format_spec = format_spec.split("/", 2) - old = old[1:] - fmt = self._build_format_func(format_spec) - - def replace(obj): - return fmt(obj.replace(old, new)) - return replace - - @staticmethod - def _default_format(format_spec): - def wrap(obj): - return format(obj, format_spec) - return wrap - - -class PathFormat(): - EXTENSION_MAP = { - "jpeg": "jpg", - "jpe" : "jpg", - "jfif": "jpg", - "jif" : "jpg", - "jfi" : "jpg", - } - - def __init__(self, extractor): - config = extractor.config - kwdefault = config("keywords-default") - - filename_fmt = config("filename") - try: - if filename_fmt is None: - filename_fmt = extractor.filename_fmt - elif isinstance(filename_fmt, dict): - self.filename_conditions = [ - (compile_expression(expr), - Formatter(fmt, kwdefault).format_map) - for expr, fmt in filename_fmt.items() if expr - ] - self.build_filename = self.build_filename_conditional - filename_fmt = filename_fmt.get("", extractor.filename_fmt) - - self.filename_formatter = Formatter( - filename_fmt, kwdefault).format_map - except Exception as exc: - raise exception.FilenameFormatError(exc) - - directory_fmt = config("directory") - try: - if directory_fmt is None: - directory_fmt = extractor.directory_fmt - elif isinstance(directory_fmt, dict): - self.directory_conditions = [ - (compile_expression(expr), [ - Formatter(fmt, kwdefault).format_map - for fmt in fmts - ]) - for expr, fmts in directory_fmt.items() if expr - ] - self.build_directory = self.build_directory_conditional - directory_fmt = directory_fmt.get("", extractor.directory_fmt) - - self.directory_formatters = [ - Formatter(dirfmt, kwdefault).format_map - for dirfmt in directory_fmt - ] - except Exception as exc: - raise exception.DirectoryFormatError(exc) - - self.kwdict = {} - self.directory = self.realdirectory = \ - self.filename = self.extension = self.prefix = \ - self.path = self.realpath = self.temppath = "" - self.delete = self._create_directory = False - - extension_map = config("extension-map") - if extension_map is None: - extension_map = self.EXTENSION_MAP - self.extension_map = extension_map.get - - restrict = config("path-restrict", "auto") - replace = config("path-replace", "_") - if restrict == "auto": - restrict = "\\\\|/<>:\"?*" if WINDOWS else "/" - elif restrict == "unix": - restrict = "/" - elif restrict == "windows": - restrict = "\\\\|/<>:\"?*" - elif restrict == "ascii": - restrict = "^0-9A-Za-z_." - self.clean_segment = self._build_cleanfunc(restrict, replace) - - remove = config("path-remove", "\x00-\x1f\x7f") - self.clean_path = self._build_cleanfunc(remove, "") - - strip = config("path-strip", "auto") - if strip == "auto": - strip = ". " if WINDOWS else "" - elif strip == "unix": - strip = "" - elif strip == "windows": - strip = ". " - self.strip = strip - - basedir = extractor._parentdir - if not basedir: - basedir = config("base-directory") - sep = os.sep - if basedir is None: - basedir = "." + sep + "gallery-dl" + sep - elif basedir: - basedir = expand_path(basedir) - altsep = os.altsep - if altsep and altsep in basedir: - basedir = basedir.replace(altsep, sep) - if basedir[-1] != sep: - basedir += sep - basedir = self.clean_path(basedir) - self.basedirectory = basedir - - @staticmethod - def _build_cleanfunc(chars, repl): - if not chars: - return identity - elif isinstance(chars, dict): - def func(x, table=str.maketrans(chars)): - return x.translate(table) - elif len(chars) == 1: - def func(x, c=chars, r=repl): - return x.replace(c, r) - else: - return functools.partial( - re.compile("[" + chars + "]").sub, repl) - return func - - def open(self, mode="wb"): - """Open file and return a corresponding file object""" - return open(self.temppath, mode) - - def exists(self): - """Return True if the file exists on disk""" - if self.extension and os.path.exists(self.realpath): - return self.check_file() - return False - - @staticmethod - def check_file(): - return True - - def _enum_file(self): - num = 1 - try: - while True: - self.prefix = str(num) + "." - self.set_extension(self.extension, False) - os.stat(self.realpath) # raises OSError if file doesn't exist - num += 1 - except OSError: - pass - return False - - def set_directory(self, kwdict): - """Build directory path and create it if necessary""" - self.kwdict = kwdict - sep = os.sep - - segments = self.build_directory(kwdict) - if segments: - self.directory = directory = self.basedirectory + self.clean_path( - sep.join(segments) + sep) - else: - self.directory = directory = self.basedirectory - - if WINDOWS: - # Enable longer-than-260-character paths on Windows - directory = "\\\\?\\" + os.path.abspath(directory) - - # abspath() in Python 3.7+ removes trailing path separators (#402) - if directory[-1] != sep: - directory += sep - - self.realdirectory = directory - self._create_directory = True - - def set_filename(self, kwdict): - """Set general filename data""" - self.kwdict = kwdict - self.temppath = self.prefix = "" - - ext = kwdict["extension"] - kwdict["extension"] = self.extension = self.extension_map(ext, ext) - - if self.extension: - self.build_path() - else: - self.filename = "" - - def set_extension(self, extension, real=True): - """Set filename extension""" - extension = self.extension_map(extension, extension) - if real: - self.extension = extension - self.kwdict["extension"] = self.prefix + extension - self.build_path() - - def fix_extension(self, _=None): - """Fix filenames without a given filename extension""" - if not self.extension: - self.set_extension("", False) - if self.path[-1] == ".": - self.path = self.path[:-1] - self.temppath = self.realpath = self.realpath[:-1] - return True - - def build_filename(self, kwdict): - """Apply 'kwdict' to filename format string""" - try: - return self.clean_path(self.clean_segment( - self.filename_formatter(kwdict))) - except Exception as exc: - raise exception.FilenameFormatError(exc) - - def build_filename_conditional(self, kwdict): - try: - for condition, formatter in self.filename_conditions: - if condition(kwdict): - break - else: - formatter = self.filename_formatter - return self.clean_path(self.clean_segment(formatter(kwdict))) - except Exception as exc: - raise exception.FilenameFormatError(exc) - - def build_directory(self, kwdict): - """Apply 'kwdict' to directory format strings""" - segments = [] - append = segments.append - strip = self.strip - - try: - for formatter in self.directory_formatters: - segment = formatter(kwdict).strip() - if strip: - # remove trailing dots and spaces (#647) - segment = segment.rstrip(strip) - if segment: - append(self.clean_segment(segment)) - return segments - except Exception as exc: - raise exception.DirectoryFormatError(exc) - - def build_directory_conditional(self, kwdict): - segments = [] - append = segments.append - strip = self.strip - - try: - for condition, formatters in self.directory_conditions: - if condition(kwdict): - break - else: - formatters = self.directory_formatters - for formatter in formatters: - segment = formatter(kwdict).strip() - if strip: - segment = segment.rstrip(strip) - if segment: - append(self.clean_segment(segment)) - return segments - except Exception as exc: - raise exception.DirectoryFormatError(exc) - - def build_path(self): - """Combine directory and filename to full paths""" - if self._create_directory: - os.makedirs(self.realdirectory, exist_ok=True) - self._create_directory = False - self.filename = filename = self.build_filename(self.kwdict) - self.path = self.directory + filename - self.realpath = self.realdirectory + filename - if not self.temppath: - self.temppath = self.realpath - - def part_enable(self, part_directory=None): - """Enable .part file usage""" - if self.extension: - self.temppath += ".part" - else: - self.set_extension("part", False) - if part_directory: - self.temppath = os.path.join( - part_directory, - os.path.basename(self.temppath), - ) - - def part_size(self): - """Return size of .part file""" - try: - return os.stat(self.temppath).st_size - except OSError: - pass - return 0 - - def finalize(self): - """Move tempfile to its target location""" - if self.delete: - self.delete = False - os.unlink(self.temppath) - return - - if self.temppath != self.realpath: - # Move temp file to its actual location - try: - os.replace(self.temppath, self.realpath) - except OSError: - shutil.copyfile(self.temppath, self.realpath) - os.unlink(self.temppath) - - mtime = self.kwdict.get("_mtime") - if mtime: - # Set file modification time - try: - if isinstance(mtime, str): - mtime = mktime_tz(parsedate_tz(mtime)) - os.utime(self.realpath, (time.time(), mtime)) - except Exception: - pass - - class DownloadArchive(): def __init__(self, path, extractor): |
