aboutsummaryrefslogtreecommitdiffstats
path: root/gallery_dl/extractor/civitai.py
diff options
context:
space:
mode:
Diffstat (limited to 'gallery_dl/extractor/civitai.py')
-rw-r--r--gallery_dl/extractor/civitai.py387
1 files changed, 281 insertions, 106 deletions
diff --git a/gallery_dl/extractor/civitai.py b/gallery_dl/extractor/civitai.py
index 56fe851..dc5b777 100644
--- a/gallery_dl/extractor/civitai.py
+++ b/gallery_dl/extractor/civitai.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
-# Copyright 2024 Mike Fährmann
+# Copyright 2024-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -8,7 +8,7 @@
"""Extractors for https://www.civitai.com/"""
-from .common import Extractor, Message
+from .common import Extractor, Message, Dispatch
from .. import text, util, exception
from ..cache import memcache
import itertools
@@ -22,9 +22,9 @@ class CivitaiExtractor(Extractor):
"""Base class for civitai extractors"""
category = "civitai"
root = "https://civitai.com"
- directory_fmt = ("{category}", "{username|user[username]}", "images")
- filename_fmt = "{file[id]|id|filename}.{extension}"
- archive_fmt = "{file[uuid]|uuid}"
+ directory_fmt = ("{category}", "{user[username]}", "images")
+ filename_fmt = "{file[id]}.{extension}"
+ archive_fmt = "{file[uuid]}"
request_interval = (0.5, 1.5)
def _init(self):
@@ -35,8 +35,7 @@ class CivitaiExtractor(Extractor):
self.log.debug("Using tRPC API")
self.api = CivitaiTrpcAPI(self)
- quality = self.config("quality")
- if quality:
+ if quality := self.config("quality"):
if not isinstance(quality, str):
quality = ",".join(quality)
self._image_quality = quality
@@ -45,8 +44,7 @@ class CivitaiExtractor(Extractor):
self._image_quality = "original=true"
self._image_ext = "png"
- quality_video = self.config("quality-videos")
- if quality_video:
+ if quality_video := self.config("quality-videos"):
if not isinstance(quality_video, str):
quality_video = ",".join(quality_video)
if quality_video[0] == "+":
@@ -59,28 +57,27 @@ class CivitaiExtractor(Extractor):
self._video_quality = "quality=100"
self._video_ext = "webm"
- metadata = self.config("metadata")
- if metadata:
+ if metadata := self.config("metadata"):
if isinstance(metadata, str):
metadata = metadata.split(",")
elif not isinstance(metadata, (list, tuple)):
- metadata = ("generation", "version")
+ metadata = ("generation", "version", "post")
self._meta_generation = ("generation" in metadata)
self._meta_version = ("version" in metadata)
+ self._meta_post = ("post" in metadata)
else:
- self._meta_generation = self._meta_version = False
+ self._meta_generation = self._meta_version = self._meta_post = \
+ False
def items(self):
- models = self.models()
- if models:
+ if models := self.models():
data = {"_extractor": CivitaiModelExtractor}
for model in models:
- url = "{}/models/{}".format(self.root, model["id"])
+ url = f"{self.root}/models/{model['id']}"
yield Message.Queue, url, data
return
- posts = self.posts()
- if posts:
+ if posts := self.posts():
for post in posts:
if "images" in post:
@@ -105,27 +102,37 @@ class CivitaiExtractor(Extractor):
yield Message.Url, file["url"], file
return
- images = self.images()
- if images:
- for image in images:
+ if images := self.images():
+ for file in images:
+
+ data = {
+ "file": file,
+ "user": file.pop("user"),
+ }
if self._meta_generation:
- image["generation"] = \
- self._extract_meta_generation(image)
+ data["generation"] = \
+ self._extract_meta_generation(file)
if self._meta_version:
- image["model"], image["version"] = \
- self._extract_meta_version(image, False)
- image["date"] = text.parse_datetime(
- image["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
-
- url = self._url(image)
- text.nameext_from_url(url, image)
- if not image["extension"]:
- image["extension"] = (
- self._video_ext if image.get("type") == "video" else
+ data["model"], data["version"] = \
+ self._extract_meta_version(file, False)
+ if "post" in file:
+ data["post"] = file.pop("post")
+ if self._meta_post and "post" not in data:
+ data["post"] = post = self._extract_meta_post(file)
+ if post:
+ post.pop("user", None)
+ file["date"] = text.parse_datetime(
+ file["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
+
+ data["url"] = url = self._url(file)
+ text.nameext_from_url(url, data)
+ if not data["extension"]:
+ data["extension"] = (
+ self._video_ext if file.get("type") == "video" else
self._image_ext)
- yield Message.Directory, image
- yield Message.Url, url, image
+ yield Message.Directory, data
+ yield Message.Url, url, data
return
def models(self):
@@ -151,12 +158,13 @@ class CivitaiExtractor(Extractor):
image["uuid"] = url
name = image.get("name")
if not name:
- mime = image.get("mimeType") or self._image_ext
- name = "{}.{}".format(image.get("id"), mime.rpartition("/")[2])
- return (
- "https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA/{}/{}/{}".format(
- url, quality, name)
- )
+ if mime := image.get("mimeType"):
+ name = f"{image.get('id')}.{mime.rpartition('/')[2]}"
+ else:
+ ext = self._video_ext if video else self._image_ext
+ name = f"{image.get('id')}.{ext}"
+ return (f"https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA"
+ f"/{url}/{quality}/{name}")
def _image_results(self, images):
for num, file in enumerate(images, 1):
@@ -171,10 +179,29 @@ class CivitaiExtractor(Extractor):
self._image_ext)
if "id" not in file and data["filename"].isdecimal():
file["id"] = text.parse_int(data["filename"])
+ if "date" not in file:
+ file["date"] = text.parse_datetime(
+ file["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
if self._meta_generation:
file["generation"] = self._extract_meta_generation(file)
yield data
+ def _image_reactions(self):
+ self._require_auth()
+
+ params = self.params
+ params["authed"] = True
+ params["useIndex"] = False
+ if "reactions" not in params:
+ params["reactions"] = ("Like", "Dislike", "Heart", "Laugh", "Cry")
+ return self.api.images(params)
+
+ def _require_auth(self):
+ if "Authorization" not in self.api.headers and \
+ not self.cookies.get(
+ "__Secure-civitai-token", domain=".civitai.com"):
+ raise exception.AuthRequired(("'api-key'", "cookies"))
+
def _parse_query(self, value):
return text.parse_query_list(
value, {"tags", "reactions", "baseModels", "tools", "techniques",
@@ -186,10 +213,18 @@ class CivitaiExtractor(Extractor):
except Exception as exc:
return self.log.debug("", exc_info=exc)
+ def _extract_meta_post(self, image):
+ try:
+ post = self.api.post(image["postId"])
+ post["date"] = text.parse_datetime(
+ post["publishedAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
+ return post
+ except Exception as exc:
+ return self.log.debug("", exc_info=exc)
+
def _extract_meta_version(self, item, is_post=True):
try:
- version_id = self._extract_version_id(item, is_post)
- if version_id:
+ if version_id := self._extract_version_id(item, is_post):
version = self.api.model_version(version_id).copy()
return version.pop("model", None), version
except Exception as exc:
@@ -197,12 +232,11 @@ class CivitaiExtractor(Extractor):
return None, None
def _extract_version_id(self, item, is_post=True):
- version_id = item.get("modelVersionId")
- if version_id:
+ if version_id := item.get("modelVersionId"):
return version_id
-
- version_ids = item.get("modelVersionIds")
- if version_ids:
+ if version_ids := item.get("modelVersionIds"):
+ return version_ids[0]
+ if version_ids := item.get("modelVersionIdsManual"):
return version_ids[0]
if is_post:
@@ -285,16 +319,15 @@ class CivitaiModelExtractor(CivitaiExtractor):
if not sep:
name = ext
ext = "bin"
- file["uuid"] = "model-{}-{}-{}".format(
- model["id"], version["id"], file["id"])
+ file["uuid"] = f"model-{model['id']}-{version['id']}-{file['id']}"
files.append({
"num" : num,
"file" : file,
"filename" : name,
"extension": ext,
- "url" : (file.get("downloadUrl") or
- "{}/api/download/models/{}".format(
- self.root, version["id"])),
+ "url" : (
+ file.get("downloadUrl") or
+ f"{self.root}/api/download/models/{version['id']}"),
"_http_headers" : {
"Authorization": self.api.headers.get("Authorization")},
"_http_validate": self._validate_file_model,
@@ -308,7 +341,7 @@ class CivitaiModelExtractor(CivitaiExtractor):
else:
params = {
"modelVersionId": version["id"],
- "prioritizedUserIds": [user["id"]],
+ "prioritizedUserIds": (user["id"],),
"period": "AllTime",
"sort": "Most Reactions",
"limit": 20,
@@ -327,8 +360,7 @@ class CivitaiModelExtractor(CivitaiExtractor):
alert = text.extr(
response.text, 'mantine-Alert-message">', "</div></div></div>")
if alert:
- msg = "\"{}\" - 'api-key' required".format(
- text.remove_html(alert))
+ msg = f"\"{text.remove_html(alert)}\" - 'api-key' required"
else:
msg = "'api-key' required to download this file"
self.log.warning(msg)
@@ -366,14 +398,26 @@ class CivitaiTagExtractor(CivitaiExtractor):
return self.api.models_tag(tag)
-class CivitaiSearchExtractor(CivitaiExtractor):
- subcategory = "search"
+class CivitaiSearchModelsExtractor(CivitaiExtractor):
+ subcategory = "search-models"
pattern = BASE_PATTERN + r"/search/models\?([^#]+)"
example = "https://civitai.com/search/models?query=QUERY"
def models(self):
- params = text.parse_query(self.groups[0])
- return self.api.models(params)
+ params = self._parse_query(self.groups[0])
+ return CivitaiSearchAPI(self).search_models(
+ params.get("query"), params.get("sortBy"), self.api.nsfw)
+
+
+class CivitaiSearchImagesExtractor(CivitaiExtractor):
+ subcategory = "search-images"
+ pattern = BASE_PATTERN + r"/search/images\?([^#]+)"
+ example = "https://civitai.com/search/images?query=QUERY"
+
+ def images(self):
+ params = self._parse_query(self.groups[0])
+ return CivitaiSearchAPI(self).search_images(
+ params.get("query"), params.get("sortBy"), self.api.nsfw)
class CivitaiModelsExtractor(CivitaiExtractor):
@@ -382,7 +426,7 @@ class CivitaiModelsExtractor(CivitaiExtractor):
example = "https://civitai.com/models"
def models(self):
- params = text.parse_query(self.groups[0])
+ params = self._parse_query(self.groups[0])
return self.api.models(params)
@@ -392,26 +436,32 @@ class CivitaiImagesExtractor(CivitaiExtractor):
example = "https://civitai.com/images"
def images(self):
- params = text.parse_query(self.groups[0])
+ params = self._parse_query(self.groups[0])
return self.api.images(params)
-class CivitaiUserExtractor(CivitaiExtractor):
- subcategory = "user"
+class CivitaiPostsExtractor(CivitaiExtractor):
+ subcategory = "posts"
+ pattern = BASE_PATTERN + r"/posts(?:/?\?([^#]+))?(?:$|#)"
+ example = "https://civitai.com/posts"
+
+ def posts(self):
+ params = self._parse_query(self.groups[0])
+ return self.api.posts(params)
+
+
+class CivitaiUserExtractor(Dispatch, CivitaiExtractor):
pattern = USER_PATTERN + r"/?(?:$|\?|#)"
example = "https://civitai.com/user/USER"
- def initialize(self):
- pass
-
def items(self):
- base = "{}/user/{}/".format(self.root, self.groups[0])
+ base = f"{self.root}/user/{self.groups[0]}/"
return self._dispatch_extractors((
(CivitaiUserModelsExtractor, base + "models"),
(CivitaiUserPostsExtractor , base + "posts"),
(CivitaiUserImagesExtractor, base + "images"),
(CivitaiUserVideosExtractor, base + "videos"),
- ), ("user-models", "user-posts"))
+ ), ("user-images", "user-videos"))
class CivitaiUserModelsExtractor(CivitaiExtractor):
@@ -446,29 +496,17 @@ class CivitaiUserImagesExtractor(CivitaiExtractor):
example = "https://civitai.com/user/USER/images"
def __init__(self, match):
- self.params = self._parse_query(match.group(2))
+ user, query = match.groups()
+ self.params = self._parse_query(query)
if self.params.get("section") == "reactions":
- self.subcategory = "reactions"
- self.images = self.images_reactions
+ self.subcategory = "reactions-images"
+ self.images = self._image_reactions
+ else:
+ self.params["username"] = text.unquote(user)
CivitaiExtractor.__init__(self, match)
def images(self):
- params = self.params
- params["username"] = text.unquote(self.groups[0])
- return self.api.images(params)
-
- def images_reactions(self):
- if "Authorization" not in self.api.headers and \
- not self.cookies.get(
- "__Secure-civitai-token", domain=".civitai.com"):
- raise exception.AuthorizationError("api-key or cookies required")
-
- params = self.params
- params["authed"] = True
- params["useIndex"] = False
- if "reactions" not in params:
- params["reactions"] = ("Like", "Dislike", "Heart", "Laugh", "Cry")
- return self.api.images(params)
+ return self.api.images(self.params)
class CivitaiUserVideosExtractor(CivitaiExtractor):
@@ -477,14 +515,40 @@ class CivitaiUserVideosExtractor(CivitaiExtractor):
pattern = USER_PATTERN + r"/videos/?(?:\?([^#]+))?"
example = "https://civitai.com/user/USER/videos"
- def images(self):
- self._image_ext = "mp4"
+ def __init__(self, match):
+ user, query = match.groups()
+ self.params = self._parse_query(query)
+ self.params["types"] = ("video",)
+ if self.params.get("section") == "reactions":
+ self.subcategory = "reactions-videos"
+ self.images = self._image_reactions
+ else:
+ self.params["username"] = text.unquote(user)
+ CivitaiExtractor.__init__(self, match)
- user, query = self.groups
- params = self._parse_query(query)
- params["types"] = ["video"]
- params["username"] = text.unquote(user)
- return self.api.images(params)
+ images = CivitaiUserImagesExtractor.images
+
+
+class CivitaiGeneratedExtractor(CivitaiExtractor):
+ """Extractor for your generated files feed"""
+ subcategory = "generated"
+ filename_fmt = "{filename}.{extension}"
+ directory_fmt = ("{category}", "generated")
+ pattern = f"{BASE_PATTERN}/generate"
+ example = "https://civitai.com/generate"
+
+ def items(self):
+ self._require_auth()
+
+ for gen in self.api.orchestrator_queryGeneratedImages():
+ gen["date"] = text.parse_datetime(
+ gen["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
+ yield Message.Directory, gen
+ for step in gen.pop("steps", ()):
+ for image in step.pop("images", ()):
+ data = {"file": image, **step, **gen}
+ url = image["url"]
+ yield Message.Url, url, text.nameext_from_url(url, data)
class CivitaiRestAPI():
@@ -498,8 +562,7 @@ class CivitaiRestAPI():
self.root = extractor.root + "/api"
self.headers = {"Content-Type": "application/json"}
- api_key = extractor.config("api-key")
- if api_key:
+ if api_key := extractor.config("api-key"):
extractor.log.debug("Using api_key authentication")
self.headers["Authorization"] = "Bearer " + api_key
@@ -528,12 +591,12 @@ class CivitaiRestAPI():
})
def model(self, model_id):
- endpoint = "/v1/models/{}".format(model_id)
+ endpoint = f"/v1/models/{model_id}"
return self._call(endpoint)
@memcache(keyarg=1)
def model_version(self, model_version_id):
- endpoint = "/v1/model-versions/{}".format(model_version_id)
+ endpoint = f"/v1/model-versions/{model_version_id}"
return self._call(endpoint)
def models(self, params):
@@ -572,13 +635,12 @@ class CivitaiTrpcAPI():
self.root = extractor.root + "/api/trpc/"
self.headers = {
"content-type" : "application/json",
- "x-client-version": "5.0.701",
+ "x-client-version": "5.0.920",
"x-client-date" : "",
"x-client" : "web",
"x-fingerprint" : "undefined",
}
- api_key = extractor.config("api-key")
- if api_key:
+ if api_key := extractor.config("api-key"):
extractor.log.debug("Using api_key authentication")
self.headers["Authorization"] = "Bearer " + api_key
@@ -607,11 +669,11 @@ class CivitaiTrpcAPI():
"useIndex" : True,
"period" : "AllTime",
"sort" : "Newest",
- "types" : ["image"],
+ "types" : ("image",),
"withMeta" : False, # Metadata Only
"fromPlatform" : False, # Made On-Site
"browsingLevel": self.nsfw,
- "include" : ["cosmetics"],
+ "include" : ("cosmetics",),
})
params = self._type_params(params)
@@ -690,9 +752,10 @@ class CivitaiTrpcAPI():
"followed" : False,
"draftOnly" : False,
"pending" : True,
- "include" : ["cosmetics"],
+ "include" : ("cosmetics",),
})
+ params = self._type_params(params)
return self._pagination(endpoint, params, meta)
def user(self, username):
@@ -700,6 +763,15 @@ class CivitaiTrpcAPI():
params = {"username": username}
return (self._call(endpoint, params),)
+ def orchestrator_queryGeneratedImages(self):
+ endpoint = "orchestrator.queryGeneratedImages"
+ params = {
+ "ascending": False,
+ "tags" : ("gen",),
+ "authed" : True,
+ }
+ return self._pagination(endpoint, params)
+
def _call(self, endpoint, params, meta=None):
url = self.root + endpoint
headers = self.headers
@@ -765,4 +837,107 @@ class CivitaiTrpcAPI():
def _bool(value):
- return True if value == "true" else False
+ return value == "true"
+
+
+class CivitaiSearchAPI():
+
+ def __init__(self, extractor):
+ self.extractor = extractor
+ self.root = "https://search.civitai.com"
+ self.headers = {
+ "Authorization": "Bearer ab8565e5ab8dc2d8f0d4256d204781cb63fe8b031"
+ "eb3779cbbed38a7b5308e5c",
+ "Content-Type": "application/json",
+ "X-Meilisearch-Client": "Meilisearch instant-meilisearch (v0.13.5)"
+ " ; Meilisearch JavaScript (v0.34.0)",
+ "Origin": extractor.root,
+ "Sec-Fetch-Dest": "empty",
+ "Sec-Fetch-Mode": "cors",
+ "Sec-Fetch-Site": "same-site",
+ "Priority": "u=4",
+ }
+
+ def search(self, query, type, facets, nsfw=31):
+ endpoint = "/multi-search"
+
+ query = {
+ "q" : query,
+ "indexUid": type,
+ "facets" : facets,
+ "attributesToHighlight": (),
+ "highlightPreTag" : "__ais-highlight__",
+ "highlightPostTag": "__/ais-highlight__",
+ "limit" : 51,
+ "offset": 0,
+ "filter": (self._generate_filter(nsfw),),
+ }
+
+ return self._pagination(endpoint, query)
+
+ def search_models(self, query, type=None, nsfw=31):
+ facets = (
+ "category.name",
+ "checkpointType",
+ "fileFormats",
+ "lastVersionAtUnix",
+ "tags.name",
+ "type",
+ "user.username",
+ "version.baseModel",
+ )
+ return self.search(query, type or "models_v9", facets, nsfw)
+
+ def search_images(self, query, type=None, nsfw=31):
+ facets = (
+ "aspectRatio",
+ "baseModel",
+ "createdAtUnix",
+ "tagNames",
+ "techniqueNames",
+ "toolNames",
+ "type",
+ "user.username",
+ )
+ return self.search(query, type or "images_v6", facets, nsfw)
+
+ def _call(self, endpoint, query):
+ url = self.root + endpoint
+ params = util.json_dumps({"queries": (query,)})
+
+ data = self.extractor.request_json(
+ url, method="POST", headers=self.headers, data=params)
+
+ return data["results"][0]
+
+ def _pagination(self, endpoint, query):
+ limit = query["limit"] - 1
+ threshold = limit // 2
+
+ while True:
+ data = self._call(endpoint, query)
+
+ items = data["hits"]
+ yield from items
+
+ if len(items) < threshold:
+ return
+ query["offset"] += limit
+
+ def _generate_filter(self, level):
+ fltr = []
+
+ if level & 1:
+ fltr.append("1")
+ if level & 2:
+ fltr.append("2")
+ if level & 4:
+ fltr.append("4")
+ if level & 8:
+ fltr.append("8")
+ if level & 16:
+ fltr.append("16")
+
+ if not fltr:
+ return "()"
+ return "(nsfwLevel=" + " OR nsfwLevel=".join(fltr) + ")"