diff options
Diffstat (limited to 'gallery_dl/extractor/schalenetwork.py')
| -rw-r--r-- | gallery_dl/extractor/schalenetwork.py | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/gallery_dl/extractor/schalenetwork.py b/gallery_dl/extractor/schalenetwork.py new file mode 100644 index 0000000..d517287 --- /dev/null +++ b/gallery_dl/extractor/schalenetwork.py @@ -0,0 +1,246 @@ +# -*- coding: utf-8 -*- + +# Copyright 2024-2025 Mike Fährmann +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. + +"""Extractors for https://niyaniya.moe/""" + +from .common import GalleryExtractor, Extractor, Message +from .. import text, exception +from ..cache import cache +import collections + +BASE_PATTERN = ( + r"(?i)(?:https?://)?(" + r"(?:niyaniya|shupogaki)\.moe|" + r"(?:koharu|anchira|seia)\.to|" + r"(?:hoshino)\.one" + r")" +) + + +class SchalenetworkExtractor(Extractor): + """Base class for schale.network extractors""" + category = "schalenetwork" + root = "https://niyaniya.moe" + root_api = "https://api.schale.network" + request_interval = (0.5, 1.5) + + def _init(self): + self.headers = { + "Accept" : "*/*", + "Referer": self.root + "/", + "Origin" : self.root, + } + + def _pagination(self, endpoint, params): + url_api = self.root_api + endpoint + + while True: + data = self.request_json( + url_api, params=params, headers=self.headers) + + try: + entries = data["entries"] + except KeyError: + return + + for entry in entries: + url = f"{self.root}/g/{entry['id']}/{entry['public_key']}" + entry["_extractor"] = SchalenetworkGalleryExtractor + yield Message.Queue, url, entry + + try: + if data["limit"] * data["page"] >= data["total"]: + return + except Exception: + pass + params["page"] += 1 + + +class SchalenetworkGalleryExtractor(SchalenetworkExtractor, GalleryExtractor): + """Extractor for schale.network galleries""" + filename_fmt = "{num:>03}.{extension}" + directory_fmt = ("{category}", "{id} {title}") + archive_fmt = "{id}_{num}" + request_interval = 0.0 + pattern = BASE_PATTERN + r"/(?:g|reader)/(\d+)/(\w+)" + example = "https://niyaniya.moe/g/12345/67890abcde/" + + TAG_TYPES = { + 0 : "general", + 1 : "artist", + 2 : "circle", + 3 : "parody", + 4 : "magazine", + 5 : "character", + 6 : "", + 7 : "uploader", + 8 : "male", + 9 : "female", + 10: "mixed", + 11: "language", + 12: "other", + } + + def __init__(self, match): + GalleryExtractor.__init__(self, match) + self.page_url = None + + def _init(self): + self.headers = { + "Accept" : "*/*", + "Referer": self.root + "/", + "Origin" : self.root, + } + + self.fmt = self.config("format") + self.cbz = self.config("cbz", True) + + if self.cbz: + self.filename_fmt = "{id} {title}.{extension}" + self.directory_fmt = ("{category}",) + + def metadata(self, _): + url = f"{self.root_api}/books/detail/{self.groups[1]}/{self.groups[2]}" + self.data = data = self.request_json(url, headers=self.headers) + data["date"] = text.parse_timestamp(data["created_at"] // 1000) + + tags = [] + types = self.TAG_TYPES + tags_data = data["tags"] + + for tag in tags_data: + name = tag["name"] + namespace = tag.get("namespace", 0) + tags.append(types[namespace] + ":" + name) + data["tags"] = tags + + if self.config("tags", False): + tags = collections.defaultdict(list) + for tag in tags_data : + tags[tag.get("namespace", 0)].append(tag["name"]) + for type, values in tags.items(): + data["tags_" + types[type]] = values + + try: + if self.cbz: + data["count"] = len(data["thumbnails"]["entries"]) + del data["thumbnails"] + del data["rels"] + except Exception: + pass + + return data + + def images(self, _): + data = self.data + fmt = self._select_format(data["data"]) + + url = (f"{self.root_api}/books/data/{data['id']}/" + f"{data['public_key']}/{fmt['id']}/{fmt['public_key']}") + params = { + "v": data["updated_at"], + "w": fmt["w"], + } + + if self.cbz: + params["action"] = "dl" + base = self.request_json( + url, method="POST", params=params, headers=self.headers, + )["base"] + url = f"{base}?v={data['updated_at']}&w={fmt['w']}" + info = text.nameext_from_url(base) + if not info["extension"]: + info["extension"] = "cbz" + return ((url, info),) + + data = self.request_json(url, params=params, headers=self.headers) + base = data["base"] + + results = [] + for entry in data["entries"]: + dimensions = entry["dimensions"] + info = { + "w": dimensions[0], + "h": dimensions[1], + "_http_headers": self.headers, + } + results.append((base + entry["path"], info)) + return results + + def _select_format(self, formats): + fmt = self.fmt + + if not fmt or fmt == "best": + fmtids = ("0", "1600", "1280", "980", "780") + elif isinstance(fmt, str): + fmtids = fmt.split(",") + elif isinstance(fmt, list): + fmtids = fmt + else: + fmtids = (str(self.fmt),) + + for fmtid in fmtids: + try: + fmt = formats[fmtid] + if fmt["id"]: + break + except KeyError: + self.log.debug("%s: Format %s is not available", + self.groups[1], fmtid) + else: + raise exception.NotFoundError("format") + + self.log.debug("%s: Selected format %s", self.groups[1], fmtid) + fmt["w"] = fmtid + return fmt + + +class SchalenetworkSearchExtractor(SchalenetworkExtractor): + """Extractor for schale.network search results""" + subcategory = "search" + pattern = BASE_PATTERN + r"/\?([^#]*)" + example = "https://niyaniya.moe/?s=QUERY" + + def items(self): + params = text.parse_query(self.groups[1]) + params["page"] = text.parse_int(params.get("page"), 1) + return self._pagination("/books", params) + + +class SchalenetworkFavoriteExtractor(SchalenetworkExtractor): + """Extractor for schale.network favorites""" + subcategory = "favorite" + pattern = BASE_PATTERN + r"/favorites(?:\?([^#]*))?" + example = "https://niyaniya.moe/favorites" + + def items(self): + self.login() + + params = text.parse_query(self.groups[1]) + params["page"] = text.parse_int(params.get("page"), 1) + return self._pagination("/favorites", params) + + def login(self): + username, password = self._get_auth_info() + if username: + self.headers["Authorization"] = \ + "Bearer " + self._login_impl(username, password) + return + + raise exception.AuthenticationError("Username and password required") + + @cache(maxage=86400, keyarg=1) + def _login_impl(self, username, password): + self.log.info("Logging in as %s", username) + + url = "https://auth.schale.network/login" + data = {"uname": username, "passwd": password} + response = self.request( + url, method="POST", headers=self.headers, data=data) + + return response.json()["session"] |
