aboutsummaryrefslogtreecommitdiffstats
path: root/gallery_dl/extractor/cohost.py
diff options
context:
space:
mode:
Diffstat (limited to 'gallery_dl/extractor/cohost.py')
-rw-r--r--gallery_dl/extractor/cohost.py223
1 files changed, 223 insertions, 0 deletions
diff --git a/gallery_dl/extractor/cohost.py b/gallery_dl/extractor/cohost.py
new file mode 100644
index 0000000..e1f6040
--- /dev/null
+++ b/gallery_dl/extractor/cohost.py
@@ -0,0 +1,223 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2024 Mike Fährmann
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+
+"""Extractors for https://cohost.org/"""
+
+from .common import Extractor, Message
+from .. import text, util
+
+BASE_PATTERN = r"(?:https?://)?(?:www\.)?cohost\.org"
+
+
+class CohostExtractor(Extractor):
+ """Base class for cohost extractors"""
+ category = "cohost"
+ root = "https://cohost.org"
+ directory_fmt = ("{category}", "{postingProject[handle]}")
+ filename_fmt = ("{postId}_{headline|plainTextBody:?/_/[:100]}"
+ "{num}.{extension}")
+ archive_fmt = "{postId}_{num}"
+
+ def _init(self):
+ self.replies = self.config("replies", True)
+ self.pinned = self.config("pinned", False)
+ self.shares = self.config("shares", False)
+ self.asks = self.config("asks", True)
+
+ def items(self):
+ for post in self.posts():
+ reason = post.get("limitedVisibilityReason")
+ if reason and reason != "none":
+ if reason == "log-in-first":
+ reason = ("This page's posts are visible only to users "
+ "who are logged in.")
+ self.log.warning('%s: "%s"', post["postId"], reason)
+
+ files = self._extract_files(post)
+ post["count"] = len(files)
+ post["date"] = text.parse_datetime(
+ post["publishedAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
+
+ yield Message.Directory, post
+ for post["num"], file in enumerate(files, 1):
+ url = file["fileURL"]
+ post.update(file)
+ text.nameext_from_url(url, post)
+ yield Message.Url, url, post
+
+ def posts(self):
+ return ()
+
+ def _request_api(self, endpoint, input):
+ url = "{}/api/v1/trpc/{}".format(self.root, endpoint)
+ params = {"batch": "1", "input": util.json_dumps({"0": input})}
+ headers = {"content-type": "application/json"}
+
+ data = self.request(url, params=params, headers=headers).json()
+ return data[0]["result"]["data"]
+
+ def _extract_files(self, post):
+ files = []
+
+ self._extract_blocks(post, files)
+ if self.shares and post.get("shareTree"):
+ for share in post["shareTree"]:
+ self._extract_blocks(share, files, share)
+ del post["shareTree"]
+
+ return files
+
+ def _extract_blocks(self, post, files, shared=None):
+ post["content"] = content = []
+
+ for block in post.pop("blocks") or ():
+ try:
+ type = block["type"]
+ if type == "attachment":
+ file = block["attachment"].copy()
+ file["shared"] = shared
+ files.append(file)
+ elif type == "attachment-row":
+ for att in block["attachments"]:
+ file = att["attachment"].copy()
+ file["shared"] = shared
+ files.append(file)
+ elif type == "markdown":
+ content.append(block["markdown"]["content"])
+ elif type == "ask":
+ post["ask"] = block["ask"]
+ else:
+ self.log.debug("%s: Unsupported block type '%s'",
+ post["postId"], type)
+ except Exception as exc:
+ self.log.debug("%s: %s", exc.__class__.__name__, exc)
+
+
+class CohostUserExtractor(CohostExtractor):
+ """Extractor for media from a cohost user"""
+ subcategory = "user"
+ pattern = BASE_PATTERN + r"/([^/?#]+)/?(?:$|\?|#)"
+ example = "https://cohost.org/USER"
+
+ def posts(self):
+ empty = 0
+ params = {
+ "projectHandle": self.groups[0],
+ "page": 0,
+ "options": {
+ "pinnedPostsAtTop" : bool(self.pinned),
+ "hideReplies" : not self.replies,
+ "hideShares" : not self.shares,
+ "hideAsks" : not self.asks,
+ "viewingOnProjectPage": True,
+ },
+ }
+
+ while True:
+ data = self._request_api("posts.profilePosts", params)
+
+ posts = data["posts"]
+ if posts:
+ empty = 0
+ yield from posts
+ else:
+ empty += 1
+
+ pagination = data["pagination"]
+ if not pagination.get("morePagesForward"):
+ return
+ if empty >= 3:
+ return self.log.debug("Empty API results")
+ params["page"] = pagination["nextPage"]
+
+
+class CohostPostExtractor(CohostExtractor):
+ """Extractor for media from a single cohost post"""
+ subcategory = "post"
+ pattern = BASE_PATTERN + r"/([^/?#]+)/post/(\d+)"
+ example = "https://cohost.org/USER/post/12345"
+
+ def posts(self):
+ endpoint = "posts.singlePost"
+ params = {
+ "handle": self.groups[0],
+ "postId": int(self.groups[1]),
+ }
+
+ data = self._request_api(endpoint, params)
+ post = data["post"]
+
+ try:
+ post["comments"] = data["comments"][self.groups[1]]
+ except LookupError:
+ post["comments"] = ()
+
+ return (post,)
+
+
+class CohostTagExtractor(CohostExtractor):
+ """Extractor for tagged posts"""
+ subcategory = "tag"
+ pattern = BASE_PATTERN + r"/([^/?#]+)/tagged/([^/?#]+)(?:\?([^#]+))?"
+ example = "https://cohost.org/USER/tagged/TAG"
+
+ def posts(self):
+ user, tag, query = self.groups
+ url = "{}/{}/tagged/{}".format(self.root, user, tag)
+ params = text.parse_query(query)
+ post_feed_key = ("tagged-post-feed" if user == "rc" else
+ "project-tagged-post-feed")
+
+ while True:
+ page = self.request(url, params=params).text
+ data = util.json_loads(text.extr(
+ page, 'id="__COHOST_LOADER_STATE__">', '</script>'))
+
+ try:
+ feed = data[post_feed_key]
+ except KeyError:
+ feed = data.popitem()[1]
+
+ yield from feed["posts"]
+
+ pagination = feed["paginationMode"]
+ if not pagination.get("morePagesForward"):
+ return
+ params["refTimestamp"] = pagination["refTimestamp"]
+ params["skipPosts"] = \
+ pagination["currentSkip"] + pagination["idealPageStride"]
+
+
+class CohostLikesExtractor(CohostExtractor):
+ """Extractor for liked posts"""
+ subcategory = "likes"
+ pattern = BASE_PATTERN + r"/rc/liked-posts"
+ example = "https://cohost.org/rc/liked-posts"
+
+ def posts(self):
+ url = "{}/rc/liked-posts".format(self.root)
+ params = {}
+
+ while True:
+ page = self.request(url, params=params).text
+ data = util.json_loads(text.extr(
+ page, 'id="__COHOST_LOADER_STATE__">', '</script>'))
+
+ try:
+ feed = data["liked-posts-feed"]
+ except KeyError:
+ feed = data.popitem()[1]
+
+ yield from feed["posts"]
+
+ pagination = feed["paginationMode"]
+ if not pagination.get("morePagesForward"):
+ return
+ params["refTimestamp"] = pagination["refTimestamp"]
+ params["skipPosts"] = \
+ pagination["currentSkip"] + pagination["idealPageStride"]