# -*- coding: utf-8 -*- # Copyright 2020-2025 Mike Fährmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. """Extractors for https://www.subscribestar.com/""" from .common import Extractor, Message from .. import text, util, exception from ..cache import cache BASE_PATTERN = r"(?:https?://)?(?:www\.)?subscribestar\.(com|adult)" class SubscribestarExtractor(Extractor): """Base class for subscribestar extractors""" category = "subscribestar" root = "https://www.subscribestar.com" directory_fmt = ("{category}", "{author_name}") filename_fmt = "{post_id}_{id}.{extension}" archive_fmt = "{id}" cookies_domain = ".subscribestar.com" cookies_names = ("_personalization_id",) _warning = True def __init__(self, match): if match[1] == "adult": self.root = "https://subscribestar.adult" self.cookies_domain = ".subscribestar.adult" self.subcategory += "-adult" Extractor.__init__(self, match) self.item = match[2] def items(self): self.login() for post_html in self.posts(): media = self._media_from_post(post_html) data = self._data_from_post(post_html) content = data["content"] if "" in content: data["content"] = content = text.extr( content, "", "") data["title"] = text.unescape(text.rextr(content, "

", "

")) yield Message.Directory, "", data for num, item in enumerate(media, 1): item.update(data) item["num"] = num url = item["url"] if name := (item.get("name") or item.get("original_filename")): text.nameext_from_name(name, item) else: text.nameext_from_url(url, item) if url[0] == "/": url = self.root + url yield Message.Url, url, item def posts(self): """Yield HTML content of all relevant posts""" def request(self, url, **kwargs): while True: response = Extractor.request(self, url, **kwargs) if response.history and ( "/verify_subscriber" in response.url or "/age_confirmation_warning" in response.url): raise exception.AbortExtraction( "HTTP redirect to " + response.url) content = response.content if len(content) < 250 and b">redirected<" in content: url = text.unescape(text.extr( content, b'href="', b'"').decode()) self.log.debug("HTML redirect message for %s", url) continue return response def login(self): if self.cookies_check(self.cookies_names): return username, password = self._get_auth_info() if username: self.cookies_update(self._login_impl( (username, self.cookies_domain), password)) if self._warning: if not username or not self.cookies_check(self.cookies_names): self.log.warning("no '_personalization_id' cookie set") SubscribestarExtractor._warning = False @cache(maxage=28*86400, keyarg=1) def _login_impl(self, username, password): username = username[0] self.log.info("Logging in as %s", username) if self.root.endswith(".adult"): self.cookies.set("18_plus_agreement_generic", "true", domain=self.cookies_domain) # load login page url = self.root + "/login" page = self.request(url).text headers = { "Accept": "*/*;q=0.5, text/javascript, application/javascript, " "application/ecmascript, application/x-ecmascript", "Referer": self.root + "/login", "X-CSRF-Token": text.unescape(text.extr( page, '', '<')), "url" : text.unescape(text.extr(att, 'href="', '"')), "type": "attachment", }) audios = text.extr( html, 'class="uploads-audios"', 'class="post-edit_form"') if audios: for audio in text.re(r'class="audio_preview-data[" ]').split( audios)[1:]: media.append({ "id" : text.parse_int(text.extr( audio, 'data-upload-id="', '"')), "name": text.unescape(text.extr( audio, 'audio_preview-title">', '<')), "url" : text.unescape(text.extr(audio, 'src="', '"')), "type": "audio", }) return media def _data_from_post(self, html): extr = text.extract_from(html) return { "post_id" : text.parse_int(extr('data-id="', '"')), "author_id" : text.parse_int(extr('data-user-id="', '"')), "author_name": text.unescape(extr('href="/', '"')), "author_nick": text.unescape(extr('>', '<')), "date" : self._parse_datetime(extr( 'class="post-date">', '")[2]), "content" : extr( '
', '
', '
'), '?tag=', '"')), } def _parse_datetime(self, dt): if dt.startswith("Updated on "): dt = dt[11:] date = self.parse_datetime(dt, "%b %d, %Y %I:%M %p") if date is dt: date = self.parse_datetime(dt, "%B %d, %Y %I:%M %p") return date def _warn_preview(self): self.log.warning("Preview image detected") self._warn_preview = util.noop class SubscribestarUserExtractor(SubscribestarExtractor): """Extractor for media from a subscribestar user""" subcategory = "user" pattern = BASE_PATTERN + r"/(?!posts/)([^/?#]+)(?:\?([^#]+))?" example = "https://www.subscribestar.com/USER" def posts(self): _, user, qs = self.groups url = f"{self.root}/{user}" if qs is None: params = None else: params = text.parse_query(qs) if "tag" in params: self.kwdict["search_tags"] = params["tag"] return self._pagination(url, params) class SubscribestarPostExtractor(SubscribestarExtractor): """Extractor for media from a single subscribestar post""" subcategory = "post" pattern = BASE_PATTERN + r"/posts/(\d+)" example = "https://www.subscribestar.com/posts/12345" def posts(self): url = f"{self.root}/posts/{self.item}" return (self.request(url).text,) def _data_from_post(self, html): extr = text.extract_from(html) return { "post_id" : text.parse_int(extr('data-id="', '"')), "date" : self._parse_datetime(extr( '
', '<')), "content" : extr( '
', '
', '
'), '?tag=', '"')), "author_name": text.unescape(extr( 'class="star_link" href="/', '"')), "author_id" : text.parse_int(extr('data-user-id="', '"')), "author_nick": text.unescape(extr('alt="', '"')), }