# -*- coding: utf-8 -*- # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. """Extractors for https://www.facebook.com/""" from .common import Extractor, Message, Dispatch from .. import text, util, exception from ..cache import memcache BASE_PATTERN = r"(?:https?://)?(?:[\w-]+\.)?facebook\.com" USER_PATTERN = (BASE_PATTERN + r"/(?!media/|photo/|photo.php|watch/|permalink.php)" r"(?:profile\.php\?id=|people/[^/?#]+/)?([^/?]+)") class FacebookExtractor(Extractor): """Base class for Facebook extractors""" category = "facebook" root = "https://www.facebook.com" directory_fmt = ("{category}", "{username}", "{title} ({set_id})") filename_fmt = "{id}.{extension}" archive_fmt = "{id}.{extension}" def _init(self): headers = self.session.headers headers["Accept"] = ( "text/html,application/xhtml+xml,application/xml;q=0.9," "image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8" ) headers["Sec-Fetch-Dest"] = "empty" headers["Sec-Fetch-Mode"] = "navigate" headers["Sec-Fetch-Site"] = "same-origin" self.fallback_retries = self.config("fallback-retries", 2) self.videos = self.config("videos", True) self.author_followups = self.config("author-followups", False) def decode_all(self, txt): return text.unescape( txt.encode().decode("unicode_escape") .encode("utf_16", "surrogatepass").decode("utf_16") ).replace("\\/", "/") def parse_set_page(self, set_page): directory = { "set_id": text.extr( set_page, '"mediaSetToken":"', '"' ) or text.extr( set_page, '"mediasetToken":"', '"' ), "username": self.decode_all( text.extr( set_page, '"user":{"__isProfile":"User","name":"', '","' ) or text.extr( set_page, '"actors":[{"__typename":"User","name":"', '","' ) ), "user_id": text.extr( set_page, '"owner":{"__typename":"User","id":"', '"' ), "user_pfbid": "", "title": self.decode_all(text.extr( set_page, '"title":{"text":"', '"' )), "first_photo_id": text.extr( set_page, '{"__typename":"Photo","__isMedia":"Photo","', '","creation_story"' ).rsplit('"id":"', 1)[-1] or text.extr( set_page, '{"__typename":"Photo","id":"', '"' ) } if directory["user_id"].startswith("pfbid"): directory["user_pfbid"] = directory["user_id"] directory["user_id"] = ( text.extr( set_page, '"actors":[{"__typename":"User","id":"', '"') or text.extr( set_page, '"userID":"', '"') or directory["set_id"].split(".")[1]) return directory def parse_photo_page(self, photo_page): photo = { "id": text.extr( photo_page, '"__isNode":"Photo","id":"', '"' ), "set_id": text.extr( photo_page, '"url":"https:\\/\\/www.facebook.com\\/photo\\/?fbid=', '"' ).rsplit("&set=", 1)[-1], "username": self.decode_all(text.extr( photo_page, '"owner":{"__typename":"User","name":"', '"' )), "user_id": text.extr( photo_page, '"owner":{"__typename":"User","id":"', '"' ), "user_pfbid": "", "caption": self.decode_all(text.extr( photo_page, '"message":{"delight_ranges"', '"},"message_preferred_body"' ).rsplit('],"text":"', 1)[-1]), "date": self.parse_timestamp( text.extr(photo_page, '\\"publish_time\\":', ',') or text.extr(photo_page, '"created_time":', ',') ), "url": self.decode_all(text.extr( photo_page, ',"image":{"uri":"', '","' )), "next_photo_id": text.extr( photo_page, '"nextMediaAfterNodeId":{"__typename":"Photo","id":"', '"' ) or text.extr( photo_page, '"nextMedia":{"edges":[{"node":{"__typename":"Photo","id":"', '"' ) } if photo["user_id"].startswith("pfbid"): photo["user_pfbid"] = photo["user_id"] photo["user_id"] = text.extr( photo_page, r'\"content_owner_id_new\":\"', r'\"') text.nameext_from_url(photo["url"], photo) photo["followups_ids"] = [] for comment_raw in text.extract_iter( photo_page, '{"node":{"id"', '"cursor":null}' ): if ('"is_author_original_poster":true' in comment_raw and '{"__typename":"Photo","id":"' in comment_raw): photo["followups_ids"].append(text.extr( comment_raw, '{"__typename":"Photo","id":"', '"' )) return photo def parse_post_page(self, post_page): first_photo_url = text.extr( text.extr( post_page, '"__isMedia":"Photo"', '"target_group"' ), '"url":"', ',' ) post = { "set_id": text.extr(post_page, '{"mediaset_token":"', '"') or text.extr(first_photo_url, 'set=', '"').rsplit("&", 1)[0] } return post def parse_video_page(self, video_page): video = { "id": text.extr( video_page, '\\"video_id\\":\\"', '\\"' ), "username": self.decode_all(text.extr( video_page, '"actors":[{"__typename":"User","name":"', '","' )), "user_id": text.extr( video_page, '"owner":{"__typename":"User","id":"', '"' ), "date": self.parse_timestamp(text.extr( video_page, '\\"publish_time\\":', ',' )), "type": "video" } if not video["username"]: video["username"] = self.decode_all(text.extr( video_page, '"__typename":"User","id":"' + video["user_id"] + '","name":"', '","' )) first_video_raw = text.extr( video_page, '"permalink_url"', '\\/Period>\\u003C\\/MPD>' ) audio = { **video, "url": self.decode_all(text.extr( text.extr( first_video_raw, "AudioChannelConfiguration", "BaseURL>\\u003C" ), "BaseURL>", "\\u003C\\/" )), "type": "audio" } video["urls"] = {} for raw_url in text.extract_iter( first_video_raw, 'FBQualityLabel=\\"', '\\u003C\\/BaseURL>' ): resolution = raw_url.split('\\"', 1)[0] video["urls"][resolution] = self.decode_all( raw_url.split('BaseURL>', 1)[1] ) if not video["urls"]: return video, audio video["url"] = max( video["urls"].items(), key=lambda x: text.parse_int(x[0][:-1]) )[1] text.nameext_from_url(video["url"], video) audio["filename"] = video["filename"] audio["extension"] = "m4a" return video, audio def photo_page_request_wrapper(self, url, **kwargs): LEFT_OFF_TXT = "" if url.endswith("&set=") else ( "\nYou can use this URL to continue from " "where you left off (added \"&setextract\"): " "\n" + url + "&setextract" ) res = self.request(url, **kwargs) if res.url.startswith(self.root + "/login"): raise exception.AuthRequired( message=("You must be logged in to continue viewing images." + LEFT_OFF_TXT)) if b'{"__dr":"CometErrorRoot.react"}' in res.content: raise exception.AbortExtraction( "You've been temporarily blocked from viewing images.\n" "Please try using a different account, " "using a VPN or waiting before you retry." + LEFT_OFF_TXT) return res def extract_set(self, set_data): set_id = set_data["set_id"] all_photo_ids = [set_data["first_photo_id"]] retries = 0 i = 0 while i < len(all_photo_ids): photo_id = all_photo_ids[i] photo_url = f"{self.root}/photo/?fbid={photo_id}&set={set_id}" photo_page = self.photo_page_request_wrapper(photo_url).text photo = self.parse_photo_page(photo_page) photo["num"] = i + 1 if self.author_followups: for followup_id in photo["followups_ids"]: if followup_id not in all_photo_ids: self.log.debug( "Found a followup in comments: %s", followup_id ) all_photo_ids.append(followup_id) if not photo["url"]: if retries < self.fallback_retries and self._interval_429: seconds = self._interval_429() self.log.warning( "Failed to find photo download URL for %s. " "Retrying in %s seconds.", photo_url, seconds, ) self.wait(seconds=seconds, reason="429 Too Many Requests") retries += 1 continue else: self.log.error( "Failed to find photo download URL for " + photo_url + ". Skipping." ) retries = 0 else: retries = 0 photo.update(set_data) yield Message.Directory, "", photo yield Message.Url, photo["url"], photo if not photo["next_photo_id"]: self.log.debug( "Can't find next image in the set. " "Extraction is over." ) elif photo["next_photo_id"] in all_photo_ids: if photo["next_photo_id"] != photo["id"]: self.log.debug( "Detected a loop in the set, it's likely finished. " "Extraction is over." ) elif int(photo["next_photo_id"]) > int(photo["id"]) + i*120: self.log.info( "Detected jump to the beginning of the set. (%s -> %s)", photo["id"], photo["next_photo_id"]) if self.config("loop", False): all_photo_ids.append(photo["next_photo_id"]) else: all_photo_ids.append(photo["next_photo_id"]) i += 1 @memcache(keyarg=1) def _extract_profile(self, profile, set_id=False): if set_id: url = f"{self.root}/{profile}/photos_by" else: url = f"{self.root}/{profile}" return self._extract_profile_page(url) def _extract_profile_page(self, url): for _ in range(self.fallback_retries + 1): page = self.request(url).text if page.find('>Page Not Found', 0, 3000) > 0: break if ('"props":{"title":"This content isn\'t available right now"' in page): raise exception.AuthRequired( "authenticated cookies", "profile", "This content isn't available right now") set_id = self._extract_profile_set_id(page) user = self._extract_profile_user(page) if set_id or user: user["set_id"] = set_id return user self.log.debug("Got empty profile photos page, retrying...") return {} def _extract_profile_set_id(self, profile_photos_page): set_ids_raw = text.extr( profile_photos_page, '"pageItems"', '"page_info"' ) set_id = text.extr( set_ids_raw, 'set=', '"' ).rsplit("&", 1)[0] or text.extr( set_ids_raw, '\\/photos\\/', '\\/' ) return set_id def _extract_profile_user(self, page): data = text.extr(page, '","user":{"', '},"viewer":{') user = None try: user = util.json_loads(f'{{"{data}}}') if user["id"].startswith("pfbid"): user["user_pfbid"] = user["id"] user["id"] = text.extr(page, '"userID":"', '"') user["username"] = (text.extr(page, '"userVanity":"', '"') or text.extr(page, '"vanity":"', '"')) user["profile_tabs"] = [ edge["node"] for edge in (user["profile_tabs"]["profile_user"] ["timeline_nav_app_sections"]["edges"]) ] if bio := text.extr(page, '"best_description":{"text":"', '"'): user["biography"] = self.decode_all(bio) elif (pos := page.find( '"__module_operation_ProfileCometTileView_profileT')) >= 0: user["biography"] = self.decode_all(text.rextr( page, '"text":"', '"', pos)) else: user["biography"] = text.unescape(text.remove_html(text.extr( page, "", "