# -*- coding: utf-8 -*-
# Copyright 2015-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for https://www.deviantart.com/"""
from .common import Extractor, Message, Dispatch
from .. import text, util, exception
from ..cache import cache, memcache
import collections
import mimetypes
import binascii
import time
BASE_PATTERN = (
r"(?:https?://)?(?:"
r"(?:www\.)?(?:fx)?deviantart\.com/(?!watch/)([\w-]+)|"
r"(?!www\.)([\w-]+)\.(?:fx)?deviantart\.com)"
)
DEFAULT_AVATAR = "https://a.deviantart.net/avatars/default.gif"
class DeviantartExtractor(Extractor):
"""Base class for deviantart extractors"""
category = "deviantart"
root = "https://www.deviantart.com"
directory_fmt = ("{category}", "{username}")
filename_fmt = "{category}_{index}_{title}.{extension}"
cookies_domain = ".deviantart.com"
cookies_names = ("auth", "auth_secure", "userinfo")
_last_request = 0
def __init__(self, match):
Extractor.__init__(self, match)
self.user = (match[1] or match[2] or "").lower()
self.offset = 0
def _init(self):
self.jwt = self.config("jwt", False)
self.flat = self.config("flat", True)
self.extra = self.config("extra", False)
self.quality = self.config("quality", "100")
self.original = self.config("original", True)
self.previews = self.config("previews", False)
self.intermediary = self.config("intermediary", True)
self.comments_avatars = self.config("comments-avatars", False)
self.comments = self.comments_avatars or self.config("comments", False)
self.api = DeviantartOAuthAPI(self)
self.eclipse_api = None
self.group = False
self._premium_cache = {}
if self.config("auto-unwatch"):
self.unwatch = []
self.finalize = self._unwatch_premium
else:
self.unwatch = None
if self.quality:
if self.quality == "png":
self.quality = "-fullview.png?"
self.quality_sub = util.re(r"-fullview\.[a-z0-9]+\?").sub
else:
self.quality = f",q_{self.quality}"
self.quality_sub = util.re(r",q_\d+").sub
if self.intermediary:
self.intermediary_subn = util.re(r"(/f/[^/]+/[^/]+)/v\d+/.*").subn
if isinstance(self.original, str) and \
self.original.lower().startswith("image"):
self.original = True
self._update_content = self._update_content_image
else:
self._update_content = self._update_content_default
if self.previews == "all":
self.previews_images = self.previews = True
else:
self.previews_images = False
journals = self.config("journals", "html")
if journals == "html":
self.commit_journal = self._commit_journal_html
elif journals == "text":
self.commit_journal = self._commit_journal_text
else:
self.commit_journal = None
def request(self, url, **kwargs):
if "fatal" not in kwargs:
kwargs["fatal"] = False
while True:
response = Extractor.request(self, url, **kwargs)
if response.status_code != 403 or \
b"Request blocked." not in response.content:
return response
self.wait(seconds=300, reason="CloudFront block")
def skip(self, num):
self.offset += num
return num
def login(self):
if self.cookies_check(self.cookies_names):
return True
username, password = self._get_auth_info()
if username:
self.cookies_update(_login_impl(self, username, password))
return True
def items(self):
if self.user:
if group := self.config("group", True):
if user := _user_details(self, self.user):
self.user = user["username"]
self.group = False
elif group == "skip":
self.log.info("Skipping group '%s'", self.user)
raise exception.AbortExtraction()
else:
self.subcategory = "group-" + self.subcategory
self.group = True
for deviation in self.deviations():
if isinstance(deviation, tuple):
url, data = deviation
yield Message.Queue, url, data
continue
if deviation["is_deleted"]:
# prevent crashing in case the deviation really is
# deleted
self.log.debug(
"Skipping %s (deleted)", deviation["deviationid"])
continue
tier_access = deviation.get("tier_access")
if tier_access == "locked":
self.log.debug(
"Skipping %s (access locked)", deviation["deviationid"])
continue
if "premium_folder_data" in deviation:
data = self._fetch_premium(deviation)
if not data:
continue
deviation.update(data)
self.prepare(deviation)
yield Message.Directory, deviation
if "content" in deviation:
content = self._extract_content(deviation)
yield self.commit(deviation, content)
elif deviation["is_downloadable"]:
content = self.api.deviation_download(deviation["deviationid"])
deviation["is_original"] = True
yield self.commit(deviation, content)
if "videos" in deviation and deviation["videos"]:
video = max(deviation["videos"],
key=lambda x: text.parse_int(x["quality"][:-1]))
deviation["is_original"] = False
yield self.commit(deviation, video)
if "flash" in deviation:
deviation["is_original"] = True
yield self.commit(deviation, deviation["flash"])
if self.commit_journal:
if journal := self._extract_journal(deviation):
if self.extra:
deviation["_journal"] = journal["html"]
deviation["is_original"] = True
yield self.commit_journal(deviation, journal)
if self.comments_avatars:
for comment in deviation["comments"]:
user = comment["user"]
name = user["username"].lower()
if user["usericon"] == DEFAULT_AVATAR:
self.log.debug(
"Skipping avatar of '%s' (default)", name)
continue
_user_details.update(name, user)
url = f"{self.root}/{name}/avatar/"
comment["_extractor"] = DeviantartAvatarExtractor
yield Message.Queue, url, comment
if self.previews and "preview" in deviation:
preview = deviation["preview"]
deviation["is_preview"] = True
if self.previews_images:
yield self.commit(deviation, preview)
else:
mtype = mimetypes.guess_type(
"a." + deviation["extension"], False)[0]
if mtype and not mtype.startswith("image/"):
yield self.commit(deviation, preview)
del deviation["is_preview"]
if not self.extra:
continue
# ref: https://www.deviantart.com
# /developers/http/v1/20210526/object/editor_text
# the value of "features" is a JSON string with forward
# slashes escaped
text_content = \
deviation["text_content"]["body"]["features"].replace(
"\\/", "/") if "text_content" in deviation else None
for txt in (text_content, deviation.get("description"),
deviation.get("_journal")):
if txt is None:
continue
for match in DeviantartStashExtractor.pattern.finditer(txt):
url = text.ensure_http_scheme(match[0])
deviation["_extractor"] = DeviantartStashExtractor
yield Message.Queue, url, deviation
def deviations(self):
"""Return an iterable containing all relevant Deviation-objects"""
def prepare(self, deviation):
"""Adjust the contents of a Deviation-object"""
if "index" not in deviation:
try:
if deviation["url"].startswith((
"https://www.deviantart.com/stash/", "https://sta.sh",
)):
filename = deviation["content"]["src"].split("/")[5]
deviation["index_base36"] = filename.partition("-")[0][1:]
deviation["index"] = id_from_base36(
deviation["index_base36"])
else:
deviation["index"] = text.parse_int(
deviation["url"].rpartition("-")[2])
except KeyError:
deviation["index"] = 0
deviation["index_base36"] = "0"
if "index_base36" not in deviation:
deviation["index_base36"] = base36_from_id(deviation["index"])
if self.user:
deviation["username"] = self.user
deviation["_username"] = self.user.lower()
else:
deviation["username"] = deviation["author"]["username"]
deviation["_username"] = deviation["username"].lower()
deviation["published_time"] = text.parse_int(
deviation["published_time"])
deviation["date"] = text.parse_timestamp(
deviation["published_time"])
if self.comments:
deviation["comments"] = (
self._extract_comments(deviation["deviationid"], "deviation")
if deviation["stats"]["comments"] else ()
)
# filename metadata
sub = util.re(r"\W").sub
deviation["filename"] = "".join((
sub("_", deviation["title"].lower()), "_by_",
sub("_", deviation["author"]["username"].lower()), "-d",
deviation["index_base36"],
))
def commit(self, deviation, target):
url = target["src"]
name = target.get("filename") or url
target = target.copy()
target["filename"] = deviation["filename"]
deviation["target"] = target
deviation["extension"] = target["extension"] = text.ext_from_url(name)
if "is_original" not in deviation:
deviation["is_original"] = ("/v1/" not in url)
return Message.Url, url, deviation
def _commit_journal_html(self, deviation, journal):
title = text.escape(deviation["title"])
url = deviation["url"]
thumbs = deviation.get("thumbs") or deviation.get("files")
html = journal["html"]
shadow = SHADOW_TEMPLATE.format_map(thumbs[0]) if thumbs else ""
if not html:
self.log.warning("%s: Empty journal content", deviation["index"])
if "css" in journal:
css, cls = journal["css"], "withskin"
elif html.startswith("")[2]
head, _, tail = html.rpartition("")
)
txt = JOURNAL_TEMPLATE_TEXT.format(
title=deviation["title"],
username=deviation["author"]["username"],
date=deviation["date"],
content=content,
)
deviation["extension"] = "txt"
return Message.Url, txt, deviation
def _extract_journal(self, deviation):
if "excerpt" in deviation:
# # empty 'html'
# return self.api.deviation_content(deviation["deviationid"])
if "_page" in deviation:
page = deviation["_page"]
del deviation["_page"]
else:
page = self._limited_request(deviation["url"]).text
# extract journal html from webpage
html = text.extr(
page,
"
Literature Text
",
"
")
if html:
return {"html": html}
self.log.debug("%s: Failed to extract journal HTML from webpage. "
"Falling back to __INITIAL_STATE__ markup.",
deviation["index"])
# parse __INITIAL_STATE__ as fallback
state = util.json_loads(text.extr(
page, 'window.__INITIAL_STATE__ = JSON.parse("', '");')
.replace("\\\\", "\\").replace("\\'", "'").replace('\\"', '"'))
deviations = state["@@entities"]["deviation"]
content = deviations.popitem()[1]["textContent"]
if html := self._textcontent_to_html(deviation, content):
return {"html": html}
return {"html": content["excerpt"].replace("\n", " ")}
if "body" in deviation:
return {"html": deviation.pop("body")}
return None
def _textcontent_to_html(self, deviation, content):
html = content["html"]
markup = html.get("markup")
if not markup or markup[0] != "{":
return markup
if html["type"] == "tiptap":
try:
return self._tiptap_to_html(markup)
except Exception as exc:
self.log.debug("", exc_info=exc)
self.log.error("%s: '%s: %s'", deviation["index"],
exc.__class__.__name__, exc)
self.log.warning("%s: Unsupported '%s' markup.",
deviation["index"], html["type"])
def _tiptap_to_html(self, markup):
html = []
html.append('
')
data = util.json_loads(markup)
for block in data["document"]["content"]:
self._tiptap_process_content(html, block)
html.append("
")
return "".join(html)
def _tiptap_process_content(self, html, content):
type = content["type"]
if type == "paragraph":
if children := content.get("content"):
html.append('
')
for block in children:
self._tiptap_process_content(html, block)
html.append("
")
else:
html.append('
')
elif type == "text":
self._tiptap_process_text(html, content)
elif type == "heading":
attrs = content["attrs"]
level = str(attrs.get("level") or "3")
html.append("')
html.append('')
self._tiptap_process_children(html, content)
html.append("")
elif type in ("listItem", "bulletList", "orderedList", "blockquote"):
c = type[1]
tag = (
"li" if c == "i" else
"ul" if c == "u" else
"ol" if c == "r" else
"blockquote"
)
html.append("<" + tag + ">")
self._tiptap_process_children(html, content)
html.append("" + tag + ">")
elif type == "anchor":
attrs = content["attrs"]
html.append('')
elif type == "hardBreak":
html.append("
")
elif type == "horizontalRule":
html.append("")
elif type == "da-deviation":
self._tiptap_process_deviation(html, content)
elif type == "da-mention":
user = content["attrs"]["user"]["username"]
html.append('@')
html.append(user)
html.append('')
elif type == "da-gif":
attrs = content["attrs"]
width = str(attrs.get("width") or "")
height = str(attrs.get("height") or "")
url = text.escape(attrs.get("url") or "")
html.append('')
elif type == "da-video":
src = text.escape(content["attrs"].get("src") or "")
html.append('
'
'
')
else:
self.log.warning("Unsupported content type '%s'", type)
def _tiptap_process_text(self, html, content):
if marks := content.get("marks"):
close = []
for mark in marks:
type = mark["type"]
if type == "link":
attrs = mark.get("attrs") or {}
html.append('')
close.append("")
elif type == "bold":
html.append("")
close.append("")
elif type == "italic":
html.append("")
close.append("")
elif type == "underline":
html.append("")
close.append("")
elif type == "strike":
html.append("")
close.append("")
elif type == "textStyle" and len(mark) <= 1:
pass
else:
self.log.warning("Unsupported text marker '%s'", type)
close.reverse()
html.append(text.escape(content["text"]))
html.extend(close)
else:
html.append(text.escape(content["text"]))
def _tiptap_process_children(self, html, content):
if children := content.get("content"):
for block in children:
self._tiptap_process_content(html, block)
def _tiptap_process_indentation(self, html, attrs):
itype = ("text-indent" if attrs.get("indentType") == "line" else
"margin-inline-start")
isize = str((attrs.get("indentation") or 0) * 24)
html.append(itype + ":" + isize + "px")
def _tiptap_process_deviation(self, html, content):
dev = content["attrs"]["deviation"]
media = dev.get("media") or ()
html.append('
')
html.append('')
if "baseUri" in media:
url, formats = self._eclipse_media(media)
full = formats["fullview"]
html.append('')
html.append('')
html.append("")
elif "textContent" in dev:
html.append('