aboutsummaryrefslogtreecommitdiffstats
path: root/gallery_dl/extractor/iwara.py
diff options
context:
space:
mode:
authorLibravatarUnit 193 <unit193@unit193.net>2025-07-31 01:22:07 -0400
committerLibravatarUnit 193 <unit193@unit193.net>2025-07-31 01:22:07 -0400
commitd9539f96cc7ac112b7d8faad022190fbbc88c745 (patch)
tree471249d60b9202c00d7d82abec8b296fc881292e /gallery_dl/extractor/iwara.py
parent889fc15f272118bf277737b6fac29d3faeffc641 (diff)
parenta6e995c093de8aae2e91a0787281bb34c0b871eb (diff)
Update upstream source from tag 'upstream/1.30.2'
Update to upstream version '1.30.2' with Debian dir f0dcd28a671f8600479182ff128e05ba8904a0d8
Diffstat (limited to 'gallery_dl/extractor/iwara.py')
-rw-r--r--gallery_dl/extractor/iwara.py440
1 files changed, 440 insertions, 0 deletions
diff --git a/gallery_dl/extractor/iwara.py b/gallery_dl/extractor/iwara.py
new file mode 100644
index 0000000..934b301
--- /dev/null
+++ b/gallery_dl/extractor/iwara.py
@@ -0,0 +1,440 @@
+# -*- coding: utf-8 -*-
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+
+"""Extractors for https://www.iwara.tv/"""
+
+from .common import Extractor, Message, Dispatch
+from .. import text, util, exception
+from ..cache import cache, memcache
+import hashlib
+
+BASE_PATTERN = r"(?:https?://)?(?:www\.)?iwara\.tv"
+USER_PATTERN = rf"{BASE_PATTERN}/profile/([^/?#]+)"
+
+
+class IwaraExtractor(Extractor):
+ """Base class for iwara.tv extractors"""
+ category = "iwara"
+ root = "https://www.iwara.tv"
+ directory_fmt = ("{category}", "{user[name]}")
+ filename_fmt = "{date} {id} {title[:200]} {filename}.{extension}"
+ archive_fmt = "{type} {user[name]} {id} {file_id}"
+
+ def _init(self):
+ self.api = IwaraAPI(self)
+
+ def items_image(self, images, user=None):
+ for image in images:
+ try:
+ if "image" in image:
+ # could extract 'date_favorited' here
+ image = image["image"]
+ if not (files := image.get("files")):
+ image = self.api.image(image["id"])
+ files = image["files"]
+
+ group_info = self.extract_media_info(image, "file", False)
+ group_info["user"] = (self.extract_user_info(image)
+ if user is None else user)
+ except Exception as exc:
+ self.status |= 1
+ self.log.error("Failed to process image %s (%s: %s)",
+ image["id"], exc.__class__.__name__, exc)
+ continue
+
+ group_info["count"] = len(files)
+ yield Message.Directory, group_info
+ for num, file in enumerate(files, 1):
+ file_info = self.extract_media_info(file, None)
+ file_id = file_info["file_id"]
+ url = (f"https://i.iwara.tv/image/original/"
+ f"{file_id}/{file_id}.{file_info['extension']}")
+ yield Message.Url, url, {**file_info, **group_info, "num": num}
+
+ def items_video(self, videos, user=None):
+ for video in videos:
+ try:
+ if "video" in video:
+ video = video["video"]
+ if "fileUrl" not in video:
+ video = self.api.video(video["id"])
+ file_url = video["fileUrl"]
+ sources = self.api.source(file_url)
+ source = next((s for s in sources
+ if s.get("name") == "Source"), None)
+ download_url = source.get('src', {}).get('download')
+
+ info = self.extract_media_info(video, "file")
+ info["count"] = info["num"] = 1
+ info["user"] = (self.extract_user_info(video)
+ if user is None else user)
+ except Exception as exc:
+ self.status |= 1
+ self.log.error("Failed to process video %s (%s: %s)",
+ video["id"], exc.__class__.__name__, exc)
+ continue
+
+ yield Message.Directory, info
+ yield Message.Url, f"https:{download_url}", info
+
+ def items_user(self, users, key=None):
+ base = f"{self.root}/profile/"
+ for user in users:
+ if key is not None:
+ user = user[key]
+ if (username := user["username"]) is None:
+ continue
+ user["type"] = "user"
+ user["_extractor"] = IwaraUserExtractor
+ yield Message.Queue, f"{base}{username}", user
+
+ def items_by_type(self, type, results):
+ if type == "image":
+ return self.items_image(results)
+ if type == "video":
+ return self.items_video(results)
+ if type == "user":
+ return self.items_user(results)
+
+ raise exception.AbortExtraction(f"Unsupported result type '{type}'")
+
+ def extract_media_info(self, item, key, include_file_info=True):
+ title = t.strip() if (t := item.get("title")) else ""
+
+ if include_file_info:
+ file_info = item if key is None else item.get(key) or {}
+ filename, _, extension = file_info.get("name", "").rpartition(".")
+
+ return {
+ "id" : item["id"],
+ "file_id" : file_info.get("id"),
+ "title" : title,
+ "filename" : filename,
+ "extension": extension,
+ "date" : text.parse_datetime(
+ file_info.get("createdAt"), "%Y-%m-%dT%H:%M:%S.%fZ"),
+ "date_updated": text.parse_datetime(
+ file_info.get("updatedAt"), "%Y-%m-%dT%H:%M:%S.%fZ"),
+ "mime" : file_info.get("mime"),
+ "size" : file_info.get("size"),
+ "width" : file_info.get("width"),
+ "height" : file_info.get("height"),
+ "duration" : file_info.get("duration"),
+ "type" : file_info.get("type"),
+ }
+ else:
+ return {
+ "id" : item["id"],
+ "title": title,
+ }
+
+ def extract_user_info(self, profile):
+ user = profile.get("user") or {}
+ return {
+ "id" : user.get("id"),
+ "name" : user.get("username"),
+ "nick" : user.get("name").strip(),
+ "status" : user.get("status"),
+ "role" : user.get("role"),
+ "premium": user.get("premium"),
+ "date" : text.parse_datetime(
+ user.get("createdAt"), "%Y-%m-%dT%H:%M:%S.000Z"),
+ "description": profile.get("body"),
+ }
+
+ def _user_params(self):
+ user, qs = self.groups
+ params = text.parse_query(qs)
+ profile = self.api.profile(user)
+ params["user"] = profile["user"]["id"]
+ return self.extract_user_info(profile), params
+
+
+class IwaraUserExtractor(Dispatch, IwaraExtractor):
+ """Extractor for iwara.tv profile pages"""
+ pattern = rf"{USER_PATTERN}/?$"
+ example = "https://www.iwara.tv/profile/USERNAME"
+
+ def items(self):
+ base = f"{self.root}/profile/{self.groups[0]}/"
+ return self._dispatch_extractors((
+ (IwaraUserImagesExtractor , f"{base}images"),
+ (IwaraUserVideosExtractor , f"{base}videos"),
+ (IwaraUserPlaylistsExtractor, f"{base}playlists"),
+ ), ("user-images", "user-videos"))
+
+
+class IwaraUserImagesExtractor(IwaraExtractor):
+ subcategory = "user-images"
+ pattern = rf"{USER_PATTERN}/images(?:\?([^#]+))?"
+ example = "https://www.iwara.tv/profile/USERNAME/images"
+
+ def items(self):
+ user, params = self._user_params()
+ return self.items_image(self.api.images(params), user)
+
+
+class IwaraUserVideosExtractor(IwaraExtractor):
+ subcategory = "user-videos"
+ pattern = rf"{USER_PATTERN}/videos(?:\?([^#]+))?"
+ example = "https://www.iwara.tv/profile/USERNAME/videos"
+
+ def items(self):
+ user, params = self._user_params()
+ return self.items_video(self.api.videos(params), user)
+
+
+class IwaraUserPlaylistsExtractor(IwaraExtractor):
+ subcategory = "user-playlists"
+ pattern = rf"{USER_PATTERN}/playlists(?:\?([^#]+))?"
+ example = "https://www.iwara.tv/profile/USERNAME/playlists"
+
+ def items(self):
+ base = f"{self.root}/playlist/"
+
+ for playlist in self.api.playlists(self._user_params()[1]):
+ playlist["type"] = "playlist"
+ playlist["_extractor"] = IwaraPlaylistExtractor
+ url = f"{base}{playlist['id']}"
+ yield Message.Queue, url, playlist
+
+
+class IwaraFollowingExtractor(IwaraExtractor):
+ subcategory = "following"
+ pattern = rf"{USER_PATTERN}/following"
+ example = "https://www.iwara.tv/profile/USERNAME/following"
+
+ def items(self):
+ uid = self.api.profile(self.groups[0])["user"]["id"]
+ return self.items_user(self.api.user_following(uid), "user")
+
+
+class IwaraFollowersExtractor(IwaraExtractor):
+ subcategory = "followers"
+ pattern = rf"{USER_PATTERN}/followers"
+ example = "https://www.iwara.tv/profile/USERNAME/followers"
+
+ def items(self):
+ uid = self.api.profile(self.groups[0])["user"]["id"]
+ return self.items_user(self.api.user_followers(uid), "follower")
+
+
+class IwaraImageExtractor(IwaraExtractor):
+ """Extractor for individual iwara.tv image pages"""
+ subcategory = "image"
+ pattern = rf"{BASE_PATTERN}/image/([^/?#]+)"
+ example = "https://www.iwara.tv/image/ID"
+
+ def items(self):
+ return self.items_image((self.api.image(self.groups[0]),))
+
+
+class IwaraVideoExtractor(IwaraExtractor):
+ """Extractor for individual iwara.tv videos"""
+ subcategory = "video"
+ pattern = rf"{BASE_PATTERN}/video/([^/?#]+)"
+ example = "https://www.iwara.tv/video/ID"
+
+ def items(self):
+ return self.items_video((self.api.video(self.groups[0]),))
+
+
+class IwaraPlaylistExtractor(IwaraExtractor):
+ """Extractor for individual iwara.tv playlist pages"""
+ subcategory = "playlist"
+ pattern = rf"{BASE_PATTERN}/playlist/([^/?#]+)"
+ example = "https://www.iwara.tv/playlist/ID"
+
+ def items(self):
+ return self.items_video(self.api.playlist(self.groups[0]))
+
+
+class IwaraFavoriteExtractor(IwaraExtractor):
+ subcategory = "favorite"
+ pattern = rf"{BASE_PATTERN}/favorites(?:/(image|video)s)?"
+ example = "https://www.iwara.tv/favorites/videos"
+
+ def items(self):
+ type = self.groups[0] or "vidoo"
+ return self.items_by_type(type, self.api.favorites(type))
+
+
+class IwaraSearchExtractor(IwaraExtractor):
+ """Extractor for iwara.tv search pages"""
+ subcategory = "search"
+ pattern = rf"{BASE_PATTERN}/search\?([^#]+)"
+ example = "https://www.iwara.tv/search?query=QUERY&type=TYPE"
+
+ def items(self):
+ params = text.parse_query(self.groups[0])
+ type = params.get("type")
+ self.kwdict["search_tags"] = query = params.get("query")
+ return self.items_by_type(type, self.api.search(type, query))
+
+
+class IwaraTagExtractor(IwaraExtractor):
+ """Extractor for iwara.tv tag search"""
+ subcategory = "tag"
+ pattern = rf"{BASE_PATTERN}/(image|video)s(?:\?([^#]+))?"
+ example = "https://www.iwara.tv/videos?tags=TAGS"
+
+ def items(self):
+ type, qs = self.groups
+ params = text.parse_query(qs)
+ self.kwdict["search_tags"] = params.get("tags")
+ return self.items_by_type(type, self.api.media(type, params))
+
+
+class IwaraAPI():
+ """Interface for the Iwara API"""
+ root = "https://api.iwara.tv"
+
+ def __init__(self, extractor):
+ self.extractor = extractor
+ self.headers = {
+ "Referer" : f"{extractor.root}/",
+ "Content-Type": "application/json",
+ "Origin" : extractor.root,
+ }
+
+ self.username, self.password = extractor._get_auth_info()
+ if not self.username:
+ self.authenticate = util.noop
+
+ def image(self, image_id):
+ endpoint = f"/image/{image_id}"
+ return self._call(endpoint)
+
+ def video(self, video_id):
+ endpoint = f"/video/{video_id}"
+ return self._call(endpoint)
+
+ def playlist(self, playlist_id):
+ endpoint = f"/playlist/{playlist_id}"
+ return self._pagination(endpoint)
+
+ def detail(self, media):
+ endpoint = f"/{media['type']}/{media['id']}"
+ return self._call(endpoint)
+
+ def images(self, params):
+ endpoint = "/images"
+ params.setdefault("rating", "all")
+ return self._pagination(endpoint, params)
+
+ def videos(self, params):
+ endpoint = "/videos"
+ params.setdefault("rating", "all")
+ return self._pagination(endpoint, params)
+
+ def playlists(self, params):
+ endpoint = "/playlists"
+ return self._pagination(endpoint, params)
+
+ def media(self, type, params):
+ endpoint = f"/{type}s"
+ params.setdefault("rating", "all")
+ return self._pagination(endpoint, params)
+
+ def favorites(self, type):
+ if not self.username:
+ raise exception.AuthRequired("'username' & 'password'")
+ endpoint = f"/favorites/{type}s"
+ return self._pagination(endpoint)
+
+ def search(self, type, query):
+ endpoint = "/search"
+ params = {"type": type, "query": query}
+ return self._pagination(endpoint, params)
+
+ @memcache(keyarg=1)
+ def profile(self, username):
+ endpoint = f"/profile/{username}"
+ return self._call(endpoint)
+
+ def user_following(self, user_id):
+ endpoint = f"/user/{user_id}/following"
+ return self._pagination(endpoint)
+
+ def user_followers(self, user_id):
+ endpoint = f"/user/{user_id}/followers"
+ return self._pagination(endpoint)
+
+ def source(self, file_url):
+ base, _, query = file_url.partition("?")
+ if not (expires := text.extr(query, "expires=", "&")):
+ return ()
+ file_id = base.rpartition("/")[2]
+ sha_postfix = "5nFp9kmbNnHdAFhaqMvt"
+ sha_key = f"{file_id}_{expires}_{sha_postfix}"
+ hash = hashlib.sha1(sha_key.encode()).hexdigest()
+ headers = {"X-Version": hash, **self.headers}
+ return self.extractor.request_json(file_url, headers=headers)
+
+ def authenticate(self):
+ self.headers["Authorization"] = self._authenticate_impl(self.username)
+
+ @cache(maxage=3600, keyarg=1)
+ def _authenticate_impl(self, username):
+ refresh_token = _refresh_token_cache(username)
+ if refresh_token is None:
+ self.extractor.log.info("Logging in as %s", username)
+
+ url = f"{self.root}/user/login"
+ json = {
+ "email" : username,
+ "password": self.password
+ }
+ data = self.extractor.request_json(
+ url, method="POST", headers=self.headers, json=json,
+ fatal=False)
+
+ if not (refresh_token := data.get("token")):
+ self.extractor.log.debug(data)
+ raise exception.AuthenticationError(data.get("message"))
+ _refresh_token_cache.update(username, refresh_token)
+
+ self.extractor.log.info("Refreshing access token for %s", username)
+
+ url = f"{self.root}/user/token"
+ headers = {"Authorization": f"Bearer {refresh_token}", **self.headers}
+ data = self.extractor.request_json(
+ url, method="POST", headers=headers, fatal=False)
+
+ if not (access_token := data.get("accessToken")):
+ self.extractor.log.debug(data)
+ raise exception.AuthenticationError(data.get("message"))
+ return f"Bearer {access_token}"
+
+ def _call(self, endpoint, params=None, headers=None):
+ if headers is None:
+ headers = self.headers
+
+ url = self.root + endpoint
+ self.authenticate()
+ return self.extractor.request_json(url, params=params, headers=headers)
+
+ def _pagination(self, endpoint, params=None):
+ if params is None:
+ params = {}
+ params["page"] = 0
+ params["limit"] = 50
+
+ while True:
+ data = self._call(endpoint, params)
+
+ if not (results := data.get("results")):
+ break
+ yield from results
+
+ if len(results) < params["limit"]:
+ break
+ params["page"] += 1
+
+
+@cache(maxage=28*86400, keyarg=0)
+def _refresh_token_cache(username):
+ return None