diff options
Diffstat (limited to 'gallery_dl/extractor/civitai.py')
| -rw-r--r-- | gallery_dl/extractor/civitai.py | 387 |
1 files changed, 281 insertions, 106 deletions
diff --git a/gallery_dl/extractor/civitai.py b/gallery_dl/extractor/civitai.py index 56fe851..dc5b777 100644 --- a/gallery_dl/extractor/civitai.py +++ b/gallery_dl/extractor/civitai.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2024 Mike Fährmann +# Copyright 2024-2025 Mike Fährmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as @@ -8,7 +8,7 @@ """Extractors for https://www.civitai.com/""" -from .common import Extractor, Message +from .common import Extractor, Message, Dispatch from .. import text, util, exception from ..cache import memcache import itertools @@ -22,9 +22,9 @@ class CivitaiExtractor(Extractor): """Base class for civitai extractors""" category = "civitai" root = "https://civitai.com" - directory_fmt = ("{category}", "{username|user[username]}", "images") - filename_fmt = "{file[id]|id|filename}.{extension}" - archive_fmt = "{file[uuid]|uuid}" + directory_fmt = ("{category}", "{user[username]}", "images") + filename_fmt = "{file[id]}.{extension}" + archive_fmt = "{file[uuid]}" request_interval = (0.5, 1.5) def _init(self): @@ -35,8 +35,7 @@ class CivitaiExtractor(Extractor): self.log.debug("Using tRPC API") self.api = CivitaiTrpcAPI(self) - quality = self.config("quality") - if quality: + if quality := self.config("quality"): if not isinstance(quality, str): quality = ",".join(quality) self._image_quality = quality @@ -45,8 +44,7 @@ class CivitaiExtractor(Extractor): self._image_quality = "original=true" self._image_ext = "png" - quality_video = self.config("quality-videos") - if quality_video: + if quality_video := self.config("quality-videos"): if not isinstance(quality_video, str): quality_video = ",".join(quality_video) if quality_video[0] == "+": @@ -59,28 +57,27 @@ class CivitaiExtractor(Extractor): self._video_quality = "quality=100" self._video_ext = "webm" - metadata = self.config("metadata") - if metadata: + if metadata := self.config("metadata"): if isinstance(metadata, str): metadata = metadata.split(",") elif not isinstance(metadata, (list, tuple)): - metadata = ("generation", "version") + metadata = ("generation", "version", "post") self._meta_generation = ("generation" in metadata) self._meta_version = ("version" in metadata) + self._meta_post = ("post" in metadata) else: - self._meta_generation = self._meta_version = False + self._meta_generation = self._meta_version = self._meta_post = \ + False def items(self): - models = self.models() - if models: + if models := self.models(): data = {"_extractor": CivitaiModelExtractor} for model in models: - url = "{}/models/{}".format(self.root, model["id"]) + url = f"{self.root}/models/{model['id']}" yield Message.Queue, url, data return - posts = self.posts() - if posts: + if posts := self.posts(): for post in posts: if "images" in post: @@ -105,27 +102,37 @@ class CivitaiExtractor(Extractor): yield Message.Url, file["url"], file return - images = self.images() - if images: - for image in images: + if images := self.images(): + for file in images: + + data = { + "file": file, + "user": file.pop("user"), + } if self._meta_generation: - image["generation"] = \ - self._extract_meta_generation(image) + data["generation"] = \ + self._extract_meta_generation(file) if self._meta_version: - image["model"], image["version"] = \ - self._extract_meta_version(image, False) - image["date"] = text.parse_datetime( - image["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ") - - url = self._url(image) - text.nameext_from_url(url, image) - if not image["extension"]: - image["extension"] = ( - self._video_ext if image.get("type") == "video" else + data["model"], data["version"] = \ + self._extract_meta_version(file, False) + if "post" in file: + data["post"] = file.pop("post") + if self._meta_post and "post" not in data: + data["post"] = post = self._extract_meta_post(file) + if post: + post.pop("user", None) + file["date"] = text.parse_datetime( + file["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ") + + data["url"] = url = self._url(file) + text.nameext_from_url(url, data) + if not data["extension"]: + data["extension"] = ( + self._video_ext if file.get("type") == "video" else self._image_ext) - yield Message.Directory, image - yield Message.Url, url, image + yield Message.Directory, data + yield Message.Url, url, data return def models(self): @@ -151,12 +158,13 @@ class CivitaiExtractor(Extractor): image["uuid"] = url name = image.get("name") if not name: - mime = image.get("mimeType") or self._image_ext - name = "{}.{}".format(image.get("id"), mime.rpartition("/")[2]) - return ( - "https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA/{}/{}/{}".format( - url, quality, name) - ) + if mime := image.get("mimeType"): + name = f"{image.get('id')}.{mime.rpartition('/')[2]}" + else: + ext = self._video_ext if video else self._image_ext + name = f"{image.get('id')}.{ext}" + return (f"https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA" + f"/{url}/{quality}/{name}") def _image_results(self, images): for num, file in enumerate(images, 1): @@ -171,10 +179,29 @@ class CivitaiExtractor(Extractor): self._image_ext) if "id" not in file and data["filename"].isdecimal(): file["id"] = text.parse_int(data["filename"]) + if "date" not in file: + file["date"] = text.parse_datetime( + file["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ") if self._meta_generation: file["generation"] = self._extract_meta_generation(file) yield data + def _image_reactions(self): + self._require_auth() + + params = self.params + params["authed"] = True + params["useIndex"] = False + if "reactions" not in params: + params["reactions"] = ("Like", "Dislike", "Heart", "Laugh", "Cry") + return self.api.images(params) + + def _require_auth(self): + if "Authorization" not in self.api.headers and \ + not self.cookies.get( + "__Secure-civitai-token", domain=".civitai.com"): + raise exception.AuthRequired(("'api-key'", "cookies")) + def _parse_query(self, value): return text.parse_query_list( value, {"tags", "reactions", "baseModels", "tools", "techniques", @@ -186,10 +213,18 @@ class CivitaiExtractor(Extractor): except Exception as exc: return self.log.debug("", exc_info=exc) + def _extract_meta_post(self, image): + try: + post = self.api.post(image["postId"]) + post["date"] = text.parse_datetime( + post["publishedAt"], "%Y-%m-%dT%H:%M:%S.%fZ") + return post + except Exception as exc: + return self.log.debug("", exc_info=exc) + def _extract_meta_version(self, item, is_post=True): try: - version_id = self._extract_version_id(item, is_post) - if version_id: + if version_id := self._extract_version_id(item, is_post): version = self.api.model_version(version_id).copy() return version.pop("model", None), version except Exception as exc: @@ -197,12 +232,11 @@ class CivitaiExtractor(Extractor): return None, None def _extract_version_id(self, item, is_post=True): - version_id = item.get("modelVersionId") - if version_id: + if version_id := item.get("modelVersionId"): return version_id - - version_ids = item.get("modelVersionIds") - if version_ids: + if version_ids := item.get("modelVersionIds"): + return version_ids[0] + if version_ids := item.get("modelVersionIdsManual"): return version_ids[0] if is_post: @@ -285,16 +319,15 @@ class CivitaiModelExtractor(CivitaiExtractor): if not sep: name = ext ext = "bin" - file["uuid"] = "model-{}-{}-{}".format( - model["id"], version["id"], file["id"]) + file["uuid"] = f"model-{model['id']}-{version['id']}-{file['id']}" files.append({ "num" : num, "file" : file, "filename" : name, "extension": ext, - "url" : (file.get("downloadUrl") or - "{}/api/download/models/{}".format( - self.root, version["id"])), + "url" : ( + file.get("downloadUrl") or + f"{self.root}/api/download/models/{version['id']}"), "_http_headers" : { "Authorization": self.api.headers.get("Authorization")}, "_http_validate": self._validate_file_model, @@ -308,7 +341,7 @@ class CivitaiModelExtractor(CivitaiExtractor): else: params = { "modelVersionId": version["id"], - "prioritizedUserIds": [user["id"]], + "prioritizedUserIds": (user["id"],), "period": "AllTime", "sort": "Most Reactions", "limit": 20, @@ -327,8 +360,7 @@ class CivitaiModelExtractor(CivitaiExtractor): alert = text.extr( response.text, 'mantine-Alert-message">', "</div></div></div>") if alert: - msg = "\"{}\" - 'api-key' required".format( - text.remove_html(alert)) + msg = f"\"{text.remove_html(alert)}\" - 'api-key' required" else: msg = "'api-key' required to download this file" self.log.warning(msg) @@ -366,14 +398,26 @@ class CivitaiTagExtractor(CivitaiExtractor): return self.api.models_tag(tag) -class CivitaiSearchExtractor(CivitaiExtractor): - subcategory = "search" +class CivitaiSearchModelsExtractor(CivitaiExtractor): + subcategory = "search-models" pattern = BASE_PATTERN + r"/search/models\?([^#]+)" example = "https://civitai.com/search/models?query=QUERY" def models(self): - params = text.parse_query(self.groups[0]) - return self.api.models(params) + params = self._parse_query(self.groups[0]) + return CivitaiSearchAPI(self).search_models( + params.get("query"), params.get("sortBy"), self.api.nsfw) + + +class CivitaiSearchImagesExtractor(CivitaiExtractor): + subcategory = "search-images" + pattern = BASE_PATTERN + r"/search/images\?([^#]+)" + example = "https://civitai.com/search/images?query=QUERY" + + def images(self): + params = self._parse_query(self.groups[0]) + return CivitaiSearchAPI(self).search_images( + params.get("query"), params.get("sortBy"), self.api.nsfw) class CivitaiModelsExtractor(CivitaiExtractor): @@ -382,7 +426,7 @@ class CivitaiModelsExtractor(CivitaiExtractor): example = "https://civitai.com/models" def models(self): - params = text.parse_query(self.groups[0]) + params = self._parse_query(self.groups[0]) return self.api.models(params) @@ -392,26 +436,32 @@ class CivitaiImagesExtractor(CivitaiExtractor): example = "https://civitai.com/images" def images(self): - params = text.parse_query(self.groups[0]) + params = self._parse_query(self.groups[0]) return self.api.images(params) -class CivitaiUserExtractor(CivitaiExtractor): - subcategory = "user" +class CivitaiPostsExtractor(CivitaiExtractor): + subcategory = "posts" + pattern = BASE_PATTERN + r"/posts(?:/?\?([^#]+))?(?:$|#)" + example = "https://civitai.com/posts" + + def posts(self): + params = self._parse_query(self.groups[0]) + return self.api.posts(params) + + +class CivitaiUserExtractor(Dispatch, CivitaiExtractor): pattern = USER_PATTERN + r"/?(?:$|\?|#)" example = "https://civitai.com/user/USER" - def initialize(self): - pass - def items(self): - base = "{}/user/{}/".format(self.root, self.groups[0]) + base = f"{self.root}/user/{self.groups[0]}/" return self._dispatch_extractors(( (CivitaiUserModelsExtractor, base + "models"), (CivitaiUserPostsExtractor , base + "posts"), (CivitaiUserImagesExtractor, base + "images"), (CivitaiUserVideosExtractor, base + "videos"), - ), ("user-models", "user-posts")) + ), ("user-images", "user-videos")) class CivitaiUserModelsExtractor(CivitaiExtractor): @@ -446,29 +496,17 @@ class CivitaiUserImagesExtractor(CivitaiExtractor): example = "https://civitai.com/user/USER/images" def __init__(self, match): - self.params = self._parse_query(match.group(2)) + user, query = match.groups() + self.params = self._parse_query(query) if self.params.get("section") == "reactions": - self.subcategory = "reactions" - self.images = self.images_reactions + self.subcategory = "reactions-images" + self.images = self._image_reactions + else: + self.params["username"] = text.unquote(user) CivitaiExtractor.__init__(self, match) def images(self): - params = self.params - params["username"] = text.unquote(self.groups[0]) - return self.api.images(params) - - def images_reactions(self): - if "Authorization" not in self.api.headers and \ - not self.cookies.get( - "__Secure-civitai-token", domain=".civitai.com"): - raise exception.AuthorizationError("api-key or cookies required") - - params = self.params - params["authed"] = True - params["useIndex"] = False - if "reactions" not in params: - params["reactions"] = ("Like", "Dislike", "Heart", "Laugh", "Cry") - return self.api.images(params) + return self.api.images(self.params) class CivitaiUserVideosExtractor(CivitaiExtractor): @@ -477,14 +515,40 @@ class CivitaiUserVideosExtractor(CivitaiExtractor): pattern = USER_PATTERN + r"/videos/?(?:\?([^#]+))?" example = "https://civitai.com/user/USER/videos" - def images(self): - self._image_ext = "mp4" + def __init__(self, match): + user, query = match.groups() + self.params = self._parse_query(query) + self.params["types"] = ("video",) + if self.params.get("section") == "reactions": + self.subcategory = "reactions-videos" + self.images = self._image_reactions + else: + self.params["username"] = text.unquote(user) + CivitaiExtractor.__init__(self, match) - user, query = self.groups - params = self._parse_query(query) - params["types"] = ["video"] - params["username"] = text.unquote(user) - return self.api.images(params) + images = CivitaiUserImagesExtractor.images + + +class CivitaiGeneratedExtractor(CivitaiExtractor): + """Extractor for your generated files feed""" + subcategory = "generated" + filename_fmt = "{filename}.{extension}" + directory_fmt = ("{category}", "generated") + pattern = f"{BASE_PATTERN}/generate" + example = "https://civitai.com/generate" + + def items(self): + self._require_auth() + + for gen in self.api.orchestrator_queryGeneratedImages(): + gen["date"] = text.parse_datetime( + gen["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ") + yield Message.Directory, gen + for step in gen.pop("steps", ()): + for image in step.pop("images", ()): + data = {"file": image, **step, **gen} + url = image["url"] + yield Message.Url, url, text.nameext_from_url(url, data) class CivitaiRestAPI(): @@ -498,8 +562,7 @@ class CivitaiRestAPI(): self.root = extractor.root + "/api" self.headers = {"Content-Type": "application/json"} - api_key = extractor.config("api-key") - if api_key: + if api_key := extractor.config("api-key"): extractor.log.debug("Using api_key authentication") self.headers["Authorization"] = "Bearer " + api_key @@ -528,12 +591,12 @@ class CivitaiRestAPI(): }) def model(self, model_id): - endpoint = "/v1/models/{}".format(model_id) + endpoint = f"/v1/models/{model_id}" return self._call(endpoint) @memcache(keyarg=1) def model_version(self, model_version_id): - endpoint = "/v1/model-versions/{}".format(model_version_id) + endpoint = f"/v1/model-versions/{model_version_id}" return self._call(endpoint) def models(self, params): @@ -572,13 +635,12 @@ class CivitaiTrpcAPI(): self.root = extractor.root + "/api/trpc/" self.headers = { "content-type" : "application/json", - "x-client-version": "5.0.701", + "x-client-version": "5.0.920", "x-client-date" : "", "x-client" : "web", "x-fingerprint" : "undefined", } - api_key = extractor.config("api-key") - if api_key: + if api_key := extractor.config("api-key"): extractor.log.debug("Using api_key authentication") self.headers["Authorization"] = "Bearer " + api_key @@ -607,11 +669,11 @@ class CivitaiTrpcAPI(): "useIndex" : True, "period" : "AllTime", "sort" : "Newest", - "types" : ["image"], + "types" : ("image",), "withMeta" : False, # Metadata Only "fromPlatform" : False, # Made On-Site "browsingLevel": self.nsfw, - "include" : ["cosmetics"], + "include" : ("cosmetics",), }) params = self._type_params(params) @@ -690,9 +752,10 @@ class CivitaiTrpcAPI(): "followed" : False, "draftOnly" : False, "pending" : True, - "include" : ["cosmetics"], + "include" : ("cosmetics",), }) + params = self._type_params(params) return self._pagination(endpoint, params, meta) def user(self, username): @@ -700,6 +763,15 @@ class CivitaiTrpcAPI(): params = {"username": username} return (self._call(endpoint, params),) + def orchestrator_queryGeneratedImages(self): + endpoint = "orchestrator.queryGeneratedImages" + params = { + "ascending": False, + "tags" : ("gen",), + "authed" : True, + } + return self._pagination(endpoint, params) + def _call(self, endpoint, params, meta=None): url = self.root + endpoint headers = self.headers @@ -765,4 +837,107 @@ class CivitaiTrpcAPI(): def _bool(value): - return True if value == "true" else False + return value == "true" + + +class CivitaiSearchAPI(): + + def __init__(self, extractor): + self.extractor = extractor + self.root = "https://search.civitai.com" + self.headers = { + "Authorization": "Bearer ab8565e5ab8dc2d8f0d4256d204781cb63fe8b031" + "eb3779cbbed38a7b5308e5c", + "Content-Type": "application/json", + "X-Meilisearch-Client": "Meilisearch instant-meilisearch (v0.13.5)" + " ; Meilisearch JavaScript (v0.34.0)", + "Origin": extractor.root, + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Site": "same-site", + "Priority": "u=4", + } + + def search(self, query, type, facets, nsfw=31): + endpoint = "/multi-search" + + query = { + "q" : query, + "indexUid": type, + "facets" : facets, + "attributesToHighlight": (), + "highlightPreTag" : "__ais-highlight__", + "highlightPostTag": "__/ais-highlight__", + "limit" : 51, + "offset": 0, + "filter": (self._generate_filter(nsfw),), + } + + return self._pagination(endpoint, query) + + def search_models(self, query, type=None, nsfw=31): + facets = ( + "category.name", + "checkpointType", + "fileFormats", + "lastVersionAtUnix", + "tags.name", + "type", + "user.username", + "version.baseModel", + ) + return self.search(query, type or "models_v9", facets, nsfw) + + def search_images(self, query, type=None, nsfw=31): + facets = ( + "aspectRatio", + "baseModel", + "createdAtUnix", + "tagNames", + "techniqueNames", + "toolNames", + "type", + "user.username", + ) + return self.search(query, type or "images_v6", facets, nsfw) + + def _call(self, endpoint, query): + url = self.root + endpoint + params = util.json_dumps({"queries": (query,)}) + + data = self.extractor.request_json( + url, method="POST", headers=self.headers, data=params) + + return data["results"][0] + + def _pagination(self, endpoint, query): + limit = query["limit"] - 1 + threshold = limit // 2 + + while True: + data = self._call(endpoint, query) + + items = data["hits"] + yield from items + + if len(items) < threshold: + return + query["offset"] += limit + + def _generate_filter(self, level): + fltr = [] + + if level & 1: + fltr.append("1") + if level & 2: + fltr.append("2") + if level & 4: + fltr.append("4") + if level & 8: + fltr.append("8") + if level & 16: + fltr.append("16") + + if not fltr: + return "()" + return "(nsfwLevel=" + " OR nsfwLevel=".join(fltr) + ")" |
