summaryrefslogtreecommitdiffstats
path: root/gallery_dl/extractor/cohost.py
diff options
context:
space:
mode:
Diffstat (limited to 'gallery_dl/extractor/cohost.py')
-rw-r--r--gallery_dl/extractor/cohost.py250
1 files changed, 0 insertions, 250 deletions
diff --git a/gallery_dl/extractor/cohost.py b/gallery_dl/extractor/cohost.py
deleted file mode 100644
index 6a43224..0000000
--- a/gallery_dl/extractor/cohost.py
+++ /dev/null
@@ -1,250 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright 2024 Mike Fährmann
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 as
-# published by the Free Software Foundation.
-
-"""Extractors for https://cohost.org/"""
-
-from .common import Extractor, Message
-from .. import text, util
-
-BASE_PATTERN = r"(?:https?://)?(?:www\.)?cohost\.org"
-
-
-class CohostExtractor(Extractor):
- """Base class for cohost extractors"""
- category = "cohost"
- root = "https://cohost.org"
- directory_fmt = ("{category}", "{postingProject[handle]}")
- filename_fmt = ("{postId}{headline:?_//[b:200]}{num:?_//}.{extension}")
- archive_fmt = "{postId}_{num}"
-
- def _init(self):
- self.replies = self.config("replies", True)
- self.pinned = self.config("pinned", False)
- self.shares = self.config("shares", False)
- self.asks = self.config("asks", True)
-
- self.avatar = self.config("avatar", False)
- if self.avatar:
- self._urls_avatar = {None, ""}
-
- self.background = self.config("background", False)
- if self.background:
- self._urls_background = {None, ""}
-
- def items(self):
- for post in self.posts():
- reason = post.get("limitedVisibilityReason")
- if reason and reason != "none":
- if reason == "log-in-first":
- reason = ("This page's posts are visible only to users "
- "who are logged in.")
- self.log.warning('%s: "%s"', post["postId"], reason)
-
- files = self._extract_files(post)
- post["count"] = len(files)
- post["date"] = text.parse_datetime(
- post["publishedAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
-
- yield Message.Directory, post
-
- project = post["postingProject"]
- if self.avatar:
- url = project.get("avatarURL")
- if url not in self._urls_avatar:
- self._urls_avatar.add(url)
- p = post.copy()
- p["postId"] = p["kind"] = "avatar"
- p["headline"] = p["num"] = ""
- yield Message.Url, url, text.nameext_from_url(url, p)
-
- if self.background:
- url = project.get("headerURL")
- if url not in self._urls_background:
- self._urls_background.add(url)
- p = post.copy()
- p["postId"] = p["kind"] = "background"
- p["headline"] = p["num"] = ""
- yield Message.Url, url, text.nameext_from_url(url, p)
-
- for post["num"], file in enumerate(files, 1):
- url = file["fileURL"]
- post.update(file)
- text.nameext_from_url(url, post)
- yield Message.Url, url, post
-
- def posts(self):
- return ()
-
- def _request_api(self, endpoint, input):
- url = "{}/api/v1/trpc/{}".format(self.root, endpoint)
- params = {"batch": "1", "input": util.json_dumps({"0": input})}
- headers = {"content-type": "application/json"}
-
- data = self.request(url, params=params, headers=headers).json()
- return data[0]["result"]["data"]
-
- def _extract_files(self, post):
- files = []
-
- self._extract_blocks(post, files)
- if self.shares and post.get("shareTree"):
- for share in post["shareTree"]:
- self._extract_blocks(share, files, share)
- del post["shareTree"]
-
- return files
-
- def _extract_blocks(self, post, files, shared=None):
- post["content"] = content = []
-
- for block in post.pop("blocks") or ():
- try:
- type = block["type"]
- if type == "attachment":
- file = block["attachment"].copy()
- file["shared"] = shared
- files.append(file)
- elif type == "attachment-row":
- for att in block["attachments"]:
- file = att["attachment"].copy()
- file["shared"] = shared
- files.append(file)
- elif type == "markdown":
- content.append(block["markdown"]["content"])
- elif type == "ask":
- post["ask"] = block["ask"]
- else:
- self.log.debug("%s: Unsupported block type '%s'",
- post["postId"], type)
- except Exception as exc:
- self.log.debug("%s: %s", exc.__class__.__name__, exc)
-
-
-class CohostUserExtractor(CohostExtractor):
- """Extractor for media from a cohost user"""
- subcategory = "user"
- pattern = BASE_PATTERN + r"/([^/?#]+)/?(?:$|\?|#)"
- example = "https://cohost.org/USER"
-
- def posts(self):
- empty = 0
- params = {
- "projectHandle": self.groups[0],
- "page": 0,
- "options": {
- "pinnedPostsAtTop" : True if self.pinned else False,
- "hideReplies" : not self.replies,
- "hideShares" : not self.shares,
- "hideAsks" : not self.asks,
- "viewingOnProjectPage": True,
- },
- }
-
- while True:
- data = self._request_api("posts.profilePosts", params)
-
- posts = data["posts"]
- if posts:
- empty = 0
- yield from posts
- else:
- empty += 1
-
- pagination = data["pagination"]
- if not pagination.get("morePagesForward"):
- return
- if empty >= 3:
- return self.log.debug("Empty API results")
- params["page"] = pagination["nextPage"]
-
-
-class CohostPostExtractor(CohostExtractor):
- """Extractor for media from a single cohost post"""
- subcategory = "post"
- pattern = BASE_PATTERN + r"/([^/?#]+)/post/(\d+)"
- example = "https://cohost.org/USER/post/12345"
-
- def posts(self):
- endpoint = "posts.singlePost"
- params = {
- "handle": self.groups[0],
- "postId": int(self.groups[1]),
- }
-
- data = self._request_api(endpoint, params)
- post = data["post"]
-
- try:
- post["comments"] = data["comments"][self.groups[1]]
- except LookupError:
- post["comments"] = ()
-
- return (post,)
-
-
-class CohostTagExtractor(CohostExtractor):
- """Extractor for tagged posts"""
- subcategory = "tag"
- pattern = BASE_PATTERN + r"/([^/?#]+)/tagged/([^/?#]+)(?:\?([^#]+))?"
- example = "https://cohost.org/USER/tagged/TAG"
-
- def posts(self):
- user, tag, query = self.groups
- url = "{}/{}/tagged/{}".format(self.root, user, tag)
- params = text.parse_query(query)
- post_feed_key = ("tagged-post-feed" if user == "rc" else
- "project-tagged-post-feed")
-
- while True:
- page = self.request(url, params=params).text
- data = util.json_loads(text.extr(
- page, 'id="__COHOST_LOADER_STATE__">', '</script>'))
-
- try:
- feed = data[post_feed_key]
- except KeyError:
- feed = data.popitem()[1]
-
- yield from feed["posts"]
-
- pagination = feed["paginationMode"]
- if not pagination.get("morePagesForward"):
- return
- params["refTimestamp"] = pagination["refTimestamp"]
- params["skipPosts"] = \
- pagination["currentSkip"] + pagination["idealPageStride"]
-
-
-class CohostLikesExtractor(CohostExtractor):
- """Extractor for liked posts"""
- subcategory = "likes"
- pattern = BASE_PATTERN + r"/rc/liked-posts"
- example = "https://cohost.org/rc/liked-posts"
-
- def posts(self):
- url = "{}/rc/liked-posts".format(self.root)
- params = {}
-
- while True:
- page = self.request(url, params=params).text
- data = util.json_loads(text.extr(
- page, 'id="__COHOST_LOADER_STATE__">', '</script>'))
-
- try:
- feed = data["liked-posts-feed"]
- except KeyError:
- feed = data.popitem()[1]
-
- yield from feed["posts"]
-
- pagination = feed["paginationMode"]
- if not pagination.get("morePagesForward"):
- return
- params["refTimestamp"] = pagination["refTimestamp"]
- params["skipPosts"] = \
- pagination["currentSkip"] + pagination["idealPageStride"]