diff options
Diffstat (limited to 'gallery_dl/extractor/cohost.py')
| -rw-r--r-- | gallery_dl/extractor/cohost.py | 250 |
1 files changed, 0 insertions, 250 deletions
diff --git a/gallery_dl/extractor/cohost.py b/gallery_dl/extractor/cohost.py deleted file mode 100644 index 6a43224..0000000 --- a/gallery_dl/extractor/cohost.py +++ /dev/null @@ -1,250 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright 2024 Mike Fährmann -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. - -"""Extractors for https://cohost.org/""" - -from .common import Extractor, Message -from .. import text, util - -BASE_PATTERN = r"(?:https?://)?(?:www\.)?cohost\.org" - - -class CohostExtractor(Extractor): - """Base class for cohost extractors""" - category = "cohost" - root = "https://cohost.org" - directory_fmt = ("{category}", "{postingProject[handle]}") - filename_fmt = ("{postId}{headline:?_//[b:200]}{num:?_//}.{extension}") - archive_fmt = "{postId}_{num}" - - def _init(self): - self.replies = self.config("replies", True) - self.pinned = self.config("pinned", False) - self.shares = self.config("shares", False) - self.asks = self.config("asks", True) - - self.avatar = self.config("avatar", False) - if self.avatar: - self._urls_avatar = {None, ""} - - self.background = self.config("background", False) - if self.background: - self._urls_background = {None, ""} - - def items(self): - for post in self.posts(): - reason = post.get("limitedVisibilityReason") - if reason and reason != "none": - if reason == "log-in-first": - reason = ("This page's posts are visible only to users " - "who are logged in.") - self.log.warning('%s: "%s"', post["postId"], reason) - - files = self._extract_files(post) - post["count"] = len(files) - post["date"] = text.parse_datetime( - post["publishedAt"], "%Y-%m-%dT%H:%M:%S.%fZ") - - yield Message.Directory, post - - project = post["postingProject"] - if self.avatar: - url = project.get("avatarURL") - if url not in self._urls_avatar: - self._urls_avatar.add(url) - p = post.copy() - p["postId"] = p["kind"] = "avatar" - p["headline"] = p["num"] = "" - yield Message.Url, url, text.nameext_from_url(url, p) - - if self.background: - url = project.get("headerURL") - if url not in self._urls_background: - self._urls_background.add(url) - p = post.copy() - p["postId"] = p["kind"] = "background" - p["headline"] = p["num"] = "" - yield Message.Url, url, text.nameext_from_url(url, p) - - for post["num"], file in enumerate(files, 1): - url = file["fileURL"] - post.update(file) - text.nameext_from_url(url, post) - yield Message.Url, url, post - - def posts(self): - return () - - def _request_api(self, endpoint, input): - url = "{}/api/v1/trpc/{}".format(self.root, endpoint) - params = {"batch": "1", "input": util.json_dumps({"0": input})} - headers = {"content-type": "application/json"} - - data = self.request(url, params=params, headers=headers).json() - return data[0]["result"]["data"] - - def _extract_files(self, post): - files = [] - - self._extract_blocks(post, files) - if self.shares and post.get("shareTree"): - for share in post["shareTree"]: - self._extract_blocks(share, files, share) - del post["shareTree"] - - return files - - def _extract_blocks(self, post, files, shared=None): - post["content"] = content = [] - - for block in post.pop("blocks") or (): - try: - type = block["type"] - if type == "attachment": - file = block["attachment"].copy() - file["shared"] = shared - files.append(file) - elif type == "attachment-row": - for att in block["attachments"]: - file = att["attachment"].copy() - file["shared"] = shared - files.append(file) - elif type == "markdown": - content.append(block["markdown"]["content"]) - elif type == "ask": - post["ask"] = block["ask"] - else: - self.log.debug("%s: Unsupported block type '%s'", - post["postId"], type) - except Exception as exc: - self.log.debug("%s: %s", exc.__class__.__name__, exc) - - -class CohostUserExtractor(CohostExtractor): - """Extractor for media from a cohost user""" - subcategory = "user" - pattern = BASE_PATTERN + r"/([^/?#]+)/?(?:$|\?|#)" - example = "https://cohost.org/USER" - - def posts(self): - empty = 0 - params = { - "projectHandle": self.groups[0], - "page": 0, - "options": { - "pinnedPostsAtTop" : True if self.pinned else False, - "hideReplies" : not self.replies, - "hideShares" : not self.shares, - "hideAsks" : not self.asks, - "viewingOnProjectPage": True, - }, - } - - while True: - data = self._request_api("posts.profilePosts", params) - - posts = data["posts"] - if posts: - empty = 0 - yield from posts - else: - empty += 1 - - pagination = data["pagination"] - if not pagination.get("morePagesForward"): - return - if empty >= 3: - return self.log.debug("Empty API results") - params["page"] = pagination["nextPage"] - - -class CohostPostExtractor(CohostExtractor): - """Extractor for media from a single cohost post""" - subcategory = "post" - pattern = BASE_PATTERN + r"/([^/?#]+)/post/(\d+)" - example = "https://cohost.org/USER/post/12345" - - def posts(self): - endpoint = "posts.singlePost" - params = { - "handle": self.groups[0], - "postId": int(self.groups[1]), - } - - data = self._request_api(endpoint, params) - post = data["post"] - - try: - post["comments"] = data["comments"][self.groups[1]] - except LookupError: - post["comments"] = () - - return (post,) - - -class CohostTagExtractor(CohostExtractor): - """Extractor for tagged posts""" - subcategory = "tag" - pattern = BASE_PATTERN + r"/([^/?#]+)/tagged/([^/?#]+)(?:\?([^#]+))?" - example = "https://cohost.org/USER/tagged/TAG" - - def posts(self): - user, tag, query = self.groups - url = "{}/{}/tagged/{}".format(self.root, user, tag) - params = text.parse_query(query) - post_feed_key = ("tagged-post-feed" if user == "rc" else - "project-tagged-post-feed") - - while True: - page = self.request(url, params=params).text - data = util.json_loads(text.extr( - page, 'id="__COHOST_LOADER_STATE__">', '</script>')) - - try: - feed = data[post_feed_key] - except KeyError: - feed = data.popitem()[1] - - yield from feed["posts"] - - pagination = feed["paginationMode"] - if not pagination.get("morePagesForward"): - return - params["refTimestamp"] = pagination["refTimestamp"] - params["skipPosts"] = \ - pagination["currentSkip"] + pagination["idealPageStride"] - - -class CohostLikesExtractor(CohostExtractor): - """Extractor for liked posts""" - subcategory = "likes" - pattern = BASE_PATTERN + r"/rc/liked-posts" - example = "https://cohost.org/rc/liked-posts" - - def posts(self): - url = "{}/rc/liked-posts".format(self.root) - params = {} - - while True: - page = self.request(url, params=params).text - data = util.json_loads(text.extr( - page, 'id="__COHOST_LOADER_STATE__">', '</script>')) - - try: - feed = data["liked-posts-feed"] - except KeyError: - feed = data.popitem()[1] - - yield from feed["posts"] - - pagination = feed["paginationMode"] - if not pagination.get("morePagesForward"): - return - params["refTimestamp"] = pagination["refTimestamp"] - params["skipPosts"] = \ - pagination["currentSkip"] + pagination["idealPageStride"] |
