diff options
| author | 2024-11-20 01:17:41 -0500 | |
|---|---|---|
| committer | 2024-11-20 01:17:41 -0500 | |
| commit | eab0e07b2d9931aa35f243fd24c42c02a4ec8533 (patch) | |
| tree | 767a0da67d0a1cd8ce30db0ad0c800e17380b4cf /pkb_client | |
| parent | 6094eb734bfd2159d81ffea918ea5d31e1e61441 (diff) | |
| parent | 3e3ebe586385a83b10c8f1d0b9ba9b67c8b56d2f (diff) | |
Update upstream source from tag 'upstream/2.0.0'
Update to upstream version '2.0.0'
with Debian dir a538d4b69c2b9e7ec310387046aa40f0e2499b5f
Diffstat (limited to 'pkb_client')
| -rw-r--r-- | pkb_client/__init__.py | 2 | ||||
| -rw-r--r-- | pkb_client/cli.py | 134 | ||||
| -rw-r--r-- | pkb_client/cli/__init__.py | 3 | ||||
| -rw-r--r-- | pkb_client/cli/cli.py | 347 | ||||
| -rw-r--r-- | pkb_client/client.py | 455 | ||||
| -rw-r--r-- | pkb_client/client/__init__.py | 22 | ||||
| -rw-r--r-- | pkb_client/client/bind_file.py | 169 | ||||
| -rw-r--r-- | pkb_client/client/client.py | 867 | ||||
| -rw-r--r-- | pkb_client/client/dns.py | 63 | ||||
| -rw-r--r-- | pkb_client/client/domain.py | 29 | ||||
| -rw-r--r-- | pkb_client/client/forwarding.py | 28 | ||||
| -rw-r--r-- | pkb_client/client/ssl_cert.py | 13 | ||||
| -rw-r--r-- | pkb_client/helper.py | 15 |
13 files changed, 1542 insertions, 605 deletions
diff --git a/pkb_client/__init__.py b/pkb_client/__init__.py index 17a5bda..d5096ef 100644 --- a/pkb_client/__init__.py +++ b/pkb_client/__init__.py @@ -1 +1 @@ -__version__ = "v1.2" +__version__ = "v2.0.0" diff --git a/pkb_client/cli.py b/pkb_client/cli.py deleted file mode 100644 index 4e87373..0000000 --- a/pkb_client/cli.py +++ /dev/null @@ -1,134 +0,0 @@ -import argparse -import pprint -import textwrap - -from pkb_client.client import PKBClient, SUPPORTED_DNS_RECORD_TYPES, DNSRestoreMode - - -def main(): - parser = argparse.ArgumentParser( - description="Unofficial client for the Porkbun API", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=textwrap.dedent(""" - License: - MIT - Copyright (c) Marvin Heptner - - Copyright notices: - requests: - Project: https://github.com/psf/requests - License: Apache-2.0 https://github.com/psf/requests/blob/master/LICENSE - setuptools: - Project: https://github.com/pypa/setuptools - License: MIT https://raw.githubusercontent.com/pypa/setuptools/main/LICENSE - """) - ) - - parser.add_argument("-k", "--key", help="The API key used for Porkbun API calls (usually starts with \"pk\").") - parser.add_argument("-s", "--secret", - help="The API secret used for Porkbun API calls (usually starts with \"sk\").") - - subparsers = parser.add_subparsers(help="Supported API methods") - - parser_ping = subparsers.add_parser("ping", help="Ping the API Endpoint") - parser_ping.set_defaults(func=PKBClient.ping) - - parser_dns_create = subparsers.add_parser("dns-create", help="Create a new DNS record.") - parser_dns_create.set_defaults(func=PKBClient.dns_create) - parser_dns_create.add_argument("domain", help="The domain for which the new DNS record should be created.") - parser_dns_create.add_argument("record_type", help="The type of the new DNS record.", - choices=SUPPORTED_DNS_RECORD_TYPES) - parser_dns_create.add_argument("content", help="The content of the new DNS record.") - parser_dns_create.add_argument("--name", - help="The subdomain for which the new DNS record should be created." - "The * can be used for a wildcard DNS record." - "If not used, then a DNS record for the root domain will be created", - required=False) - parser_dns_create.add_argument("--ttl", type=int, help="The ttl of the new DNS record.", required=False) - parser_dns_create.add_argument("--prio", type=int, help="The priority of the new DNS record.", required=False) - - parser_dns_edit = subparsers.add_parser("dns-edit", help="Edit an existing DNS record.") - parser_dns_edit.set_defaults(func=PKBClient.dns_edit) - parser_dns_edit.add_argument("domain", help="The domain for which the DNS record should be edited.") - parser_dns_edit.add_argument("record_id", help="The id of the DNS record which should be edited.") - parser_dns_edit.add_argument("record_type", help="The new type of the DNS record.", - choices=SUPPORTED_DNS_RECORD_TYPES) - parser_dns_edit.add_argument("content", help="The new content of the DNS record.") - parser_dns_edit.add_argument("--name", - help="The new value of the subdomain for which the DNS record should apply. " - "The * can be used for a wildcard DNS record. If not set, the record will " - "be set for the root domain.", - required=False) - parser_dns_edit.add_argument("--ttl", type=int, help="The new ttl of the DNS record.", required=False) - parser_dns_edit.add_argument("--prio", type=int, help="The new priority of the DNS record.", required=False) - - parser_dns_delete = subparsers.add_parser("dns-delete", help="Delete an existing DNS record.") - parser_dns_delete.set_defaults(func=PKBClient.dns_delete) - parser_dns_delete.add_argument("domain", help="The domain for which the DNS record should be deleted.") - parser_dns_delete.add_argument("record_id", help="The id of the DNS record which should be deleted.") - - parser_dns_receive = subparsers.add_parser("dns-retrieve", help="Get all DNS records.") - parser_dns_receive.set_defaults(func=PKBClient.dns_retrieve) - parser_dns_receive.add_argument("domain", help="The domain for which the DNS record should be retrieved.") - - parser_dns_export = subparsers.add_parser("dns-export", help="Save all DNS records to a local file as json.") - parser_dns_export.set_defaults(func=PKBClient.dns_export) - parser_dns_export.add_argument("domain", - help="The domain for which the DNS record should be retrieved and saved.") - parser_dns_export.add_argument("filename", help="The filename where to save the exported DNS records.") - - parser_dns_import = subparsers.add_parser("dns-import", help="Restore all DNS records from a local file.", - formatter_class=argparse.RawTextHelpFormatter) - parser_dns_import.set_defaults(func=PKBClient.dns_import) - parser_dns_import.add_argument("domain", help="The domain for which the DNS record should be restored.") - parser_dns_import.add_argument("filename", help="The filename from which the DNS records are to be restored.") - parser_dns_import.add_argument("restore_mode", help="""The restore mode (DNS records are identified by the record id): - clean: remove all existing DNS records and restore all DNS records from the provided file - replace: replace only existing DNS records with the DNS records from the provided file, but do not create any new DNS records - keep: keep the existing DNS records and only create new ones for all DNS records from the specified file if they do not exist - """, type=DNSRestoreMode.from_string, choices=list(DNSRestoreMode)) - - parser_domain_pricing = subparsers.add_parser("domain-pricing", help="Get the pricing for porkbun domains.") - parser_domain_pricing.set_defaults(func=PKBClient.get_domain_pricing) - - parser_ssl_retrieve = subparsers.add_parser("ssl-retrieve", help="Retrieve an SSL bundle for given domain.") - parser_ssl_retrieve.set_defaults(func=PKBClient.ssl_retrieve) - parser_ssl_retrieve.add_argument("domain", help="The domain for which the SSL bundle should be retrieve.") - - args = parser.parse_args() - - if not hasattr(args, "func"): - raise argparse.ArgumentError(None, "No method specified. Please provide a method and try again.") - - pp = pprint.PrettyPrinter(indent=4) - - # call the static methods - if args.func == PKBClient.get_domain_pricing: - pp.pprint(args.func(**vars(args))) - exit(0) - - if args.key is None: - while True: - api_key = input("Please enter your API key you got from Porkbun (usually starts with \"pk\"): ") - if len(api_key) == 0: - print("The api key can not be empty.") - else: - break - else: - api_key = args.key - - if args.secret is None: - while True: - api_secret = input("Please enter your API key secret you got from Porkbun (usually starts with \"sk\"): ") - if len(api_secret) == 0: - print("The api key secret can not be empty.") - else: - break - else: - api_secret = args.secret - - pkb_client = PKBClient(api_key, api_secret) - pp.pprint(args.func(pkb_client, **vars(args))) - - -if __name__ == "__main__": - main() diff --git a/pkb_client/cli/__init__.py b/pkb_client/cli/__init__.py new file mode 100644 index 0000000..ed32c05 --- /dev/null +++ b/pkb_client/cli/__init__.py @@ -0,0 +1,3 @@ +from .cli import main + +__all__ = ["main"] diff --git a/pkb_client/cli/cli.py b/pkb_client/cli/cli.py new file mode 100644 index 0000000..4bf53cd --- /dev/null +++ b/pkb_client/cli/cli.py @@ -0,0 +1,347 @@ +import argparse +import dataclasses +import json +import os +import textwrap +from datetime import datetime + +from pkb_client.client import PKBClient, API_ENDPOINT +from pkb_client.client.dns import DNSRecordType, DNSRestoreMode +from pkb_client.client.forwarding import URLForwardingType + + +class CustomJSONEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, datetime): + return o.isoformat() + if dataclasses.is_dataclass(o): + return dataclasses.asdict(o) + return super().default(o) + + +def main(): + parser = argparse.ArgumentParser( + description="Python client for the Porkbun API", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=textwrap.dedent(""" + License: + MIT - Copyright (c) Marvin Heptner + """), + ) + + parser.add_argument( + "-k", + "--key", + help='The API key used for Porkbun API calls (usually starts with "pk").', + ) + parser.add_argument( + "-s", + "--secret", + help='The API secret used for Porkbun API calls (usually starts with "sk").', + ) + parser.add_argument("--debug", help="Enable debug mode.", action="store_true") + parser.add_argument( + "--endpoint", help="The API endpoint to use.", default=API_ENDPOINT + ) + + subparsers = parser.add_subparsers(help="Supported API methods") + + parser_ping = subparsers.add_parser("ping", help="Ping the API Endpoint") + parser_ping.set_defaults(func=PKBClient.ping) + + parser_dns_create = subparsers.add_parser( + "create-dns-record", help="Create a new DNS record." + ) + parser_dns_create.set_defaults(func=PKBClient.create_dns_record) + parser_dns_create.add_argument( + "domain", help="The domain for which the new DNS record should be created." + ) + parser_dns_create.add_argument( + "record_type", + help="The type of the new DNS record.", + choices=list(DNSRecordType), + ) + parser_dns_create.add_argument("content", help="The content of the new DNS record.") + parser_dns_create.add_argument( + "--name", + help="The subdomain for which the new DNS record should be created." + "The * can be used for a wildcard DNS record." + "If not used, then a DNS record for the root domain will be created", + required=False, + ) + parser_dns_create.add_argument( + "--ttl", type=int, help="The ttl of the new DNS record.", required=False + ) + parser_dns_create.add_argument( + "--prio", type=int, help="The priority of the new DNS record.", required=False + ) + + parser_dns_edit = subparsers.add_parser( + "update-dns-record", help="Edit an existing DNS record." + ) + parser_dns_edit.set_defaults(func=PKBClient.update_dns_record) + parser_dns_edit.add_argument( + "domain", help="The domain for which the DNS record should be edited." + ) + parser_dns_edit.add_argument( + "record_id", help="The id of the DNS record which should be edited." + ) + parser_dns_edit.add_argument( + "record_type", + help="The new type of the DNS record.", + choices=list(DNSRecordType), + ) + parser_dns_edit.add_argument("content", help="The new content of the DNS record.") + parser_dns_edit.add_argument( + "--name", + help="The new value of the subdomain for which the DNS record should apply. " + "The * can be used for a wildcard DNS record. If not set, the record will " + "be set for the root domain.", + required=False, + ) + parser_dns_edit.add_argument( + "--ttl", type=int, help="The new ttl of the DNS record.", required=False + ) + parser_dns_edit.add_argument( + "--prio", type=int, help="The new priority of the DNS record.", required=False + ) + + parser_dns_delete = subparsers.add_parser( + "delete-dns-records", help="Delete an existing DNS record." + ) + parser_dns_delete.set_defaults(func=PKBClient.delete_dns_record) + parser_dns_delete.add_argument( + "domain", help="The domain for which the DNS record should be deleted." + ) + parser_dns_delete.add_argument( + "record_id", help="The id of the DNS record which should be deleted." + ) + + parser_dns_receive = subparsers.add_parser( + "get-dns-records", help="Get all DNS records." + ) + parser_dns_receive.set_defaults(func=PKBClient.get_dns_records) + parser_dns_receive.add_argument( + "domain", help="The domain for which the DNS record should be retrieved." + ) + + parser_dns_export = subparsers.add_parser( + "export-dns-records", help="Save all DNS records to a local json file." + ) + parser_dns_export.set_defaults(func=PKBClient.export_dns_records) + parser_dns_export.add_argument( + "domain", + help="The domain for which the DNS record should be retrieved and saved.", + ) + parser_dns_export.add_argument( + "filepath", help="The filepath where to save the exported DNS records." + ) + + parser_dns_export_bind = subparsers.add_parser( + "export-bind-dns-records", help="Save all DNS records to a local BIND file." + ) + parser_dns_export_bind.set_defaults(func=PKBClient.export_bind_dns_records) + parser_dns_export_bind.add_argument( + "domain", + help="The domain for which the DNS record should be retrieved and saved.", + ) + parser_dns_export_bind.add_argument( + "filepath", help="The filepath where to save the exported DNS records." + ) + + parser_dns_import = subparsers.add_parser( + "import-dns-records", + help="Restore all DNS records from a local json file.", + formatter_class=argparse.RawTextHelpFormatter, + ) + parser_dns_import.set_defaults(func=PKBClient.import_dns_records) + parser_dns_import.add_argument( + "domain", help="The domain for which the DNS record should be restored." + ) + parser_dns_import.add_argument( + "filepath", help="The filepath from which the DNS records are to be restored." + ) + parser_dns_import.add_argument( + "restore_mode", + help="""The restore mode (DNS records are identified by the record type, name and prio if supported): + clear: remove all existing DNS records and restore all DNS records from the provided file + replace: replace only existing DNS records with the DNS records from the provided file, but do not create any new DNS records + keep: keep the existing DNS records and only create new ones for all DNS records from the specified file if they do not exist + """, + type=DNSRestoreMode.from_string, + choices=list(DNSRestoreMode), + ) + + parser_dns_import_bind = subparsers.add_parser( + "import-bind-dns-records", + help="Restore all DNS records from a local BIND file.", + formatter_class=argparse.RawTextHelpFormatter, + ) + parser_dns_import_bind.set_defaults(func=PKBClient.import_bind_dns_records) + parser_dns_import_bind.add_argument( + "filepath", help="The filepath from which the DNS records are to be restored." + ) + parser_dns_import_bind.add_argument( + "restore_mode", + help="""The restore mode (DNS records are identified by the record id): + clear: remove all existing DNS records and restore all DNS records from the provided file + """, + type=DNSRestoreMode.from_string, + choices=[DNSRestoreMode.clear], + ) + + parser_domain_pricing = subparsers.add_parser( + "get-domain-pricing", help="Get the pricing for Porkbun domains." + ) + parser_domain_pricing.set_defaults(func=PKBClient.get_domain_pricing) + + parser_ssl_retrieve = subparsers.add_parser( + "get-ssl-bundle", help="Retrieve an SSL bundle for given domain." + ) + parser_ssl_retrieve.set_defaults(func=PKBClient.get_ssl_bundle) + parser_ssl_retrieve.add_argument( + "domain", help="The domain for which the SSL bundle should be retrieve." + ) + + parser_update_dns_server = subparsers.add_parser( + "update-dns-servers", help="Update the DNS servers for a domain." + ) + parser_update_dns_server.set_defaults(func=PKBClient.update_dns_servers) + parser_update_dns_server.add_argument( + "domain", help="The domain for which the DNS servers should be set." + ) + parser_update_dns_server.add_argument( + "dns_servers", nargs="+", help="The DNS servers to be set." + ) + + parser_get_dns_server = subparsers.add_parser( + "get-dns-servers", help="Retrieve the DNS servers for a domain." + ) + parser_get_dns_server.set_defaults(func=PKBClient.get_dns_servers) + parser_get_dns_server.add_argument( + "domain", help="The domain for which the DNS servers should be retrieved." + ) + + parser_list_domains = subparsers.add_parser( + "get-domains", help="List all domains in this account in chunks of 1000." + ) + parser_list_domains.set_defaults(func=PKBClient.get_domains) + parser_list_domains.add_argument( + "--start", + type=int, + help="The start index of the list.", + default=0, + required=False, + ) + + parser_get_url_forward = subparsers.add_parser( + "get-url-forwards", help="Retrieve all URL forwards." + ) + parser_get_url_forward.set_defaults(func=PKBClient.get_url_forwards) + parser_get_url_forward.add_argument( + "domain", help="The domain for which the URL forwards should be retrieved." + ) + + parser_add_url_forward = subparsers.add_parser( + "create-url-forward", help="Create a new URL forward." + ) + parser_add_url_forward.set_defaults(func=PKBClient.create_url_forward) + parser_add_url_forward.add_argument( + "domain", help="The domain for which the new URL forward should be created." + ) + parser_add_url_forward.add_argument( + "location", + help="The location to which the url forwarding should redirect.", + ) + parser_add_url_forward.add_argument( + "type", help="The type of the url forwarding.", choices=list(URLForwardingType) + ) + parser_add_url_forward.add_argument( + "--subdomain", + help="The subdomain for which the url forwarding should be added.", + required=False, + default="", + ) + parser_add_url_forward.add_argument( + "--include-path", + help="Whether the path should be included in the url forwarding.", + action="store_true", + default=False, + ) + parser_add_url_forward.add_argument( + "--wildcard", + help="Whether the url forwarding should be also applied to subdomains.", + action="store_true", + default=False, + ) + + parser_delete_url_forward = subparsers.add_parser( + "delete-url-forward", help="Delete an existing URL forward." + ) + parser_delete_url_forward.set_defaults(func=PKBClient.delete_url_forward) + parser_delete_url_forward.add_argument( + "domain", help="The domain for which the URL forward should be deleted." + ) + parser_delete_url_forward.add_argument( + "id", help="The id of the URL forward which should be deleted." + ) + + args = vars(parser.parse_args()) + + debug = args.pop("debug", False) + + func = args.pop("func", None) + if not func: + raise argparse.ArgumentError( + None, "No method specified. Please provide a method and try again." + ) + + endpoint = args.pop("endpoint") + api_key = args.pop("key") + api_secret = args.pop("secret") + + # call the api methods which do not require authentication + if func == PKBClient.get_domain_pricing: + pkb_client = PKBClient(api_endpoint=endpoint, debug=debug) + ret = func(pkb_client, **args) + + print(json.dumps(ret, cls=CustomJSONEncoder, indent=4)) + exit(0) + + if api_key is None: + # try to get the api key from the environment variable or fallback to user input + api_key = os.environ.get("PKB_API_KEY", "") + if len(api_key.strip()) == 0: + while True: + api_key = input( + 'Please enter your API key you got from Porkbun (usually starts with "pk"): ' + ) + if len(api_key.strip()) == 0: + print("The api key can not be empty.") + else: + break + + if api_secret is None: + # try to get the api secret from the environment variable or fallback to user input + api_secret = os.environ.get("PKB_API_SECRET", "") + if len(api_secret.strip()) == 0: + while True: + api_secret = input( + 'Please enter your API key secret you got from Porkbun (usually starts with "sk"): ' + ) + if len(api_secret.strip()) == 0: + print("The api key secret can not be empty.") + else: + break + + pkb_client = PKBClient( + api_key=api_key, secret_api_key=api_secret, api_endpoint=endpoint, debug=debug + ) + + ret = func(pkb_client, **args) + + print(json.dumps(ret, cls=CustomJSONEncoder, indent=4)) + + +if __name__ == "__main__": + main() diff --git a/pkb_client/client.py b/pkb_client/client.py deleted file mode 100644 index 48c0d82..0000000 --- a/pkb_client/client.py +++ /dev/null @@ -1,455 +0,0 @@ -import json -import logging -from enum import Enum -from pathlib import Path -from typing import Optional, Tuple -from urllib.parse import urljoin - -import requests - -from pkb_client.helper import parse_dns_record - -API_ENDPOINT = "https://porkbun.com/api/json/v3/" -SUPPORTED_DNS_RECORD_TYPES = ["A", "AAAA", "MX", "CNAME", "ALIAS", "TXT", "NS", "SRV", "TLSA", "CAA"] - -# prevent urllib3 to log request with the api key and secret -logging.getLogger("urllib3").setLevel(logging.WARNING) - - -class DNSRestoreMode(Enum): - clear = 0 - replace = 1 - keep = 2 - - def __str__(self): - return self.name - - @staticmethod - def from_string(a): - try: - return DNSRestoreMode[a] - except KeyError: - return a - - -class PKBClient: - """ - API client for Porkbun. - """ - - def __init__(self, api_key: str, secret_api_key: str) -> None: - """ - Creates a new PKBClient object. - - :param api_key: the API key used for Porkbun API calls - :param secret_api_key: the API secret used for Porkbun API calls - """ - - assert api_key is not None and len(api_key) > 0 - assert secret_api_key is not None and len(secret_api_key) > 0 - - self.api_key = api_key - self.secret_api_key = secret_api_key - - def ping(self, **kwargs) -> str: - """ - API ping method: get the current public ip address of the requesting system; can also be used for auth checking - see https://porkbun.com/api/json/v3/documentation#Authentication for more info - - :return: the current public ip address of the requesting system - """ - - url = urljoin(API_ENDPOINT, "ping") - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return json.loads(r.text).get("yourIp", None) - else: - raise Exception("ERROR: ping api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_create(self, - domain: str, - record_type: str, - content: str, - name: Optional[str] = None, - ttl: Optional[int] = 300, - prio: Optional[int] = None, **kwargs) -> str: - """ - API DNS create method: create a new DNS record for given domain - see https://porkbun.com/api/json/v3/documentation#DNS%20Create%20Record for more info - - :param domain: the domain for which the DNS record should be created - :param record_type: the type of the new DNS record; - supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA - :param content: the content of the new DNS record - :param name: the subdomain for which the new DNS record entry should apply; the * can be used for a - wildcard DNS record; if not used, then a DNS record for the root domain will be created - :param ttl: the time to live in seconds of the new DNS record; have to be between 0 and 2147483647 - :param prio: the priority of the new DNS record - - :return: the id of the new created DNS record - """ - - assert domain is not None and len(domain) > 0 - assert record_type in SUPPORTED_DNS_RECORD_TYPES - assert content is not None and len(content) > 0 - assert ttl is None or 300 <= ttl <= 2147483647 - - url = urljoin(API_ENDPOINT, "dns/create/{}".format(domain)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key, - "name": name, - "type": record_type, - "content": content, - "ttl": ttl, - "prio": prio - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return str(json.loads(r.text).get("id", None)) - else: - raise Exception("ERROR: DNS create api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_edit(self, - domain: str, - record_id: str, - record_type: str, - content: str, - name: str = None, - ttl: int = 300, - prio: int = None, - **kwargs) -> bool: - """ - API DNS edit method: edit an existing DNS record specified by the id for a given domain - see https://porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record for more info - - :param domain: the domain for which the DNS record should be edited - :param record_id: the id of the DNS record which should be edited - :param record_type: the new type of the DNS record; - supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA - :param content: the new content of the DNS record - :param name: the new value of the subdomain for which the DNS record should apply; the * can be used for a - wildcard DNS record; if not set, the record will be set for the record domain - :param ttl: the new time to live in seconds of the DNS record, have to be between 0 and 2147483647 - :param prio: the new priority of the DNS record - - :return: True if the editing was successful - """ - - assert domain is not None and len(domain) > 0 - assert record_id is not None and len(record_id) > 0 - assert record_type in SUPPORTED_DNS_RECORD_TYPES - assert content is not None and len(content) > 0 - assert ttl is None or 300 <= ttl <= 2147483647 - - url = urljoin(API_ENDPOINT, "dns/edit/{}/{}".format(domain, record_id)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key, - "name": name, - "type": record_type, - "content": content, - "ttl": ttl, - "prio": prio - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return True - else: - raise Exception("ERROR: DNS edit api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_delete(self, - domain: str, - record_id: str, - **kwargs) -> bool: - """ - API DNS delete method: delete an existing DNS record specified by the id for a given domain - see https://porkbun.com/api/json/v3/documentation#DNS%20Delete%20Record for more info - - :param domain: the domain for which the DNS record should be deleted - :param record_id: the id of the DNS record which should be deleted - - :return: True if the deletion was successful - """ - - assert domain is not None and len(domain) > 0 - assert record_id is not None and len(record_id) > 0 - - url = urljoin(API_ENDPOINT, "dns/delete/{}/{}".format(domain, record_id)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return True - else: - raise Exception("ERROR: DNS delete api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_retrieve(self, domain, **kwargs) -> list: - """ - API DNS retrieve method: retrieve all DNS records for given domain - see https://porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records for more info - - :param domain: the domain for which the DNS records should be retrieved - - :return: list of DNS records as dicts - - The list structure will be: - [ - { - "id": "123456789", - "name": "example.com", - "type": "TXT", - "content": "this is a nice text", - "ttl": "300", - "prio": None, - "notes": "" - }, - { - "id": "234567890", - "name": "example.com", - "type": "A", - "content": "0.0.0.0", - "ttl": "300", - "prio": 0, - "notes": "" - } - ] - """ - - assert domain is not None and len(domain) > 0 - - url = urljoin(API_ENDPOINT, "dns/retrieve/{}".format(domain)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return [parse_dns_record(record) for record in json.loads(r.text).get("records", [])] - else: - raise Exception("ERROR: DNS retrieve api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_export(self, domain: str, filename: str, **kwargs) -> bool: - """ - Export all DNS record from the given domain as json to a file. - This method does not not represent a Porkbun API method. - - :param domain: the domain for which the DNS record should be retrieved and saved - :param filename: the filename where to save the exported DNS records - - :return: True if everything went well - """ - - assert domain is not None and len(domain) > 0 - assert filename is not None and len(filename) > 0 - - print("retrieve current DNS records...") - dns_records = self.dns_retrieve(domain) - - print("save DNS records to {} ...".format(filename)) - # merge the single DNS records into one single dict with the record id as key - dns_records_dict = dict() - for record in dns_records: - dns_records_dict[record["id"]] = record - - filepath = Path(filename) - if filepath.exists(): - raise Exception("File already exists. Please try another filename") - with open(filepath, "w") as f: - json.dump(dns_records_dict, f) - print("export finished") - - return True - - def dns_import(self, domain: str, filename: str, restore_mode: DNSRestoreMode, **kwargs) -> bool: - """ - Restore - This method does not not represent a Porkbun API method. - - :param domain: the domain for which the DNS record should be restored - :param filename: the filename from which the DNS records are to be restored - :param restore_mode: The restore mode (DNS records are identified by the record id) - clean: remove all existing DNS records and restore all DNS records from the provided file - replace: replace only existing DNS records with the DNS records from the provided file, - but do not create any new DNS records - keep: keep the existing DNS records and only create new ones for all DNS records from - the specified file if they do not exist - - :return: True if everything went well - """ - - assert domain is not None and len(domain) > 0 - assert filename is not None and len(filename) > 0 - assert isinstance(restore_mode, DNSRestoreMode) - - existing_dns_records = self.dns_retrieve(domain) - - with open(filename, "r") as f: - exported_dns_records_dict = json.load(f) - - if restore_mode is DNSRestoreMode.clear: - print("restore mode: clear") - - try: - # delete all existing DNS records - for record in existing_dns_records: - self.dns_delete(domain, record["id"]) - - # restore all exported records by creating new DNS records - for _, exported_record in exported_dns_records_dict.items(): - name = ".".join(exported_record["name"].split(".")[:-2]) - self.dns_create(domain=domain, - record_type=exported_record["type"], - content=exported_record["content"], - name=name, - ttl=exported_record["ttl"], - prio=exported_record["prio"]) - except Exception as e: - print("something went wrong: {}".format(e.__str__())) - self.__handle_error_backup__(existing_dns_records) - print("import failed") - return False - elif restore_mode is DNSRestoreMode.replace: - print("restore mode: replace") - - try: - for existing_record in existing_dns_records: - record_id = existing_record["id"] - exported_record = exported_dns_records_dict.get(record_id, None) - # also check if the exported dns record is different to the existing record, - # so we can reduce unnecessary api calls - if exported_record is not None and exported_record != existing_record: - name = ".".join(exported_record["name"].split(".")[:-2]) - self.dns_edit(domain=domain, - record_id=record_id, - record_type=exported_record["type"], - content=exported_record["content"], - name=name, - ttl=exported_record["ttl"], - prio=exported_record["prio"]) - except Exception as e: - print("something went wrong: {}".format(e.__str__())) - self.__handle_error_backup__(existing_dns_records) - print("import failed") - return False - elif restore_mode is DNSRestoreMode.keep: - print("restore mode: keep") - - existing_dns_records_dict = dict() - for record in existing_dns_records: - existing_dns_records_dict[record["id"]] = record - - try: - for _, exported_record in exported_dns_records_dict.items(): - if exported_record["id"] not in existing_dns_records_dict: - name = ".".join(exported_record["name"].split(".")[:-2]) - self.dns_create(domain=domain, - record_type=exported_record["type"], - content=exported_record["content"], - name=name, - ttl=exported_record["ttl"], - prio=exported_record["prio"]) - except Exception as e: - print("something went wrong: {}".format(e.__str__())) - self.__handle_error_backup__(existing_dns_records) - print("import failed") - return False - else: - raise Exception("restore mode not supported") - - print("import successfully completed") - - return True - - @staticmethod - def get_domain_pricing(**kwargs) -> dict: - """ - Get the pricing for porkbun domains - see https://porkbun.com/api/json/v3/documentation#Domain%20Pricing for more info - - :return: dict with pricing - """ - - url = urljoin(API_ENDPOINT, "pricing/get") - r = requests.post(url=url) - - if r.status_code == 200: - return json.loads(r.text) - else: - raise Exception("ERROR: Domain pricing retrieve api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def ssl_retrieve(self, domain, **kwargs) -> Tuple[str, str, str, str]: - """ - API SSL bundle retrieve method: retrieve an SSL bundle for given domain - see https://porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info - - :param domain: the domain for which the SSL bundle should be retrieved - - :return: tuple of intermediate certificate, certificate chain, private key, public key - """ - - assert domain is not None and len(domain) > 0 - - url = urljoin(API_ENDPOINT, "ssl/retrieve/{}".format(domain)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - ssl_bundle = json.loads(r.text) - - intermediate_certificate = ssl_bundle["intermediate_certificate"] - certificate_chain = ssl_bundle["certificate_chain"] - private_key = ssl_bundle["private_key"] - public_key = ssl_bundle["public_key"] - - return intermediate_certificate, certificate_chain, private_key, public_key - else: - raise Exception("ERROR: SSL bundle retrieve api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - @staticmethod - def __handle_error_backup__(dns_records): - # merge the single DNS records into one single dict with the record id as key - dns_records_dict = dict() - for record in dns_records: - dns_records_dict[record["id"]] = record - - # generate filename with incremental suffix - base_backup_filename = "pkb_client_dns_records_backup" - suffix = 0 - backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) - while backup_file_path.exists(): - suffix += 1 - backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) - - with open(backup_file_path, "w") as f: - json.dump(dns_records_dict, f) - - print("a backup of your existing dns records was saved to {}".format(str(backup_file_path))) diff --git a/pkb_client/client/__init__.py b/pkb_client/client/__init__.py new file mode 100644 index 0000000..da8a42b --- /dev/null +++ b/pkb_client/client/__init__.py @@ -0,0 +1,22 @@ +from .bind_file import BindFile, BindRecord, RecordClass +from .client import PKBClient, PKBClientException, API_ENDPOINT +from .dns import DNSRecord, DNSRestoreMode, DNSRecordType +from .domain import DomainInfo +from .forwarding import URLForwarding, URLForwardingType +from .ssl_cert import SSLCertBundle + +__all__ = [ + "PKBClient", + "PKBClientException", + "API_ENDPOINT", + "BindFile", + "BindRecord", + "RecordClass", + "DNSRecord", + "DNSRestoreMode", + "DNSRecordType", + "DomainInfo", + "URLForwarding", + "URLForwardingType", + "SSLCertBundle", +] diff --git a/pkb_client/client/bind_file.py b/pkb_client/client/bind_file.py new file mode 100644 index 0000000..af9abe0 --- /dev/null +++ b/pkb_client/client/bind_file.py @@ -0,0 +1,169 @@ +import logging +from dataclasses import dataclass +from enum import Enum +from typing import Optional, List + +from pkb_client.client.dns import DNSRecordType, DNS_RECORDS_WITH_PRIORITY + + +class RecordClass(str, Enum): + IN = "IN" + + def __str__(self): + return self.value + + +@dataclass +class BindRecord: + name: str + ttl: int + record_class: RecordClass + record_type: DNSRecordType + data: str + prio: Optional[int] = None + comment: Optional[str] = None + + def __str__(self): + record_string = f"{self.name} {self.ttl} {self.record_class} {self.record_type}" + if self.prio is not None: + record_string += f" {self.prio}" + record_string += f" {self.data}" + if self.comment: + record_string += f" ; {self.comment}" + return record_string + + +class BindFile: + origin: str + ttl: Optional[int] = None + records: List[BindRecord] + + def __init__( + self, + origin: str, + ttl: Optional[int] = None, + records: Optional[List[BindRecord]] = None, + ) -> None: + self.origin = origin + self.ttl = ttl + self.records = records or [] + + @staticmethod + def from_file(file_path: str) -> "BindFile": + with open(file_path, "r") as f: + file_data = f.readlines() + + # parse the file line by line + origin = None + ttl = None + records = [] + for line in file_data: + if line.startswith("$ORIGIN"): + origin = line.split()[1] + elif line.startswith("$TTL"): + ttl = int(line.split()[1]) + else: + # parse the records with the two possible formats: + # 1: name ttl record-class record-type record-data + # 2: name record-class ttl record-type record-data + # whereby the ttl is optional + + # drop any right trailing comments + line_parts = line.split(";", 1) + line = line_parts[0].strip() + comment = line_parts[1].strip() if len(line_parts) > 1 else None + prio = None + + # skip empty lines + if not line: + continue + + # find which format the line is + record_parts = line.split() + if record_parts[1].isdigit(): + # scheme 1 + if record_parts[3] not in DNSRecordType.__members__: + logging.warning(f"Ignoring unsupported record type: {line}") + continue + if record_parts[2] not in RecordClass.__members__: + logging.warning(f"Ignoring unsupported record class: {line}") + continue + record_name = record_parts[0] + record_ttl = int(record_parts[1]) + record_class = RecordClass[record_parts[2]] + record_type = DNSRecordType[record_parts[3]] + if record_type in DNS_RECORDS_WITH_PRIORITY: + prio = int(record_parts[4]) + record_data = " ".join(record_parts[5:]) + else: + record_data = " ".join(record_parts[4:]) + elif record_parts[2].isdigit(): + # scheme 2 + if record_parts[3] not in DNSRecordType.__members__: + logging.warning(f"Ignoring unsupported record type: {line}") + continue + if record_parts[1] not in RecordClass.__members__: + logging.warning(f"Ignoring unsupported record class: {line}") + continue + record_name = record_parts[0] + record_ttl = int(record_parts[2]) + record_class = RecordClass[record_parts[1]] + record_type = DNSRecordType[record_parts[3]] + if record_type in DNS_RECORDS_WITH_PRIORITY: + prio = int(record_parts[4]) + record_data = " ".join(record_parts[5:]) + else: + record_data = " ".join(record_parts[4:]) + else: + # no ttl, use default or previous + if record_parts[2] not in DNSRecordType.__members__: + logging.warning(f"Ignoring unsupported record type: {line}") + continue + if record_parts[1] not in RecordClass.__members__: + logging.warning(f"Ignoring unsupported record class: {line}") + continue + record_name = record_parts[0] + if ttl is None and not records: + raise ValueError("No TTL found in file") + record_ttl = ttl or records[-1].ttl + record_class = RecordClass[record_parts[1]] + record_type = DNSRecordType[record_parts[2]] + if record_type in DNS_RECORDS_WITH_PRIORITY: + prio = int(record_parts[3]) + record_data = " ".join(record_parts[4:]) + else: + record_data = " ".join(record_parts[3:]) + + # replace @ in record name with origin + record_name = record_name.replace("@", origin) + + records.append( + BindRecord( + record_name, + record_ttl, + record_class, + record_type, + record_data, + prio=prio, + comment=comment, + ) + ) + + if origin is None: + raise ValueError("No origin found in file") + + return BindFile(origin, ttl, records) + + def to_file(self, file_path: str) -> None: + with open(file_path, "w") as f: + f.write(str(self)) + + def __str__(self) -> str: + bind = f"$ORIGIN {self.origin}\n" + + if self.ttl is not None: + bind += f"$TTL {self.ttl}\n" + + for record in self.records: + bind += f"{record}\n" + return bind diff --git a/pkb_client/client/client.py b/pkb_client/client/client.py new file mode 100644 index 0000000..86956a5 --- /dev/null +++ b/pkb_client/client/client.py @@ -0,0 +1,867 @@ +import json +import logging +from hashlib import sha256 +from pathlib import Path +from typing import Optional, List, Union +from urllib.parse import urljoin + +import dns.resolver +import requests + +from pkb_client.client import BindFile +from pkb_client.client.dns import ( + DNSRecord, + DNSRestoreMode, + DNSRecordType, + DNS_RECORDS_WITH_PRIORITY, +) +from pkb_client.client.domain import DomainInfo +from pkb_client.client.forwarding import URLForwarding, URLForwardingType +from pkb_client.client.ssl_cert import SSLCertBundle + +API_ENDPOINT = "https://api.porkbun.com/api/json/v3/" + +logger = logging.getLogger("pkb_client") +logging.basicConfig(level=logging.INFO) + + +class PKBClientException(Exception): + def __init__(self, status, message): + super().__init__(f"{status}: {message}") + + +class PKBClient: + """ + API client for Porkbun. + """ + + default_ttl: int = 300 + + def __init__( + self, + api_key: Optional[str] = None, + secret_api_key: Optional[str] = None, + api_endpoint: str = API_ENDPOINT, + debug: bool = False, + ) -> None: + """ + Creates a new PKBClient object. + + :param api_key: the API key used for Porkbun API calls + :param secret_api_key: the API secret used for Porkbun API calls + :param api_endpoint: the endpoint of the Porkbun API. + :param debug: boolean to enable debug logging + """ + self.api_key = api_key + self.secret_api_key = secret_api_key + self.api_endpoint = api_endpoint + self.debug = debug + if self.debug: + logger.setLevel(logging.DEBUG) + + def _get_auth_request_json(self) -> dict: + """ + Get the request json for the authentication of the Porkbun API calls. + + :return: the request json for the authentication of the Porkbun API calls + """ + + if self.api_key is None or self.secret_api_key is None: + raise ValueError("api_key and secret_api_key must be set") + + return {"apikey": self.api_key, "secretapikey": self.secret_api_key} + + def ping(self) -> str: + """ + API ping method: get the current public ip address of the requesting system; can also be used for auth checking. + See https://api.porkbun.com/api/json/v3/documentation#Authentication for more info. + + :return: the current public ip address of the requesting system + """ + + url = urljoin(self.api_endpoint, "ping") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return json.loads(r.text).get("yourIp", None) + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def create_dns_record( + self, + domain: str, + record_type: DNSRecordType, + content: str, + name: Optional[str] = None, + ttl: int = default_ttl, + prio: Optional[int] = None, + ) -> str: + """ + API DNS create method: create a new DNS record for given domain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Create%20Record for more info. + + :param domain: the domain for which the DNS record should be created + :param record_type: the type of the new DNS record + :param content: the content of the new DNS record + :param name: the subdomain for which the new DNS record entry should apply; the * can be used for a + wildcard DNS record; if not used, then a DNS record for the root domain will be created + :param ttl: the time to live in seconds of the new DNS record; have to be between 300 and 86400 + :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None + :return: the id of the new created DNS record + """ + + if ttl > 86400 or ttl < self.default_ttl: + raise ValueError(f"ttl must be between {self.default_ttl} and 86400") + + if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY: + raise ValueError( + f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}" + ) + + url = urljoin(self.api_endpoint, f"dns/create/{domain}") + req_json = { + **self._get_auth_request_json(), + "name": name, + "type": record_type, + "content": content, + "ttl": ttl, + "prio": prio, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return str(json.loads(r.text).get("id", None)) + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def update_dns_record( + self, + domain: str, + record_id: str, + record_type: DNSRecordType, + content: str, + name: Optional[str] = None, + ttl: int = default_ttl, + prio: Optional[int] = None, + ) -> bool: + """ + API DNS edit method: edit an existing DNS record specified by the id for a given domain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record for more info. + + :param domain: the domain for which the DNS record should be edited + :param record_id: the id of the DNS record which should be edited + :param record_type: the new type of the DNS record + :param content: the new content of the DNS record + :param name: the new value of the subdomain for which the DNS record should apply; the * can be used for a + wildcard DNS record; if not set, the record will be set for the record domain + :param ttl: the new time to live in seconds of the DNS record, have to be between 300 and 86400 + :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None + + :return: True if the editing was successful + """ + + if ttl > 86400 or ttl < self.default_ttl: + raise ValueError(f"ttl must be between {self.default_ttl} and 86400") + + if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY: + raise ValueError( + f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}" + ) + + url = urljoin(self.api_endpoint, f"dns/edit/{domain}/{record_id}") + req_json = { + **self._get_auth_request_json(), + "name": name, + "type": record_type, + "content": content, + "ttl": ttl, + "prio": prio, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def update_all_dns_records( + self, + domain: str, + record_type: DNSRecordType, + subdomain: str, + content: str, + ttl: int = default_ttl, + prio: Optional[int] = None, + ) -> bool: + """ + API DNS edit method: edit all existing DNS record matching the domain, record type and subdomain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record%20by%20Domain,%20Subdomain%20and%20Type for more info. + + :param domain: the domain for which the DNS record should be edited + :param record_type: the type of the DNS record + :param subdomain: the subdomain of the DNS record can be empty string for root domain + :param content: the new content of the DNS record + :param ttl: the new time to live in seconds of the DNS record, have to be between 300 and 86400 + :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None + + :return: True if the editing was successful + """ + + if ttl > 86400 or ttl < self.default_ttl: + raise ValueError(f"ttl must be between {self.default_ttl} and 86400") + + if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY: + raise ValueError( + f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}" + ) + + url = urljoin( + self.api_endpoint, f"dns/editByNameType/{domain}/{record_type}/{subdomain}" + ) + req_json = { + **self._get_auth_request_json(), + "type": record_type, + "content": content, + "ttl": ttl, + "prio": prio, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def delete_dns_record(self, domain: str, record_id: str) -> bool: + """ + API DNS delete method: delete an existing DNS record specified by the id for a given domain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Delete%20Record for more info. + + :param domain: the domain for which the DNS record should be deleted + :param record_id: the id of the DNS record which should be deleted + + :return: True if the deletion was successful + """ + + url = urljoin(self.api_endpoint, f"dns/delete/{domain}/{record_id}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def delete_all_dns_records( + self, domain: str, record_type: DNSRecordType, subdomain: str + ) -> bool: + """ + API DNS delete method: delete all existing DNS record matching the domain, record type and subdomain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Delete%20Records%20by%20Domain,%20Subdomain%20and%20Type for more info. + + :param domain: the domain for which the DNS record should be deleted + :param record_type: the type of the DNS record + :param subdomain: the subdomain of the DNS record can be empty string for root domain + + :return: True if the deletion was successful + """ + + url = urljoin( + self.api_endpoint, + f"dns/deleteByNameType/{domain}/{record_type}/{subdomain}", + ) + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_dns_records( + self, domain, record_id: Optional[str] = None + ) -> List[DNSRecord]: + """ + API DNS retrieve method: retrieve all DNS records for given domain if no record id is specified. + Otherwise, retrieve the DNS record of the specified domain with the given record id. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records for more info. + + :param domain: the domain for which the DNS records should be retrieved + :param record_id: the id of the DNS record which should be retrieved + + :return: list of DNSRecords objects + """ + + if record_id is None: + url = urljoin(self.api_endpoint, f"dns/retrieve/{domain}") + else: + url = urljoin(self.api_endpoint, f"dns/retrieve/{domain}/{record_id}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + DNSRecord.from_dict(record) + for record in json.loads(r.text).get("records", []) + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_all_dns_records( + self, domain: str, record_type: DNSRecordType, subdomain: str + ) -> List[DNSRecord]: + """ + API DNS retrieve method: retrieve all DNS records matching the domain, record type and subdomain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records%20by%20Domain,%20Subdomain%20and%20Type for more info. + + :param domain: the domain for which the DNS records should be retrieved + :param record_type: the type of the DNS records + :param subdomain: the subdomain of the DNS records can be empty string for root domain + + :return: list of DNSRecords objects + """ + + url = urljoin( + self.api_endpoint, + f"dns/retrieveByNameType/{domain}/{record_type}/{subdomain}", + ) + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + DNSRecord.from_dict(record) + for record in json.loads(r.text).get("records", []) + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def export_dns_records(self, domain: str, filepath: Union[Path, str]) -> bool: + """ + Export all DNS record from the given domain to a json file. + This method does not represent a Porkbun API method. + DNS records with all custom fields like notes are exported. + + :param domain: the domain for which the DNS record should be retrieved and saved + :param filepath: the filepath where to save the exported DNS records + + :return: True if everything went well + """ + + filepath = Path(filepath) + + logger.debug("retrieve current DNS records...") + dns_records = self.get_dns_records(domain) + + logger.debug("save DNS records to {} ...".format(filepath)) + # merge the single DNS records into one single dict with the record id as key + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record.id] = record + + if filepath.exists(): + logger.warning("file already exists, overwriting...") + + with open(filepath, "w") as f: + json.dump(dns_records_dict, f, default=lambda o: o.__dict__, indent=4) + + logger.info("export finished") + + return True + + def export_bind_dns_records(self, domain: str, filepath: Union[Path, str]) -> bool: + """ + Export all DNS record from the given domain to a BIND file. + This method does not represent a Porkbun API method. + Porkbun DNS record notes are exported as comments. + + :param domain: the domain for which the DNS record should be retrieved and saved + :param filepath: the filepath where to save the exported DNS records + + :return: True if everything went well + """ + + filepath = Path(filepath) + + logger.debug("retrieve current DNS records...") + dns_records = self.get_dns_records(domain) + + logger.debug("save DNS records to {} ...".format(filepath)) + # merge the single DNS records into one single dict with the record id as key + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record.id] = record + + if filepath.exists(): + logger.warning("file already exists, overwriting...") + + # domain header + bind_file_content = f"$ORIGIN {domain}" + + # SOA record + soa_records = dns.resolver.resolve(domain, "SOA") + if soa_records: + soa_record = soa_records[0] + bind_file_content += f"\n@ IN SOA {soa_record.mname} {soa_record.rname} ({soa_record.serial} {soa_record.refresh} {soa_record.retry} {soa_record.expire} {soa_record.minimum})" + + # records + for record in dns_records: + # name record class ttl record type record data + if record.prio: + record_content = f"{record.prio} {record.content}" + else: + record_content = record.content + bind_file_content += ( + f"\n{record.name} IN {record.ttl} {record.type} {record_content}" + ) + + if record.notes: + bind_file_content += f" ; {record.notes}" + + with open(filepath, "w") as f: + f.write(bind_file_content) + + logger.info("export finished") + + return True + + def import_dns_records( + self, domain: str, filepath: Union[Path, str], restore_mode: DNSRestoreMode + ) -> bool: + """ + Restore all DNS records from a json file to the given domain. + This method does not represent a Porkbun API method. + + :param domain: the domain for which the DNS record should be restored + :param filepath: the filepath from which the DNS records are to be restored + :param restore_mode: The restore mode (DNS records are identified by the record type, name and prio if supported): + clear: remove all existing DNS records and restore all DNS records from the provided file + replace: replace only existing DNS records with the DNS records from the provided file, + but do not create any new DNS records + keep: keep the existing DNS records and only create new ones for all DNS records from + the specified file if they do not exist + + :return: True if everything went well + """ + + filepath = Path(filepath) + + existing_dns_records = self.get_dns_records(domain) + + with open(filepath, "r") as f: + exported_dns_records_dict = json.load(f) + + if restore_mode is DNSRestoreMode.clear: + logger.debug("restore mode: clear") + + try: + # delete all existing DNS records + for record in existing_dns_records: + self.delete_dns_record(domain, record.id) + + # restore all exported records by creating new DNS records + for _, exported_record in exported_dns_records_dict.items(): + name = ".".join(exported_record["name"].split(".")[:-2]) + self.create_dns_record( + domain=domain, + record_type=exported_record["type"], + content=exported_record["content"], + name=name, + ttl=exported_record["ttl"], + prio=exported_record["prio"], + ) + except Exception as e: + logger.error("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + logger.error("import failed") + return False + elif restore_mode is DNSRestoreMode.replace: + logger.debug("restore mode: replace") + + try: + existing_dns_record_hashed = { + sha256( + f"{record.type}{record.name}{record.prio}".encode() + ).hexdigest(): record + for record in existing_dns_records + } + for record in exported_dns_records_dict.values(): + record_hash = sha256( + f"{record['type']}{record['name']}{record['prio']}".encode() + ).hexdigest() + existing_record = existing_dns_record_hashed.get(record_hash, None) + # check if the exported dns record is different to the existing record, + # so we can reduce unnecessary api calls + if existing_record is not None and ( + record["content"] != existing_record.content + or record["ttl"] != existing_record.ttl + or record["prio"] != existing_record.prio + ): + self.update_dns_record( + domain=domain, + record_id=existing_record.id, + record_type=record["type"], + content=record["content"], + name=record["name"].replace(f".{domain}", ""), + ttl=record["ttl"], + prio=record["prio"], + ) + except Exception as e: + logger.error("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + logger.error("import failed") + return False + elif restore_mode is DNSRestoreMode.keep: + logger.debug("restore mode: keep") + + existing_dns_record_hashed = { + sha256( + f"{record.type}{record.name}{record.prio}".encode() + ).hexdigest(): record + for record in existing_dns_records + } + + try: + for record in exported_dns_records_dict.values(): + record_hash = sha256( + f"{record['type']}{record['name']}{record['prio']}".encode() + ).hexdigest() + existing_record = existing_dns_record_hashed.get(record_hash, None) + if existing_record is None: + self.create_dns_record( + domain=domain, + record_type=record["type"], + content=record["content"], + name=record["name"].replace(f".{domain}", ""), + ttl=record["ttl"], + prio=record["prio"], + ) + except Exception as e: + logger.error("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + logger.error("import failed") + return False + else: + raise Exception("restore mode not supported") + + logger.info("import successfully completed") + + return True + + def import_bind_dns_records( + self, filepath: Union[Path, str], restore_mode: DNSRestoreMode + ) -> bool: + """ + Restore all DNS records from a BIND file. + This method does not represent a Porkbun API method. + + :param filepath: the bind filepath from which the DNS records are to be restored + :param restore_mode: The restore mode: + clear: remove all existing DNS records and restore all DNS records from the provided file + :return: True if everything went well + """ + + bind_file = BindFile.from_file(filepath) + + existing_dns_records = self.get_dns_records(bind_file.origin[:-1]) + + if restore_mode is DNSRestoreMode.clear: + logger.debug("restore mode: clear") + + try: + # delete all existing DNS records + for record in existing_dns_records: + self.delete_dns_record(bind_file.origin[:-1], record.id) + + # restore all records from BIND file by creating new DNS records + for record in bind_file.records: + # extract subdomain from record name + subdomain = record.name.replace(bind_file.origin, "") + # replace trailing dot + subdomain = subdomain[:-1] if subdomain.endswith(".") else subdomain + self.create_dns_record( + domain=bind_file.origin[:-1], + record_type=record.record_type, + content=record.data, + name=subdomain, + ttl=record.ttl, + prio=record.prio, + ) + + except Exception as e: + logger.error("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + logger.error("import failed") + return False + else: + raise Exception(f"restore mode '{restore_mode.value}' not supported") + + logger.info("import successfully completed") + + return True + + def update_dns_servers(self, domain: str, name_servers: List[str]) -> bool: + """ + Update the name servers of the specified domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Update%20Name%20Servers for more info. + + :return: True if everything went well + """ + + url = urljoin(self.api_endpoint, f"domain/updateNs/{domain}") + req_json = {**self._get_auth_request_json(), "ns": name_servers} + r = requests.post(url=url, json=req_json) + + if r.status_code == 200 and json.loads(r.text).get("status", None) == "SUCCESS": + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_dns_servers(self, domain: str) -> List[str]: + """ + Get the name servers for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Get%20Name%20Servers for more info. + + :return: list of name servers + """ + + url = urljoin(self.api_endpoint, f"domain/getNs/{domain}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return json.loads(r.text).get("ns", []) + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_domains(self, start: int = 0) -> List[DomainInfo]: + """ + Get all domains for the account in chunks of 1000. If you reach the end of all domains, the list will be empty. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20List%20All for more info. + + :param start: the index of the first domain to retrieve + + :return: list of DomainInfo objects + """ + + url = urljoin(self.api_endpoint, "domain/listAll") + + req_json = {**self._get_auth_request_json(), "start": start} + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + DomainInfo.from_dict(domain) + for domain in json.loads(r.text).get("domains", []) + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_url_forwards(self, domain: str) -> List[URLForwarding]: + """ + Get the url forwarding for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Get%20URL%20Forwarding for more info. + + :return: list of URLForwarding objects + """ + + url = urljoin(self.api_endpoint, f"domain/getUrlForwarding/{domain}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + URLForwarding.from_dict(forwarding) + for forwarding in json.loads(r.text).get("forwards", []) + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def create_url_forward( + self, + domain: str, + subdomain: str, + location: str, + type: URLForwardingType, + include_path: bool, + wildcard: bool, + ) -> bool: + """ + Add a url forward for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Add%20URL%20Forward for more info. + + :param domain: the domain for which the url forwarding should be added + :param subdomain: the subdomain for which the url forwarding should be added, can be empty for root domain + :param location: the location to which the url forwarding should redirect + :param type: the type of the url forwarding + :param include_path: if the path should be included in the url forwarding + :param wildcard: if the url forwarding should also be applied to all subdomains + + :return: True if the forwarding was added successfully + """ + + url = urljoin(self.api_endpoint, f"domain/addUrlForward/{domain}") + req_json = { + **self._get_auth_request_json(), + "subdomain": subdomain, + "location": location, + "type": type, + "includePath": include_path, + "wildcard": wildcard, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def delete_url_forward(self, domain: str, id: str) -> bool: + """ + Delete an url forward for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Delete%20URL%20Forward for more info. + + :param domain: the domain for which the url forwarding should be deleted + :param id: the id of the url forwarding which should be deleted + + :return: True if the deletion was successful + """ + + url = urljoin(self.api_endpoint, f"domain/deleteUrlForward/{domain}/{id}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_domain_pricing(self) -> dict: + """ + Get the pricing for all Porkbun domains. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Pricing for more info. + + :return: dict with pricing + """ + + url = urljoin(self.api_endpoint, "pricing/get") + r = requests.post(url=url) + + if r.status_code == 200: + return json.loads(r.text)["pricing"] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_ssl_bundle(self, domain) -> SSLCertBundle: + """ + API SSL bundle retrieve method: retrieve an SSL bundle for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info. + + :param domain: the domain for which the SSL bundle should be retrieved + + :return: tuple of intermediate certificate, certificate chain, private key, public key + """ + + url = urljoin(self.api_endpoint, f"ssl/retrieve/{domain}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + ssl_bundle = json.loads(r.text) + + return SSLCertBundle( + certificate_chain=ssl_bundle["certificatechain"], + private_key=ssl_bundle["privatekey"], + public_key=ssl_bundle["publickey"], + ) + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + @staticmethod + def __handle_error_backup__(dns_records): + # merge the single DNS records into one single dict with the record id as key + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record["id"]] = record + + # generate filename with incremental suffix + base_backup_filename = "pkb_client_dns_records_backup" + suffix = 0 + backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) + while backup_file_path.exists(): + suffix += 1 + backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) + + with open(backup_file_path, "w") as f: + json.dump(dns_records_dict, f) + + logger.warning( + "a backup of your existing dns records was saved to {}".format( + str(backup_file_path) + ) + ) diff --git a/pkb_client/client/dns.py b/pkb_client/client/dns.py new file mode 100644 index 0000000..85ae971 --- /dev/null +++ b/pkb_client/client/dns.py @@ -0,0 +1,63 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Optional + + +class DNSRecordType(str, Enum): + A = "A" + AAAA = "AAAA" + MX = "MX" + CNAME = "CNAME" + ALIAS = "ALIAS" + TXT = "TXT" + NS = "NS" + SRV = "SRV" + TLSA = "TLSA" + CAA = "CAA" + + def __str__(self): + return self.value + + +DNS_RECORDS_WITH_PRIORITY = {DNSRecordType.MX, DNSRecordType.SRV} + + +@dataclass +class DNSRecord: + id: str + name: str + type: DNSRecordType + content: str + ttl: int + prio: Optional[int] + notes: str + + @staticmethod + def from_dict(d): + # only use prio for supported record types since the API returns it for all records with default value 0 + prio = int(d["prio"]) if d["type"] in DNS_RECORDS_WITH_PRIORITY else None + return DNSRecord( + id=d["id"], + name=d["name"], + type=DNSRecordType[d["type"]], + content=d["content"], + ttl=int(d["ttl"]), + prio=prio, + notes=d["notes"], + ) + + +class DNSRestoreMode(Enum): + clear = 0 + replace = 1 + keep = 2 + + def __str__(self): + return self.name + + @staticmethod + def from_string(a): + try: + return DNSRestoreMode[a] + except KeyError: + return a diff --git a/pkb_client/client/domain.py b/pkb_client/client/domain.py new file mode 100644 index 0000000..a44a904 --- /dev/null +++ b/pkb_client/client/domain.py @@ -0,0 +1,29 @@ +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class DomainInfo: + domain: str + status: str + tld: str + create_date: datetime + expire_date: datetime + security_lock: bool + whois_privacy: bool + auto_renew: bool + not_local: bool + + @staticmethod + def from_dict(d): + return DomainInfo( + domain=d["domain"], + status=d["status"], + tld=d["tld"], + create_date=datetime.fromisoformat(d["createDate"]), + expire_date=datetime.fromisoformat(d["expireDate"]), + security_lock=bool(d["securityLock"]), + whois_privacy=bool(d["whoisPrivacy"]), + auto_renew=bool(d["autoRenew"]), + not_local=bool(d["notLocal"]), + ) diff --git a/pkb_client/client/forwarding.py b/pkb_client/client/forwarding.py new file mode 100644 index 0000000..64962ca --- /dev/null +++ b/pkb_client/client/forwarding.py @@ -0,0 +1,28 @@ +from dataclasses import dataclass +from enum import Enum + + +class URLForwardingType(str, Enum): + temporary = "temporary" + permanent = "permanent" + + +@dataclass +class URLForwarding: + id: str + subdomain: str + location: str + type: URLForwardingType + include_path: bool + wildcard: bool + + @staticmethod + def from_dict(d): + return URLForwarding( + id=d["id"], + subdomain=d["subdomain"], + location=d["location"], + type=URLForwardingType[d["type"]], + include_path=d["includePath"] == "yes", + wildcard=d["wildcard"] == "yes", + ) diff --git a/pkb_client/client/ssl_cert.py b/pkb_client/client/ssl_cert.py new file mode 100644 index 0000000..0662ce4 --- /dev/null +++ b/pkb_client/client/ssl_cert.py @@ -0,0 +1,13 @@ +from dataclasses import dataclass + + +@dataclass +class SSLCertBundle: + # The complete certificate chain. + certificate_chain: str + + # The private key. + private_key: str + + # The public key. + public_key: str diff --git a/pkb_client/helper.py b/pkb_client/helper.py deleted file mode 100644 index 0c08167..0000000 --- a/pkb_client/helper.py +++ /dev/null @@ -1,15 +0,0 @@ -def parse_dns_record(record: dict) -> dict: - """ - Parse the DNS record. - Replace the ttl and prio string values with the int values. - - :param record: the unparsed DNS record dict - - :return: the parsed dns record dict - """ - if record.get("ttl", None) is not None: - record["ttl"] = int(record["ttl"]) - if record.get("prio", None) is not None: - record["prio"] = int(record["prio"]) - - return record |
