# -*- coding: utf-8 -*- # Copyright 2021-2025 Mike Fährmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. """Extractors for Gelbooru Beta 0.2 sites""" from . import booru from .. import text, util, exception import collections class GelbooruV02Extractor(booru.BooruExtractor): basecategory = "gelbooru_v02" def __init__(self, match): booru.BooruExtractor.__init__(self, match) self.request_interval = self.config_instance("request-interval", 0.0) self.root_api = self.config_instance("root-api") or self.root def _init(self): self.api_key = self.config("api-key") self.user_id = self.config("user-id") if self.category == "rule34": self._file_url = self._file_url_rule34 def _api_request(self, params): params["api_key"] = self.api_key params["user_id"] = self.user_id url = self.root_api + "/index.php?page=dapi&s=post&q=index" root = self.request_xml(url, params=params) if root.tag == "error": msg = root.text if msg.lower().startswith("missing authentication"): raise exception.AuthRequired( "'api-key' & 'user-id'", "the API", msg) raise exception.AbortExtraction(f"'{msg}'") return root def _pagination(self, params): params["pid"] = self.page_start params["limit"] = self.per_page post = total = None count = 0 while True: try: root = self._api_request(params) except SyntaxError: # ElementTree.ParseError if "tags" not in params or post is None: raise taglist = [tag for tag in params["tags"].split() if not tag.startswith("id:<")] taglist.append("id:<" + str(post.attrib["id"])) params["tags"] = " ".join(taglist) params["pid"] = 0 continue if total is None: try: self.kwdict["total"] = total = int(root.attrib["count"]) if "search_tags" in self.kwdict: self.kwdict["search_count"] = total self.log.debug("%s posts in total", total) except Exception as exc: total = 0 self.log.debug( "Failed to get total number of posts (%s: %s)", exc.__class__.__name__, exc) post = None for post in root: yield post.attrib num = len(root) count += num if num < self.per_page: if not total or count >= total: return if not num: self.log.debug("Empty response - Retrying") continue params["pid"] += 1 def _pagination_html(self, params): url = self.root + "/index.php" params["pid"] = self.page_start * self.per_page data = {} find_ids = text.re(r"\sid=\"p(\d+)").findall while True: page = self.request(url, params=params).text pids = find_ids(page) for data["id"] in pids: for post in self._api_request(data): yield post.attrib if len(pids) < self.per_page: return params["pid"] += self.per_page def _file_url_rule34(self, post): url = post["file_url"] if text.ext_from_url(url) not in util.EXTS_VIDEO: path = url.partition(".")[2] post["_fallback"] = (url,) post["file_url"] = url = "https://wimg." + path return url def _prepare(self, post): post["tags"] = post["tags"].strip() post["date"] = self.parse_datetime( post["created_at"], "%a %b %d %H:%M:%S %z %Y") def _html(self, post): url = f"{self.root}/index.php?page=post&s=view&id={post['id']}" return self.request(url).text def _tags(self, post, page): tag_container = (text.extr(page, '