aboutsummaryrefslogtreecommitdiffstats
path: root/gallery_dl/extractor/civitai.py
diff options
context:
space:
mode:
Diffstat (limited to 'gallery_dl/extractor/civitai.py')
-rw-r--r--gallery_dl/extractor/civitai.py174
1 files changed, 133 insertions, 41 deletions
diff --git a/gallery_dl/extractor/civitai.py b/gallery_dl/extractor/civitai.py
index 3e657d6..725af3a 100644
--- a/gallery_dl/extractor/civitai.py
+++ b/gallery_dl/extractor/civitai.py
@@ -22,17 +22,17 @@ class CivitaiExtractor(Extractor):
category = "civitai"
root = "https://civitai.com"
directory_fmt = ("{category}", "{username|user[username]}", "images")
- filename_fmt = "{id}.{extension}"
- archive_fmt = "{hash}"
+ filename_fmt = "{file[id]|id|filename}.{extension}"
+ archive_fmt = "{file[hash]|hash}"
request_interval = (0.5, 1.5)
def _init(self):
- if self.config("api") == "trpc":
- self.log.debug("Using tRPC API")
- self.api = CivitaiTrpcAPI(self)
- else:
+ if self.config("api") == "rest":
self.log.debug("Using REST API")
self.api = CivitaiRestAPI(self)
+ else:
+ self.log.debug("Using tRPC API")
+ self.api = CivitaiTrpcAPI(self)
quality = self.config("quality")
if quality:
@@ -53,6 +53,30 @@ class CivitaiExtractor(Extractor):
yield Message.Queue, url, data
return
+ posts = self.posts()
+ if posts:
+ for post in posts:
+
+ if "images" in post:
+ images = post["images"]
+ else:
+ images = self.api.images_post(post["id"])
+
+ post = self.api.post(post["id"])
+ post["date"] = text.parse_datetime(
+ post["publishedAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
+ data = {
+ "post": post,
+ "user": post["user"],
+ }
+ del post["user"]
+
+ yield Message.Directory, data
+ for file in self._image_results(images):
+ file.update(data)
+ yield Message.Url, file["url"], file
+ return
+
images = self.images()
if images:
for image in images:
@@ -68,6 +92,9 @@ class CivitaiExtractor(Extractor):
def models(self):
return ()
+ def posts(self):
+ return ()
+
def images(self):
return ()
@@ -87,13 +114,26 @@ class CivitaiExtractor(Extractor):
url, self._image_quality, name)
)
+ def _image_results(self, images):
+ for num, file in enumerate(images, 1):
+ data = text.nameext_from_url(file["url"], {
+ "num" : num,
+ "file": file,
+ "url" : self._url(file),
+ })
+ if not data["extension"]:
+ data["extension"] = self._image_ext
+ if "id" not in file and data["filename"].isdecimal():
+ file["id"] = text.parse_int(data["filename"])
+ yield data
+
class CivitaiModelExtractor(CivitaiExtractor):
subcategory = "model"
directory_fmt = ("{category}", "{user[username]}",
"{model[id]}{model[name]:? //}",
"{version[id]}{version[name]:? //}")
- filename_fmt = "{filename}.{extension}"
+ filename_fmt = "{file[id]}.{extension}"
archive_fmt = "{file[hash]}"
pattern = BASE_PATTERN + r"/models/(\d+)(?:/?\?modelVersionId=(\d+))?"
example = "https://civitai.com/models/12345/TITLE"
@@ -183,23 +223,11 @@ class CivitaiModelExtractor(CivitaiExtractor):
}
images = self.api.images(params, defaults=False)
- return [
- text.nameext_from_url(file["url"], {
- "num" : num,
- "file": file,
- "url" : self._url(file),
- })
- for num, file in enumerate(images, 1)
- ]
+ return self._image_results(images)
def _extract_files_gallery(self, model, version, user):
images = self.api.images_gallery(model, version, user)
- for num, file in enumerate(images, 1):
- yield text.nameext_from_url(file["url"], {
- "num" : num,
- "file": file,
- "url" : self._url(file),
- })
+ return self._image_results(images)
def _validate_file_model(self, response):
if response.headers.get("Content-Type", "").startswith("text/html"):
@@ -224,6 +252,17 @@ class CivitaiImageExtractor(CivitaiExtractor):
return self.api.image(self.groups[0])
+class CivitaiPostExtractor(CivitaiExtractor):
+ subcategory = "post"
+ directory_fmt = ("{category}", "{username|user[username]}", "posts",
+ "{post[id]}{post[title]:? //}")
+ pattern = BASE_PATTERN + r"/posts/(\d+)"
+ example = "https://civitai.com/posts/12345"
+
+ def posts(self):
+ return ({"id": int(self.groups[0])},)
+
+
class CivitaiTagModelsExtractor(CivitaiExtractor):
subcategory = "tag-models"
pattern = BASE_PATTERN + r"/(?:tag/|models\?tag=)([^/?&#]+)"
@@ -266,8 +305,9 @@ class CivitaiUserExtractor(CivitaiExtractor):
base = "{}/user/{}/".format(self.root, self.groups[0])
return self._dispatch_extractors((
(CivitaiUserModelsExtractor, base + "models"),
+ (CivitaiUserPostsExtractor , base + "posts"),
(CivitaiUserImagesExtractor, base + "images"),
- ), ("user-models", "user-images"))
+ ), ("user-models", "user-posts"))
class CivitaiUserModelsExtractor(CivitaiExtractor):
@@ -281,6 +321,19 @@ class CivitaiUserModelsExtractor(CivitaiExtractor):
return self.api.models(params)
+class CivitaiUserPostsExtractor(CivitaiExtractor):
+ subcategory = "user-posts"
+ directory_fmt = ("{category}", "{username|user[username]}", "posts",
+ "{post[id]}{post[title]:? //}")
+ pattern = USER_PATTERN + r"/posts/?(?:\?([^#]+))?"
+ example = "https://civitai.com/user/USER/posts"
+
+ def posts(self):
+ params = text.parse_query(self.groups[1])
+ params["username"] = text.unquote(self.groups[0])
+ return self.api.posts(params)
+
+
class CivitaiUserImagesExtractor(CivitaiExtractor):
subcategory = "user-images"
pattern = USER_PATTERN + r"/images/?(?:\?([^#]+))?"
@@ -373,7 +426,7 @@ class CivitaiTrpcAPI():
self.root = extractor.root + "/api/trpc/"
self.headers = {
"content-type" : "application/json",
- "x-client-version": "5.0.94",
+ "x-client-version": "5.0.146",
"x-client-date" : "",
"x-client" : "web",
"x-fingerprint" : "undefined",
@@ -399,7 +452,7 @@ class CivitaiTrpcAPI():
endpoint = "image.getInfinite"
if defaults:
- params_ = {
+ params = self._merge_params(params, {
"useIndex" : True,
"period" : "AllTime",
"sort" : "Newest",
@@ -408,12 +461,9 @@ class CivitaiTrpcAPI():
"fromPlatform" : False, # Made On-Site
"browsingLevel": self.nsfw,
"include" : ["cosmetics"],
- }
- params_.update(params)
- else:
- params_ = params
+ })
- return self._pagination(endpoint, params_)
+ return self._pagination(endpoint, params)
def images_gallery(self, model, version, user):
endpoint = "image.getImagesAsPostsInfinite"
@@ -430,6 +480,13 @@ class CivitaiTrpcAPI():
for post in self._pagination(endpoint, params):
yield from post["images"]
+ def images_post(self, post_id):
+ params = {
+ "postId" : int(post_id),
+ "pending": True,
+ }
+ return self.images(params)
+
def model(self, model_id):
endpoint = "model.getById"
params = {"id": int(model_id)}
@@ -444,7 +501,7 @@ class CivitaiTrpcAPI():
endpoint = "model.getAll"
if defaults:
- params_ = {
+ params = self._merge_params(params, {
"period" : "AllTime",
"periodMode" : "published",
"sort" : "Newest",
@@ -455,36 +512,71 @@ class CivitaiTrpcAPI():
"fromPlatform" : False,
"supportsGeneration": False,
"browsingLevel": self.nsfw,
- }
- params_.update(params)
- else:
- params_ = params
+ })
+
+ return self._pagination(endpoint, params)
+
+ def post(self, post_id):
+ endpoint = "post.get"
+ params = {"id": int(post_id)}
+ return self._call(endpoint, params)
- return self._pagination(endpoint, params_)
+ def posts(self, params, defaults=True):
+ endpoint = "post.getInfinite"
+ meta = {"cursor": ("Date",)}
+
+ if defaults:
+ params = self._merge_params(params, {
+ "browsingLevel": self.nsfw,
+ "period" : "AllTime",
+ "periodMode" : "published",
+ "sort" : "Newest",
+ "followed" : False,
+ "draftOnly" : False,
+ "pending" : True,
+ "include" : ["cosmetics"],
+ })
+
+ return self._pagination(endpoint, params, meta)
def user(self, username):
endpoint = "user.getCreator"
params = {"username": username}
return (self._call(endpoint, params),)
- def _call(self, endpoint, params):
+ def _call(self, endpoint, params, meta=None):
url = self.root + endpoint
headers = self.headers
- params = {"input": util.json_dumps({"json": params})}
+ if meta:
+ input = {"json": params, "meta": {"values": meta}}
+ else:
+ input = {"json": params}
+
+ params = {"input": util.json_dumps(input)}
headers["x-client-date"] = str(int(time.time() * 1000))
- response = self.extractor.request(url, headers=headers, params=params)
+ response = self.extractor.request(url, params=params, headers=headers)
return response.json()["result"]["data"]["json"]
- def _pagination(self, endpoint, params):
+ def _pagination(self, endpoint, params, meta=None):
+ if "cursor" not in params:
+ params["cursor"] = None
+ meta_ = {"cursor": ("undefined",)}
+
while True:
- data = self._call(endpoint, params)
+ data = self._call(endpoint, params, meta_)
yield from data["items"]
try:
if not data["nextCursor"]:
return
- params["cursor"] = data["nextCursor"]
except KeyError:
return
+
+ params["cursor"] = data["nextCursor"]
+ meta_ = meta
+
+ def _merge_params(self, params_user, params_default):
+ params_default.update(params_user)
+ return params_default