diff options
| author | 2025-07-31 01:22:01 -0400 | |
|---|---|---|
| committer | 2025-07-31 01:22:01 -0400 | |
| commit | a6e995c093de8aae2e91a0787281bb34c0b871eb (patch) | |
| tree | 2d79821b05300d34d8871eb6c9662b359a2de85d /gallery_dl/extractor/kemonoparty.py | |
| parent | 7672a750cb74bf31e21d76aad2776367fd476155 (diff) | |
New upstream version 1.30.2.upstream/1.30.2
Diffstat (limited to 'gallery_dl/extractor/kemonoparty.py')
| -rw-r--r-- | gallery_dl/extractor/kemonoparty.py | 625 |
1 files changed, 0 insertions, 625 deletions
diff --git a/gallery_dl/extractor/kemonoparty.py b/gallery_dl/extractor/kemonoparty.py deleted file mode 100644 index 4893f19..0000000 --- a/gallery_dl/extractor/kemonoparty.py +++ /dev/null @@ -1,625 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright 2021-2023 Mike Fährmann -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. - -"""Extractors for https://kemono.su/""" - -from .common import Extractor, Message -from .. import text, util, exception -from ..cache import cache -import itertools -import json -import re - -BASE_PATTERN = r"(?:https?://)?(?:www\.|beta\.)?(kemono|coomer)\.(su|party)" -USER_PATTERN = BASE_PATTERN + r"/([^/?#]+)/user/([^/?#]+)" -HASH_PATTERN = r"/[0-9a-f]{2}/[0-9a-f]{2}/([0-9a-f]{64})" - - -class KemonopartyExtractor(Extractor): - """Base class for kemonoparty extractors""" - category = "kemonoparty" - root = "https://kemono.su" - directory_fmt = ("{category}", "{service}", "{user}") - filename_fmt = "{id}_{title[:180]}_{num:>02}_{filename[:180]}.{extension}" - archive_fmt = "{service}_{user}_{id}_{num}" - cookies_domain = ".kemono.su" - - def __init__(self, match): - domain = match.group(1) - tld = match.group(2) - self.category = domain + "party" - self.root = text.root_from_url(match.group(0)) - self.cookies_domain = ".{}.{}".format(domain, tld) - Extractor.__init__(self, match) - - def _init(self): - self.api = KemonoAPI(self) - self.revisions = self.config("revisions") - if self.revisions: - self.revisions_unique = (self.revisions == "unique") - order = self.config("order-revisions") - self.revisions_reverse = order[0] in ("r", "a") if order else False - - self._prepare_ddosguard_cookies() - self._find_inline = re.compile( - r'src="(?:https?://(?:kemono|coomer)\.(?:su|party))?(/inline/[^"]+' - r'|/[0-9a-f]{2}/[0-9a-f]{2}/[0-9a-f]{64}\.[^"]+)').findall - self._json_dumps = json.JSONEncoder( - ensure_ascii=False, check_circular=False, - sort_keys=True, separators=(",", ":")).encode - - def items(self): - find_hash = re.compile(HASH_PATTERN).match - generators = self._build_file_generators(self.config("files")) - announcements = True if self.config("announcements") else None - archives = True if self.config("archives") else False - comments = True if self.config("comments") else False - duplicates = True if self.config("duplicates") else False - dms = True if self.config("dms") else None - max_posts = self.config("max-posts") - creator_info = {} if self.config("metadata", True) else None - exts_archive = {"zip", "rar", "7z"} - - # prevent files from being sent with gzip compression - headers = {"Accept-Encoding": "identity"} - - posts = self.posts() - if max_posts: - posts = itertools.islice(posts, max_posts) - if self.revisions: - posts = self._revisions(posts) - - for post in posts: - headers["Referer"] = "{}/{}/user/{}/post/{}".format( - self.root, post["service"], post["user"], post["id"]) - post["_http_headers"] = headers - post["date"] = self._parse_datetime( - post.get("published") or post.get("added") or "") - service = post["service"] - creator_id = post["user"] - - if creator_info is not None: - key = "{}_{}".format(service, creator_id) - if key not in creator_info: - creator = creator_info[key] = self.api.creator_profile( - service, creator_id) - else: - creator = creator_info[key] - - post["user_profile"] = creator - post["username"] = creator["name"] - - if comments: - try: - post["comments"] = self.api.creator_post_comments( - service, creator_id, post["id"]) - except exception.HttpError: - post["comments"] = () - if dms is not None: - if dms is True: - dms = self.api.creator_dms( - post["service"], post["user"]) - try: - dms = dms["props"]["dms"] - except Exception: - dms = () - post["dms"] = dms - if announcements is not None: - if announcements is True: - announcements = self.api.creator_announcements( - post["service"], post["user"]) - post["announcements"] = announcements - - files = [] - hashes = set() - post_archives = post["archives"] = [] - - for file in itertools.chain.from_iterable( - g(post) for g in generators): - url = file["path"] - - if "\\" in url: - file["path"] = url = url.replace("\\", "/") - - match = find_hash(url) - if match: - file["hash"] = hash = match.group(1) - if not duplicates: - if hash in hashes: - self.log.debug("Skipping %s (duplicate)", url) - continue - hashes.add(hash) - else: - file["hash"] = hash = "" - - if url[0] == "/": - url = self.root + "/data" + url - elif url.startswith(self.root): - url = self.root + "/data" + url[20:] - file["url"] = url - - text.nameext_from_url(file.get("name", url), file) - ext = text.ext_from_url(url) - if not file["extension"]: - file["extension"] = ext - elif ext == "txt" and file["extension"] != "txt": - file["_http_validate"] = _validate - elif ext in exts_archive: - file["type"] = "archive" - if archives: - try: - data = self.api.file(file["hash"]) - data.update(file) - post_archives.append(data) - except Exception as exc: - self.log.warning( - "%s: Failed to retrieve archive metadata of " - "'%s' (%s: %s)", post["id"], file.get("name"), - exc.__class__.__name__, exc) - post_archives.append(file.copy()) - else: - post_archives.append(file.copy()) - - files.append(file) - - post["count"] = len(files) - yield Message.Directory, post - for post["num"], file in enumerate(files, 1): - if "id" in file: - del file["id"] - post.update(file) - yield Message.Url, file["url"], post - - def login(self): - username, password = self._get_auth_info() - if username: - self.cookies_update(self._login_impl( - (username, self.cookies_domain), password)) - - @cache(maxage=3650*86400, keyarg=1) - def _login_impl(self, username, password): - username = username[0] - self.log.info("Logging in as %s", username) - - url = self.root + "/api/v1/authentication/login" - data = {"username": username, "password": password} - - response = self.request(url, method="POST", json=data, fatal=False) - if response.status_code >= 400: - try: - msg = '"' + response.json()["error"] + '"' - except Exception: - msg = '"Username or password is incorrect"' - raise exception.AuthenticationError(msg) - - return {c.name: c.value for c in response.cookies} - - def _file(self, post): - file = post["file"] - if not file or "path" not in file: - return () - file["type"] = "file" - return (file,) - - def _attachments(self, post): - for attachment in post["attachments"]: - attachment["type"] = "attachment" - return post["attachments"] - - def _inline(self, post): - for path in self._find_inline(post.get("content") or ""): - yield {"path": path, "name": path, "type": "inline"} - - def _build_file_generators(self, filetypes): - if filetypes is None: - return (self._attachments, self._file, self._inline) - genmap = { - "file" : self._file, - "attachments": self._attachments, - "inline" : self._inline, - } - if isinstance(filetypes, str): - filetypes = filetypes.split(",") - return [genmap[ft] for ft in filetypes] - - def _parse_datetime(self, date_string): - if len(date_string) > 19: - date_string = date_string[:19] - return text.parse_datetime(date_string, "%Y-%m-%dT%H:%M:%S") - - def _revisions(self, posts): - return itertools.chain.from_iterable( - self._revisions_post(post) for post in posts) - - def _revisions_post(self, post): - post["revision_id"] = 0 - - try: - revs = self.api.creator_post_revisions( - post["service"], post["user"], post["id"]) - except exception.HttpError: - post["revision_hash"] = self._revision_hash(post) - post["revision_index"] = 1 - post["revision_count"] = 1 - return (post,) - revs.insert(0, post) - - for rev in revs: - rev["revision_hash"] = self._revision_hash(rev) - - if self.revisions_unique: - uniq = [] - last = None - for rev in revs: - if last != rev["revision_hash"]: - last = rev["revision_hash"] - uniq.append(rev) - revs = uniq - - cnt = idx = len(revs) - for rev in revs: - rev["revision_index"] = idx - rev["revision_count"] = cnt - idx -= 1 - - if self.revisions_reverse: - revs.reverse() - - return revs - - def _revisions_all(self, service, creator_id, post_id): - revs = self.api.creator_post_revisions(service, creator_id, post_id) - - cnt = idx = len(revs) - for rev in revs: - rev["revision_hash"] = self._revision_hash(rev) - rev["revision_index"] = idx - rev["revision_count"] = cnt - idx -= 1 - - if self.revisions_reverse: - revs.reverse() - - return revs - - def _revision_hash(self, revision): - rev = revision.copy() - rev.pop("revision_id", None) - rev.pop("added", None) - rev.pop("next", None) - rev.pop("prev", None) - rev["file"] = rev["file"].copy() - rev["file"].pop("name", None) - rev["attachments"] = [a.copy() for a in rev["attachments"]] - for a in rev["attachments"]: - a.pop("name", None) - return util.sha1(self._json_dumps(rev)) - - -def _validate(response): - return (response.headers["content-length"] != "9" or - response.content != b"not found") - - -class KemonopartyUserExtractor(KemonopartyExtractor): - """Extractor for all posts from a kemono.su user listing""" - subcategory = "user" - pattern = USER_PATTERN + r"/?(?:\?([^#]+))?(?:$|\?|#)" - example = "https://kemono.su/SERVICE/user/12345" - - def __init__(self, match): - self.subcategory = match.group(3) - KemonopartyExtractor.__init__(self, match) - - def posts(self): - endpoint = self.config("endpoint") - if endpoint == "legacy": - endpoint = self.api.creator_posts_legacy - elif endpoint == "legacy+": - endpoint = self._posts_legacy_plus - else: - endpoint = self.api.creator_posts - - _, _, service, creator_id, query = self.groups - params = text.parse_query(query) - return endpoint(service, creator_id, - params.get("o"), params.get("q"), params.get("tag")) - - def _posts_legacy_plus(self, service, creator_id, - offset=0, query=None, tags=None): - for post in self.api.creator_posts_legacy( - service, creator_id, offset, query, tags): - yield self.api.creator_post( - service, creator_id, post["id"])["post"] - - -class KemonopartyPostsExtractor(KemonopartyExtractor): - """Extractor for kemono.su post listings""" - subcategory = "posts" - pattern = BASE_PATTERN + r"/posts()()(?:/?\?([^#]+))?" - example = "https://kemono.su/posts" - - def posts(self): - params = text.parse_query(self.groups[4]) - return self.api.posts( - params.get("o"), params.get("q"), params.get("tag")) - - -class KemonopartyPostExtractor(KemonopartyExtractor): - """Extractor for a single kemono.su post""" - subcategory = "post" - pattern = USER_PATTERN + r"/post/([^/?#]+)(/revisions?(?:/(\d*))?)?" - example = "https://kemono.su/SERVICE/user/12345/post/12345" - - def __init__(self, match): - self.subcategory = match.group(3) - KemonopartyExtractor.__init__(self, match) - - def posts(self): - _, _, service, creator_id, post_id, revision, revision_id = self.groups - post = self.api.creator_post(service, creator_id, post_id) - if not revision: - return (post["post"],) - - self.revisions = False - - revs = self._revisions_all(service, creator_id, post_id) - if not revision_id: - return revs - - for rev in revs: - if str(rev["revision_id"]) == revision_id: - return (rev,) - - raise exception.NotFoundError("revision") - - -class KemonopartyDiscordExtractor(KemonopartyExtractor): - """Extractor for kemono.su discord servers""" - subcategory = "discord" - directory_fmt = ("{category}", "discord", "{server}", - "{channel_name|channel}") - filename_fmt = "{id}_{num:>02}_{filename}.{extension}" - archive_fmt = "discord_{server}_{id}_{num}" - pattern = (BASE_PATTERN + r"/discord/server/(\d+)" - r"(?:/(?:channel/)?(\d+)(?:#(.+))?|#(.+))") - example = "https://kemono.su/discord/server/12345/12345" - - def items(self): - self._prepare_ddosguard_cookies() - _, _, server_id, channel_id, channel_name, channel = self.groups - - if channel_id is None: - if channel.isdecimal() and len(channel) >= 16: - key = "id" - else: - key = "name" - else: - key = "id" - channel = channel_id - - if not channel_name or not channel_id: - for ch in self.api.discord_server(server_id): - if ch[key] == channel: - break - else: - raise exception.NotFoundError("channel") - channel_id = ch["id"] - channel_name = ch["name"] - - find_inline = re.compile( - r"https?://(?:cdn\.discordapp.com|media\.discordapp\.net)" - r"(/[A-Za-z0-9-._~:/?#\[\]@!$&'()*+,;%=]+)").findall - find_hash = re.compile(HASH_PATTERN).match - - posts = self.api.discord_channel(channel_id) - max_posts = self.config("max-posts") - if max_posts: - posts = itertools.islice(posts, max_posts) - - for post in posts: - files = [] - append = files.append - for attachment in post["attachments"]: - match = find_hash(attachment["path"]) - attachment["hash"] = match.group(1) if match else "" - attachment["type"] = "attachment" - append(attachment) - for path in find_inline(post["content"] or ""): - append({"path": "https://cdn.discordapp.com" + path, - "name": path, "type": "inline", "hash": ""}) - - post["channel_name"] = channel_name - post["date"] = self._parse_datetime(post["published"]) - post["count"] = len(files) - yield Message.Directory, post - - for post["num"], file in enumerate(files, 1): - post["hash"] = file["hash"] - post["type"] = file["type"] - url = file["path"] - - text.nameext_from_url(file.get("name", url), post) - if not post["extension"]: - post["extension"] = text.ext_from_url(url) - - if url[0] == "/": - url = self.root + "/data" + url - elif url.startswith(self.root): - url = self.root + "/data" + url[20:] - yield Message.Url, url, post - - -class KemonopartyDiscordServerExtractor(KemonopartyExtractor): - subcategory = "discord-server" - pattern = BASE_PATTERN + r"/discord/server/(\d+)$" - example = "https://kemono.su/discord/server/12345" - - def items(self): - server_id = self.groups[2] - for channel in self.api.discord_server(server_id): - url = "{}/discord/server/{}/{}#{}".format( - self.root, server_id, channel["id"], channel["name"]) - channel["_extractor"] = KemonopartyDiscordExtractor - yield Message.Queue, url, channel - - -class KemonopartyFavoriteExtractor(KemonopartyExtractor): - """Extractor for kemono.su favorites""" - subcategory = "favorite" - pattern = BASE_PATTERN + r"/(?:account/)?favorites()()(?:/?\?([^#]+))?" - example = "https://kemono.su/account/favorites/artists" - - def items(self): - self._prepare_ddosguard_cookies() - self.login() - - params = text.parse_query(self.groups[4]) - type = params.get("type") or self.config("favorites") or "artist" - - sort = params.get("sort") - order = params.get("order") or "desc" - - if type == "artist": - users = self.api.account_favorites("artist") - - if not sort: - sort = "updated" - users.sort(key=lambda x: x[sort] or util.NONE, - reverse=(order == "desc")) - - for user in users: - service = user["service"] - if service == "discord": - user["_extractor"] = KemonopartyDiscordServerExtractor - url = "{}/discord/server/{}".format( - self.root, user["id"]) - else: - user["_extractor"] = KemonopartyUserExtractor - url = "{}/{}/user/{}".format( - self.root, service, user["id"]) - yield Message.Queue, url, user - - elif type == "post": - posts = self.api.account_favorites("post") - - if not sort: - sort = "faved_seq" - posts.sort(key=lambda x: x[sort] or util.NONE, - reverse=(order == "desc")) - - for post in posts: - post["_extractor"] = KemonopartyPostExtractor - url = "{}/{}/user/{}/post/{}".format( - self.root, post["service"], post["user"], post["id"]) - yield Message.Queue, url, post - - -class KemonoAPI(): - """Interface for the Kemono API v1.1.0 - - https://kemono.su/documentation/api - """ - - def __init__(self, extractor): - self.extractor = extractor - self.root = extractor.root + "/api/v1" - - def posts(self, offset=0, query=None, tags=None): - endpoint = "/posts" - params = {"q": query, "o": offset, "tag": tags} - return self._pagination(endpoint, params, 50, "posts") - - def file(self, file_hash): - endpoint = "/file/" + file_hash - return self._call(endpoint) - - def creator_posts(self, service, creator_id, - offset=0, query=None, tags=None): - endpoint = "/{}/user/{}".format(service, creator_id) - params = {"q": query, "tag": tags, "o": offset} - return self._pagination(endpoint, params, 50) - - def creator_posts_legacy(self, service, creator_id, - offset=0, query=None, tags=None): - endpoint = "/{}/user/{}/posts-legacy".format(service, creator_id) - params = {"o": offset, "tag": tags, "q": query} - return self._pagination(endpoint, params, 50, "results") - - def creator_announcements(self, service, creator_id): - endpoint = "/{}/user/{}/announcements".format(service, creator_id) - return self._call(endpoint) - - def creator_dms(self, service, creator_id): - endpoint = "/{}/user/{}/dms".format(service, creator_id) - return self._call(endpoint) - - def creator_fancards(self, service, creator_id): - endpoint = "/{}/user/{}/fancards".format(service, creator_id) - return self._call(endpoint) - - def creator_post(self, service, creator_id, post_id): - endpoint = "/{}/user/{}/post/{}".format(service, creator_id, post_id) - return self._call(endpoint) - - def creator_post_comments(self, service, creator_id, post_id): - endpoint = "/{}/user/{}/post/{}/comments".format( - service, creator_id, post_id) - return self._call(endpoint) - - def creator_post_revisions(self, service, creator_id, post_id): - endpoint = "/{}/user/{}/post/{}/revisions".format( - service, creator_id, post_id) - return self._call(endpoint) - - def creator_profile(self, service, creator_id): - endpoint = "/{}/user/{}/profile".format(service, creator_id) - return self._call(endpoint) - - def creator_links(self, service, creator_id): - endpoint = "/{}/user/{}/links".format(service, creator_id) - return self._call(endpoint) - - def creator_tags(self, service, creator_id): - endpoint = "/{}/user/{}/tags".format(service, creator_id) - return self._call(endpoint) - - def discord_channel(self, channel_id): - endpoint = "/discord/channel/{}".format(channel_id) - return self._pagination(endpoint, {}, 150) - - def discord_server(self, server_id): - endpoint = "/discord/channel/lookup/{}".format(server_id) - return self._call(endpoint) - - def account_favorites(self, type): - endpoint = "/account/favorites" - params = {"type": type} - return self._call(endpoint, params) - - def _call(self, endpoint, params=None): - url = self.root + endpoint - response = self.extractor.request(url, params=params) - return response.json() - - def _pagination(self, endpoint, params, batch=50, key=False): - offset = text.parse_int(params.get("o")) - params["o"] = offset - offset % batch - - while True: - data = self._call(endpoint, params) - - if key: - data = data.get(key) - if not data: - return - yield from data - - if len(data) < batch: - return - params["o"] += batch |
