# -*- coding: utf-8 -*- # Copyright 2025 Mike Fährmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. """Extractors for https://www.bellazon.com/""" from .common import Extractor, Message from .. import text, exception BASE_PATTERN = r"(?:https?://)?(?:www\.)?bellazon\.com/main" class BellazonExtractor(Extractor): """Base class for bellazon extractors""" category = "bellazon" root = "https://www.bellazon.com/main" directory_fmt = ("{category}", "{thread[section]}", "{thread[title]} ({thread[id]})") filename_fmt = "{post[id]}_{num:>02}_{id}_{filename}.{extension}" archive_fmt = "{post[id]}/{id}_{filename}" def items(self): native = (self.root + "/", self.root[6:] + "/") extract_urls = text.re( r'(?s)<(' r'(?:video .*?]*?src|a [^>]*?href)="([^"]+).*?' r'|img [^>]*?src="([^"]+)"[^>]*>' r')' ).findall if self.config("quoted", False): strip_quoted = None else: strip_quoted = text.re(r"(?s)
").sub for post in self.posts(): if strip_quoted is None: urls = extract_urls(post["content"]) else: urls = extract_urls(strip_quoted("", post["content"])) data = {"post": post} post["count"] = data["count"] = len(urls) yield Message.Directory, "", data data["num"] = data["num_internal"] = data["num_external"] = 0 for info, url, url_img in urls: url = text.unescape(url or url_img) if url.startswith(native): if ( "/uploads/emoticons/" in url or "/profile/" in url or "/topic/" in url ): continue data["num"] += 1 data["num_internal"] += 1 if not (alt := text.extr(info, ' alt="', '"')) or ( alt.startswith("post-") and "_thumb." in alt): dc = text.nameext_from_url(url, data.copy()) else: dc = data.copy() dc["name"] = name = text.unescape(alt) dc["filename"] = name.partition(".")[0] dc["id"] = text.extr(info, 'data-fileid="', '"') if ext := text.extr(info, 'data-fileext="', '"'): dc["extension"] = ext elif "/core/interface/file/attachment.php" in url: if not dc["id"]: dc["id"] = \ url.rpartition("?id=")[2].partition("&")[0] if name := text.extr(info, ">", "<").strip(): dc["name"] = name = text.unescape(name) text.nameext_from_name(name, dc) else: dc["extension"] = text.ext_from_url(url) if url[0] == "/": url = "https:" + url yield Message.Url, url, dc else: data["num"] += 1 data["num_external"] += 1 yield Message.Queue, url, data def _pagination(self, base, pnum=None): base = self.root + base if pnum is None: url = base + "/" pnum = 1 else: url = f"{base}/page/{pnum}/" pnum = None while True: page = self.request(url).text yield page if pnum is None or ' rel="next" ' not in page or text.extr( page, " rel=\"next\" data-page='", "'") == str(pnum): return pnum += 1 url = f"{base}/page/{pnum}/" def _pagination_reverse(self, base, pnum=None): base = self.root + base url = f"{base}/page/{'9999' if pnum is None else pnum}/" with self.request(url) as response: parts = response.url.rsplit("/", 3) pnum = text.parse_int(parts[2]) if parts[1] == "page" else 1 page = response.text while True: yield page pnum -= 1 if pnum > 1: url = f"{base}/page/{pnum}/" elif pnum == 1: url = base + "/" else: return page = self.request(url).text def _parse_thread(self, page): schema = self._extract_jsonld(page) author = schema["author"] stats = schema["interactionStatistic"] url_t = schema["url"] url_a = author.get("url") or "" path = text.split_html(text.extr( page, '