diff options
Diffstat (limited to 'pkb_client/client.py')
| -rw-r--r-- | pkb_client/client.py | 455 |
1 files changed, 0 insertions, 455 deletions
diff --git a/pkb_client/client.py b/pkb_client/client.py deleted file mode 100644 index 48c0d82..0000000 --- a/pkb_client/client.py +++ /dev/null @@ -1,455 +0,0 @@ -import json -import logging -from enum import Enum -from pathlib import Path -from typing import Optional, Tuple -from urllib.parse import urljoin - -import requests - -from pkb_client.helper import parse_dns_record - -API_ENDPOINT = "https://porkbun.com/api/json/v3/" -SUPPORTED_DNS_RECORD_TYPES = ["A", "AAAA", "MX", "CNAME", "ALIAS", "TXT", "NS", "SRV", "TLSA", "CAA"] - -# prevent urllib3 to log request with the api key and secret -logging.getLogger("urllib3").setLevel(logging.WARNING) - - -class DNSRestoreMode(Enum): - clear = 0 - replace = 1 - keep = 2 - - def __str__(self): - return self.name - - @staticmethod - def from_string(a): - try: - return DNSRestoreMode[a] - except KeyError: - return a - - -class PKBClient: - """ - API client for Porkbun. - """ - - def __init__(self, api_key: str, secret_api_key: str) -> None: - """ - Creates a new PKBClient object. - - :param api_key: the API key used for Porkbun API calls - :param secret_api_key: the API secret used for Porkbun API calls - """ - - assert api_key is not None and len(api_key) > 0 - assert secret_api_key is not None and len(secret_api_key) > 0 - - self.api_key = api_key - self.secret_api_key = secret_api_key - - def ping(self, **kwargs) -> str: - """ - API ping method: get the current public ip address of the requesting system; can also be used for auth checking - see https://porkbun.com/api/json/v3/documentation#Authentication for more info - - :return: the current public ip address of the requesting system - """ - - url = urljoin(API_ENDPOINT, "ping") - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return json.loads(r.text).get("yourIp", None) - else: - raise Exception("ERROR: ping api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_create(self, - domain: str, - record_type: str, - content: str, - name: Optional[str] = None, - ttl: Optional[int] = 300, - prio: Optional[int] = None, **kwargs) -> str: - """ - API DNS create method: create a new DNS record for given domain - see https://porkbun.com/api/json/v3/documentation#DNS%20Create%20Record for more info - - :param domain: the domain for which the DNS record should be created - :param record_type: the type of the new DNS record; - supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA - :param content: the content of the new DNS record - :param name: the subdomain for which the new DNS record entry should apply; the * can be used for a - wildcard DNS record; if not used, then a DNS record for the root domain will be created - :param ttl: the time to live in seconds of the new DNS record; have to be between 0 and 2147483647 - :param prio: the priority of the new DNS record - - :return: the id of the new created DNS record - """ - - assert domain is not None and len(domain) > 0 - assert record_type in SUPPORTED_DNS_RECORD_TYPES - assert content is not None and len(content) > 0 - assert ttl is None or 300 <= ttl <= 2147483647 - - url = urljoin(API_ENDPOINT, "dns/create/{}".format(domain)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key, - "name": name, - "type": record_type, - "content": content, - "ttl": ttl, - "prio": prio - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return str(json.loads(r.text).get("id", None)) - else: - raise Exception("ERROR: DNS create api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_edit(self, - domain: str, - record_id: str, - record_type: str, - content: str, - name: str = None, - ttl: int = 300, - prio: int = None, - **kwargs) -> bool: - """ - API DNS edit method: edit an existing DNS record specified by the id for a given domain - see https://porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record for more info - - :param domain: the domain for which the DNS record should be edited - :param record_id: the id of the DNS record which should be edited - :param record_type: the new type of the DNS record; - supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA - :param content: the new content of the DNS record - :param name: the new value of the subdomain for which the DNS record should apply; the * can be used for a - wildcard DNS record; if not set, the record will be set for the record domain - :param ttl: the new time to live in seconds of the DNS record, have to be between 0 and 2147483647 - :param prio: the new priority of the DNS record - - :return: True if the editing was successful - """ - - assert domain is not None and len(domain) > 0 - assert record_id is not None and len(record_id) > 0 - assert record_type in SUPPORTED_DNS_RECORD_TYPES - assert content is not None and len(content) > 0 - assert ttl is None or 300 <= ttl <= 2147483647 - - url = urljoin(API_ENDPOINT, "dns/edit/{}/{}".format(domain, record_id)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key, - "name": name, - "type": record_type, - "content": content, - "ttl": ttl, - "prio": prio - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return True - else: - raise Exception("ERROR: DNS edit api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_delete(self, - domain: str, - record_id: str, - **kwargs) -> bool: - """ - API DNS delete method: delete an existing DNS record specified by the id for a given domain - see https://porkbun.com/api/json/v3/documentation#DNS%20Delete%20Record for more info - - :param domain: the domain for which the DNS record should be deleted - :param record_id: the id of the DNS record which should be deleted - - :return: True if the deletion was successful - """ - - assert domain is not None and len(domain) > 0 - assert record_id is not None and len(record_id) > 0 - - url = urljoin(API_ENDPOINT, "dns/delete/{}/{}".format(domain, record_id)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return True - else: - raise Exception("ERROR: DNS delete api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_retrieve(self, domain, **kwargs) -> list: - """ - API DNS retrieve method: retrieve all DNS records for given domain - see https://porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records for more info - - :param domain: the domain for which the DNS records should be retrieved - - :return: list of DNS records as dicts - - The list structure will be: - [ - { - "id": "123456789", - "name": "example.com", - "type": "TXT", - "content": "this is a nice text", - "ttl": "300", - "prio": None, - "notes": "" - }, - { - "id": "234567890", - "name": "example.com", - "type": "A", - "content": "0.0.0.0", - "ttl": "300", - "prio": 0, - "notes": "" - } - ] - """ - - assert domain is not None and len(domain) > 0 - - url = urljoin(API_ENDPOINT, "dns/retrieve/{}".format(domain)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return [parse_dns_record(record) for record in json.loads(r.text).get("records", [])] - else: - raise Exception("ERROR: DNS retrieve api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_export(self, domain: str, filename: str, **kwargs) -> bool: - """ - Export all DNS record from the given domain as json to a file. - This method does not not represent a Porkbun API method. - - :param domain: the domain for which the DNS record should be retrieved and saved - :param filename: the filename where to save the exported DNS records - - :return: True if everything went well - """ - - assert domain is not None and len(domain) > 0 - assert filename is not None and len(filename) > 0 - - print("retrieve current DNS records...") - dns_records = self.dns_retrieve(domain) - - print("save DNS records to {} ...".format(filename)) - # merge the single DNS records into one single dict with the record id as key - dns_records_dict = dict() - for record in dns_records: - dns_records_dict[record["id"]] = record - - filepath = Path(filename) - if filepath.exists(): - raise Exception("File already exists. Please try another filename") - with open(filepath, "w") as f: - json.dump(dns_records_dict, f) - print("export finished") - - return True - - def dns_import(self, domain: str, filename: str, restore_mode: DNSRestoreMode, **kwargs) -> bool: - """ - Restore - This method does not not represent a Porkbun API method. - - :param domain: the domain for which the DNS record should be restored - :param filename: the filename from which the DNS records are to be restored - :param restore_mode: The restore mode (DNS records are identified by the record id) - clean: remove all existing DNS records and restore all DNS records from the provided file - replace: replace only existing DNS records with the DNS records from the provided file, - but do not create any new DNS records - keep: keep the existing DNS records and only create new ones for all DNS records from - the specified file if they do not exist - - :return: True if everything went well - """ - - assert domain is not None and len(domain) > 0 - assert filename is not None and len(filename) > 0 - assert isinstance(restore_mode, DNSRestoreMode) - - existing_dns_records = self.dns_retrieve(domain) - - with open(filename, "r") as f: - exported_dns_records_dict = json.load(f) - - if restore_mode is DNSRestoreMode.clear: - print("restore mode: clear") - - try: - # delete all existing DNS records - for record in existing_dns_records: - self.dns_delete(domain, record["id"]) - - # restore all exported records by creating new DNS records - for _, exported_record in exported_dns_records_dict.items(): - name = ".".join(exported_record["name"].split(".")[:-2]) - self.dns_create(domain=domain, - record_type=exported_record["type"], - content=exported_record["content"], - name=name, - ttl=exported_record["ttl"], - prio=exported_record["prio"]) - except Exception as e: - print("something went wrong: {}".format(e.__str__())) - self.__handle_error_backup__(existing_dns_records) - print("import failed") - return False - elif restore_mode is DNSRestoreMode.replace: - print("restore mode: replace") - - try: - for existing_record in existing_dns_records: - record_id = existing_record["id"] - exported_record = exported_dns_records_dict.get(record_id, None) - # also check if the exported dns record is different to the existing record, - # so we can reduce unnecessary api calls - if exported_record is not None and exported_record != existing_record: - name = ".".join(exported_record["name"].split(".")[:-2]) - self.dns_edit(domain=domain, - record_id=record_id, - record_type=exported_record["type"], - content=exported_record["content"], - name=name, - ttl=exported_record["ttl"], - prio=exported_record["prio"]) - except Exception as e: - print("something went wrong: {}".format(e.__str__())) - self.__handle_error_backup__(existing_dns_records) - print("import failed") - return False - elif restore_mode is DNSRestoreMode.keep: - print("restore mode: keep") - - existing_dns_records_dict = dict() - for record in existing_dns_records: - existing_dns_records_dict[record["id"]] = record - - try: - for _, exported_record in exported_dns_records_dict.items(): - if exported_record["id"] not in existing_dns_records_dict: - name = ".".join(exported_record["name"].split(".")[:-2]) - self.dns_create(domain=domain, - record_type=exported_record["type"], - content=exported_record["content"], - name=name, - ttl=exported_record["ttl"], - prio=exported_record["prio"]) - except Exception as e: - print("something went wrong: {}".format(e.__str__())) - self.__handle_error_backup__(existing_dns_records) - print("import failed") - return False - else: - raise Exception("restore mode not supported") - - print("import successfully completed") - - return True - - @staticmethod - def get_domain_pricing(**kwargs) -> dict: - """ - Get the pricing for porkbun domains - see https://porkbun.com/api/json/v3/documentation#Domain%20Pricing for more info - - :return: dict with pricing - """ - - url = urljoin(API_ENDPOINT, "pricing/get") - r = requests.post(url=url) - - if r.status_code == 200: - return json.loads(r.text) - else: - raise Exception("ERROR: Domain pricing retrieve api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def ssl_retrieve(self, domain, **kwargs) -> Tuple[str, str, str, str]: - """ - API SSL bundle retrieve method: retrieve an SSL bundle for given domain - see https://porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info - - :param domain: the domain for which the SSL bundle should be retrieved - - :return: tuple of intermediate certificate, certificate chain, private key, public key - """ - - assert domain is not None and len(domain) > 0 - - url = urljoin(API_ENDPOINT, "ssl/retrieve/{}".format(domain)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - ssl_bundle = json.loads(r.text) - - intermediate_certificate = ssl_bundle["intermediate_certificate"] - certificate_chain = ssl_bundle["certificate_chain"] - private_key = ssl_bundle["private_key"] - public_key = ssl_bundle["public_key"] - - return intermediate_certificate, certificate_chain, private_key, public_key - else: - raise Exception("ERROR: SSL bundle retrieve api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - @staticmethod - def __handle_error_backup__(dns_records): - # merge the single DNS records into one single dict with the record id as key - dns_records_dict = dict() - for record in dns_records: - dns_records_dict[record["id"]] = record - - # generate filename with incremental suffix - base_backup_filename = "pkb_client_dns_records_backup" - suffix = 0 - backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) - while backup_file_path.exists(): - suffix += 1 - backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) - - with open(backup_file_path, "w") as f: - json.dump(dns_records_dict, f) - - print("a backup of your existing dns records was saved to {}".format(str(backup_file_path))) |
