# -*- coding: utf-8 -*- # Copyright 2024 Mike Fährmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. """Extractors for https://cohost.org/""" from .common import Extractor, Message from .. import text, util BASE_PATTERN = r"(?:https?://)?(?:www\.)?cohost\.org" class CohostExtractor(Extractor): """Base class for cohost extractors""" category = "cohost" root = "https://cohost.org" directory_fmt = ("{category}", "{postingProject[handle]}") filename_fmt = ("{postId}{headline:?_//[b:200]}{num:?_//}.{extension}") archive_fmt = "{postId}_{num}" def _init(self): self.replies = self.config("replies", True) self.pinned = self.config("pinned", False) self.shares = self.config("shares", False) self.asks = self.config("asks", True) self.avatar = self.config("avatar", False) if self.avatar: self._urls_avatar = {None, ""} self.background = self.config("background", False) if self.background: self._urls_background = {None, ""} def items(self): for post in self.posts(): reason = post.get("limitedVisibilityReason") if reason and reason != "none": if reason == "log-in-first": reason = ("This page's posts are visible only to users " "who are logged in.") self.log.warning('%s: "%s"', post["postId"], reason) files = self._extract_files(post) post["count"] = len(files) post["date"] = text.parse_datetime( post["publishedAt"], "%Y-%m-%dT%H:%M:%S.%fZ") yield Message.Directory, post project = post["postingProject"] if self.avatar: url = project.get("avatarURL") if url not in self._urls_avatar: self._urls_avatar.add(url) p = post.copy() p["postId"] = p["kind"] = "avatar" p["headline"] = p["num"] = "" yield Message.Url, url, text.nameext_from_url(url, p) if self.background: url = project.get("headerURL") if url not in self._urls_background: self._urls_background.add(url) p = post.copy() p["postId"] = p["kind"] = "background" p["headline"] = p["num"] = "" yield Message.Url, url, text.nameext_from_url(url, p) for post["num"], file in enumerate(files, 1): url = file["fileURL"] post.update(file) text.nameext_from_url(url, post) yield Message.Url, url, post def posts(self): return () def _request_api(self, endpoint, input): url = "{}/api/v1/trpc/{}".format(self.root, endpoint) params = {"batch": "1", "input": util.json_dumps({"0": input})} headers = {"content-type": "application/json"} data = self.request(url, params=params, headers=headers).json() return data[0]["result"]["data"] def _extract_files(self, post): files = [] self._extract_blocks(post, files) if self.shares and post.get("shareTree"): for share in post["shareTree"]: self._extract_blocks(share, files, share) del post["shareTree"] return files def _extract_blocks(self, post, files, shared=None): post["content"] = content = [] for block in post.pop("blocks") or (): try: type = block["type"] if type == "attachment": file = block["attachment"].copy() file["shared"] = shared files.append(file) elif type == "attachment-row": for att in block["attachments"]: file = att["attachment"].copy() file["shared"] = shared files.append(file) elif type == "markdown": content.append(block["markdown"]["content"]) elif type == "ask": post["ask"] = block["ask"] else: self.log.debug("%s: Unsupported block type '%s'", post["postId"], type) except Exception as exc: self.log.debug("%s: %s", exc.__class__.__name__, exc) class CohostUserExtractor(CohostExtractor): """Extractor for media from a cohost user""" subcategory = "user" pattern = BASE_PATTERN + r"/([^/?#]+)/?(?:$|\?|#)" example = "https://cohost.org/USER" def posts(self): empty = 0 params = { "projectHandle": self.groups[0], "page": 0, "options": { "pinnedPostsAtTop" : True if self.pinned else False, "hideReplies" : not self.replies, "hideShares" : not self.shares, "hideAsks" : not self.asks, "viewingOnProjectPage": True, }, } while True: data = self._request_api("posts.profilePosts", params) posts = data["posts"] if posts: empty = 0 yield from posts else: empty += 1 pagination = data["pagination"] if not pagination.get("morePagesForward"): return if empty >= 3: return self.log.debug("Empty API results") params["page"] = pagination["nextPage"] class CohostPostExtractor(CohostExtractor): """Extractor for media from a single cohost post""" subcategory = "post" pattern = BASE_PATTERN + r"/([^/?#]+)/post/(\d+)" example = "https://cohost.org/USER/post/12345" def posts(self): endpoint = "posts.singlePost" params = { "handle": self.groups[0], "postId": int(self.groups[1]), } data = self._request_api(endpoint, params) post = data["post"] try: post["comments"] = data["comments"][self.groups[1]] except LookupError: post["comments"] = () return (post,) class CohostTagExtractor(CohostExtractor): """Extractor for tagged posts""" subcategory = "tag" pattern = BASE_PATTERN + r"/([^/?#]+)/tagged/([^/?#]+)(?:\?([^#]+))?" example = "https://cohost.org/USER/tagged/TAG" def posts(self): user, tag, query = self.groups url = "{}/{}/tagged/{}".format(self.root, user, tag) params = text.parse_query(query) post_feed_key = ("tagged-post-feed" if user == "rc" else "project-tagged-post-feed") while True: page = self.request(url, params=params).text data = util.json_loads(text.extr( page, 'id="__COHOST_LOADER_STATE__">', '')) try: feed = data[post_feed_key] except KeyError: feed = data.popitem()[1] yield from feed["posts"] pagination = feed["paginationMode"] if not pagination.get("morePagesForward"): return params["refTimestamp"] = pagination["refTimestamp"] params["skipPosts"] = \ pagination["currentSkip"] + pagination["idealPageStride"] class CohostLikesExtractor(CohostExtractor): """Extractor for liked posts""" subcategory = "likes" pattern = BASE_PATTERN + r"/rc/liked-posts" example = "https://cohost.org/rc/liked-posts" def posts(self): url = "{}/rc/liked-posts".format(self.root) params = {} while True: page = self.request(url, params=params).text data = util.json_loads(text.extr( page, 'id="__COHOST_LOADER_STATE__">', '')) try: feed = data["liked-posts-feed"] except KeyError: feed = data.popitem()[1] yield from feed["posts"] pagination = feed["paginationMode"] if not pagination.get("morePagesForward"): return params["refTimestamp"] = pagination["refTimestamp"] params["skipPosts"] = \ pagination["currentSkip"] + pagination["idealPageStride"]