diff options
Diffstat (limited to 'gallery_dl/extractor/tiktok.py')
| -rw-r--r-- | gallery_dl/extractor/tiktok.py | 1205 |
1 files changed, 1123 insertions, 82 deletions
diff --git a/gallery_dl/extractor/tiktok.py b/gallery_dl/extractor/tiktok.py index a4c7171..daf2b69 100644 --- a/gallery_dl/extractor/tiktok.py +++ b/gallery_dl/extractor/tiktok.py @@ -6,10 +6,15 @@ """Extractors for https://www.tiktok.com/""" -from .common import Extractor, Message +from .common import Extractor, Message, Dispatch from .. import text, util, ytdl, exception +import functools +import itertools +import random +import time BASE_PATTERN = r"(?:https?://)?(?:www\.)?tiktokv?\.com" +USER_PATTERN = BASE_PATTERN + r"/@([\w_.-]+)" class TiktokExtractor(Extractor): @@ -17,18 +22,24 @@ class TiktokExtractor(Extractor): category = "tiktok" directory_fmt = ("{category}", "{user}") filename_fmt = ( - "{id}{num:?_//>02} {title[b:150]}{img_id|audio_id:? [/]/}.{extension}") - archive_fmt = "{id}_{num}_{img_id}" + "{id}{num:?_//>02} {title[b:150]}{file_id:? [/]/}.{extension}") + archive_fmt = "{id}_{num}_{file_id}" root = "https://www.tiktok.com" cookies_domain = ".tiktok.com" + rehydration_data_cache = {} + rehydration_data_app_context_cache = {} def _init(self): + self.photo = self.config("photos", True) self.audio = self.config("audio", True) self.video = self.config("videos", True) self.cover = self.config("covers", False) + self.range = self.config("tiktok-range") or "" + self.range_predicate = util.RangePredicate(self.range) + def items(self): - for tiktok_url in self.urls(): + for tiktok_url in self.posts(): tiktok_url = self._sanitize_url(tiktok_url) data = self._extract_rehydration_data(tiktok_url) if "webapp.video-detail" not in data: @@ -39,7 +50,7 @@ class TiktokExtractor(Extractor): data = self._extract_rehydration_data(tiktok_url) video_detail = data["webapp.video-detail"] - if not self._check_status_code(video_detail, tiktok_url): + if not self._check_status_code(video_detail, tiktok_url, "post"): continue post = video_detail["itemInfo"]["itemStruct"] @@ -51,22 +62,23 @@ class TiktokExtractor(Extractor): ytdl_media = False if "imagePost" in post: - if not original_title: - title = f"TikTok photo #{post['id']}" - img_list = post["imagePost"]["images"] - for i, img in enumerate(img_list, 1): - url = img["imageURL"]["urlList"][0] - text.nameext_from_url(url, post) - post.update({ - "type" : "image", - "image" : img, - "title" : title, - "num" : i, - "img_id": post["filename"].partition("~")[0], - "width" : img["imageWidth"], - "height": img["imageHeight"], - }) - yield Message.Url, url, post + if self.photo: + if not original_title: + title = f"TikTok photo #{post['id']}" + img_list = post["imagePost"]["images"] + for i, img in enumerate(img_list, 1): + url = img["imageURL"]["urlList"][0] + text.nameext_from_url(url, post) + post.update({ + "type" : "image", + "image" : img, + "title" : title, + "num" : i, + "file_id": post["filename"].partition("~")[0], + "width" : img["imageWidth"], + "height": img["imageHeight"], + }) + yield Message.Url, url, post if self.audio and "music" in post: if self.audio == "ytdl": @@ -75,8 +87,10 @@ class TiktokExtractor(Extractor): yield Message.Url, url, post elif "video" in post: - if self.video: + if self.video == "ytdl": ytdl_media = "video" + elif self.video and (url := self._extract_video(post)): + yield Message.Url, url, post if self.cover and (url := self._extract_cover(post, "video")): yield Message.Url, url, post @@ -93,7 +107,7 @@ class TiktokExtractor(Extractor): "extension" : "mp3" if ytdl_media == "audio" else "mp4", "title" : title, "num" : 0, - "img_id" : "", + "file_id" : "", "width" : 0, "height" : 0, }) @@ -102,7 +116,8 @@ class TiktokExtractor(Extractor): def _sanitize_url(self, url): return text.ensure_http_scheme(url.replace("/photo/", "/video/", 1)) - def _extract_rehydration_data(self, url): + def _extract_rehydration_data(self, url, additional_keys=[], *, + has_keys=[]): tries = 0 while True: try: @@ -115,8 +130,14 @@ class TiktokExtractor(Extractor): data = text.extr( html, '<script id="__UNIVERSAL_DATA_FOR_REHYDRATION__" ' 'type="application/json">', '</script>') - return util.json_loads(data)["__DEFAULT_SCOPE__"] - except ValueError: + data = util.json_loads(data)["__DEFAULT_SCOPE__"] + for key in additional_keys: + data = data[key] + for assert_key in has_keys: + if assert_key not in data: + raise KeyError(assert_key) + return data + except (ValueError, KeyError): # We failed to retrieve rehydration data. This happens # relatively frequently when making many requests, so # retry. @@ -128,6 +149,88 @@ class TiktokExtractor(Extractor): self._retries) self.sleep(self._timeout, "retry") + def _extract_rehydration_data_user(self, profile_url, additional_keys=()): + if profile_url in self.rehydration_data_cache: + data = self.rehydration_data_cache[profile_url] + else: + data = self._extract_rehydration_data( + profile_url, + has_keys=["webapp.user-detail", "webapp.app-context"] + ) + self.rehydration_data_cache[profile_url] = \ + data["webapp.user-detail"] + self.rehydration_data_app_context_cache = \ + data["webapp.app-context"] + data = data["webapp.user-detail"] + if not self._check_status_code(data, profile_url, "profile"): + raise exception.ExtractionError( + "%s: could not extract rehydration data", profile_url) + try: + for key in additional_keys: + data = data[key] + except KeyError as exc: + self.log.traceback(exc) + raise exception.ExtractionError( + "%s: could not extract rehydration data (%s)", + profile_url, ", ".join(additional_keys)) + return data + + def _ensure_rehydration_data_app_context_cache_is_populated(self): + if not self.rehydration_data_app_context_cache: + self.rehydration_data_app_context_cache = \ + self._extract_rehydration_data_user( + "https://www.tiktok.com/", ["webapp.app-context"]) + + def _extract_sec_uid(self, profile_url, user_name): + sec_uid = self._extract_id( + profile_url, user_name, r"MS4wLjABAAAA[\w-]{64}", "secUid") + if sec_uid is None: + raise exception.AbortExtraction( + f"{user_name}: unable to extract secondary user ID") + return sec_uid + + def _extract_author_id(self, profile_url, user_name): + author_id = self._extract_id( + profile_url, user_name, r"[0-9]+", "id") + if author_id is None: + raise exception.AbortExtraction( + f"{user_name}: unable to extract user ID") + return author_id + + def _extract_id(self, profile_url, user_name, regex, id_key): + match = text.re(regex).fullmatch + + if match(user_name) is not None: + # If it was provided in the URL, then we can skip extracting it + # from the rehydration data. + return user_name + + id = self._extract_rehydration_data_user( + profile_url, ("userInfo", "user", id_key)) + return None if match(id) is None else id + + def _extract_video(self, post): + video = post["video"] + try: + url = video["playAddr"] + except KeyError: + raise exception.ExtractionError("Failed to extract video URL, you " + "may need cookies to continue") + text.nameext_from_url(url, post) + post.update({ + "type" : "video", + "image" : None, + "title" : post["desc"] or f"TikTok video #{post['id']}", + "duration" : video.get("duration"), + "num" : 0, + "file_id" : video.get("id"), + "width" : video.get("width"), + "height" : video.get("height"), + }) + if not post["extension"]: + post["extension"] = video.get("format", "mp4") + return url + def _extract_audio(self, post): audio = post["music"] url = audio["playUrl"] @@ -138,8 +241,7 @@ class TiktokExtractor(Extractor): "title" : post["desc"] or f"TikTok audio #{post['id']}", "duration" : audio.get("duration"), "num" : 0, - "img_id" : "", - "audio_id" : audio.get("id"), + "file_id" : audio.get("id"), "width" : 0, "height" : 0, }) @@ -164,22 +266,38 @@ class TiktokExtractor(Extractor): "title" : post["desc"] or f"TikTok {type} cover #{post['id']}", "duration" : media.get("duration"), "num" : 0, - "img_id" : "", - "cover_id" : cover_id, + "file_id" : cover_id, "width" : 0, "height" : 0, }) return url - def _check_status_code(self, detail, url): + def _check_status_code(self, detail, url, type_of_url): status = detail.get("statusCode") if not status: return True if status == 10222: - self.log.error("%s: Login required to access this post", url) + # Video count workaround ported from yt-dlp: sometimes TikTok + # reports a profile as private even though we have the cookies to + # access it. We know that we can access it if we can see the + # videos stats. If we can't, we assume that we don't have access + # to the profile. + # We only care about this workaround for webapp.user-detail + # objects, so always fail the workaround for e.g. + # webapp.video-detail objects. + video_count = self._extract_video_count_from_user_detail(detail) + if video_count is None: + self.log.error("%s: Login required to access this %s", url, + type_of_url) + elif video_count > 0: + return True + else: + self.log.error("%s: Login required to access this %s, or this " + "profile has no videos posted", url, + type_of_url) elif status == 10204: - self.log.error("%s: Requested post not available", url) + self.log.error("%s: Requested %s not available", url, type_of_url) elif status == 10231: self.log.error("%s: Region locked - Try downloading with a " "VPN/proxy connection", url) @@ -189,14 +307,26 @@ class TiktokExtractor(Extractor): url, status, detail.get("statusMsg") or "") return False + def _extract_video_count_from_user_detail(self, detail): + user_info = detail.get("userInfo") + if not user_info: + return None + stats = user_info.get("stats") or user_info.get("statsV2") + try: + # stats.videoCount is an int, but statsV2.videoCount is a + # string, so we must explicitly convert the attribute. + return int(stats["videoCount"]) + except (KeyError, ValueError): + return None + class TiktokPostExtractor(TiktokExtractor): """Extract a single video or photo TikTok link""" subcategory = "post" - pattern = rf"{BASE_PATTERN}/(?:@([\w_.-]*)|share)/(?:phot|vide)o/(\d+)" + pattern = BASE_PATTERN + r"/(?:@([\w_.-]*)|share)/(?:phot|vide)o/(\d+)" example = "https://www.tiktok.com/@USER/photo/1234567890" - def urls(self): + def posts(self): user, post_id = self.groups url = f"{self.root}/@{user or ''}/video/{post_id}" return (url,) @@ -223,31 +353,92 @@ class TiktokVmpostExtractor(TiktokExtractor): yield Message.Queue, url.partition("?")[0], data -class TiktokUserExtractor(TiktokExtractor): - """Extract a TikTok user's profile""" - subcategory = "user" - pattern = rf"{BASE_PATTERN}/@([\w_.-]+)/?(?:$|\?|#)" +class TiktokUserExtractor(Dispatch, TiktokExtractor): + """Extractor for a TikTok user profile""" + pattern = USER_PATTERN + r"/?(?:$|\?|#)" example = "https://www.tiktok.com/@USER" - def _init(self): - self.avatar = self.config("avatar", True) + def items(self): + base = f"{self.root}/@{self.groups[0]}/" + return self._dispatch_extractors(( + (TiktokAvatarExtractor , base + "avatar"), + (TiktokPostsExtractor , base + "posts"), + (TiktokRepostsExtractor, base + "reposts"), + (TiktokStoriesExtractor, base + "stories"), + (TiktokLikesExtractor , base + "likes"), + (TiktokSavedExtractor , base + "saved"), + ), ("avatar", "posts")) + + +class TiktokAvatarExtractor(TiktokExtractor): + subcategory = "avatar" + pattern = USER_PATTERN + r"/avatar" + example = "https://www.tiktok.com/@USER/avatar" def items(self): - """Attempt to use yt-dlp/youtube-dl to extract links from a - user's page""" + user_name = self.groups[0] + profile_url = f"{self.root}/@{user_name}" + + data = self._extract_rehydration_data_user( + profile_url, ("userInfo", "user")) + data["user"] = data.get("uniqueId", user_name) + avatar_url = data.get("avatarLarger") or data.get("avatarMedium") \ + or data["avatarThumb"] + avatar = text.nameext_from_url(avatar_url, data.copy()) + avatar.update({ + "type" : "avatar", + "title" : "@" + data["user"], + "id" : data["id"], + "file_id": avatar["filename"].partition("~")[0], + "num" : 0, + }) + yield Message.Directory, "", avatar + yield Message.Url, avatar_url, avatar + + +class TiktokPostsExtractor(TiktokExtractor): + subcategory = "posts" + pattern = USER_PATTERN + r"/posts" + example = "https://www.tiktok.com/@USER/posts" + + def posts(self): + user_name = self.groups[0] + profile_url = f"{self.root}/@{user_name}" + self.user_provided_cookies = bool(self.cookies) + + # If set to "ytdl", we shall first go via yt-dlp. If that fails, + # we shall attempt to extract directly. + if self.config("ytdl", False): + if posts := self._extract_posts_ytdl(profile_url): + return posts + ytdl = True + self.log.warning("Could not extract TikTok user " + f"{user_name} via yt-dlp or youtube-dl, " + "attempting the extraction directly") + else: + ytdl = False + + if posts := self._extract_posts_api(profile_url, user_name): + return posts + + message = "Could not extract any posts from TikTok user " \ + f"{user_name}" + if not ytdl: + message += ", try extracting post information using " \ + "yt-dlp with the -o " \ + "tiktok-user-extractor=ytdl argument" + self.log.warning(message) + return () + + def _extract_posts_ytdl(self, profile_url): try: module = ytdl.import_module(self.config("module")) except (ImportError, SyntaxError) as exc: self.log.error("Cannot import module '%s'", getattr(exc, "name", "")) self.log.traceback(exc) - raise exception.ExtractionError("yt-dlp or youtube-dl is required " - "for this feature!") - - ytdl_range = self.config("tiktok-range") - if ytdl_range is None or not ytdl_range and ytdl_range != 0: - ytdl_range = "" + return [] extr_opts = { "extract_flat" : True, @@ -257,7 +448,7 @@ class TiktokUserExtractor(TiktokExtractor): "retries" : self._retries, "socket_timeout" : self._timeout, "nocheckcertificate" : not self._verify, - "playlist_items" : str(ytdl_range), + "playlist_items" : str(self.range), } if self._proxies: user_opts["proxy"] = self._proxies.get("http") @@ -271,39 +462,889 @@ class TiktokUserExtractor(TiktokExtractor): for cookie in self.cookies: set_cookie(cookie) - user_name = self.groups[0] - profile_url = f"{self.root}/@{user_name}" - if self.avatar: - try: - avatar_url, avatar = self._generate_avatar( - user_name, profile_url) - except Exception as exc: - self.log.warning("Unable to extract 'avatar' URL (%s: %s)", - exc.__class__.__name__, exc) - else: - yield Message.Directory, "", avatar - yield Message.Url, avatar_url, avatar - with ytdl_instance as ydl: info_dict = ydl._YoutubeDL__extract_info( profile_url, ydl.get_info_extractor("TikTokUser"), False, {}, True) - # This should include video and photo posts in /video/ URL form. - for video in info_dict["entries"]: - data = {"_extractor": TiktokPostExtractor} - yield Message.Queue, video["url"].partition("?")[0], data - - def _generate_avatar(self, user_name, profile_url): - data = self._extract_rehydration_data(profile_url) - data = data["webapp.user-detail"]["userInfo"]["user"] - data["user"] = user_name - avatar_url = data["avatarLarger"] - avatar = text.nameext_from_url(avatar_url, data.copy()) - avatar.update({ - "type" : "avatar", - "title" : "@" + user_name, - "id" : data["id"], - "img_id": avatar["filename"].partition("~")[0], - "num" : 0, - }) - return (avatar_url, avatar) + # This should be a list of video and photo post URLs in /video/ + # format. + return [video["url"].partition("?")[0] + for video in info_dict["entries"]] + + def _extract_posts_api(self, profile_url, user_name): + self.post_order = self.config("order-posts") or "desc" + if self.post_order not in ["desc", "asc", "reverse", "popular"]: + self.post_order = "desc" + + sec_uid = self._extract_sec_uid(profile_url, user_name) + if not self.user_provided_cookies: + if self.post_order != "desc": + self.log.warning( + "%s: no cookies have been provided so the order-posts " + "option will not take effect. You must provide cookies in " + "order to extract a profile's posts in non-descending " + "order", + profile_url + ) + return self._extract_posts_api_legacy( + profile_url, sec_uid, self.range_predicate) + try: + return self._extract_posts_api_order( + profile_url, sec_uid, self.range_predicate) + except Exception as exc: + self.log.error( + "%s: failed to extract user posts using post/item_list (make " + "sure you provide valid cookies). Attempting with legacy " + "creator/item_list endpoint that does not support post " + "ordering", + profile_url + ) + self.log.traceback(exc) + return self._extract_posts_api_legacy( + profile_url, sec_uid, self.range_predicate) + + def _extract_posts_api_order(self, profile_url, sec_uid, range_predicate): + post_item_list_request_type = "0" + if self.post_order in ["asc", "reverse"]: + post_item_list_request_type = "2" + elif self.post_order in ["popular"]: + post_item_list_request_type = "1" + query_parameters = { + "secUid": sec_uid, + "post_item_list_request_type": post_item_list_request_type, + "count": "15", + "needPinnedItemIds": "false", + } + request = TiktokPostItemListRequest(range_predicate) + request.execute(self, profile_url, query_parameters) + return request.generate_urls(profile_url, self.video, self.photo, + self.audio) + + def _extract_posts_api_legacy(self, profile_url, sec_uid, range_predicate): + query_parameters = { + "secUid": sec_uid, + "type": "1", + "count": "15", + } + request = TiktokCreatorItemListRequest(range_predicate) + request.execute(self, profile_url, query_parameters) + return request.generate_urls(profile_url, self.video, self.photo, + self.audio) + + +class TiktokRepostsExtractor(TiktokExtractor): + subcategory = "reposts" + pattern = USER_PATTERN + r"/reposts" + example = "https://www.tiktok.com/@USER/reposts" + + def posts(self): + user_name = self.groups[0] + profile_url = f"{self.root}/@{user_name}" + + query_parameters = { + "secUid": self._extract_sec_uid(profile_url, user_name), + "post_item_list_request_type": "0", + "needPinnedItemIds": "false", + "count": "15", + } + request = TiktokRepostItemListRequest(self.range_predicate) + request.execute(self, profile_url, query_parameters) + return request.generate_urls(profile_url, self.video, self.photo, + self.audio) + + +class TiktokStoriesExtractor(TiktokExtractor): + subcategory = "stories" + pattern = USER_PATTERN + r"/stories" + example = "https://www.tiktok.com/@USER/stories" + + def posts(self): + user_name = self.groups[0] + profile_url = f"{self.root}/@{user_name}" + + query_parameters = { + "authorId": self._extract_author_id(profile_url, user_name), + "loadBackward": "false", + "count": "5", + } + request = TiktokStoryItemListRequest() + request.execute(self, profile_url, query_parameters) + return request.generate_urls(profile_url, self.video, self.photo, + self.audio) + + +class TiktokLikesExtractor(TiktokExtractor): + subcategory = "likes" + pattern = USER_PATTERN + r"/like[sd]" + example = "https://www.tiktok.com/@USER/liked" + + def posts(self): + user_name = self.groups[0] + profile_url = f"{self.root}/@{user_name}" + + query_parameters = { + "secUid": self._extract_sec_uid(profile_url, user_name), + "post_item_list_request_type": "0", + "needPinnedItemIds": "false", + "count": "15", + } + request = TiktokFavoriteItemListRequest(self.range_predicate) + request.execute(self, profile_url, query_parameters) + return request.generate_urls(profile_url, self.video, self.photo, + self.audio) + + +class TiktokSavedExtractor(TiktokExtractor): + subcategory = "saved" + pattern = USER_PATTERN + r"/saved" + example = "https://www.tiktok.com/@USER/saved" + + def posts(self): + user_name = self.groups[0] + profile_url = f"{self.root}/@{user_name}" + + query_parameters = { + "secUid": self._extract_sec_uid(profile_url, user_name), + "post_item_list_request_type": "0", + "needPinnedItemIds": "false", + "count": "15", + } + request = TiktokSavedPostItemListRequest(self.range_predicate) + request.execute(self, profile_url, query_parameters) + return request.generate_urls(profile_url, self.video, self.photo, + self.audio) + + +class TiktokFollowingExtractor(TiktokUserExtractor): + """Extract all of the stories of all of the users you follow""" + subcategory = "following" + pattern = rf"{BASE_PATTERN}/following" + example = "https://www.tiktok.com/following" + + def items(self): + """Attempt to extract all of the stories of all of the accounts + the user follows""" + + query_parameters = { + "storyFeedScene": "3", + "count": "15", + } + request = TiktokStoryUserListRequest() + if not request.execute(self, self.url, query_parameters): + self.log.error("%s: could not extract follower list, make sure " + "you are using logged-in cookies", self.url) + users = request.generate_urls() + if len(users) == 0: + self.log.warning("%s: No followers with stories could be " + "extracted", self.url) + + entries = [] + # Batch all of the users up into groups of at most ten and use the + # batch endpoint to improve performance. The response to the story user + # list request may also include the user themselves, so skip them if + # they ever turn up. + for b in range((len(users) - 1) // 10 + 1): + batch_number = b + 1 + user_batch = users[b*10:batch_number*10] + + # Handle edge case where final batch is composed of a single user + # and that user is the one we need to skip. If we don't handle this + # here (or when we generate the author ID list later), we will + # trigger an AssertionError for an empty author ID list. + if len(user_batch) == 1: + if self._is_current_user(user_batch[0][0]): + continue + + self.log.info("TikTok user stories, batch %d: %s", batch_number, + ", ".join([profile_url for user_id, profile_url in + user_batch if not self._is_current_user( + user_id)])) + + # Since we've already extracted all of the author IDs, we should be + # able to avoid having to request rehydration data (except for one + # time, since it's required to make _is_current_user() work), but + # we should keep this mechanism in place for safety. + author_ids = [self._extract_author_id(profile_url, user_id) + for user_id, profile_url in user_batch + if not self._is_current_user(user_id)] + query_parameters = { + "authorIds": ",".join(author_ids), + "storyCallScene": "2", + } + request = TiktokStoryBatchItemListRequest() + request.execute(self, f"Batch {batch_number}", query_parameters) + # We technically don't need to have the correct user name in the + # URL and it's easier to just ignore it here. + entries += request.generate_urls("https://www.tiktok.com/@_", + self.video, self.photo, + self.audio) + + for video in entries: + data = {"_extractor": TiktokPostExtractor} + yield Message.Queue, video, data + + def _is_current_user(self, user_id): + self._ensure_rehydration_data_app_context_cache_is_populated() + if "user" not in self.rehydration_data_app_context_cache: + return False + if "uid" not in self.rehydration_data_app_context_cache["user"]: + return False + return self.rehydration_data_app_context_cache["user"]["uid"] == \ + user_id + + +class TiktokPaginationCursor: + def current_page(self): + """Must return the page the cursor is currently pointing to. + + Returns + ------- + int + The current value of the cursor. + """ + + return 0 + + def next_page(self, data, query_parameters): + """Must progress the cursor to the next page. + + Parameters + ---------- + data : dict + The response of the most recent request. + query_parameters : dict + All of the query parameters used for the most recent + request. + + Returns + ------- + bool + True if the cursor detects that we've reached the end, False + otherwise. + """ + + return True + + +class TiktokTimeCursor(TiktokPaginationCursor): + def __init__(self, *, reverse=True): + super().__init__() + self.cursor = 0 + # If we expect the cursor to go up or down as we go to the next page. + # True for down, False for up. + self.reverse = reverse + + def current_page(self): + return self.cursor + + def next_page(self, data, query_parameters): + skip_fallback_logic = self.cursor == 0 + new_cursor = int(data.get("cursor", 0)) + no_cursor = not new_cursor + if not skip_fallback_logic: + # If the new cursor doesn't go in the direction we expect, use the + # fallback logic instead. + if self.reverse and (new_cursor > self.cursor or no_cursor): + new_cursor = self.fallback_cursor(data) + elif not self.reverse and (new_cursor < self.cursor or no_cursor): + new_cursor = self.fallback_cursor(data) + elif no_cursor: + raise exception.ExtractionError("Could not extract next cursor") + self.cursor = new_cursor + return not data.get("hasMore", False) + + def fallback_cursor(self, data): + try: + return int(data["itemList"][-1]["createTime"]) * 1000 + except Exception: + return 7 * 86_400_000 * (-1 if self.reverse else 1) + + +class TiktokForwardTimeCursor(TiktokTimeCursor): + def __init__(self): + super().__init__(reverse=False) + + +class TiktokBackwardTimeCursor(TiktokTimeCursor): + def __init__(self): + super().__init__(reverse=True) + + +class TiktokPopularTimeCursor(TiktokTimeCursor): + def __init__(self): + super().__init__(reverse=True) + + def fallback_cursor(self, data): + # Don't really know what to do here, all I know is that the cursor + # for the popular item feed goes down and it does not appear to be + # based on item list timestamps at all. + return -50_000 + + +class TiktokLegacyTimeCursor(TiktokPaginationCursor): + def __init__(self): + super().__init__() + self.cursor = int(time.time()) * 1000 + + def current_page(self): + return self.cursor + + def next_page(self, data, query_parameters): + old_cursor = self.cursor + try: + self.cursor = int(data["itemList"][-1]["createTime"]) * 1000 + except Exception: + self.cursor = 0 + if not self.cursor or old_cursor == self.cursor: + # User may not have posted within this ~1 week look back, + # so manually adjust the cursor. + self.cursor = old_cursor - 7 * 86_400_000 + # In case 'hasMorePrevious' is wrong, break if we have + # gone back before TikTok existed. + has_more_previous = data.get("hasMorePrevious") + return self.cursor < 1472706000000 or not has_more_previous + + +class TiktokItemCursor(TiktokPaginationCursor): + def __init__(self, list_key: str = "itemList"): + super().__init__() + self.cursor = 0 + self.list_key = list_key + + def current_page(self): + return self.cursor + + def next_page(self, data, query_parameters): + # We should offset the cursor by the number of items in the response. + # Sometimes less items are returned than what was requested in the + # count parameter! We could fall back onto the count query parameter + # but we could miss out on some posts, and truth is if the expected + # item list isn't in the response, the extraction was going to fail + # anyway. + self.cursor += len(data[self.list_key]) + return not data.get("hasMore", False) + + +class TiktokPaginationRequest: + def __init__(self, endpoint): + self.endpoint = endpoint + self._regenerate_device_id() + self.items = {} + + def execute(self, extractor, url, query_parameters): + """Performs requests until all pages have been retrieved. + + The items retrieved from this request are stored in self.items. + Each call to execute() will clear the previous value of + self.items. + + Usually extractors want a simple list of URLs. For this, each + request subtype is to implement generate_urls(). + + Parameters + ---------- + extractor : TiktokExtractor + The TikTok extractor performing the request. + url : str + The URL associated with this request for logging purposes. + query_parameters : dict[str, str] + The query parameters to apply to this request. + + Returns + ------- + bool + True if the request was performed successfully and all items + were retrieved, False if no items or only some items could + be retrieved. + """ + + self.validate_query_parameters(query_parameters) + self.items = {} + cursor_type = self.cursor_type(query_parameters) + cursor = cursor_type() if cursor_type else None + for page in itertools.count(start=1): + extractor.log.info("%s: retrieving %s page %d", url, self.endpoint, + page) + tries = 0 + while True: + try: + data, final_parameters = self._request_data( + extractor, + cursor, + query_parameters + ) + incoming_items = self.extract_items(data) + self._detect_duplicate_pages(extractor, url, + set(self.items.keys()), + set(incoming_items.keys())) + self.items.update(incoming_items) + if cursor: + final_page_reached = cursor.next_page(data, + final_parameters) + exit_early = self.exit_early(extractor, url) + if exit_early or final_page_reached: + return True + # Continue to next page and reset tries counter. + break + else: + # This request has no cursor: return immediately. + return True + except Exception as exc: + if tries >= extractor._retries: + extractor.log.error("%s: failed to retrieve %s page " + "%d", url, self.endpoint, page) + extractor.log.traceback(exc) + return False + tries += 1 + extractor.log.warning("%s: failed to retrieve %s page %d", + url, self.endpoint, page) + extractor.sleep(extractor._timeout, "retry") + + def validate_query_parameters(self, query_parameters): + """Used to validate the given parameters for this type of + pagination request. + + For developer purposes only. You should call + super().validate_query_parameters() for most requests as they + will usually have a count parameter. + + Parameters + ---------- + query_parameters : dict[str, str] + The query parameters to validate. + + Raises + ------- + AssertionError + If mandatory query parameters are not given, or they are + given in the wrong format. + """ + + assert "count" in query_parameters + assert type(query_parameters["count"]) is str + assert query_parameters["count"].isdigit() + assert query_parameters["count"] != "0" + + def cursor_type(self, query_parameters): + """Used to determine which type of cursor to use for this + request, if any. + + Parameters + ---------- + query_parameters : dict[str, str] + The query parameters given to the execute() call. + + Returns + ------- + Type[TiktokPaginationCursor] | None + The type of cursor to use, if any. + """ + + return None + + def extract_items(self, data): + """Used to extract data from the response of a request. + + Parameters + ---------- + data : dict + The data given by TikTok. + + Returns + ------- + dict + Each item from the response data, keyed on a unique ID. + + Raises + ------ + Exception + If items could not be extracted. + """ + + return {} + + def exit_early(self, extractor, url): + """Used to determine if we should exit early from the request. + + You have access to the items extracted so far (self.items). + + Parameters + ---------- + extractor : TiktokExtractor + The extractor making the requests. + url : str + The URL associated with the executing request for logging + purposes. + + Returns + ------- + bool + True if we should exit early, False otherwise. + """ + + return False + + def generate_urls(self): + """Used to convert the items retrieved from the request into a + list of URLs. + + Returns + ------- + list + Ideally one URL for each item, although subclasses are + permitted to return a list of any format they wish. + """ + + return [] + + def _regenerate_device_id(self): + self.device_id = str(random.randint( + 7_250_000_000_000_000_000, 7_325_099_899_999_994_577)) + + def _request_data(self, extractor, cursor, query_parameters): + # Implement simple 1 retry mechanism without delays that handles the + # flaky post/item_list endpoint. + retries = 0 + while True: + try: + url, final_parameters = self._build_api_request_url( + cursor, + query_parameters + ) + response = extractor.request(url) + return (util.json_loads(response.text), final_parameters) + except ValueError: + if retries == 1: + raise + extractor.log.warning( + "Could not decode response for this page, trying again" + ) + retries += 1 + + def _build_api_request_url(self, cursor, extra_parameters): + query_parameters = { + "aid": "1988", + "app_language": "en", + "app_name": "tiktok_web", + "browser_language": "en-US", + "browser_name": "Mozilla", + "browser_online": "true", + "browser_platform": "Win32", + "browser_version": "5.0 (Windows)", + "channel": "tiktok_web", + "cookie_enabled": "true", + "device_id": self.device_id, + "device_platform": "web_pc", + "focus_state": "true", + "from_page": "user", + "history_len": "2", + "is_fullscreen": "false", + "is_page_visible": "true", + "language": "en", + "os": "windows", + "priority_region": "", + "referer": "", + "region": "US", + "screen_height": "1080", + "screen_width": "1920", + "tz_name": "UTC", + "verifyFp": "verify_" + "".join(random.choices( + "0123456789abcdef", k=7)), + "webcast_language": "en", + } + if cursor: + # We must not write this as a floating-point number: + query_parameters["cursor"] = str(int(cursor.current_page())) + for key, value in extra_parameters.items(): + query_parameters[key] = f"{value}" + query_str = text.build_query(query_parameters) + return (f"https://www.tiktok.com/api/{self.endpoint}/?{query_str}", + query_parameters) + + def _detect_duplicate_pages(self, extractor, url, seen_ids, incoming_ids): + if incoming_ids and incoming_ids == seen_ids: + # TikTok API keeps sending the same page, likely due to + # a bad device ID. Generate a new one and try again. + self._regenerate_device_id() + extractor.log.warning("%s: TikTok API keeps sending the same " + "page. Taking measures to avoid an infinite " + "loop", url) + raise exception.ExtractionError( + "TikTok API keeps sending the same page") + + +class TiktokItemListRequest(TiktokPaginationRequest): + def __init__(self, endpoint, type_of_items, range_predicate): + super().__init__(endpoint) + self.type_of_items = type_of_items + self.range_predicate = range_predicate + self.exit_early_due_to_no_items = False + + def extract_items(self, data): + if "itemList" not in data: + self.exit_early_due_to_no_items = True + return {} + return {item["id"]: item for item in data["itemList"]} + + def exit_early(self, extractor, url): + if self.exit_early_due_to_no_items: + extractor.log.warning("%s: could not extract any %s for this user", + url, self.type_of_items) + return True + if not self.range_predicate: + # No range predicate given. + return False + if len(self.range_predicate.ranges) == 0: + # No range predicates given in the predicate object. + return False + # If our current selection of items can't satisfy the upper bound of + # the predicate, we must continue extracting them until we can. + return len(self.items) > self.range_predicate.upper + + def generate_urls(self, profile_url, video, photo, audio): + urls = [] + for index, id in enumerate(self.items.keys()): + if not self._matches_filters(self.items.get(id), index + 1, video, + photo, audio): + continue + # Try to grab the author's unique ID, but don't cause the + # extraction to fail if we can't, it's not imperative that the + # URLs include the actual poster's unique ID. + try: + url = f"https://www.tiktok.com/@" \ + f"{self.items[id]['author']['uniqueId']}/video/{id}" + except KeyError: + # Use the given profile URL as a back up. + url = f"{profile_url}/video/{id}" + urls.append(url) + return urls + + def _matches_filters(self, item, index, video, photo, audio): + # First, check if this index falls within any of our configured ranges. + # If it doesn't, we filter it out. + if self.range_predicate: + range_match = len(self.range_predicate.ranges) == 0 + for range in self.range_predicate.ranges: + if index in range: + range_match = True + break + if not range_match: + return False + + # Then, we apply basic video/photo filtering. + if not item: + return True + is_image_post = "imagePost" in item + if not photo and not audio and is_image_post: + return False + if not video and not is_image_post: + return False + return True + + +class TiktokCreatorItemListRequest(TiktokItemListRequest): + """A less flaky version of the post/item_list endpoint that doesn't + support latest/popular/oldest ordering.""" + + def __init__(self, range_predicate): + super().__init__("creator/item_list", "posts", range_predicate) + + def validate_query_parameters(self, query_parameters): + super().validate_query_parameters(query_parameters) + assert "secUid" in query_parameters + assert "type" in query_parameters + # Pagination type: 0 == oldest-to-newest, 1 == newest-to-oldest. + # NOTE: ^ this type parameter doesn't seem to do what yt-dlp thinks it + # does. post/item_list is the only way to get an ordered feed + # based on latest/popular/oldest. + assert query_parameters["type"] == "0" or \ + query_parameters["type"] == "1" + + def cursor_type(self, query_parameters): + return TiktokLegacyTimeCursor + + +class TiktokPostItemListRequest(TiktokItemListRequest): + """Retrieves posts in latest/popular/oldest ordering. + + Very often, this request will just return an empty response, making + it quite flaky, but the next attempt to make the request usually + does return a response. For this reason creator/item_list was kept + as a backup, though it doesn't seem to support ordering. + + It also doesn't work without cookies. + """ + + def __init__(self, range_predicate): + super().__init__("post/item_list", "posts", range_predicate) + + def validate_query_parameters(self, query_parameters): + super().validate_query_parameters(query_parameters) + assert "secUid" in query_parameters + assert "post_item_list_request_type" in query_parameters + # Pagination type: + # 0 == newest-to-oldest. + # 1 == popular. + # 2 == oldest-to-newest. + assert query_parameters["post_item_list_request_type"] in \ + ["0", "1", "2"] + assert "needPinnedItemIds" in query_parameters + # If this value is set to "true", and "post_item_list_request_type" is + # set to "0", pinned posts will always show up first in the resulting + # itemList. It keeps our logic simpler if we avoid this behavior by + # setting this parameter to "false" (especially if we were to use a + # really small "count" value like "1" or "2"). + assert query_parameters["needPinnedItemIds"] in ["false"] + + def cursor_type(self, query_parameters): + request_type = query_parameters["post_item_list_request_type"] + if request_type == "2": + return TiktokForwardTimeCursor + elif request_type == "1": + return TiktokPopularTimeCursor + else: + return TiktokBackwardTimeCursor + + +class TiktokFavoriteItemListRequest(TiktokItemListRequest): + """Retrieves a user's liked posts. + + Appears to only support descending order, but it can work without + cookies. + """ + + def __init__(self, range_predicate): + super().__init__("favorite/item_list", "liked posts", range_predicate) + + def validate_query_parameters(self, query_parameters): + super().validate_query_parameters(query_parameters) + assert "secUid" in query_parameters + assert "post_item_list_request_type" in query_parameters + assert query_parameters["post_item_list_request_type"] == "0" + assert "needPinnedItemIds" in query_parameters + assert query_parameters["needPinnedItemIds"] in ["false"] + + def cursor_type(self, query_parameters): + return TiktokPopularTimeCursor + + +class TiktokRepostItemListRequest(TiktokItemListRequest): + """Retrieves a user's reposts. + + Appears to only support descending order, but it can work without + cookies. + """ + + def __init__(self, range_predicate): + super().__init__("repost/item_list", "reposts", range_predicate) + + def validate_query_parameters(self, query_parameters): + super().validate_query_parameters(query_parameters) + assert "secUid" in query_parameters + assert "post_item_list_request_type" in query_parameters + assert query_parameters["post_item_list_request_type"] == "0" + assert "needPinnedItemIds" in query_parameters + assert query_parameters["needPinnedItemIds"] in ["false"] + + def cursor_type(self, query_parameters): + return TiktokItemCursor + + +class TiktokSavedPostItemListRequest(TiktokItemListRequest): + """Retrieves a user's saved posts. + + Appears to only support descending order, but it can work without + cookies. + """ + + def __init__(self, range_predicate): + super().__init__("user/collect/item_list", "saved posts", + range_predicate) + + def validate_query_parameters(self, query_parameters): + super().validate_query_parameters(query_parameters) + assert "secUid" in query_parameters + assert "post_item_list_request_type" in query_parameters + assert query_parameters["post_item_list_request_type"] == "0" + assert "needPinnedItemIds" in query_parameters + assert query_parameters["needPinnedItemIds"] in ["false"] + + def cursor_type(self, query_parameters): + return TiktokPopularTimeCursor + + +class TiktokStoryItemListRequest(TiktokItemListRequest): + def __init__(self): + super().__init__("story/item_list", "stories", None) + + def validate_query_parameters(self, query_parameters): + super().validate_query_parameters(query_parameters) + assert "authorId" in query_parameters + assert "loadBackward" in query_parameters + assert query_parameters["loadBackward"] in ["true", "false"] + + def cursor_type(self, query_parameters): + return TiktokItemCursor + + +class TiktokStoryBatchItemListRequest(TiktokItemListRequest): + def __init__(self): + super().__init__("story/batch/item_list", "stories", None) + + def validate_query_parameters(self, query_parameters): + # This request type does not need a count parameter so don't invoke + # super().validate_query_parameters(). + assert "authorIds" in query_parameters + # I'd recommend between 1-10 users at a time, as that's what I see in + # the webapp. + author_count = query_parameters["authorIds"].count(",") + 1 + assert author_count >= 1 and author_count <= 10 + # Not sure what this parameter does. + assert "storyCallScene" in query_parameters + assert query_parameters["storyCallScene"] == "2" + + def extract_items(self, data): + # We need to extract each itemList within the response and combine each + # of them into a single list of items. If even one of the users doesn't + # have an item list, "exit early," but continue to gather the rest + # (this request doesn't use a cursor anyway so there is no concept of + # exiting early). + items = {} + if type(data.get("batchStoryItemLists")) is not list: + self.exit_early_due_to_no_items = True + return items + for userStories in data["batchStoryItemLists"]: + items.update(super().extract_items(userStories)) + return items + + +class TiktokStoryUserListRequest(TiktokPaginationRequest): + def __init__(self): + super().__init__("story/user_list") + self.exit_early_due_to_no_cookies = False + + def validate_query_parameters(self, query_parameters): + super().validate_query_parameters(query_parameters) + # Not sure what this parameter does. + assert "storyFeedScene" in query_parameters + assert query_parameters["storyFeedScene"] == "3" + + def cursor_type(self, query_parameters): + return functools.partial(TiktokItemCursor, "storyUsers") + + def extract_items(self, data): + if "storyUsers" not in data: + self.exit_early_due_to_no_cookies = True + return {} + return {item["user"]["id"]: item["user"]["uniqueId"] + for item in data["storyUsers"]} + + def exit_early(self, extractor, url): + if self.exit_early_due_to_no_cookies: + extractor.log.error("You must provide cookies to extract the " + "stories of your following list") + return self.exit_early_due_to_no_cookies + + def generate_urls(self): + return [(id, f"https://www.tiktok.com/@{name}") + for id, name in self.items.items()] |
