diff options
| author | 2023-12-26 19:39:06 -0500 | |
|---|---|---|
| committer | 2023-12-26 19:39:06 -0500 | |
| commit | fa197fe27b8a03bbf4504476f842956ece2c76c9 (patch) | |
| tree | 5a75b92e4c731a4b2ced68eadb9581a8c922d82e /pkb_client | |
Import Upstream version 1.2upstream/1.2
Diffstat (limited to 'pkb_client')
| -rw-r--r-- | pkb_client/__init__.py | 1 | ||||
| -rw-r--r-- | pkb_client/cli.py | 134 | ||||
| -rw-r--r-- | pkb_client/client.py | 455 | ||||
| -rw-r--r-- | pkb_client/helper.py | 15 |
4 files changed, 605 insertions, 0 deletions
diff --git a/pkb_client/__init__.py b/pkb_client/__init__.py new file mode 100644 index 0000000..17a5bda --- /dev/null +++ b/pkb_client/__init__.py @@ -0,0 +1 @@ +__version__ = "v1.2" diff --git a/pkb_client/cli.py b/pkb_client/cli.py new file mode 100644 index 0000000..4e87373 --- /dev/null +++ b/pkb_client/cli.py @@ -0,0 +1,134 @@ +import argparse +import pprint +import textwrap + +from pkb_client.client import PKBClient, SUPPORTED_DNS_RECORD_TYPES, DNSRestoreMode + + +def main(): + parser = argparse.ArgumentParser( + description="Unofficial client for the Porkbun API", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=textwrap.dedent(""" + License: + MIT - Copyright (c) Marvin Heptner + + Copyright notices: + requests: + Project: https://github.com/psf/requests + License: Apache-2.0 https://github.com/psf/requests/blob/master/LICENSE + setuptools: + Project: https://github.com/pypa/setuptools + License: MIT https://raw.githubusercontent.com/pypa/setuptools/main/LICENSE + """) + ) + + parser.add_argument("-k", "--key", help="The API key used for Porkbun API calls (usually starts with \"pk\").") + parser.add_argument("-s", "--secret", + help="The API secret used for Porkbun API calls (usually starts with \"sk\").") + + subparsers = parser.add_subparsers(help="Supported API methods") + + parser_ping = subparsers.add_parser("ping", help="Ping the API Endpoint") + parser_ping.set_defaults(func=PKBClient.ping) + + parser_dns_create = subparsers.add_parser("dns-create", help="Create a new DNS record.") + parser_dns_create.set_defaults(func=PKBClient.dns_create) + parser_dns_create.add_argument("domain", help="The domain for which the new DNS record should be created.") + parser_dns_create.add_argument("record_type", help="The type of the new DNS record.", + choices=SUPPORTED_DNS_RECORD_TYPES) + parser_dns_create.add_argument("content", help="The content of the new DNS record.") + parser_dns_create.add_argument("--name", + help="The subdomain for which the new DNS record should be created." + "The * can be used for a wildcard DNS record." + "If not used, then a DNS record for the root domain will be created", + required=False) + parser_dns_create.add_argument("--ttl", type=int, help="The ttl of the new DNS record.", required=False) + parser_dns_create.add_argument("--prio", type=int, help="The priority of the new DNS record.", required=False) + + parser_dns_edit = subparsers.add_parser("dns-edit", help="Edit an existing DNS record.") + parser_dns_edit.set_defaults(func=PKBClient.dns_edit) + parser_dns_edit.add_argument("domain", help="The domain for which the DNS record should be edited.") + parser_dns_edit.add_argument("record_id", help="The id of the DNS record which should be edited.") + parser_dns_edit.add_argument("record_type", help="The new type of the DNS record.", + choices=SUPPORTED_DNS_RECORD_TYPES) + parser_dns_edit.add_argument("content", help="The new content of the DNS record.") + parser_dns_edit.add_argument("--name", + help="The new value of the subdomain for which the DNS record should apply. " + "The * can be used for a wildcard DNS record. If not set, the record will " + "be set for the root domain.", + required=False) + parser_dns_edit.add_argument("--ttl", type=int, help="The new ttl of the DNS record.", required=False) + parser_dns_edit.add_argument("--prio", type=int, help="The new priority of the DNS record.", required=False) + + parser_dns_delete = subparsers.add_parser("dns-delete", help="Delete an existing DNS record.") + parser_dns_delete.set_defaults(func=PKBClient.dns_delete) + parser_dns_delete.add_argument("domain", help="The domain for which the DNS record should be deleted.") + parser_dns_delete.add_argument("record_id", help="The id of the DNS record which should be deleted.") + + parser_dns_receive = subparsers.add_parser("dns-retrieve", help="Get all DNS records.") + parser_dns_receive.set_defaults(func=PKBClient.dns_retrieve) + parser_dns_receive.add_argument("domain", help="The domain for which the DNS record should be retrieved.") + + parser_dns_export = subparsers.add_parser("dns-export", help="Save all DNS records to a local file as json.") + parser_dns_export.set_defaults(func=PKBClient.dns_export) + parser_dns_export.add_argument("domain", + help="The domain for which the DNS record should be retrieved and saved.") + parser_dns_export.add_argument("filename", help="The filename where to save the exported DNS records.") + + parser_dns_import = subparsers.add_parser("dns-import", help="Restore all DNS records from a local file.", + formatter_class=argparse.RawTextHelpFormatter) + parser_dns_import.set_defaults(func=PKBClient.dns_import) + parser_dns_import.add_argument("domain", help="The domain for which the DNS record should be restored.") + parser_dns_import.add_argument("filename", help="The filename from which the DNS records are to be restored.") + parser_dns_import.add_argument("restore_mode", help="""The restore mode (DNS records are identified by the record id): + clean: remove all existing DNS records and restore all DNS records from the provided file + replace: replace only existing DNS records with the DNS records from the provided file, but do not create any new DNS records + keep: keep the existing DNS records and only create new ones for all DNS records from the specified file if they do not exist + """, type=DNSRestoreMode.from_string, choices=list(DNSRestoreMode)) + + parser_domain_pricing = subparsers.add_parser("domain-pricing", help="Get the pricing for porkbun domains.") + parser_domain_pricing.set_defaults(func=PKBClient.get_domain_pricing) + + parser_ssl_retrieve = subparsers.add_parser("ssl-retrieve", help="Retrieve an SSL bundle for given domain.") + parser_ssl_retrieve.set_defaults(func=PKBClient.ssl_retrieve) + parser_ssl_retrieve.add_argument("domain", help="The domain for which the SSL bundle should be retrieve.") + + args = parser.parse_args() + + if not hasattr(args, "func"): + raise argparse.ArgumentError(None, "No method specified. Please provide a method and try again.") + + pp = pprint.PrettyPrinter(indent=4) + + # call the static methods + if args.func == PKBClient.get_domain_pricing: + pp.pprint(args.func(**vars(args))) + exit(0) + + if args.key is None: + while True: + api_key = input("Please enter your API key you got from Porkbun (usually starts with \"pk\"): ") + if len(api_key) == 0: + print("The api key can not be empty.") + else: + break + else: + api_key = args.key + + if args.secret is None: + while True: + api_secret = input("Please enter your API key secret you got from Porkbun (usually starts with \"sk\"): ") + if len(api_secret) == 0: + print("The api key secret can not be empty.") + else: + break + else: + api_secret = args.secret + + pkb_client = PKBClient(api_key, api_secret) + pp.pprint(args.func(pkb_client, **vars(args))) + + +if __name__ == "__main__": + main() diff --git a/pkb_client/client.py b/pkb_client/client.py new file mode 100644 index 0000000..48c0d82 --- /dev/null +++ b/pkb_client/client.py @@ -0,0 +1,455 @@ +import json +import logging +from enum import Enum +from pathlib import Path +from typing import Optional, Tuple +from urllib.parse import urljoin + +import requests + +from pkb_client.helper import parse_dns_record + +API_ENDPOINT = "https://porkbun.com/api/json/v3/" +SUPPORTED_DNS_RECORD_TYPES = ["A", "AAAA", "MX", "CNAME", "ALIAS", "TXT", "NS", "SRV", "TLSA", "CAA"] + +# prevent urllib3 to log request with the api key and secret +logging.getLogger("urllib3").setLevel(logging.WARNING) + + +class DNSRestoreMode(Enum): + clear = 0 + replace = 1 + keep = 2 + + def __str__(self): + return self.name + + @staticmethod + def from_string(a): + try: + return DNSRestoreMode[a] + except KeyError: + return a + + +class PKBClient: + """ + API client for Porkbun. + """ + + def __init__(self, api_key: str, secret_api_key: str) -> None: + """ + Creates a new PKBClient object. + + :param api_key: the API key used for Porkbun API calls + :param secret_api_key: the API secret used for Porkbun API calls + """ + + assert api_key is not None and len(api_key) > 0 + assert secret_api_key is not None and len(secret_api_key) > 0 + + self.api_key = api_key + self.secret_api_key = secret_api_key + + def ping(self, **kwargs) -> str: + """ + API ping method: get the current public ip address of the requesting system; can also be used for auth checking + see https://porkbun.com/api/json/v3/documentation#Authentication for more info + + :return: the current public ip address of the requesting system + """ + + url = urljoin(API_ENDPOINT, "ping") + req_json = { + "apikey": self.api_key, + "secretapikey": self.secret_api_key + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return json.loads(r.text).get("yourIp", None) + else: + raise Exception("ERROR: ping api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + def dns_create(self, + domain: str, + record_type: str, + content: str, + name: Optional[str] = None, + ttl: Optional[int] = 300, + prio: Optional[int] = None, **kwargs) -> str: + """ + API DNS create method: create a new DNS record for given domain + see https://porkbun.com/api/json/v3/documentation#DNS%20Create%20Record for more info + + :param domain: the domain for which the DNS record should be created + :param record_type: the type of the new DNS record; + supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA + :param content: the content of the new DNS record + :param name: the subdomain for which the new DNS record entry should apply; the * can be used for a + wildcard DNS record; if not used, then a DNS record for the root domain will be created + :param ttl: the time to live in seconds of the new DNS record; have to be between 0 and 2147483647 + :param prio: the priority of the new DNS record + + :return: the id of the new created DNS record + """ + + assert domain is not None and len(domain) > 0 + assert record_type in SUPPORTED_DNS_RECORD_TYPES + assert content is not None and len(content) > 0 + assert ttl is None or 300 <= ttl <= 2147483647 + + url = urljoin(API_ENDPOINT, "dns/create/{}".format(domain)) + req_json = { + "apikey": self.api_key, + "secretapikey": self.secret_api_key, + "name": name, + "type": record_type, + "content": content, + "ttl": ttl, + "prio": prio + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return str(json.loads(r.text).get("id", None)) + else: + raise Exception("ERROR: DNS create api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + def dns_edit(self, + domain: str, + record_id: str, + record_type: str, + content: str, + name: str = None, + ttl: int = 300, + prio: int = None, + **kwargs) -> bool: + """ + API DNS edit method: edit an existing DNS record specified by the id for a given domain + see https://porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record for more info + + :param domain: the domain for which the DNS record should be edited + :param record_id: the id of the DNS record which should be edited + :param record_type: the new type of the DNS record; + supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA + :param content: the new content of the DNS record + :param name: the new value of the subdomain for which the DNS record should apply; the * can be used for a + wildcard DNS record; if not set, the record will be set for the record domain + :param ttl: the new time to live in seconds of the DNS record, have to be between 0 and 2147483647 + :param prio: the new priority of the DNS record + + :return: True if the editing was successful + """ + + assert domain is not None and len(domain) > 0 + assert record_id is not None and len(record_id) > 0 + assert record_type in SUPPORTED_DNS_RECORD_TYPES + assert content is not None and len(content) > 0 + assert ttl is None or 300 <= ttl <= 2147483647 + + url = urljoin(API_ENDPOINT, "dns/edit/{}/{}".format(domain, record_id)) + req_json = { + "apikey": self.api_key, + "secretapikey": self.secret_api_key, + "name": name, + "type": record_type, + "content": content, + "ttl": ttl, + "prio": prio + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + raise Exception("ERROR: DNS edit api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + def dns_delete(self, + domain: str, + record_id: str, + **kwargs) -> bool: + """ + API DNS delete method: delete an existing DNS record specified by the id for a given domain + see https://porkbun.com/api/json/v3/documentation#DNS%20Delete%20Record for more info + + :param domain: the domain for which the DNS record should be deleted + :param record_id: the id of the DNS record which should be deleted + + :return: True if the deletion was successful + """ + + assert domain is not None and len(domain) > 0 + assert record_id is not None and len(record_id) > 0 + + url = urljoin(API_ENDPOINT, "dns/delete/{}/{}".format(domain, record_id)) + req_json = { + "apikey": self.api_key, + "secretapikey": self.secret_api_key + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + raise Exception("ERROR: DNS delete api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + def dns_retrieve(self, domain, **kwargs) -> list: + """ + API DNS retrieve method: retrieve all DNS records for given domain + see https://porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records for more info + + :param domain: the domain for which the DNS records should be retrieved + + :return: list of DNS records as dicts + + The list structure will be: + [ + { + "id": "123456789", + "name": "example.com", + "type": "TXT", + "content": "this is a nice text", + "ttl": "300", + "prio": None, + "notes": "" + }, + { + "id": "234567890", + "name": "example.com", + "type": "A", + "content": "0.0.0.0", + "ttl": "300", + "prio": 0, + "notes": "" + } + ] + """ + + assert domain is not None and len(domain) > 0 + + url = urljoin(API_ENDPOINT, "dns/retrieve/{}".format(domain)) + req_json = { + "apikey": self.api_key, + "secretapikey": self.secret_api_key + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [parse_dns_record(record) for record in json.loads(r.text).get("records", [])] + else: + raise Exception("ERROR: DNS retrieve api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + def dns_export(self, domain: str, filename: str, **kwargs) -> bool: + """ + Export all DNS record from the given domain as json to a file. + This method does not not represent a Porkbun API method. + + :param domain: the domain for which the DNS record should be retrieved and saved + :param filename: the filename where to save the exported DNS records + + :return: True if everything went well + """ + + assert domain is not None and len(domain) > 0 + assert filename is not None and len(filename) > 0 + + print("retrieve current DNS records...") + dns_records = self.dns_retrieve(domain) + + print("save DNS records to {} ...".format(filename)) + # merge the single DNS records into one single dict with the record id as key + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record["id"]] = record + + filepath = Path(filename) + if filepath.exists(): + raise Exception("File already exists. Please try another filename") + with open(filepath, "w") as f: + json.dump(dns_records_dict, f) + print("export finished") + + return True + + def dns_import(self, domain: str, filename: str, restore_mode: DNSRestoreMode, **kwargs) -> bool: + """ + Restore + This method does not not represent a Porkbun API method. + + :param domain: the domain for which the DNS record should be restored + :param filename: the filename from which the DNS records are to be restored + :param restore_mode: The restore mode (DNS records are identified by the record id) + clean: remove all existing DNS records and restore all DNS records from the provided file + replace: replace only existing DNS records with the DNS records from the provided file, + but do not create any new DNS records + keep: keep the existing DNS records and only create new ones for all DNS records from + the specified file if they do not exist + + :return: True if everything went well + """ + + assert domain is not None and len(domain) > 0 + assert filename is not None and len(filename) > 0 + assert isinstance(restore_mode, DNSRestoreMode) + + existing_dns_records = self.dns_retrieve(domain) + + with open(filename, "r") as f: + exported_dns_records_dict = json.load(f) + + if restore_mode is DNSRestoreMode.clear: + print("restore mode: clear") + + try: + # delete all existing DNS records + for record in existing_dns_records: + self.dns_delete(domain, record["id"]) + + # restore all exported records by creating new DNS records + for _, exported_record in exported_dns_records_dict.items(): + name = ".".join(exported_record["name"].split(".")[:-2]) + self.dns_create(domain=domain, + record_type=exported_record["type"], + content=exported_record["content"], + name=name, + ttl=exported_record["ttl"], + prio=exported_record["prio"]) + except Exception as e: + print("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + print("import failed") + return False + elif restore_mode is DNSRestoreMode.replace: + print("restore mode: replace") + + try: + for existing_record in existing_dns_records: + record_id = existing_record["id"] + exported_record = exported_dns_records_dict.get(record_id, None) + # also check if the exported dns record is different to the existing record, + # so we can reduce unnecessary api calls + if exported_record is not None and exported_record != existing_record: + name = ".".join(exported_record["name"].split(".")[:-2]) + self.dns_edit(domain=domain, + record_id=record_id, + record_type=exported_record["type"], + content=exported_record["content"], + name=name, + ttl=exported_record["ttl"], + prio=exported_record["prio"]) + except Exception as e: + print("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + print("import failed") + return False + elif restore_mode is DNSRestoreMode.keep: + print("restore mode: keep") + + existing_dns_records_dict = dict() + for record in existing_dns_records: + existing_dns_records_dict[record["id"]] = record + + try: + for _, exported_record in exported_dns_records_dict.items(): + if exported_record["id"] not in existing_dns_records_dict: + name = ".".join(exported_record["name"].split(".")[:-2]) + self.dns_create(domain=domain, + record_type=exported_record["type"], + content=exported_record["content"], + name=name, + ttl=exported_record["ttl"], + prio=exported_record["prio"]) + except Exception as e: + print("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + print("import failed") + return False + else: + raise Exception("restore mode not supported") + + print("import successfully completed") + + return True + + @staticmethod + def get_domain_pricing(**kwargs) -> dict: + """ + Get the pricing for porkbun domains + see https://porkbun.com/api/json/v3/documentation#Domain%20Pricing for more info + + :return: dict with pricing + """ + + url = urljoin(API_ENDPOINT, "pricing/get") + r = requests.post(url=url) + + if r.status_code == 200: + return json.loads(r.text) + else: + raise Exception("ERROR: Domain pricing retrieve api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + def ssl_retrieve(self, domain, **kwargs) -> Tuple[str, str, str, str]: + """ + API SSL bundle retrieve method: retrieve an SSL bundle for given domain + see https://porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info + + :param domain: the domain for which the SSL bundle should be retrieved + + :return: tuple of intermediate certificate, certificate chain, private key, public key + """ + + assert domain is not None and len(domain) > 0 + + url = urljoin(API_ENDPOINT, "ssl/retrieve/{}".format(domain)) + req_json = { + "apikey": self.api_key, + "secretapikey": self.secret_api_key + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + ssl_bundle = json.loads(r.text) + + intermediate_certificate = ssl_bundle["intermediate_certificate"] + certificate_chain = ssl_bundle["certificate_chain"] + private_key = ssl_bundle["private_key"] + public_key = ssl_bundle["public_key"] + + return intermediate_certificate, certificate_chain, private_key, public_key + else: + raise Exception("ERROR: SSL bundle retrieve api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + @staticmethod + def __handle_error_backup__(dns_records): + # merge the single DNS records into one single dict with the record id as key + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record["id"]] = record + + # generate filename with incremental suffix + base_backup_filename = "pkb_client_dns_records_backup" + suffix = 0 + backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) + while backup_file_path.exists(): + suffix += 1 + backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) + + with open(backup_file_path, "w") as f: + json.dump(dns_records_dict, f) + + print("a backup of your existing dns records was saved to {}".format(str(backup_file_path))) diff --git a/pkb_client/helper.py b/pkb_client/helper.py new file mode 100644 index 0000000..0c08167 --- /dev/null +++ b/pkb_client/helper.py @@ -0,0 +1,15 @@ +def parse_dns_record(record: dict) -> dict: + """ + Parse the DNS record. + Replace the ttl and prio string values with the int values. + + :param record: the unparsed DNS record dict + + :return: the parsed dns record dict + """ + if record.get("ttl", None) is not None: + record["ttl"] = int(record["ttl"]) + if record.get("prio", None) is not None: + record["prio"] = int(record["prio"]) + + return record |
