diff options
| author | 2025-09-07 20:40:45 -0400 | |
|---|---|---|
| committer | 2025-09-07 20:40:45 -0400 | |
| commit | 243b2597edb922fe7e0b0d887e80bb7ebbe72ab7 (patch) | |
| tree | 1a42ddec8ae1f21e3c4c88849818e1ea9140aaaa /gallery_dl/extractor/fansly.py | |
| parent | 1df55d9de48105dace9cc16f1511dba3c9a6da6f (diff) | |
New upstream version 1.30.6.upstream/1.30.6
Diffstat (limited to 'gallery_dl/extractor/fansly.py')
| -rw-r--r-- | gallery_dl/extractor/fansly.py | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/gallery_dl/extractor/fansly.py b/gallery_dl/extractor/fansly.py new file mode 100644 index 0000000..31d242f --- /dev/null +++ b/gallery_dl/extractor/fansly.py @@ -0,0 +1,318 @@ +# -*- coding: utf-8 -*- + +# Copyright 2025 Mike Fährmann +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. + +"""Extractors for https://fansly.com/""" + +from .common import Extractor, Message +from .. import text, util +import time + +BASE_PATTERN = r"(?:https?://)?(?:www\.)?fansly\.com" + + +class FanslyExtractor(Extractor): + """Base class for fansly extractors""" + category = "fansly" + root = "https://fansly.com" + directory_fmt = ("{category}", "{account[username]} ({account[id]})") + filename_fmt = "{id}_{num}_{file[id]}.{extension}" + archive_fmt = "{file[id]}" + + def _init(self): + self.api = FanslyAPI(self) + self.formats = self.config("format") or (303, 302, 1, 2, 4) + + def items(self): + for post in self.posts(): + files = self._extract_files(post) + post["count"] = len(files) + post["date"] = text.parse_timestamp(post["createdAt"]) + + yield Message.Directory, post + for post["num"], file in enumerate(files, 1): + post.update(file) + url = file["url"] + yield Message.Url, url, text.nameext_from_url(url, post) + + def _extract_files(self, post): + files = [] + for attachment in post.pop("attachments"): + try: + self._extract_attachment(files, post, attachment) + except Exception as exc: + self.log.debug("", exc_info=exc) + self.log.error( + "%s/%s, Failed to extract media (%s: %s)", + post["id"], attachment.get("id"), + exc.__class__.__name__, exc) + return files + + def _extract_attachment(self, files, post, attachment): + media = attachment["media"] + variants = { + variant["type"]: variant + for variant in media.pop("variants", ()) + } + variants[media["type"]] = media + + for fmt in self.formats: + if fmt in variants and (variant := variants[fmt]).get("locations"): + break + else: + return self.log.warning( + "%s/%s: Requested format not available", + post["id"], attachment["id"]) + + mime = variant["mimetype"] + location = variant.pop("locations")[0] + if "metadata" in variant: + try: + variant.update(util.json_loads(variant.pop("metadata"))) + except Exception: + pass + + file = { + **variant, + "format": fmt, + "date": text.parse_timestamp(media["createdAt"]), + "date_updated": text.parse_timestamp(media["updatedAt"]), + } + + if "metadata" in location: + # manifest + meta = location["metadata"] + + file["type"] = "video" + files.append({ + "file": file, + "url": f"ytdl:{location['location']}", + # "_fallback": (media["locations"][0]["location"],), + "_ytdl_manifest": + "dash" if mime == "application/dash+xml" else "hls", + "_ytdl_manifest_cookies": ( + ("CloudFront-Key-Pair-Id", meta["Key-Pair-Id"]), + ("CloudFront-Signature" , meta["Signature"]), + ("CloudFront-Policy" , meta["Policy"]), + ), + }) + else: + file["type"] = "image" if mime.startswith("image/") else "video" + files.append({ + "file": file, + "url" : location["location"], + }) + + +class FanslyPostExtractor(FanslyExtractor): + subcategory = "post" + pattern = rf"{BASE_PATTERN}/post/(\d+)" + example = "https://fansly.com/post/1234567890" + + def posts(self): + return self.api.post(self.groups[0]) + + +class FanslyHomeExtractor(FanslyExtractor): + subcategory = "home" + pattern = rf"{BASE_PATTERN}/home(?:/(?:subscribed()|list/(\d+)))?" + example = "https://fansly.com/home" + + def posts(self): + subscribed, list_id = self.groups + if subscribed is not None: + mode = "1" + elif list_id is not None: + mode = None + else: + mode = "0" + return self.api.timeline_home(mode, list_id) + + +class FanslyListExtractor(FanslyExtractor): + subcategory = "list" + pattern = rf"{BASE_PATTERN}/lists/(\d+)" + example = "https://fansly.com/lists/1234567890" + + def items(self): + base = f"{self.root}/" + for account in self.api.lists_itemsnew(self.groups[0]): + account["_extractor"] = FanslyCreatorPostsExtractor + url = f"{base}{account['username']}/posts" + yield Message.Queue, url, account + + +class FanslyListsExtractor(FanslyExtractor): + subcategory = "lists" + pattern = rf"{BASE_PATTERN}/lists" + example = "https://fansly.com/lists" + + def items(self): + base = f"{self.root}/lists/" + for list in self.api.lists_account(): + list["_extractor"] = FanslyListExtractor + url = f"{base}{list['id']}#{list['label']}" + yield Message.Queue, url, list + + +class FanslyCreatorPostsExtractor(FanslyExtractor): + subcategory = "creator-posts" + pattern = rf"{BASE_PATTERN}/([^/?#]+)/posts" + example = "https://fansly.com/CREATOR/posts" + + def posts(self): + creator = self.groups[0] + if creator.startswith("id:"): + account = self.api.account_by_id(creator[3:]) + else: + account = self.api.account(creator) + wall_id = account["walls"][0]["id"] + return self.api.timeline_new(account["id"], wall_id) + + +class FanslyAPI(): + ROOT = "https://apiv3.fansly.com" + + def __init__(self, extractor): + self.extractor = extractor + + token = extractor.config("token") + if not token: + self.extractor.log.warning("No 'token' provided") + + self.headers = { + "fansly-client-ts": None, + "Origin" : extractor.root, + "authorization" : token, + } + + def account(self, username): + endpoint = "/v1/account" + params = {"usernames": username} + return self._call(endpoint, params)[0] + + def account_by_id(self, account_id): + endpoint = "/v1/account" + params = {"ids": account_id} + return self._call(endpoint, params)[0] + + def accounts_by_id(self, account_ids): + endpoint = "/v1/account" + params = {"ids": ",".join(map(str, account_ids))} + return self._call(endpoint, params) + + def lists_account(self): + endpoint = "/v1/lists/account" + params = {"itemId": ""} + return self._call(endpoint, params) + + def lists_itemsnew(self, list_id, sort="3"): + endpoint = "/v1/lists/itemsnew" + params = { + "listId" : list_id, + "limit" : 50, + "after" : None, + "sortMode": sort, + } + return self._pagination(endpoint, params) + + def post(self, post_id): + endpoint = "/v1/post" + params = {"ids": post_id} + return self._update_posts(self._call(endpoint, params)) + + def timeline_home(self, mode="0", list_id=None): + endpoint = "/v1/timeline/home" + params = {"before": "0", "after": "0"} + if list_id is None: + params["mode"] = mode + else: + params["listId"] = list_id + return self._pagination(endpoint, params) + + def timeline_new(self, account_id, wall_id): + endpoint = f"/v1/timelinenew/{account_id}" + params = { + "before" : "0", + "after" : "0", + "wallId" : wall_id, + "contentSearch": "", + } + return self._pagination(endpoint, params) + + def _update_posts(self, response): + accounts = { + account["id"]: account + for account in response["accounts"] + } + media = { + media["id"]: media + for media in response["accountMedia"] + } + bundles = { + bundle["id"]: bundle + for bundle in response["accountMediaBundles"] + } + + posts = response["posts"] + for post in posts: + post["account"] = accounts[post.pop("accountId")] + + attachments = [] + for attachment in post["attachments"]: + cid = attachment["contentId"] + if cid in media: + attachments.append(media[cid]) + elif cid in bundles: + bundle = bundles[cid]["bundleContent"] + bundle.sort(key=lambda c: c["pos"]) + attachments.extend( + media[m["accountMediaId"]] + for m in bundle + if m["accountMediaId"] in media + ) + else: + self.extractor.log.warning( + "%s: Unhandled 'contentId' %s", + post["id"], cid) + post["attachments"] = attachments + return posts + + def _update_items(self, items): + ids = [item["id"] for item in items] + accounts = { + account["id"]: account + for account in self.accounts_by_id(ids) + } + return [accounts[id] for id in ids] + + def _call(self, endpoint, params): + url = f"{self.ROOT}/api{endpoint}" + params["ngsw-bypass"] = "true" + headers = self.headers.copy() + headers["fansly-client-ts"] = str(int(time.time() * 1000)) + + data = self.extractor.request_json( + url, params=params, headers=headers) + return data["response"] + + def _pagination(self, endpoint, params): + while True: + response = self._call(endpoint, params) + + if isinstance(response, list): + if not response: + return + yield from self._update_items(response) + params["after"] = response[-1]["sortId"] + + else: + if not response.get("posts"): + return + posts = self._update_posts(response) + yield from posts + params["before"] = min(p["id"] for p in posts) |
