1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
# -*- coding: utf-8 -*-
# Copyright 2021-2023 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for https://www.pillowfort.social/"""
from .common import Extractor, Message
from ..cache import cache
from .. import text, exception
import re
BASE_PATTERN = r"(?:https?://)?www\.pillowfort\.social"
class PillowfortExtractor(Extractor):
"""Base class for pillowfort extractors"""
category = "pillowfort"
root = "https://www.pillowfort.social"
directory_fmt = ("{category}", "{username}")
filename_fmt = ("{post_id} {title|original_post[title]:?/ /}"
"{num:>02}.{extension}")
archive_fmt = "{id}"
cookies_domain = "www.pillowfort.social"
def __init__(self, match):
Extractor.__init__(self, match)
self.item = match.group(1)
def items(self):
self.login()
inline = self.config("inline", True)
reblogs = self.config("reblogs", False)
external = self.config("external", False)
if inline:
inline = re.compile(r'src="(https://img\d+\.pillowfort\.social'
r'/posts/[^"]+)').findall
for post in self.posts():
if "original_post" in post and not reblogs:
continue
files = post.pop("media")
if inline:
for url in inline(post["content"]):
files.append({"url": url})
post["date"] = text.parse_datetime(
post["created_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
post["post_id"] = post.pop("id")
yield Message.Directory, post
post["num"] = 0
for file in files:
url = file["url"] or file.get("b2_lg_url")
if not url:
continue
if file.get("embed_code"):
if not external:
continue
msgtype = Message.Queue
else:
post["num"] += 1
msgtype = Message.Url
post.update(file)
text.nameext_from_url(url, post)
post["hash"], _, post["filename"] = \
post["filename"].partition("_")
if "id" not in file:
post["id"] = post["hash"]
if "created_at" in file:
post["date"] = text.parse_datetime(
file["created_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
yield msgtype, url, post
def login(self):
if self.cookies.get("_Pf_new_session", domain=self.cookies_domain):
return
if self.cookies.get("remember_user_token", domain=self.cookies_domain):
return
username, password = self._get_auth_info()
if username:
self.cookies_update(self._login_impl(username, password))
@cache(maxage=14*86400, keyarg=1)
def _login_impl(self, username, password):
self.log.info("Logging in as %s", username)
url = "https://www.pillowfort.social/users/sign_in"
page = self.request(url).text
auth = text.extr(page, 'name="authenticity_token" value="', '"')
headers = {"Origin": self.root, "Referer": url}
data = {
"utf8" : "✓",
"authenticity_token": auth,
"user[email]" : username,
"user[password]" : password,
"user[remember_me]" : "1",
}
response = self.request(url, method="POST", headers=headers, data=data)
if not response.history:
raise exception.AuthenticationError()
return {
cookie.name: cookie.value
for cookie in response.history[0].cookies
}
class PillowfortPostExtractor(PillowfortExtractor):
"""Extractor for a single pillowfort post"""
subcategory = "post"
pattern = BASE_PATTERN + r"/posts/(\d+)"
example = "https://www.pillowfort.social/posts/12345"
def posts(self):
url = "{}/posts/{}/json/".format(self.root, self.item)
return (self.request(url).json(),)
class PillowfortUserExtractor(PillowfortExtractor):
"""Extractor for all posts of a pillowfort user"""
subcategory = "user"
pattern = BASE_PATTERN + r"/(?!posts/)([^/?#]+(?:/tagged/[^/?#]+)?)"
example = "https://www.pillowfort.social/USER"
def posts(self):
url = "{}/{}/json/".format(self.root, self.item)
params = {"p": 1}
while True:
posts = self.request(url, params=params).json()["posts"]
yield from posts
if len(posts) < 20:
return
params["p"] += 1
|