aboutsummaryrefslogtreecommitdiffstats
path: root/gallery_dl/extractor/twitter.py
diff options
context:
space:
mode:
Diffstat (limited to 'gallery_dl/extractor/twitter.py')
-rw-r--r--gallery_dl/extractor/twitter.py245
1 files changed, 125 insertions, 120 deletions
diff --git a/gallery_dl/extractor/twitter.py b/gallery_dl/extractor/twitter.py
index 896bf28..7252d05 100644
--- a/gallery_dl/extractor/twitter.py
+++ b/gallery_dl/extractor/twitter.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
-# Copyright 2016-2023 Mike Fährmann
+# Copyright 2016-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -8,12 +8,11 @@
"""Extractors for https://x.com/"""
-from .common import Extractor, Message
+from .common import Extractor, Message, Dispatch
from .. import text, util, exception
from ..cache import cache, memcache
import itertools
import random
-import re
BASE_PATTERN = (r"(?:https?://)?(?:www\.|mobile\.)?"
r"(?:(?:[fv]x)?twitter|(?:fix(?:up|v))?x)\.com")
@@ -32,7 +31,7 @@ class TwitterExtractor(Extractor):
def __init__(self, match):
Extractor.__init__(self, match)
- self.user = match.group(1)
+ self.user = match[1]
def _init(self):
self.unavailable = self.config("unavailable", False)
@@ -72,21 +71,16 @@ class TwitterExtractor(Extractor):
self.login()
self.api = TwitterAPI(self)
metadata = self.metadata()
-
- if self.config("expand"):
- tweets = self._expand_tweets(self.tweets())
- self.tweets = lambda : tweets
-
- if self.config("unique", True):
- seen_tweets = set()
- else:
- seen_tweets = None
+ seen_tweets = set() if self.config("unique", True) else None
if self.twitpic:
- self._find_twitpic = re.compile(
+ self._find_twitpic = util.re(
r"https?(://twitpic\.com/(?!photos/)\w+)").findall
- for tweet in self.tweets():
+ tweets = self.tweets()
+ if self.config("expand"):
+ tweets = self._expand_tweets(tweets)
+ for tweet in tweets:
if "legacy" in tweet:
data = tweet["legacy"]
@@ -129,6 +123,12 @@ class TwitterExtractor(Extractor):
tdata.update(metadata)
tdata["count"] = len(files)
yield Message.Directory, tdata
+
+ del tdata["source_id"]
+ del tdata["sensitive_flags"]
+ if "source_user" in tdata:
+ del tdata["source_user"]
+
for tdata["num"], file in enumerate(files, 1):
file.update(tdata)
url = file.pop("url")
@@ -170,8 +170,29 @@ class TwitterExtractor(Extractor):
return files
def _extract_media(self, tweet, entities, files):
+ flags_tweet = None
+
for media in entities:
+ if "sensitive_media_warning" in media:
+ flags_media = media["sensitive_media_warning"]
+
+ flags = []
+ if "adult_content" in flags_media:
+ flags.append("Nudity")
+ if "other" in flags_media:
+ flags.append("Sensitive")
+ if "graphic_violence" in flags_media:
+ flags.append("Violence")
+
+ if flags_tweet is None:
+ flags_tweet = set(flags)
+ else:
+ flags_tweet.update(flags)
+ flags_media = flags
+ else:
+ flags_media = ()
+
if "ext_media_availability" in media:
ext = media["ext_media_availability"]
if ext.get("status") == "Unavailable":
@@ -180,38 +201,22 @@ class TwitterExtractor(Extractor):
if not self.unavailable:
continue
- mtype = media.get("type")
- descr = media.get("ext_alt_text")
- width = media["original_info"].get("width", 0)
- height = media["original_info"].get("height", 0)
-
if "video_info" in media:
if self.videos == "ytdl":
- files.append({
- "url": "ytdl:{}/i/web/status/{}".format(
- self.root, tweet["id_str"]),
- "type" : mtype,
- "width" : width,
- "height" : height,
- "extension" : None,
- "description": descr,
- })
+ url = f"ytdl:{self.root}/i/web/status/{tweet['id_str']}"
+ file = {"url": url, "extension": "mp4"}
elif self.videos:
video_info = media["video_info"]
variant = max(
video_info["variants"],
key=lambda v: v.get("bitrate", 0),
)
- files.append({
- "url" : variant["url"],
- "type" : mtype,
- "width" : width,
- "height" : height,
- "bitrate" : variant.get("bitrate", 0),
- "duration" : video_info.get(
+ file = {
+ "url" : variant["url"],
+ "bitrate" : variant.get("bitrate", 0),
+ "duration": video_info.get(
"duration_millis", 0) / 1000,
- "description": descr,
- })
+ }
elif "media_url_https" in media:
url = media["media_url_https"]
if url[-4] == ".":
@@ -219,16 +224,37 @@ class TwitterExtractor(Extractor):
base += "?format=" + fmt + "&name="
else:
base = url.rpartition("=")[0] + "="
- files.append(text.nameext_from_url(url, {
- "url" : base + self._size_image,
- "type" : mtype,
- "width" : width,
- "height" : height,
- "_fallback" : self._image_fallback(base),
- "description": descr,
- }))
+ file = text.nameext_from_url(url, {
+ "url" : base + self._size_image,
+ "_fallback": self._image_fallback(base),
+ })
else:
files.append({"url": media["media_url"]})
+ continue
+
+ file["type"] = media.get("type")
+ file["width"] = media["original_info"].get("width", 0)
+ file["height"] = media["original_info"].get("height", 0)
+ file["description"] = media.get("ext_alt_text")
+ file["sensitive_flags"] = flags_media
+ self._extract_media_source(file, media)
+ files.append(file)
+
+ tweet["sensitive_flags"] = \
+ () if flags_tweet is None else sorted(flags_tweet)
+
+ def _extract_media_source(self, dest, media):
+ dest["source_id"] = 0
+
+ if "source_status_id_str" in media:
+ try:
+ dest["source_id"] = text.parse_int(
+ media["source_status_id_str"])
+ dest["source_user"] = self._transform_user(
+ media["additional_media_info"]["source_user"]
+ ["user_results"]["result"])
+ except Exception:
+ pass
def _image_fallback(self, base):
for fmt in self._size_fallback:
@@ -252,8 +278,7 @@ class TwitterExtractor(Extractor):
bvals = {bval["key"]: bval["value"]
for bval in card["binding_values"]}
- cbl = self.cards_blacklist
- if cbl:
+ if cbl := self.cards_blacklist:
if name in cbl:
return
if "vanity_url" in bvals:
@@ -288,7 +313,7 @@ class TwitterExtractor(Extractor):
if self.cards == "ytdl":
tweet_id = tweet.get("rest_id") or tweet["id_str"]
- url = "ytdl:{}/i/web/status/{}".format(self.root, tweet_id)
+ url = f"ytdl:{self.root}/i/web/status/{tweet_id}"
files.append({"url": url})
def _extract_twitpic(self, tweet, files):
@@ -313,8 +338,8 @@ class TwitterExtractor(Extractor):
response = self.request(url, fatal=False)
if response.status_code >= 400:
continue
- url = text.extr(response.text, 'name="twitter:image" value="', '"')
- if url:
+ if url := text.extr(
+ response.text, 'name="twitter:image" value="', '"'):
files.append({"url": url})
def _transform_tweet(self, tweet):
@@ -354,12 +379,14 @@ class TwitterExtractor(Extractor):
tget("in_reply_to_status_id_str")),
"conversation_id": text.parse_int(
tget("conversation_id_str")),
+ "source_id" : 0,
"date" : date,
"author" : author,
"user" : self._user or author,
"lang" : legacy["lang"],
"source" : text.extr(source, ">", "<") if source else "",
"sensitive" : tget("possibly_sensitive"),
+ "sensitive_flags": tget("sensitive_flags"),
"favorite_count": tget("favorite_count"),
"quote_count" : tget("quote_count"),
"reply_count" : tget("reply_count"),
@@ -383,12 +410,10 @@ class TwitterExtractor(Extractor):
content = tget("full_text") or tget("text") or ""
entities = legacy["entities"]
- hashtags = entities.get("hashtags")
- if hashtags:
+ if hashtags := entities.get("hashtags"):
tdata["hashtags"] = [t["text"] for t in hashtags]
- mentions = entities.get("user_mentions")
- if mentions:
+ if mentions := entities.get("user_mentions"):
tdata["mentions"] = [{
"id": text.parse_int(u["id_str"]),
"name": u["screen_name"],
@@ -396,8 +421,7 @@ class TwitterExtractor(Extractor):
} for u in mentions]
content = text.unescape(content)
- urls = entities.get("urls")
- if urls:
+ if urls := entities.get("urls"):
for url in urls:
try:
content = content.replace(url["url"], url["expanded_url"])
@@ -417,9 +441,11 @@ class TwitterExtractor(Extractor):
tdata["reply_to"] = legacy["in_reply_to_screen_name"]
if "quoted_by" in legacy:
tdata["quote_by"] = legacy["quoted_by"]
+ if "extended_entities" in legacy:
+ self._extract_media_source(
+ tdata, legacy["extended_entities"]["media"][0])
if tdata["retweet_id"]:
- tdata["content"] = "RT @{}: {}".format(
- author["name"], tdata["content"])
+ tdata["content"] = f"RT @{author['name']}: {tdata['content']}"
tdata["date_original"] = text.parse_timestamp(
((tdata["retweet_id"] >> 22) + 1288834974657) // 1000)
@@ -466,8 +492,7 @@ class TwitterExtractor(Extractor):
}
descr = user["description"]
- urls = entities["description"].get("urls")
- if urls:
+ if urls := entities["description"].get("urls"):
for url in urls:
try:
descr = descr.replace(url["url"], url["expanded_url"])
@@ -577,27 +602,18 @@ class TwitterExtractor(Extractor):
return self.cookies_update(_login_impl(self, username, password))
-class TwitterUserExtractor(TwitterExtractor):
+class TwitterUserExtractor(Dispatch, TwitterExtractor):
"""Extractor for a Twitter user"""
- subcategory = "user"
pattern = (BASE_PATTERN + r"/(?!search)(?:([^/?#]+)/?(?:$|[?#])"
r"|i(?:/user/|ntent/user\?user_id=)(\d+))")
example = "https://x.com/USER"
- def __init__(self, match):
- TwitterExtractor.__init__(self, match)
- user_id = match.group(2)
- if user_id:
- self.user = "id:" + user_id
-
- def initialize(self):
- pass
-
- def finalize(self):
- pass
-
def items(self):
- base = "{}/{}/".format(self.root, self.user)
+ user, user_id = self.groups
+ if user_id is not None:
+ user = "id:" + user_id
+
+ base = f"{self.root}/{user}/"
return self._dispatch_extractors((
(TwitterInfoExtractor , base + "info"),
(TwitterAvatarExtractor , base + "photo"),
@@ -663,12 +679,12 @@ class TwitterTimelineExtractor(TwitterExtractor):
self.api._user_id_by_screen_name(self.user)
# build search query
- query = "from:{} max_id:{}".format(self._user["name"], tweet_id)
+ query = f"from:{self._user['name']} max_id:{tweet_id}"
if self.retweets:
query += " include:retweets include:nativeretweets"
if state <= 2:
- self._cursor_prefix = "2_{}/".format(tweet_id)
+ self._cursor_prefix = f"2_{tweet_id}/"
if reset:
self._cursor = self._cursor_prefix
@@ -684,7 +700,7 @@ class TwitterTimelineExtractor(TwitterExtractor):
if state <= 3:
# yield unfiltered search results
- self._cursor_prefix = "3_{}/".format(tweet_id)
+ self._cursor_prefix = f"3_{tweet_id}/"
if reset:
self._cursor = self._cursor_prefix
@@ -704,7 +720,7 @@ class TwitterTimelineExtractor(TwitterExtractor):
return self.api.user_media
if strategy == "with_replies":
return self.api.user_tweets_and_replies
- raise exception.StopExtraction("Invalid strategy '%s'", strategy)
+ raise exception.AbortExtraction(f"Invalid strategy '{strategy}'")
class TwitterTweetsExtractor(TwitterExtractor):
@@ -847,7 +863,7 @@ class TwitterHashtagExtractor(TwitterExtractor):
example = "https://x.com/hashtag/NAME"
def items(self):
- url = "{}/search?q=%23{}".format(self.root, self.user)
+ url = f"{self.root}/search?q=%23{self.user}"
data = {"_extractor": TwitterSearchExtractor}
yield Message.Queue, url, data
@@ -898,11 +914,10 @@ class TwitterTweetExtractor(TwitterExtractor):
def __init__(self, match):
TwitterExtractor.__init__(self, match)
- self.tweet_id = match.group(2)
+ self.tweet_id = match[2]
def tweets(self):
- conversations = self.config("conversations")
- if conversations:
+ if conversations := self.config("conversations"):
self._accessible = (conversations == "accessible")
return self._tweets_conversation(self.tweet_id)
@@ -919,8 +934,8 @@ class TwitterTweetExtractor(TwitterExtractor):
try:
self._assign_user(tweet["core"]["user_results"]["result"])
except KeyError:
- raise exception.StopExtraction(
- "'%s'", tweet.get("reason") or "Unavailable")
+ raise exception.AbortExtraction(
+ f"'{tweet.get('reason') or 'Unavailable'}'")
yield tweet
@@ -977,7 +992,7 @@ class TwitterQuotesExtractor(TwitterExtractor):
example = "https://x.com/USER/status/12345/quotes"
def items(self):
- url = "{}/search?q=quoted_tweet_id:{}".format(self.root, self.user)
+ url = f"{self.root}/search?q=quoted_tweet_id:{self.user}"
data = {"_extractor": TwitterSearchExtractor}
yield Message.Queue, url, data
@@ -1055,8 +1070,7 @@ class TwitterImageExtractor(Extractor):
TwitterExtractor._init_sizes(self)
def items(self):
- base = "https://pbs.twimg.com/media/{}?format={}&name=".format(
- self.id, self.fmt)
+ base = f"https://pbs.twimg.com/media/{self.id}?format={self.fmt}&name="
data = {
"filename": self.id,
@@ -1233,7 +1247,7 @@ class TwitterAPI():
raise exception.AuthorizationError("NSFW Tweet")
if reason == "Protected":
raise exception.AuthorizationError("Protected Tweet")
- raise exception.StopExtraction("Tweet unavailable ('%s')", reason)
+ raise exception.AbortExtraction(f"Tweet unavailable ('{reason}')")
return tweet
@@ -1391,7 +1405,7 @@ class TwitterAPI():
("viewer", "communities_timeline", "timeline"))
def live_event_timeline(self, event_id):
- endpoint = "/2/live_event/timeline/{}.json".format(event_id)
+ endpoint = f"/2/live_event/timeline/{event_id}.json"
params = self.params.copy()
params["timeline_id"] = "recap"
params["urt"] = "true"
@@ -1399,7 +1413,7 @@ class TwitterAPI():
return self._pagination_legacy(endpoint, params)
def live_event(self, event_id):
- endpoint = "/1.1/live_event/1/{}/timeline.json".format(event_id)
+ endpoint = f"/1.1/live_event/1/{event_id}/timeline.json"
params = self.params.copy()
params["count"] = "0"
params["urt"] = "true"
@@ -1484,9 +1498,9 @@ class TwitterAPI():
return user["rest_id"]
except KeyError:
if "unavailable_message" in user:
- raise exception.NotFoundError("{} ({})".format(
- user["unavailable_message"].get("text"),
- user.get("reason")), False)
+ raise exception.NotFoundError(
+ f"{user['unavailable_message'].get('text')} "
+ f"({user.get('reason')})", False)
else:
raise exception.NotFoundError("user")
@@ -1543,8 +1557,7 @@ class TwitterAPI():
headers=self.headers, fatal=None)
# update 'x-csrf-token' header (#1170)
- csrf_token = response.cookies.get("ct0")
- if csrf_token:
+ if csrf_token := response.cookies.get("ct0"):
self.headers["x-csrf-token"] = csrf_token
remaining = int(response.headers.get("x-rate-limit-remaining", 6))
@@ -1614,13 +1627,12 @@ class TwitterAPI():
except Exception:
pass
- raise exception.StopExtraction(
- "%s %s (%s)", response.status_code, response.reason, errors)
+ raise exception.AbortExtraction(
+ f"{response.status_code} {response.reason} ({errors})")
def _pagination_legacy(self, endpoint, params):
extr = self.extractor
- cursor = extr._init_cursor()
- if cursor:
+ if cursor := extr._init_cursor():
params["cursor"] = cursor
original_retweets = (extr.retweets == "original")
bottom = ("cursor-bottom-", "sq-cursor-bottom")
@@ -1701,8 +1713,7 @@ class TwitterAPI():
yield tweet
if "quoted_status_id_str" in tweet:
- quoted = tweets.get(tweet["quoted_status_id_str"])
- if quoted:
+ if quoted := tweets.get(tweet["quoted_status_id_str"]):
quoted = quoted.copy()
quoted["author"] = users[quoted["user_id_str"]]
quoted["quoted_by"] = tweet["user"]["screen_name"]
@@ -1722,8 +1733,7 @@ class TwitterAPI():
pinned_tweet = extr.pinned
params = {"variables": None}
- cursor = extr._init_cursor()
- if cursor:
+ if cursor := extr._init_cursor():
variables["cursor"] = cursor
if features is None:
features = self.features_pagination
@@ -1772,8 +1782,7 @@ class TwitterAPI():
except LookupError:
extr.log.debug(data)
- user = extr._user_obj
- if user:
+ if user := extr._user_obj:
user = user["legacy"]
if user.get("blocked_by"):
if self.headers["x-twitter-auth-type"] and \
@@ -1784,14 +1793,12 @@ class TwitterAPI():
extr.log.info("Retrying API request as guest")
continue
raise exception.AuthorizationError(
- "{} blocked your account".format(
- user["screen_name"]))
+ f"{user['screen_name']} blocked your account")
elif user.get("protected"):
raise exception.AuthorizationError(
- "{}'s Tweets are protected".format(
- user["screen_name"]))
+ f"{user['screen_name']}'s Tweets are protected")
- raise exception.StopExtraction(
+ raise exception.AbortExtraction(
"Unable to retrieve Tweets from this timeline")
tweets = []
@@ -1924,8 +1931,7 @@ class TwitterAPI():
def _pagination_users(self, endpoint, variables, path=None):
extr = self.extractor
- cursor = extr._init_cursor()
- if cursor:
+ if cursor := extr._init_cursor():
variables["cursor"] = cursor
params = {
"variables": None,
@@ -1970,7 +1976,7 @@ class TwitterAPI():
def _handle_ratelimit(self, response):
rl = self.extractor.config("ratelimit")
if rl == "abort":
- raise exception.StopExtraction("Rate limit exceeded")
+ raise exception.AbortExtraction("Rate limit exceeded")
elif rl and isinstance(rl, str) and rl.startswith("wait:"):
until = None
seconds = text.parse_float(rl.partition(":")[2]) or 60.0
@@ -2000,8 +2006,7 @@ def _login_impl(extr, username, password):
method="POST", fatal=None)
# update 'x-csrf-token' header (#5945)
- csrf_token = response.cookies.get("ct0")
- if csrf_token:
+ if csrf_token := response.cookies.get("ct0"):
headers["x-csrf-token"] = csrf_token
try:
@@ -2019,7 +2024,7 @@ def _login_impl(extr, username, password):
errors = []
for error in data.get("errors") or ():
msg = error.get("message")
- errors.append('"{}"'.format(msg) if msg else "Unknown error")
+ errors.append(f'"{msg}"' if msg else "Unknown error")
extr.log.debug(response.text)
raise exception.AuthenticationError(", ".join(errors))
@@ -2154,7 +2159,7 @@ def _login_impl(extr, username, password):
raise exception.AuthenticationError(
"No 'auth_token' cookie received")
else:
- raise exception.StopExtraction("Unrecognized subtask %s", subtask)
+ raise exception.AbortExtraction(f"Unrecognized subtask {subtask}")
inputs = {"subtask_id": subtask}
inputs.update(data)
@@ -2163,7 +2168,7 @@ def _login_impl(extr, username, password):
"subtask_inputs": [inputs],
}
- extr.sleep(random.uniform(1.0, 3.0), "login ({})".format(subtask))
+ extr.sleep(random.uniform(1.0, 3.0), f"login ({subtask})")
flow_token, subtask = process(data)
return {