aboutsummaryrefslogtreecommitdiffstats
path: root/gallery_dl/extractor/kemono.py
diff options
context:
space:
mode:
Diffstat (limited to 'gallery_dl/extractor/kemono.py')
-rw-r--r--gallery_dl/extractor/kemono.py680
1 files changed, 680 insertions, 0 deletions
diff --git a/gallery_dl/extractor/kemono.py b/gallery_dl/extractor/kemono.py
new file mode 100644
index 0000000..1e88891
--- /dev/null
+++ b/gallery_dl/extractor/kemono.py
@@ -0,0 +1,680 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2021-2025 Mike Fährmann
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+
+"""Extractors for https://kemono.cr/"""
+
+from .common import Extractor, Message
+from .. import text, util, exception
+from ..cache import cache, memcache
+import itertools
+import json
+
+BASE_PATTERN = (r"(?:https?://)?(?:www\.|beta\.)?"
+ r"(kemono|coomer)\.(cr|s[tu]|party)")
+USER_PATTERN = BASE_PATTERN + r"/([^/?#]+)/user/([^/?#]+)"
+HASH_PATTERN = r"/[0-9a-f]{2}/[0-9a-f]{2}/([0-9a-f]{64})"
+
+
+class KemonoExtractor(Extractor):
+ """Base class for kemono extractors"""
+ category = "kemono"
+ root = "https://kemono.cr"
+ directory_fmt = ("{category}", "{service}", "{user}")
+ filename_fmt = "{id}_{title[:180]}_{num:>02}_{filename[:180]}.{extension}"
+ archive_fmt = "{service}_{user}_{id}_{num}"
+ cookies_domain = ".kemono.cr"
+
+ def __init__(self, match):
+ if match[1] == "coomer":
+ self.category = "coomer"
+ self.root = "https://coomer.st"
+ self.cookies_domain = ".coomer.st"
+ Extractor.__init__(self, match)
+
+ def _init(self):
+ self.api = KemonoAPI(self)
+ self.revisions = self.config("revisions")
+ if self.revisions:
+ self.revisions_unique = (self.revisions == "unique")
+ order = self.config("order-revisions")
+ self.revisions_reverse = order[0] in ("r", "a") if order else False
+
+ self._find_inline = util.re(
+ r'src="(?:https?://(?:kemono\.cr|coomer\.st))?(/inline/[^"]+'
+ r'|/[0-9a-f]{2}/[0-9a-f]{2}/[0-9a-f]{64}\.[^"]+)').findall
+ self._json_dumps = json.JSONEncoder(
+ ensure_ascii=False, check_circular=False,
+ sort_keys=True, separators=(",", ":")).encode
+
+ def items(self):
+ find_hash = util.re(HASH_PATTERN).match
+ generators = self._build_file_generators(self.config("files"))
+ announcements = True if self.config("announcements") else None
+ archives = True if self.config("archives") else False
+ comments = True if self.config("comments") else False
+ dms = True if self.config("dms") else None
+ max_posts = self.config("max-posts")
+ creator_info = {} if self.config("metadata", True) else None
+ exts_archive = util.EXTS_ARCHIVE
+
+ if duplicates := self.config("duplicates"):
+ if isinstance(duplicates, str):
+ duplicates = set(duplicates.split(","))
+ elif isinstance(duplicates, (list, tuple)):
+ duplicates = set(duplicates)
+ else:
+ duplicates = {"file", "attachment", "inline"}
+ else:
+ duplicates = ()
+
+ # prevent files from being sent with gzip compression
+ headers = {"Accept-Encoding": "identity"}
+
+ posts = self.posts()
+ if max_posts:
+ posts = itertools.islice(posts, max_posts)
+ if self.revisions:
+ posts = self._revisions(posts)
+
+ for post in posts:
+ headers["Referer"] = (f"{self.root}/{post['service']}/user/"
+ f"{post['user']}/post/{post['id']}")
+ post["_http_headers"] = headers
+ post["date"] = self._parse_datetime(
+ post.get("published") or post.get("added") or "")
+ service = post["service"]
+ creator_id = post["user"]
+
+ if creator_info is not None:
+ key = f"{service}_{creator_id}"
+ if key not in creator_info:
+ creator = creator_info[key] = self.api.creator_profile(
+ service, creator_id)
+ else:
+ creator = creator_info[key]
+
+ post["user_profile"] = creator
+ post["username"] = creator["name"]
+
+ if comments:
+ try:
+ post["comments"] = self.api.creator_post_comments(
+ service, creator_id, post["id"])
+ except exception.HttpError:
+ post["comments"] = ()
+ if dms is not None:
+ if dms is True:
+ dms = self.api.creator_dms(
+ post["service"], post["user"])
+ try:
+ dms = dms["props"]["dms"]
+ except Exception:
+ dms = ()
+ post["dms"] = dms
+ if announcements is not None:
+ if announcements is True:
+ announcements = self.api.creator_announcements(
+ post["service"], post["user"])
+ post["announcements"] = announcements
+
+ files = []
+ hashes = set()
+ post_archives = post["archives"] = []
+
+ for file in itertools.chain.from_iterable(
+ g(post) for g in generators):
+ url = file["path"]
+
+ if "\\" in url:
+ file["path"] = url = url.replace("\\", "/")
+
+ if match := find_hash(url):
+ file["hash"] = hash = match[1]
+ if file["type"] not in duplicates and hash in hashes:
+ self.log.debug("Skipping %s %s (duplicate)",
+ file["type"], url)
+ continue
+ hashes.add(hash)
+ else:
+ file["hash"] = hash = ""
+
+ if url[0] == "/":
+ url = self.root + "/data" + url
+ elif url.startswith(self.root):
+ url = self.root + "/data" + url[20:]
+ file["url"] = url
+
+ text.nameext_from_url(file.get("name", url), file)
+ ext = text.ext_from_url(url)
+ if not file["extension"]:
+ file["extension"] = ext
+ elif ext == "txt" and file["extension"] != "txt":
+ file["_http_validate"] = _validate
+ elif ext in exts_archive:
+ file["type"] = "archive"
+ if archives:
+ try:
+ data = self.api.file(hash)
+ data.update(file)
+ post_archives.append(data)
+ except Exception as exc:
+ self.log.warning(
+ "%s: Failed to retrieve archive metadata of "
+ "'%s' (%s: %s)", post["id"], file.get("name"),
+ exc.__class__.__name__, exc)
+ post_archives.append(file.copy())
+ else:
+ post_archives.append(file.copy())
+
+ files.append(file)
+
+ post["count"] = len(files)
+ yield Message.Directory, post
+ for post["num"], file in enumerate(files, 1):
+ if "id" in file:
+ del file["id"]
+ post.update(file)
+ yield Message.Url, file["url"], post
+
+ def login(self):
+ username, password = self._get_auth_info()
+ if username:
+ self.cookies_update(self._login_impl(
+ (username, self.cookies_domain), password))
+
+ @cache(maxage=3650*86400, keyarg=1)
+ def _login_impl(self, username, password):
+ username = username[0]
+ self.log.info("Logging in as %s", username)
+
+ url = self.root + "/api/v1/authentication/login"
+ data = {"username": username, "password": password}
+
+ response = self.request(url, method="POST", json=data, fatal=False)
+ if response.status_code >= 400:
+ try:
+ msg = '"' + response.json()["error"] + '"'
+ except Exception:
+ msg = '"Username or password is incorrect"'
+ raise exception.AuthenticationError(msg)
+
+ return {c.name: c.value for c in response.cookies}
+
+ def _file(self, post):
+ file = post["file"]
+ if not file or "path" not in file:
+ return ()
+ file["type"] = "file"
+ return (file,)
+
+ def _attachments(self, post):
+ for attachment in post["attachments"]:
+ attachment["type"] = "attachment"
+ return post["attachments"]
+
+ def _inline(self, post):
+ for path in self._find_inline(post.get("content") or ""):
+ yield {"path": path, "name": path, "type": "inline"}
+
+ def _build_file_generators(self, filetypes):
+ if filetypes is None:
+ return (self._attachments, self._file, self._inline)
+ genmap = {
+ "file" : self._file,
+ "attachments": self._attachments,
+ "inline" : self._inline,
+ }
+ if isinstance(filetypes, str):
+ filetypes = filetypes.split(",")
+ return [genmap[ft] for ft in filetypes]
+
+ def _parse_datetime(self, date_string):
+ if len(date_string) > 19:
+ date_string = date_string[:19]
+ return text.parse_datetime(date_string, "%Y-%m-%dT%H:%M:%S")
+
+ def _revisions(self, posts):
+ return itertools.chain.from_iterable(
+ self._revisions_post(post) for post in posts)
+
+ def _revisions_post(self, post):
+ post["revision_id"] = 0
+
+ try:
+ revs = self.api.creator_post_revisions(
+ post["service"], post["user"], post["id"])
+ except exception.HttpError:
+ post["revision_hash"] = self._revision_hash(post)
+ post["revision_index"] = 1
+ post["revision_count"] = 1
+ return (post,)
+ revs.insert(0, post)
+
+ for rev in revs:
+ rev["revision_hash"] = self._revision_hash(rev)
+
+ if self.revisions_unique:
+ uniq = []
+ last = None
+ for rev in revs:
+ if last != rev["revision_hash"]:
+ last = rev["revision_hash"]
+ uniq.append(rev)
+ revs = uniq
+
+ cnt = idx = len(revs)
+ for rev in revs:
+ rev["revision_index"] = idx
+ rev["revision_count"] = cnt
+ idx -= 1
+
+ if self.revisions_reverse:
+ revs.reverse()
+
+ return revs
+
+ def _revisions_all(self, service, creator_id, post_id):
+ revs = self.api.creator_post_revisions(service, creator_id, post_id)
+
+ cnt = idx = len(revs)
+ for rev in revs:
+ rev["revision_hash"] = self._revision_hash(rev)
+ rev["revision_index"] = idx
+ rev["revision_count"] = cnt
+ idx -= 1
+
+ if self.revisions_reverse:
+ revs.reverse()
+
+ return revs
+
+ def _revision_hash(self, revision):
+ rev = revision.copy()
+ rev.pop("revision_id", None)
+ rev.pop("added", None)
+ rev.pop("next", None)
+ rev.pop("prev", None)
+ rev["file"] = rev["file"].copy()
+ rev["file"].pop("name", None)
+ rev["attachments"] = [a.copy() for a in rev["attachments"]]
+ for a in rev["attachments"]:
+ a.pop("name", None)
+ return util.sha1(self._json_dumps(rev))
+
+
+def _validate(response):
+ return (response.headers["content-length"] != "9" or
+ response.content != b"not found")
+
+
+class KemonoUserExtractor(KemonoExtractor):
+ """Extractor for all posts from a kemono.cr user listing"""
+ subcategory = "user"
+ pattern = USER_PATTERN + r"/?(?:\?([^#]+))?(?:$|\?|#)"
+ example = "https://kemono.cr/SERVICE/user/12345"
+
+ def __init__(self, match):
+ self.subcategory = match[3]
+ KemonoExtractor.__init__(self, match)
+
+ def posts(self):
+ _, _, service, creator_id, query = self.groups
+ params = text.parse_query(query)
+ tag = params.get("tag")
+
+ endpoint = self.config("endpoint")
+ if endpoint == "legacy+":
+ endpoint = self._posts_legacy_plus
+ elif endpoint == "legacy" or tag:
+ endpoint = self.api.creator_posts_legacy
+ else:
+ endpoint = self.api.creator_posts
+
+ return endpoint(service, creator_id,
+ params.get("o"), params.get("q"), tag)
+
+ def _posts_legacy_plus(self, service, creator_id,
+ offset=0, query=None, tags=None):
+ for post in self.api.creator_posts_legacy(
+ service, creator_id, offset, query, tags):
+ yield self.api.creator_post(
+ service, creator_id, post["id"])["post"]
+
+
+class KemonoPostsExtractor(KemonoExtractor):
+ """Extractor for kemono.cr post listings"""
+ subcategory = "posts"
+ pattern = BASE_PATTERN + r"/posts()()(?:/?\?([^#]+))?"
+ example = "https://kemono.cr/posts"
+
+ def posts(self):
+ params = text.parse_query(self.groups[4])
+ return self.api.posts(
+ params.get("o"), params.get("q"), params.get("tag"))
+
+
+class KemonoPostExtractor(KemonoExtractor):
+ """Extractor for a single kemono.cr post"""
+ subcategory = "post"
+ pattern = USER_PATTERN + r"/post/([^/?#]+)(/revisions?(?:/(\d*))?)?"
+ example = "https://kemono.cr/SERVICE/user/12345/post/12345"
+
+ def __init__(self, match):
+ self.subcategory = match[3]
+ KemonoExtractor.__init__(self, match)
+
+ def posts(self):
+ _, _, service, creator_id, post_id, revision, revision_id = self.groups
+ post = self.api.creator_post(service, creator_id, post_id)
+ if not revision:
+ return (post["post"],)
+
+ self.revisions = False
+
+ revs = self._revisions_all(service, creator_id, post_id)
+ if not revision_id:
+ return revs
+
+ for rev in revs:
+ if str(rev["revision_id"]) == revision_id:
+ return (rev,)
+
+ raise exception.NotFoundError("revision")
+
+
+class KemonoDiscordExtractor(KemonoExtractor):
+ """Extractor for kemono.cr discord servers"""
+ subcategory = "discord"
+ directory_fmt = ("{category}", "discord",
+ "{server_id} {server}", "{channel_id} {channel}")
+ filename_fmt = "{id}_{num:>02}_{filename}.{extension}"
+ archive_fmt = "discord_{server_id}_{id}_{num}"
+ pattern = BASE_PATTERN + r"/discord/server/(\d+)[/#](?:channel/)?(\d+)"
+ example = "https://kemono.cr/discord/server/12345/12345"
+
+ def items(self):
+ _, _, server_id, channel_id = self.groups
+
+ try:
+ server, channels = discord_server_info(self, server_id)
+ channel = channels[channel_id]
+ except Exception:
+ raise exception.NotFoundError("channel")
+
+ data = {
+ "server" : server["name"],
+ "server_id" : server["id"],
+ "channel" : channel["name"],
+ "channel_id" : channel["id"],
+ "channel_nsfw" : channel["is_nsfw"],
+ "channel_type" : channel["type"],
+ "channel_topic": channel["topic"],
+ "parent_id" : channel["parent_channel_id"],
+ }
+
+ find_inline = util.re(
+ r"https?://(?:cdn\.discordapp.com|media\.discordapp\.net)"
+ r"(/[A-Za-z0-9-._~:/?#\[\]@!$&'()*+,;%=]+)").findall
+ find_hash = util.re(HASH_PATTERN).match
+
+ posts = self.api.discord_channel(channel_id)
+ if max_posts := self.config("max-posts"):
+ posts = itertools.islice(posts, max_posts)
+
+ for post in posts:
+ files = []
+ for attachment in post["attachments"]:
+ match = find_hash(attachment["path"])
+ attachment["hash"] = match[1] if match else ""
+ attachment["type"] = "attachment"
+ files.append(attachment)
+ for path in find_inline(post["content"] or ""):
+ files.append({"path": "https://cdn.discordapp.com" + path,
+ "name": path, "type": "inline", "hash": ""})
+
+ post.update(data)
+ post["date"] = self._parse_datetime(post["published"])
+ post["count"] = len(files)
+ yield Message.Directory, post
+
+ for post["num"], file in enumerate(files, 1):
+ post["hash"] = file["hash"]
+ post["type"] = file["type"]
+ url = file["path"]
+
+ text.nameext_from_url(file.get("name", url), post)
+ if not post["extension"]:
+ post["extension"] = text.ext_from_url(url)
+
+ if url[0] == "/":
+ url = self.root + "/data" + url
+ elif url.startswith(self.root):
+ url = self.root + "/data" + url[20:]
+ yield Message.Url, url, post
+
+
+class KemonoDiscordServerExtractor(KemonoExtractor):
+ subcategory = "discord-server"
+ pattern = BASE_PATTERN + r"/discord/server/(\d+)$"
+ example = "https://kemono.cr/discord/server/12345"
+
+ def items(self):
+ server_id = self.groups[2]
+ server, channels = discord_server_info(self, server_id)
+ for channel in channels.values():
+ url = (f"{self.root}/discord/server/{server_id}/"
+ f"{channel['id']}#{channel['name']}")
+ yield Message.Queue, url, {
+ "server" : server,
+ "channel" : channel,
+ "_extractor": KemonoDiscordExtractor,
+ }
+
+
+@memcache(keyarg=1)
+def discord_server_info(extr, server_id):
+ server = extr.api.discord_server(server_id)
+ return server, {
+ channel["id"]: channel
+ for channel in server.pop("channels")
+ }
+
+
+class KemonoFavoriteExtractor(KemonoExtractor):
+ """Extractor for kemono.cr favorites"""
+ subcategory = "favorite"
+ pattern = BASE_PATTERN + r"/(?:account/)?favorites()()(?:/?\?([^#]+))?"
+ example = "https://kemono.cr/account/favorites/artists"
+
+ def items(self):
+ self.login()
+
+ params = text.parse_query(self.groups[4])
+ type = params.get("type") or self.config("favorites") or "artist"
+
+ sort = params.get("sort")
+ order = params.get("order") or "desc"
+
+ if type == "artist":
+ users = self.api.account_favorites("artist")
+
+ if not sort:
+ sort = "updated"
+ users.sort(key=lambda x: x[sort] or util.NONE,
+ reverse=(order == "desc"))
+
+ for user in users:
+ service = user["service"]
+ if service == "discord":
+ user["_extractor"] = KemonoDiscordServerExtractor
+ url = f"{self.root}/discord/server/{user['id']}"
+ else:
+ user["_extractor"] = KemonoUserExtractor
+ url = f"{self.root}/{service}/user/{user['id']}"
+ yield Message.Queue, url, user
+
+ elif type == "post":
+ posts = self.api.account_favorites("post")
+
+ if not sort:
+ sort = "faved_seq"
+ posts.sort(key=lambda x: x[sort] or util.NONE,
+ reverse=(order == "desc"))
+
+ for post in posts:
+ post["_extractor"] = KemonoPostExtractor
+ url = (f"{self.root}/{post['service']}/user/"
+ f"{post['user']}/post/{post['id']}")
+ yield Message.Queue, url, post
+
+
+class KemonoArtistsExtractor(KemonoExtractor):
+ """Extractor for kemono artists"""
+ subcategory = "artists"
+ pattern = BASE_PATTERN + r"/artists(?:\?([^#]+))?"
+ example = "https://kemono.cr/artists"
+
+ def items(self):
+ params = text.parse_query(self.groups[2])
+ users = self.api.creators()
+
+ if params.get("service"):
+ service = params["service"].lower()
+ users = [user for user in users
+ if user["service"] == service]
+
+ if params.get("q"):
+ q = params["q"].lower()
+ users = [user for user in users
+ if q in user["name"].lower()]
+
+ sort = params.get("sort_by") or "favorited"
+ order = params.get("order") or "desc"
+ users.sort(key=lambda user: user[sort] or util.NONE,
+ reverse=(order != "asc"))
+
+ for user in users:
+ service = user["service"]
+ if service == "discord":
+ user["_extractor"] = KemonoDiscordServerExtractor
+ url = f"{self.root}/discord/server/{user['id']}"
+ else:
+ user["_extractor"] = KemonoUserExtractor
+ url = f"{self.root}/{service}/user/{user['id']}"
+ yield Message.Queue, url, user
+
+
+class KemonoAPI():
+ """Interface for the Kemono API v1.1.0
+
+ https://kemono.cr/documentation/api
+ """
+
+ def __init__(self, extractor):
+ self.extractor = extractor
+ self.root = extractor.root + "/api/v1"
+
+ def posts(self, offset=0, query=None, tags=None):
+ endpoint = "/posts"
+ params = {"q": query, "o": offset, "tag": tags}
+ return self._pagination(endpoint, params, 50, "posts")
+
+ def file(self, file_hash):
+ endpoint = "/file/" + file_hash
+ return self._call(endpoint)
+
+ def creators(self):
+ endpoint = "/creators.txt"
+ return self._call(endpoint)
+
+ def creator_posts(self, service, creator_id,
+ offset=0, query=None, tags=None):
+ endpoint = f"/{service}/user/{creator_id}"
+ params = {"q": query, "tag": tags, "o": offset}
+ return self._pagination(endpoint, params, 50)
+
+ def creator_posts_legacy(self, service, creator_id,
+ offset=0, query=None, tags=None):
+ endpoint = f"/{service}/user/{creator_id}/posts-legacy"
+ params = {"o": offset, "tag": tags, "q": query}
+ return self._pagination(endpoint, params, 50, "results")
+
+ def creator_announcements(self, service, creator_id):
+ endpoint = f"/{service}/user/{creator_id}/announcements"
+ return self._call(endpoint)
+
+ def creator_dms(self, service, creator_id):
+ endpoint = f"/{service}/user/{creator_id}/dms"
+ return self._call(endpoint)
+
+ def creator_fancards(self, service, creator_id):
+ endpoint = f"/{service}/user/{creator_id}/fancards"
+ return self._call(endpoint)
+
+ def creator_post(self, service, creator_id, post_id):
+ endpoint = f"/{service}/user/{creator_id}/post/{post_id}"
+ return self._call(endpoint)
+
+ def creator_post_comments(self, service, creator_id, post_id):
+ endpoint = f"/{service}/user/{creator_id}/post/{post_id}/comments"
+ return self._call(endpoint)
+
+ def creator_post_revisions(self, service, creator_id, post_id):
+ endpoint = f"/{service}/user/{creator_id}/post/{post_id}/revisions"
+ return self._call(endpoint)
+
+ def creator_profile(self, service, creator_id):
+ endpoint = f"/{service}/user/{creator_id}/profile"
+ return self._call(endpoint)
+
+ def creator_links(self, service, creator_id):
+ endpoint = f"/{service}/user/{creator_id}/links"
+ return self._call(endpoint)
+
+ def creator_tags(self, service, creator_id):
+ endpoint = f"/{service}/user/{creator_id}/tags"
+ return self._call(endpoint)
+
+ def discord_channel(self, channel_id):
+ endpoint = f"/discord/channel/{channel_id}"
+ return self._pagination(endpoint, {}, 150)
+
+ def discord_channel_lookup(self, server_id):
+ endpoint = f"/discord/channel/lookup/{server_id}"
+ return self._call(endpoint)
+
+ def discord_server(self, server_id):
+ endpoint = f"/discord/server/{server_id}"
+ return self._call(endpoint)
+
+ def account_favorites(self, type):
+ endpoint = "/account/favorites"
+ params = {"type": type}
+ return self._call(endpoint, params)
+
+ def _call(self, endpoint, params=None):
+ url = self.root + endpoint
+ response = self.extractor.request(url, params=params)
+ return response.json()
+
+ def _pagination(self, endpoint, params, batch=50, key=False):
+ offset = text.parse_int(params.get("o"))
+ params["o"] = offset - offset % batch
+
+ while True:
+ data = self._call(endpoint, params)
+
+ if key:
+ data = data.get(key)
+ if not data:
+ return
+ yield from data
+
+ if len(data) < batch:
+ return
+ params["o"] += batch