summaryrefslogtreecommitdiffstats
path: root/gallery_dl/extractor/urlshortener.py
diff options
context:
space:
mode:
Diffstat (limited to 'gallery_dl/extractor/urlshortener.py')
-rw-r--r--gallery_dl/extractor/urlshortener.py69
1 files changed, 69 insertions, 0 deletions
diff --git a/gallery_dl/extractor/urlshortener.py b/gallery_dl/extractor/urlshortener.py
new file mode 100644
index 0000000..1a39b5b
--- /dev/null
+++ b/gallery_dl/extractor/urlshortener.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+
+"""Extractors for general-purpose URL shorteners"""
+
+from .common import BaseExtractor, Message
+from .. import exception
+
+
+class UrlshortenerExtractor(BaseExtractor):
+ """Base class for URL shortener extractors"""
+ basecategory = "urlshortener"
+
+
+INSTANCES = {
+ "bitly": {
+ "root": "https://bit.ly",
+ "pattern": r"bit\.ly",
+ },
+ "tco": {
+ # t.co sends 'http-equiv="refresh"' (200) when using browser UA
+ "headers": {"User-Agent": None},
+ "root": "https://t.co",
+ "pattern": r"t\.co",
+ },
+}
+
+BASE_PATTERN = UrlshortenerExtractor.update(INSTANCES)
+
+
+class UrlshortenerLinkExtractor(UrlshortenerExtractor):
+ """Extractor for general-purpose URL shorteners"""
+ subcategory = "link"
+ pattern = BASE_PATTERN + r"/([^/?&#]+)"
+ test = (
+ ("https://bit.ly/3cWIUgq", {
+ "count": 1,
+ "pattern": "^https://gumroad.com/l/storm_b1",
+ }),
+ ("https://t.co/bCgBY8Iv5n", {
+ "count": 1,
+ "pattern": "^https://twitter.com/elonmusk/status/"
+ "1421395561324896257/photo/1",
+ }),
+ ("https://t.co/abcdefghij", {
+ "exception": exception.NotFoundError,
+ }),
+ )
+
+ def __init__(self, match):
+ UrlshortenerExtractor.__init__(self, match)
+ self.id = match.group(match.lastindex)
+
+ try:
+ self.headers = INSTANCES[self.category]["headers"]
+ except Exception:
+ self.headers = None
+
+ def items(self):
+ response = self.request(
+ "{}/{}".format(self.root, self.id), headers=self.headers,
+ method="HEAD", allow_redirects=False, notfound="URL")
+ try:
+ yield Message.Queue, response.headers["location"], {}
+ except KeyError:
+ raise exception.StopExtraction("Unable to resolve short URL")