diff options
Diffstat (limited to 'gallery_dl/extractor/fanbox.py')
| -rw-r--r-- | gallery_dl/extractor/fanbox.py | 130 |
1 files changed, 69 insertions, 61 deletions
diff --git a/gallery_dl/extractor/fanbox.py b/gallery_dl/extractor/fanbox.py index 8981c29..70b06e7 100644 --- a/gallery_dl/extractor/fanbox.py +++ b/gallery_dl/extractor/fanbox.py @@ -7,9 +7,8 @@ """Extractors for https://www.fanbox.cc/""" from .common import Extractor, Message -from .. import text +from .. import text, util from ..cache import memcache -import re BASE_PATTERN = r"(?:https?://)?(?:www\.)?fanbox\.cc" USER_PATTERN = ( @@ -41,8 +40,7 @@ class FanboxExtractor(Extractor): } self.embeds = self.config("embeds", True) - includes = self.config("metadata") - if includes: + if includes := self.config("metadata"): if isinstance(includes, str): includes = includes.split(",") elif not isinstance(includes, (list, tuple)): @@ -62,7 +60,23 @@ class FanboxExtractor(Extractor): FanboxExtractor._warning = False def items(self): - for content_body, post in self.posts(): + fee_max = self.config("fee-max") + + for item in self.posts(): + if fee_max is not None and fee_max < item["feeRequired"]: + self.log.warning("Skipping post %s (feeRequired of %s > %s)", + item["id"], item["feeRequired"], fee_max) + continue + + try: + url = "https://api.fanbox.cc/post.info?postId=" + item["id"] + body = self.request_json(url, headers=self.headers)["body"] + content_body, post = self._extract_post(body) + except Exception as exc: + self.log.warning("Skipping post %s (%s: %s)", + item["id"], exc.__class__.__name__, exc) + continue + yield Message.Directory, post yield from self._get_urls_from_post(content_body, post) @@ -72,22 +86,17 @@ class FanboxExtractor(Extractor): def _pagination(self, url): while url: url = text.ensure_http_scheme(url) - body = self.request(url, headers=self.headers).json()["body"] - for item in body["items"]: - try: - yield self._get_post_data(item["id"]) - except Exception as exc: - self.log.warning("Skipping post %s (%s: %s)", - item["id"], exc.__class__.__name__, exc) + body = self.request_json(url, headers=self.headers)["body"] + + yield from body["items"] + url = body["nextUrl"] - def _get_post_data(self, post_id): + def _extract_post(self, post): """Fetch and process post data""" - url = "https://api.fanbox.cc/post.info?postId="+post_id - post = self.request(url, headers=self.headers).json()["body"] + post["archives"] = () - content_body = post.pop("body", None) - if content_body: + if content_body := post.pop("body", None): if "html" in content_body: post["html"] = content_body["html"] if post["type"] == "article": @@ -95,29 +104,30 @@ class FanboxExtractor(Extractor): if "blocks" in content_body: content = [] # text content images = [] # image IDs in 'body' order + files = [] # file IDs in 'body' order - append = content.append - append_img = images.append for block in content_body["blocks"]: if "text" in block: - append(block["text"]) + content.append(block["text"]) if "links" in block: for link in block["links"]: - append(link["url"]) + content.append(link["url"]) if "imageId" in block: - append_img(block["imageId"]) - - if images and "imageMap" in content_body: - # reorder 'imageMap' (#2718) - image_map = content_body["imageMap"] - content_body["imageMap"] = { - image_id: image_map[image_id] - for image_id in images - if image_id in image_map - } + images.append(block["imageId"]) + if "fileId" in block: + files.append(block["fileId"]) post["content"] = "\n".join(content) + self._sort_map(content_body, "imageMap", images) + if file_map := self._sort_map(content_body, "fileMap", files): + exts = util.EXTS_ARCHIVE + post["archives"] = [ + file + for file in file_map.values() + if file.get("extension", "").lower() in exts + ] + post["date"] = text.parse_datetime(post["publishedDatetime"]) post["text"] = content_body.get("text") if content_body else None post["isCoverImage"] = False @@ -130,8 +140,7 @@ class FanboxExtractor(Extractor): try: post["plan"] = plans[fee] except KeyError: - fees = [f for f in plans if f >= fee] - if fees: + if fees := [f for f in plans if f >= fee]: plan = plans[min(fees)] else: plan = plans[0].copy() @@ -139,17 +148,30 @@ class FanboxExtractor(Extractor): post["plan"] = plans[fee] = plan if self._meta_comments: if post["commentCount"]: - post["comments"] = list(self._get_comment_data(post_id)) + post["comments"] = list(self._get_comment_data(post["id"])) else: post["commentd"] = () return content_body, post + def _sort_map(self, body, key, ids): + orig = body.get(key) + if not orig: + return {} if orig is None else orig + + body[key] = new = { + id: orig[id] + for id in ids + if id in orig + } + + return new + @memcache(keyarg=1) def _get_user_data(self, creator_id): url = "https://api.fanbox.cc/creator.get" params = {"creatorId": creator_id} - data = self.request(url, params=params, headers=self.headers).json() + data = self.request_json(url, params=params, headers=self.headers) user = data["body"] user.update(user.pop("user")) @@ -160,7 +182,7 @@ class FanboxExtractor(Extractor): def _get_plan_data(self, creator_id): url = "https://api.fanbox.cc/plan.listCreator" params = {"creatorId": creator_id} - data = self.request(url, params=params, headers=self.headers).json() + data = self.request_json(url, params=params, headers=self.headers) plans = {0: { "id" : "", @@ -185,7 +207,7 @@ class FanboxExtractor(Extractor): comments = [] while url: url = text.ensure_http_scheme(url) - body = self.request(url, headers=self.headers).json()["body"] + body = self.request_json(url, headers=self.headers)["body"] data = body["commentList"] comments.extend(data["items"]) url = data["nextUrl"] @@ -193,9 +215,8 @@ class FanboxExtractor(Extractor): def _get_urls_from_post(self, content_body, post): num = 0 - cover_image = post.get("coverImageUrl") - if cover_image: - cover_image = re.sub("/c/[0-9a-z_]+", "", cover_image) + if cover_image := post.get("coverImageUrl"): + cover_image = util.re("/c/[0-9a-z_]+").sub("", cover_image) final_post = post.copy() final_post["isCoverImage"] = True final_post["fileUrl"] = cover_image @@ -313,10 +334,10 @@ class FanboxExtractor(Extractor): elif provider == "twitter": url = "https://twitter.com/_/status/"+content_id elif provider == "google_forms": - templ = "https://docs.google.com/forms/d/e/{}/viewform?usp=sf_link" - url = templ.format(content_id) + url = (f"https://docs.google.com/forms/d/e/" + f"{content_id}/viewform?usp=sf_link") else: - self.log.warning("service not recognized: {}".format(provider)) + self.log.warning(f"service not recognized: {provider}") if url: final_post["embed"] = embed @@ -334,25 +355,16 @@ class FanboxCreatorExtractor(FanboxExtractor): pattern = USER_PATTERN + r"(?:/posts)?/?$" example = "https://USER.fanbox.cc/" - def __init__(self, match): - FanboxExtractor.__init__(self, match) - self.creator_id = match.group(1) or match.group(2) - def posts(self): url = "https://api.fanbox.cc/post.paginateCreator?creatorId=" - return self._pagination_creator(url + self.creator_id) + creator_id = self.groups[0] or self.groups[1] + return self._pagination_creator(url + creator_id) def _pagination_creator(self, url): - urls = self.request(url, headers=self.headers).json()["body"] + urls = self.request_json(url, headers=self.headers)["body"] for url in urls: url = text.ensure_http_scheme(url) - body = self.request(url, headers=self.headers).json()["body"] - for item in body: - try: - yield self._get_post_data(item["id"]) - except Exception as exc: - self.log.warning("Skipping post %s (%s: %s)", - item["id"], exc.__class__.__name__, exc) + yield from self.request_json(url, headers=self.headers)["body"] class FanboxPostExtractor(FanboxExtractor): @@ -361,12 +373,8 @@ class FanboxPostExtractor(FanboxExtractor): pattern = USER_PATTERN + r"/posts/(\d+)" example = "https://USER.fanbox.cc/posts/12345" - def __init__(self, match): - FanboxExtractor.__init__(self, match) - self.post_id = match.group(3) - def posts(self): - return (self._get_post_data(self.post_id),) + return ({"id": self.groups[2], "feeRequired": 0},) class FanboxHomeExtractor(FanboxExtractor): |
