summaryrefslogtreecommitdiffstats
path: root/gallery_dl/extractor/twitter.py
diff options
context:
space:
mode:
Diffstat (limited to 'gallery_dl/extractor/twitter.py')
-rw-r--r--gallery_dl/extractor/twitter.py321
1 files changed, 180 insertions, 141 deletions
diff --git a/gallery_dl/extractor/twitter.py b/gallery_dl/extractor/twitter.py
index a5bd984..ff77828 100644
--- a/gallery_dl/extractor/twitter.py
+++ b/gallery_dl/extractor/twitter.py
@@ -6,17 +6,18 @@
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
-"""Extractors for https://twitter.com/"""
+"""Extractors for https://x.com/"""
from .common import Extractor, Message
from .. import text, util, exception
from ..cache import cache, memcache
import itertools
+import random
import json
import re
BASE_PATTERN = (r"(?:https?://)?(?:www\.|mobile\.)?"
- r"(?:(?:[fv]x)?twitter|(?:fixup)?x)\.com")
+ r"(?:(?:[fv]x)?twitter|(?:fix(?:up|v))?x)\.com")
class TwitterExtractor(Extractor):
@@ -25,9 +26,9 @@ class TwitterExtractor(Extractor):
directory_fmt = ("{category}", "{user[name]}")
filename_fmt = "{tweet_id}_{num}.{extension}"
archive_fmt = "{tweet_id}_{retweet_id}_{num}"
- cookies_domain = ".twitter.com"
+ cookies_domain = ".x.com"
cookies_names = ("auth_token",)
- root = "https://twitter.com"
+ root = "https://x.com"
browser = "firefox"
def __init__(self, match):
@@ -243,8 +244,8 @@ class TwitterExtractor(Extractor):
# collect URLs from entities
for url in tweet["entities"].get("urls") or ():
- url = url["expanded_url"]
- if "//twitpic.com/" not in url or "/photos/" in url:
+ url = url.get("expanded_url") or url.get("url") or ""
+ if not url or "//twitpic.com/" not in url or "/photos/" in url:
continue
if url.startswith("http:"):
url = "https" + url[4:]
@@ -336,12 +337,20 @@ class TwitterExtractor(Extractor):
urls = entities.get("urls")
if urls:
for url in urls:
- content = content.replace(url["url"], url["expanded_url"])
+ try:
+ content = content.replace(url["url"], url["expanded_url"])
+ except KeyError:
+ pass
txt, _, tco = content.rpartition(" ")
tdata["content"] = txt if tco.startswith("https://t.co/") else content
if "birdwatch_pivot" in tweet:
- tdata["birdwatch"] = tweet["birdwatch_pivot"]["subtitle"]["text"]
+ try:
+ tdata["birdwatch"] = \
+ tweet["birdwatch_pivot"]["subtitle"]["text"]
+ except KeyError:
+ self.log.debug("Unable to extract 'birdwatch' note from %s",
+ tweet["birdwatch_pivot"])
if "in_reply_to_screen_name" in legacy:
tdata["reply_to"] = legacy["in_reply_to_screen_name"]
if "quoted_by" in legacy:
@@ -398,7 +407,10 @@ class TwitterExtractor(Extractor):
urls = entities["description"].get("urls")
if urls:
for url in urls:
- descr = descr.replace(url["url"], url["expanded_url"])
+ try:
+ descr = descr.replace(url["url"], url["expanded_url"])
+ except KeyError:
+ pass
udata["description"] = descr
if "url" in entities:
@@ -483,7 +495,13 @@ class TwitterExtractor(Extractor):
username, password = self._get_auth_info()
if username:
- self.cookies_update(_login_impl(self, username, password))
+ return self.cookies_update(_login_impl(self, username, password))
+
+ for cookie in self.cookies:
+ if cookie.domain == ".twitter.com":
+ self.cookies.set(
+ cookie.name, cookie.value, domain=self.cookies_domain,
+ expires=cookie.expires, secure=cookie.secure)
class TwitterUserExtractor(TwitterExtractor):
@@ -491,7 +509,7 @@ class TwitterUserExtractor(TwitterExtractor):
subcategory = "user"
pattern = (BASE_PATTERN + r"/(?!search)(?:([^/?#]+)/?(?:$|[?#])"
r"|i(?:/user/|ntent/user\?user_id=)(\d+))")
- example = "https://twitter.com/USER"
+ example = "https://x.com/USER"
def __init__(self, match):
TwitterExtractor.__init__(self, match)
@@ -519,7 +537,7 @@ class TwitterTimelineExtractor(TwitterExtractor):
"""Extractor for a Twitter user timeline"""
subcategory = "timeline"
pattern = BASE_PATTERN + r"/(?!search)([^/?#]+)/timeline(?!\w)"
- example = "https://twitter.com/USER/timeline"
+ example = "https://x.com/USER/timeline"
def tweets(self):
# yield initial batch of (media) tweets
@@ -566,7 +584,7 @@ class TwitterTweetsExtractor(TwitterExtractor):
"""Extractor for Tweets from a user's Tweets timeline"""
subcategory = "tweets"
pattern = BASE_PATTERN + r"/(?!search)([^/?#]+)/tweets(?!\w)"
- example = "https://twitter.com/USER/tweets"
+ example = "https://x.com/USER/tweets"
def tweets(self):
return self.api.user_tweets(self.user)
@@ -576,7 +594,7 @@ class TwitterRepliesExtractor(TwitterExtractor):
"""Extractor for Tweets from a user's timeline including replies"""
subcategory = "replies"
pattern = BASE_PATTERN + r"/(?!search)([^/?#]+)/with_replies(?!\w)"
- example = "https://twitter.com/USER/with_replies"
+ example = "https://x.com/USER/with_replies"
def tweets(self):
return self.api.user_tweets_and_replies(self.user)
@@ -586,7 +604,7 @@ class TwitterMediaExtractor(TwitterExtractor):
"""Extractor for Tweets from a user's Media timeline"""
subcategory = "media"
pattern = BASE_PATTERN + r"/(?!search)([^/?#]+)/media(?!\w)"
- example = "https://twitter.com/USER/media"
+ example = "https://x.com/USER/media"
def tweets(self):
return self.api.user_media(self.user)
@@ -596,7 +614,7 @@ class TwitterLikesExtractor(TwitterExtractor):
"""Extractor for liked tweets"""
subcategory = "likes"
pattern = BASE_PATTERN + r"/(?!search)([^/?#]+)/likes(?!\w)"
- example = "https://twitter.com/USER/likes"
+ example = "https://x.com/USER/likes"
def metadata(self):
return {"user_likes": self.user}
@@ -609,7 +627,7 @@ class TwitterBookmarkExtractor(TwitterExtractor):
"""Extractor for bookmarked tweets"""
subcategory = "bookmark"
pattern = BASE_PATTERN + r"/i/bookmarks()"
- example = "https://twitter.com/i/bookmarks"
+ example = "https://x.com/i/bookmarks"
def tweets(self):
return self.api.user_bookmarks()
@@ -625,7 +643,7 @@ class TwitterListExtractor(TwitterExtractor):
"""Extractor for Twitter lists"""
subcategory = "list"
pattern = BASE_PATTERN + r"/i/lists/(\d+)/?$"
- example = "https://twitter.com/i/lists/12345"
+ example = "https://x.com/i/lists/12345"
def tweets(self):
return self.api.list_latest_tweets_timeline(self.user)
@@ -635,7 +653,7 @@ class TwitterListMembersExtractor(TwitterExtractor):
"""Extractor for members of a Twitter list"""
subcategory = "list-members"
pattern = BASE_PATTERN + r"/i/lists/(\d+)/members"
- example = "https://twitter.com/i/lists/12345/members"
+ example = "https://x.com/i/lists/12345/members"
def items(self):
self.login()
@@ -646,7 +664,7 @@ class TwitterFollowingExtractor(TwitterExtractor):
"""Extractor for followed users"""
subcategory = "following"
pattern = BASE_PATTERN + r"/(?!search)([^/?#]+)/following(?!\w)"
- example = "https://twitter.com/USER/following"
+ example = "https://x.com/USER/following"
def items(self):
self.login()
@@ -657,7 +675,7 @@ class TwitterSearchExtractor(TwitterExtractor):
"""Extractor for Twitter search results"""
subcategory = "search"
pattern = BASE_PATTERN + r"/search/?\?(?:[^&#]+&)*q=([^&#]+)"
- example = "https://twitter.com/search?q=QUERY"
+ example = "https://x.com/search?q=QUERY"
def metadata(self):
return {"search": text.unquote(self.user)}
@@ -688,7 +706,7 @@ class TwitterHashtagExtractor(TwitterExtractor):
"""Extractor for Twitter hashtags"""
subcategory = "hashtag"
pattern = BASE_PATTERN + r"/hashtag/([^/?#]+)"
- example = "https://twitter.com/hashtag/NAME"
+ example = "https://x.com/hashtag/NAME"
def items(self):
url = "{}/search?q=%23{}".format(self.root, self.user)
@@ -700,7 +718,7 @@ class TwitterCommunityExtractor(TwitterExtractor):
"""Extractor for a Twitter community"""
subcategory = "community"
pattern = BASE_PATTERN + r"/i/communities/(\d+)"
- example = "https://twitter.com/i/communities/12345"
+ example = "https://x.com/i/communities/12345"
def tweets(self):
if self.textonly:
@@ -712,7 +730,7 @@ class TwitterCommunitiesExtractor(TwitterExtractor):
"""Extractor for followed Twitter communities"""
subcategory = "communities"
pattern = BASE_PATTERN + r"/([^/?#]+)/communities/?$"
- example = "https://twitter.com/i/communities"
+ example = "https://x.com/i/communities"
def tweets(self):
return self.api.communities_main_page_timeline(self.user)
@@ -724,7 +742,7 @@ class TwitterEventExtractor(TwitterExtractor):
directory_fmt = ("{category}", "Events",
"{event[id]} {event[short_title]}")
pattern = BASE_PATTERN + r"/i/events/(\d+)"
- example = "https://twitter.com/i/events/12345"
+ example = "https://x.com/i/events/12345"
def metadata(self):
return {"event": self.api.live_event(self.user)}
@@ -736,8 +754,9 @@ class TwitterEventExtractor(TwitterExtractor):
class TwitterTweetExtractor(TwitterExtractor):
"""Extractor for individual tweets"""
subcategory = "tweet"
- pattern = BASE_PATTERN + r"/([^/?#]+|i/web)/status/(\d+)/?$"
- example = "https://twitter.com/USER/status/12345"
+ pattern = (BASE_PATTERN + r"/([^/?#]+|i/web)/status/(\d+)"
+ r"/?(?:$|\?|#|photo/|video/)")
+ example = "https://x.com/USER/status/12345"
def __init__(self, match):
TwitterExtractor.__init__(self, match)
@@ -817,7 +836,7 @@ class TwitterQuotesExtractor(TwitterExtractor):
"""Extractor for quotes of a Tweet"""
subcategory = "quotes"
pattern = BASE_PATTERN + r"/(?:[^/?#]+|i/web)/status/(\d+)/quotes"
- example = "https://twitter.com/USER/status/12345/quotes"
+ example = "https://x.com/USER/status/12345/quotes"
def items(self):
url = "{}/search?q=quoted_tweet_id:{}".format(self.root, self.user)
@@ -830,7 +849,7 @@ class TwitterAvatarExtractor(TwitterExtractor):
filename_fmt = "avatar {date}.{extension}"
archive_fmt = "AV_{user[id]}_{date}"
pattern = BASE_PATTERN + r"/(?!search)([^/?#]+)/photo"
- example = "https://twitter.com/USER/photo"
+ example = "https://x.com/USER/photo"
def tweets(self):
self.api._user_id_by_screen_name(self.user)
@@ -852,7 +871,7 @@ class TwitterBackgroundExtractor(TwitterExtractor):
filename_fmt = "background {date}.{extension}"
archive_fmt = "BG_{user[id]}_{date}"
pattern = BASE_PATTERN + r"/(?!search)([^/?#]+)/header_photo"
- example = "https://twitter.com/USER/header_photo"
+ example = "https://x.com/USER/header_photo"
def tweets(self):
self.api._user_id_by_screen_name(self.user)
@@ -899,7 +918,7 @@ class TwitterAPI():
self.extractor = extractor
self.log = extractor.log
- self.root = "https://twitter.com/i/api"
+ self.root = "https://x.com/i/api"
self._nsfw_warning = True
self._json_dumps = json.JSONEncoder(separators=(",", ":")).encode
@@ -919,7 +938,7 @@ class TwitterAPI():
self.headers = {
"Accept": "*/*",
- "Referer": "https://twitter.com/",
+ "Referer": extractor.root + "/",
"content-type": "application/json",
"x-guest-token": None,
"x-twitter-auth-type": "OAuth2Session" if auth_token else None,
@@ -1262,7 +1281,7 @@ class TwitterAPI():
endpoint = "/1.1/guest/activate.json"
self.log.info("Requesting guest token")
return str(self._call(
- endpoint, None, "POST", False, "https://api.twitter.com",
+ endpoint, None, "POST", False, "https://api.x.com",
)["guest_token"])
def _authenticate_guest(self):
@@ -1288,63 +1307,72 @@ class TwitterAPI():
if csrf_token:
self.headers["x-csrf-token"] = csrf_token
- if response.status_code < 400:
+ remaining = int(response.headers.get("x-rate-limit-remaining", 6))
+ if remaining < 6 and remaining <= random.randrange(1, 6):
+ self._handle_ratelimit(response)
+ continue
+
+ try:
data = response.json()
+ except ValueError:
+ data = {"errors": ({"message": response.text},)}
+
+ errors = data.get("errors")
+ if not errors:
+ return data
+
+ retry = False
+ for error in errors:
+ msg = error.get("message") or "Unspecified"
+ self.log.debug("API error: '%s'", msg)
+
+ if "this account is temporarily locked" in msg:
+ msg = "Account temporarily locked"
+ if self.extractor.config("locked") != "wait":
+ raise exception.AuthorizationError(msg)
+ self.log.warning(msg)
+ self.extractor.input("Press ENTER to retry.")
+ retry = True
+
+ elif "Could not authenticate you" in msg:
+ if not self.extractor.config("relogin", True):
+ continue
- errors = data.get("errors")
- if not errors:
- return data
+ username, password = self.extractor._get_auth_info()
+ if not username:
+ continue
- retry = False
- for error in errors:
- msg = error.get("message") or "Unspecified"
- self.log.debug("API error: '%s'", msg)
+ _login_impl.invalidate(username)
+ self.extractor.cookies_update(
+ _login_impl(self.extractor, username, password))
+ self.__init__(self.extractor)
+ retry = True
- if "this account is temporarily locked" in msg:
- msg = "Account temporarily locked"
- if self.extractor.config("locked") != "wait":
- raise exception.AuthorizationError(msg)
- self.log.warning("%s. Press ENTER to retry.", msg)
- try:
- input()
- except (EOFError, OSError):
- pass
- retry = True
-
- elif msg.lower().startswith("timeout"):
- retry = True
+ elif msg.lower().startswith("timeout"):
+ retry = True
- if not retry:
- return data
- elif self.headers["x-twitter-auth-type"]:
+ if retry:
+ if self.headers["x-twitter-auth-type"]:
self.log.debug("Retrying API request")
continue
+ else:
+ # fall through to "Login Required"
+ response.status_code = 404
- # fall through to "Login Required"
- response.status_code = 404
-
- if response.status_code == 429:
- # rate limit exceeded
- if self.extractor.config("ratelimit") == "abort":
- raise exception.StopExtraction("Rate limit exceeded")
-
- until = response.headers.get("x-rate-limit-reset")
- seconds = None if until else 60
- self.extractor.wait(until=until, seconds=seconds)
- continue
-
- if response.status_code in (403, 404) and \
+ if response.status_code < 400:
+ return data
+ elif response.status_code in (403, 404) and \
not self.headers["x-twitter-auth-type"]:
raise exception.AuthorizationError("Login required")
+ elif response.status_code == 429:
+ self._handle_ratelimit(response)
+ continue
# error
try:
- data = response.json()
- errors = ", ".join(e["message"] for e in data["errors"])
- except ValueError:
- errors = response.text
+ errors = ", ".join(e["message"] for e in errors)
except Exception:
- errors = data.get("errors", "")
+ pass
raise exception.StopExtraction(
"%s %s (%s)", response.status_code, response.reason, errors)
@@ -1680,6 +1708,13 @@ class TwitterAPI():
return
variables["cursor"] = cursor
+ def _handle_ratelimit(self, response):
+ if self.extractor.config("ratelimit") == "abort":
+ raise exception.StopExtraction("Rate limit exceeded")
+
+ until = response.headers.get("x-rate-limit-reset")
+ self.extractor.wait(until=until, seconds=None if until else 60)
+
def _process_tombstone(self, entry, tombstone):
text = (tombstone.get("richText") or tombstone["text"])["text"]
tweet_id = entry["entryId"].rpartition("-")[2]
@@ -1695,22 +1730,22 @@ class TwitterAPI():
@cache(maxage=365*86400, keyarg=1)
def _login_impl(extr, username, password):
- import re
- import random
+ def process(data, params=None):
+ response = extr.request(
+ url, params=params, headers=headers, json=data,
+ method="POST", fatal=None)
- if re.fullmatch(r"[\w.%+-]+@[\w.-]+\.\w{2,}", username):
- extr.log.warning(
- "Login with email is no longer possible. "
- "You need to provide your username or phone number instead.")
-
- def process(response):
try:
data = response.json()
except ValueError:
data = {"errors": ({"message": "Invalid response"},)}
else:
if response.status_code < 400:
- return data["flow_token"]
+ try:
+ return (data["flow_token"],
+ data["subtasks"][0]["subtask_id"])
+ except LookupError:
+ pass
errors = []
for error in data.get("errors") or ():
@@ -1719,9 +1754,13 @@ def _login_impl(extr, username, password):
extr.log.debug(response.text)
raise exception.AuthenticationError(", ".join(errors))
- extr.cookies.clear()
+ cookies = extr.cookies
+ cookies.clear()
api = TwitterAPI(extr)
api._authenticate_guest()
+
+ url = "https://api.x.com/1.1/onboarding/task.json"
+ params = {"flow_name": "login"}
headers = api.headers
extr.log.info("Logging in as %s", username)
@@ -1778,31 +1817,18 @@ def _login_impl(extr, username, password):
"web_modal": 1,
},
}
- url = "https://api.twitter.com/1.1/onboarding/task.json?flow_name=login"
- response = extr.request(url, method="POST", headers=headers, json=data)
- data = {
- "flow_token": process(response),
- "subtask_inputs": [
- {
- "subtask_id": "LoginJsInstrumentationSubtask",
+ flow_token, subtask = process(data, params)
+ while not cookies.get("auth_token"):
+ if subtask == "LoginJsInstrumentationSubtask":
+ data = {
"js_instrumentation": {
"response": "{}",
"link": "next_link",
},
- },
- ],
- }
- url = "https://api.twitter.com/1.1/onboarding/task.json"
- response = extr.request(
- url, method="POST", headers=headers, json=data, fatal=None)
-
- # username
- data = {
- "flow_token": process(response),
- "subtask_inputs": [
- {
- "subtask_id": "LoginEnterUserIdentifierSSO",
+ }
+ elif subtask == "LoginEnterUserIdentifierSSO":
+ data = {
"settings_list": {
"setting_responses": [
{
@@ -1814,48 +1840,61 @@ def _login_impl(extr, username, password):
],
"link": "next_link",
},
- },
- ],
- }
- # url = "https://api.twitter.com/1.1/onboarding/task.json"
- extr.sleep(random.uniform(2.0, 4.0), "login (username)")
- response = extr.request(
- url, method="POST", headers=headers, json=data, fatal=None)
-
- # password
- data = {
- "flow_token": process(response),
- "subtask_inputs": [
- {
- "subtask_id": "LoginEnterPassword",
+ }
+ elif subtask == "LoginEnterPassword":
+ data = {
"enter_password": {
"password": password,
"link": "next_link",
},
- },
- ],
- }
- # url = "https://api.twitter.com/1.1/onboarding/task.json"
- extr.sleep(random.uniform(2.0, 4.0), "login (password)")
- response = extr.request(
- url, method="POST", headers=headers, json=data, fatal=None)
-
- # account duplication check ?
- data = {
- "flow_token": process(response),
- "subtask_inputs": [
- {
- "subtask_id": "AccountDuplicationCheck",
+ }
+ elif subtask == "LoginEnterAlternateIdentifierSubtask":
+ alt = extr.input(
+ "Alternate Identifier (username, email, phone number): ")
+ data = {
+ "enter_text": {
+ "text": alt,
+ "link": "next_link",
+ },
+ }
+ elif subtask == "LoginTwoFactorAuthChallenge":
+ data = {
+ "enter_text": {
+ "text": extr.input("2FA Token: "),
+ "link": "next_link",
+ },
+ }
+ elif subtask == "LoginAcid":
+ data = {
+ "enter_text": {
+ "text": extr.input("Email Verification Code: "),
+ "link": "next_link",
+ },
+ }
+ elif subtask == "AccountDuplicationCheck":
+ data = {
"check_logged_in_account": {
"link": "AccountDuplicationCheck_false",
},
- },
- ],
- }
- # url = "https://api.twitter.com/1.1/onboarding/task.json"
- response = extr.request(
- url, method="POST", headers=headers, json=data, fatal=None)
- process(response)
+ }
+ elif subtask == "ArkoseLogin":
+ raise exception.AuthenticationError("Login requires CAPTCHA")
+ elif subtask == "DenyLoginSubtask":
+ raise exception.AuthenticationError("Login rejected as suspicious")
+ elif subtask == "ArkoseLogin":
+ raise exception.AuthenticationError("No auth token cookie")
+ else:
+ raise exception.StopExtraction("Unrecognized subtask %s", subtask)
+
+ inputs = {"subtask_id": subtask}
+ inputs.update(data)
+ data = {
+ "flow_token": flow_token,
+ "subtask_inputs": [inputs],
+ }
+
+ extr.sleep(random.uniform(1.0, 3.0), "login ({})".format(subtask))
+ flow_token, subtask = process(data)
return {
cookie.name: cookie.value