diff options
| author | 2025-08-16 07:00:33 -0400 | |
|---|---|---|
| committer | 2025-08-16 07:00:33 -0400 | |
| commit | 3d18761f620a294ea6c5bff13c5994b93b29f3ed (patch) | |
| tree | 092fa6f8128bc187512be532801670417f215986 /gallery_dl/extractor/idolcomplex.py | |
| parent | a6e995c093de8aae2e91a0787281bb34c0b871eb (diff) | |
New upstream version 1.30.3.upstream/1.30.3
Diffstat (limited to 'gallery_dl/extractor/idolcomplex.py')
| -rw-r--r-- | gallery_dl/extractor/idolcomplex.py | 269 |
1 files changed, 21 insertions, 248 deletions
diff --git a/gallery_dl/extractor/idolcomplex.py b/gallery_dl/extractor/idolcomplex.py index 075e1f6..26fd595 100644 --- a/gallery_dl/extractor/idolcomplex.py +++ b/gallery_dl/extractor/idolcomplex.py @@ -6,266 +6,39 @@ # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. -"""Extractors for https://idol.sankakucomplex.com/""" +"""Extractors for https://www.idolcomplex.com/""" -from .sankaku import SankakuExtractor -from .common import Message -from ..cache import cache -from .. import text, util, exception -import collections -import re +from . import sankaku -BASE_PATTERN = r"(?:https?://)?idol\.sankakucomplex\.com(?:/[a-z]{2})?" +BASE_PATTERN = (r"(?:https?://)?(?:www\.)?" + r"idol(?:\.sankaku)?complex\.com(?:/[a-z]{2})?") -class IdolcomplexExtractor(SankakuExtractor): +class IdolcomplexBase(): """Base class for idolcomplex extractors""" category = "idolcomplex" - root = "https://idol.sankakucomplex.com" - cookies_domain = "idol.sankakucomplex.com" - cookies_names = ("_idolcomplex_session",) - referer = False - request_interval = (3.0, 6.0) - - def __init__(self, match): - SankakuExtractor.__init__(self, match) - self.logged_in = True - self.start_page = 1 - self.start_post = 0 + root = "https://www.idolcomplex.com" + cookies_domain = ".idolcomplex.com" def _init(self): - self.find_pids = re.compile( - r" href=[\"#]/\w\w/posts/(\w+)" - ).findall - self.find_tags = re.compile( - r'tag-type-([^"]+)">\s*<a [^>]*?href="/[^?]*\?tags=([^"]+)' - ).findall - - def items(self): - self.login() - data = self.metadata() - - for post_id in util.advance(self.post_ids(), self.start_post): - post = self._extract_post(post_id) - url = post["file_url"] - post.update(data) - text.nameext_from_url(url, post) - yield Message.Directory, post - yield Message.Url, url, post - - def skip(self, num): - self.start_post += num - return num - - def post_ids(self): - """Return an iterable containing all relevant post ids""" - - def login(self): - if self.cookies_check(self.cookies_names): - return - - username, password = self._get_auth_info() - if username: - return self.cookies_update(self._login_impl(username, password)) - - self.logged_in = False - - @cache(maxage=90*86400, keyarg=1) - def _login_impl(self, username, password): - self.log.info("Logging in as %s", username) - - url = self.root + "/users/login" - page = self.request(url).text - - headers = { - "Referer": url, - } - url = self.root + (text.extr(page, '<form action="', '"') or - "/en/user/authenticate") - data = { - "authenticity_token": text.unescape(text.extr( - page, 'name="authenticity_token" value="', '"')), - "url" : "", - "user[name]" : username, - "user[password]": password, - "commit" : "Login", - } - self.sleep(10, "login") - response = self.request(url, method="POST", headers=headers, data=data) - - if not response.history or response.url.endswith( - ("/users/login", "/user/home")): - raise exception.AuthenticationError() - return {c.name: c.value for c in response.history[0].cookies} - - def _extract_post(self, post_id): - url = self.root + "/posts/" + post_id - page = self.request(url, retries=10).text - extr = text.extract_from(page) - - vavg = extr('id="rating"', "</ul>") - vcnt = extr('>Votes</strong>:', "<") - pid = extr(">Post ID:", "<") - created = extr(' title="', '"') - - if file_url := extr('>Original:', 'id='): - file_url = extr(' href="', '"') - width = extr(">", "x") - height = extr("", " ") - else: - width = extr('<object width=', ' ') - height = extr('height=', '>') - file_url = extr('<embed src="', '"') - - rating = extr(">Rating:", "<br") - - data = { - "id" : pid.strip(), - "md5" : file_url.rpartition("/")[2].partition(".")[0], - "vote_average": (1.0 * vavg.count('class="star-full"') + - 0.5 * vavg.count('class="star-half"')), - "vote_count" : text.parse_int(vcnt), - "created_at" : created, - "date" : text.parse_datetime( - created, "%Y-%m-%d %H:%M:%S.%f"), - "rating" : text.remove_html(rating).lower(), - "file_url" : "https:" + text.unescape(file_url), - "width" : text.parse_int(width), - "height" : text.parse_int(height), - } - - tags = collections.defaultdict(list) - tags_list = [] - tags_html = text.extr(page, '<ul id="tag-sidebar"', '</ul>') - for tag_type, tag_name in self.find_tags(tags_html or ""): - tags[tag_type].append(text.unquote(tag_name)) - for key, value in tags.items(): - data["tags_" + key] = " ".join(value) - tags_list += value - data["tags"] = " ".join(tags_list) - - return data + self.api = sankaku.SankakuAPI(self) + self.api.ROOT = "https://i.sankakuapi.com" + self.api.headers["Origin"] = self.root -class IdolcomplexTagExtractor(IdolcomplexExtractor): - """Extractor for images from idol.sankakucomplex.com by search-tags""" - subcategory = "tag" - directory_fmt = ("{category}", "{search_tags}") - archive_fmt = "t_{search_tags}_{id}" - pattern = BASE_PATTERN + r"/(?:posts/?)?\?([^#]*)" - example = "https://idol.sankakucomplex.com/en/posts?tags=TAGS" - per_page = 20 +class IdolcomplexTagExtractor(IdolcomplexBase, sankaku.SankakuTagExtractor): + """Extractor for idolcomplex tag searches""" + pattern = BASE_PATTERN + r"(?:/posts)?/?\?([^#]*)" + example = "https://www.idolcomplex.com/en/posts?tags=TAGS" - def __init__(self, match): - IdolcomplexExtractor.__init__(self, match) - query = text.parse_query(match[1]) - self.tags = text.unquote(query.get("tags", "").replace("+", " ")) - self.start_page = text.parse_int(query.get("page"), 1) - self.next = text.parse_int(query.get("next"), 0) - def skip(self, num): - if self.next: - self.start_post += num - else: - pages, posts = divmod(num, self.per_page) - self.start_page += pages - self.start_post += posts - return num - - def metadata(self): - if not self.next: - max_page = 50 if self.logged_in else 25 - if self.start_page > max_page: - self.log.info("Traversing from page %d to page %d", - max_page, self.start_page) - self.start_post += self.per_page * (self.start_page - max_page) - self.start_page = max_page - - tags = self.tags.split() - if not self.logged_in and len(tags) > 4: - raise exception.AbortExtraction( - "Non-members can only search up to 4 tags at once") - return {"search_tags": " ".join(tags)} - - def post_ids(self): - url = self.root + "/en/posts" - - params = {"auto_page": "t"} - if self.next: - params["next"] = self.next - else: - params["page"] = self.start_page - params["tags"] = self.tags - - while True: - response = self.request(url, params=params, retries=10) - if response.history and "/posts/premium" in response.url: - self.log.warning("HTTP redirect to %s", response.url) - page = response.text - - yield from text.extract_iter(page, '"id":"', '"') - - next_page_url = text.extr(page, 'next-page-url="', '"') - if not next_page_url: - return - - url, _, next_params = text.unquote( - text.unescape(text.unescape(next_page_url))).partition("?") - next_params = text.parse_query(next_params) - - if "next" in next_params: - # stop if the same "next" value occurs twice in a row (#265) - if "next" in params and params["next"] == next_params["next"]: - return - next_params["page"] = "2" - - if url[0] == "/": - url = self.root + url - params = next_params - - -class IdolcomplexPoolExtractor(IdolcomplexExtractor): - """Extractor for image-pools from idol.sankakucomplex.com""" - subcategory = "pool" - directory_fmt = ("{category}", "pool", "{pool}") - archive_fmt = "p_{pool}_{id}" +class IdolcomplexPoolExtractor(IdolcomplexBase, sankaku.SankakuPoolExtractor): + """Extractor for idolcomplex pools""" pattern = BASE_PATTERN + r"/pools?/(?:show/)?(\w+)" - example = "https://idol.sankakucomplex.com/pools/0123456789abcdef" - per_page = 24 - - def skip(self, num): - pages, posts = divmod(num, self.per_page) - self.start_page += pages - self.start_post += posts - return num - - def metadata(self): - return {"pool": self.groups[0]} - - def post_ids(self): - if not self.logged_in: - self.log.warning("Login required") - - url = self.root + "/pools/show/" + self.groups[0] - params = {"page": self.start_page} - - while True: - page = self.request(url, params=params, retries=10).text - pos = page.find('id="pool-show"') + 1 - post_ids = self.find_pids(page, pos) - - yield from post_ids - if len(post_ids) < self.per_page: - return - params["page"] += 1 - + example = "https://www.idolcomplex.com/en/pools/0123456789abcdef" -class IdolcomplexPostExtractor(IdolcomplexExtractor): - """Extractor for single images from idol.sankakucomplex.com""" - subcategory = "post" - archive_fmt = "{id}" - pattern = BASE_PATTERN + r"/posts?/(?:show/)?(\w+)" - example = "https://idol.sankakucomplex.com/posts/0123456789abcdef" - def post_ids(self): - return (self.groups[0],) +class IdolcomplexPostExtractor(IdolcomplexBase, sankaku.SankakuPostExtractor): + """Extractor for individual idolcomplex posts""" + pattern = BASE_PATTERN + r"/posts?(?:/show)?/(\w+)" + example = "https://www.idolcomplex.com/en/posts/0123456789abcdef" |
