diff options
Diffstat (limited to 'gallery_dl/extractor/civitai.py')
| -rw-r--r-- | gallery_dl/extractor/civitai.py | 174 |
1 files changed, 133 insertions, 41 deletions
diff --git a/gallery_dl/extractor/civitai.py b/gallery_dl/extractor/civitai.py index 3e657d6..725af3a 100644 --- a/gallery_dl/extractor/civitai.py +++ b/gallery_dl/extractor/civitai.py @@ -22,17 +22,17 @@ class CivitaiExtractor(Extractor): category = "civitai" root = "https://civitai.com" directory_fmt = ("{category}", "{username|user[username]}", "images") - filename_fmt = "{id}.{extension}" - archive_fmt = "{hash}" + filename_fmt = "{file[id]|id|filename}.{extension}" + archive_fmt = "{file[hash]|hash}" request_interval = (0.5, 1.5) def _init(self): - if self.config("api") == "trpc": - self.log.debug("Using tRPC API") - self.api = CivitaiTrpcAPI(self) - else: + if self.config("api") == "rest": self.log.debug("Using REST API") self.api = CivitaiRestAPI(self) + else: + self.log.debug("Using tRPC API") + self.api = CivitaiTrpcAPI(self) quality = self.config("quality") if quality: @@ -53,6 +53,30 @@ class CivitaiExtractor(Extractor): yield Message.Queue, url, data return + posts = self.posts() + if posts: + for post in posts: + + if "images" in post: + images = post["images"] + else: + images = self.api.images_post(post["id"]) + + post = self.api.post(post["id"]) + post["date"] = text.parse_datetime( + post["publishedAt"], "%Y-%m-%dT%H:%M:%S.%fZ") + data = { + "post": post, + "user": post["user"], + } + del post["user"] + + yield Message.Directory, data + for file in self._image_results(images): + file.update(data) + yield Message.Url, file["url"], file + return + images = self.images() if images: for image in images: @@ -68,6 +92,9 @@ class CivitaiExtractor(Extractor): def models(self): return () + def posts(self): + return () + def images(self): return () @@ -87,13 +114,26 @@ class CivitaiExtractor(Extractor): url, self._image_quality, name) ) + def _image_results(self, images): + for num, file in enumerate(images, 1): + data = text.nameext_from_url(file["url"], { + "num" : num, + "file": file, + "url" : self._url(file), + }) + if not data["extension"]: + data["extension"] = self._image_ext + if "id" not in file and data["filename"].isdecimal(): + file["id"] = text.parse_int(data["filename"]) + yield data + class CivitaiModelExtractor(CivitaiExtractor): subcategory = "model" directory_fmt = ("{category}", "{user[username]}", "{model[id]}{model[name]:? //}", "{version[id]}{version[name]:? //}") - filename_fmt = "{filename}.{extension}" + filename_fmt = "{file[id]}.{extension}" archive_fmt = "{file[hash]}" pattern = BASE_PATTERN + r"/models/(\d+)(?:/?\?modelVersionId=(\d+))?" example = "https://civitai.com/models/12345/TITLE" @@ -183,23 +223,11 @@ class CivitaiModelExtractor(CivitaiExtractor): } images = self.api.images(params, defaults=False) - return [ - text.nameext_from_url(file["url"], { - "num" : num, - "file": file, - "url" : self._url(file), - }) - for num, file in enumerate(images, 1) - ] + return self._image_results(images) def _extract_files_gallery(self, model, version, user): images = self.api.images_gallery(model, version, user) - for num, file in enumerate(images, 1): - yield text.nameext_from_url(file["url"], { - "num" : num, - "file": file, - "url" : self._url(file), - }) + return self._image_results(images) def _validate_file_model(self, response): if response.headers.get("Content-Type", "").startswith("text/html"): @@ -224,6 +252,17 @@ class CivitaiImageExtractor(CivitaiExtractor): return self.api.image(self.groups[0]) +class CivitaiPostExtractor(CivitaiExtractor): + subcategory = "post" + directory_fmt = ("{category}", "{username|user[username]}", "posts", + "{post[id]}{post[title]:? //}") + pattern = BASE_PATTERN + r"/posts/(\d+)" + example = "https://civitai.com/posts/12345" + + def posts(self): + return ({"id": int(self.groups[0])},) + + class CivitaiTagModelsExtractor(CivitaiExtractor): subcategory = "tag-models" pattern = BASE_PATTERN + r"/(?:tag/|models\?tag=)([^/?&#]+)" @@ -266,8 +305,9 @@ class CivitaiUserExtractor(CivitaiExtractor): base = "{}/user/{}/".format(self.root, self.groups[0]) return self._dispatch_extractors(( (CivitaiUserModelsExtractor, base + "models"), + (CivitaiUserPostsExtractor , base + "posts"), (CivitaiUserImagesExtractor, base + "images"), - ), ("user-models", "user-images")) + ), ("user-models", "user-posts")) class CivitaiUserModelsExtractor(CivitaiExtractor): @@ -281,6 +321,19 @@ class CivitaiUserModelsExtractor(CivitaiExtractor): return self.api.models(params) +class CivitaiUserPostsExtractor(CivitaiExtractor): + subcategory = "user-posts" + directory_fmt = ("{category}", "{username|user[username]}", "posts", + "{post[id]}{post[title]:? //}") + pattern = USER_PATTERN + r"/posts/?(?:\?([^#]+))?" + example = "https://civitai.com/user/USER/posts" + + def posts(self): + params = text.parse_query(self.groups[1]) + params["username"] = text.unquote(self.groups[0]) + return self.api.posts(params) + + class CivitaiUserImagesExtractor(CivitaiExtractor): subcategory = "user-images" pattern = USER_PATTERN + r"/images/?(?:\?([^#]+))?" @@ -373,7 +426,7 @@ class CivitaiTrpcAPI(): self.root = extractor.root + "/api/trpc/" self.headers = { "content-type" : "application/json", - "x-client-version": "5.0.94", + "x-client-version": "5.0.146", "x-client-date" : "", "x-client" : "web", "x-fingerprint" : "undefined", @@ -399,7 +452,7 @@ class CivitaiTrpcAPI(): endpoint = "image.getInfinite" if defaults: - params_ = { + params = self._merge_params(params, { "useIndex" : True, "period" : "AllTime", "sort" : "Newest", @@ -408,12 +461,9 @@ class CivitaiTrpcAPI(): "fromPlatform" : False, # Made On-Site "browsingLevel": self.nsfw, "include" : ["cosmetics"], - } - params_.update(params) - else: - params_ = params + }) - return self._pagination(endpoint, params_) + return self._pagination(endpoint, params) def images_gallery(self, model, version, user): endpoint = "image.getImagesAsPostsInfinite" @@ -430,6 +480,13 @@ class CivitaiTrpcAPI(): for post in self._pagination(endpoint, params): yield from post["images"] + def images_post(self, post_id): + params = { + "postId" : int(post_id), + "pending": True, + } + return self.images(params) + def model(self, model_id): endpoint = "model.getById" params = {"id": int(model_id)} @@ -444,7 +501,7 @@ class CivitaiTrpcAPI(): endpoint = "model.getAll" if defaults: - params_ = { + params = self._merge_params(params, { "period" : "AllTime", "periodMode" : "published", "sort" : "Newest", @@ -455,36 +512,71 @@ class CivitaiTrpcAPI(): "fromPlatform" : False, "supportsGeneration": False, "browsingLevel": self.nsfw, - } - params_.update(params) - else: - params_ = params + }) + + return self._pagination(endpoint, params) + + def post(self, post_id): + endpoint = "post.get" + params = {"id": int(post_id)} + return self._call(endpoint, params) - return self._pagination(endpoint, params_) + def posts(self, params, defaults=True): + endpoint = "post.getInfinite" + meta = {"cursor": ("Date",)} + + if defaults: + params = self._merge_params(params, { + "browsingLevel": self.nsfw, + "period" : "AllTime", + "periodMode" : "published", + "sort" : "Newest", + "followed" : False, + "draftOnly" : False, + "pending" : True, + "include" : ["cosmetics"], + }) + + return self._pagination(endpoint, params, meta) def user(self, username): endpoint = "user.getCreator" params = {"username": username} return (self._call(endpoint, params),) - def _call(self, endpoint, params): + def _call(self, endpoint, params, meta=None): url = self.root + endpoint headers = self.headers - params = {"input": util.json_dumps({"json": params})} + if meta: + input = {"json": params, "meta": {"values": meta}} + else: + input = {"json": params} + + params = {"input": util.json_dumps(input)} headers["x-client-date"] = str(int(time.time() * 1000)) - response = self.extractor.request(url, headers=headers, params=params) + response = self.extractor.request(url, params=params, headers=headers) return response.json()["result"]["data"]["json"] - def _pagination(self, endpoint, params): + def _pagination(self, endpoint, params, meta=None): + if "cursor" not in params: + params["cursor"] = None + meta_ = {"cursor": ("undefined",)} + while True: - data = self._call(endpoint, params) + data = self._call(endpoint, params, meta_) yield from data["items"] try: if not data["nextCursor"]: return - params["cursor"] = data["nextCursor"] except KeyError: return + + params["cursor"] = data["nextCursor"] + meta_ = meta + + def _merge_params(self, params_user, params_default): + params_default.update(params_user) + return params_default |
