# -*- coding: utf-8 -*-
# Copyright 2016-2020 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for https://twitter.com/"""
from .common import Extractor, Message
from .. import text, exception
from ..cache import cache, memcache
import json
import re
class TwitterExtractor(Extractor):
"""Base class for twitter extractors"""
category = "twitter"
directory_fmt = ("{category}", "{user[name]}")
filename_fmt = "{tweet_id}_{num}.{extension}"
archive_fmt = "{tweet_id}_{retweet_id}_{num}"
cookiedomain = ".twitter.com"
root = "https://twitter.com"
sizes = (":orig", ":large", ":medium", ":small")
user_agent = ("Mozilla/5.0 (Windows NT 6.1; WOW64; "
"Trident/7.0; rv:11.0) like Gecko")
def __init__(self, match):
Extractor.__init__(self, match)
self.user = match.group(1)
self._user_dict = None
self.logged_in = False
self.retweets = self.config("retweets", True)
self.twitpic = self.config("twitpic", False)
self.content = self.config("content", False)
self.videos = self.config("videos", True)
if self.content:
self._emoji_sub = re.compile(
r']*>').sub
def items(self):
self.login()
metadata = self.metadata()
yield Message.Version, 1
for tweet in self.tweets():
data = self._data_from_tweet(tweet)
if not data or not self.retweets and data["retweet_id"]:
continue
data.update(metadata)
if self.videos and "-videoContainer" in tweet:
yield Message.Directory, data
if self.videos == "ytdl":
data["extension"] = None
url = "ytdl:{}/i/web/status/{}".format(
self.root, data["tweet_id"])
else:
url = self._video_from_tweet(data["tweet_id"])
if not url:
continue
ext = text.ext_from_url(url)
if ext == "m3u8":
url = "ytdl:" + url
data["extension"] = "mp4"
data["_ytdl_extra"] = {"protocol": "m3u8_native"}
else:
data["extension"] = ext
data["num"] = 1
yield Message.Url, url, data
elif "data-image-url=" in tweet:
yield Message.Directory, data
images = text.extract_iter(
tweet, 'data-image-url="', '"')
for data["num"], url in enumerate(images, 1):
text.nameext_from_url(url, data)
urls = [url + size for size in self.sizes]
yield Message.Urllist, urls, data
if self.twitpic and "//twitpic.com/" in tweet:
urls = [
url for url in text.extract_iter(
tweet, 'data-expanded-url="', '"')
if "//twitpic.com/" in url
]
if "num" not in data:
if urls:
yield Message.Directory, data
data["num"] = 0
for data["num"], url in enumerate(urls, data["num"]+1):
response = self.request(url, fatal=False)
if response.status_code >= 400:
continue
url = text.extract(
response.text, 'name="twitter:image" value="', '"')[0]
yield Message.Url, url, text.nameext_from_url(url, data)
def metadata(self):
"""Return general metadata"""
return {}
def tweets(self):
"""Yield HTML content of all relevant tweets"""
def login(self):
username, password = self._get_auth_info()
if username:
self._update_cookies(self._login_impl(username, password))
self.logged_in = True
@cache(maxage=360*24*3600, keyarg=1)
def _login_impl(self, username, password):
self.log.info("Logging in as %s", username)
headers = {"User-Agent": self.user_agent}
page = self.request(self.root + "/login", headers=headers).text
pos = page.index('name="authenticity_token"')
token = text.extract(page, 'value="', '"', pos-80)[0]
url = self.root + "/sessions"
data = {
"session[username_or_email]": username,
"session[password]" : password,
"authenticity_token" : token,
"ui_metrics" : '{"rf":{},"s":""}',
"scribe_log" : "",
"redirect_after_login" : "",
"remember_me" : "1",
}
response = self.request(url, method="POST", headers=headers, data=data)
if "/error" in response.url:
raise exception.AuthenticationError()
return {
cookie.name: cookie.value
for cookie in self.session.cookies
if cookie.domain and "twitter.com" in cookie.domain
}
def _data_from_tweet(self, tweet):
extr = text.extract_from(tweet)
data = {
"tweet_id" : text.parse_int(extr('data-tweet-id="' , '"')),
"retweet_id": text.parse_int(extr('data-retweet-id="', '"')),
"retweeter" : extr('data-retweeter="' , '"'),
"author" : {
"name" : extr('data-screen-name="', '"'),
"nick" : text.unescape(extr('data-name="' , '"')),
"id" : text.parse_int(extr('data-user-id="' , '"')),
},
}
if not self._user_dict:
if data["retweet_id"]:
for user in json.loads(text.unescape(extr(
'data-reply-to-users-json="', '"'))):
if user["screen_name"] == data["retweeter"]:
break
else:
self.log.warning("Unable to extract user info")
return None
self._user_dict = {
"name": user["screen_name"],
"nick": text.unescape(user["name"]),
"id" : text.parse_int(user["id_str"]),
}
else:
self._user_dict = data["author"]
data["user"] = self._user_dict
data["date"] = text.parse_timestamp(extr('data-time="', '"'))
if self.content:
content = extr('