diff options
Diffstat (limited to 'pkb_client/client')
| -rw-r--r-- | pkb_client/client/__init__.py | 22 | ||||
| -rw-r--r-- | pkb_client/client/bind_file.py | 169 | ||||
| -rw-r--r-- | pkb_client/client/client.py | 867 | ||||
| -rw-r--r-- | pkb_client/client/dns.py | 63 | ||||
| -rw-r--r-- | pkb_client/client/domain.py | 29 | ||||
| -rw-r--r-- | pkb_client/client/forwarding.py | 28 | ||||
| -rw-r--r-- | pkb_client/client/ssl_cert.py | 13 |
7 files changed, 1191 insertions, 0 deletions
diff --git a/pkb_client/client/__init__.py b/pkb_client/client/__init__.py new file mode 100644 index 0000000..da8a42b --- /dev/null +++ b/pkb_client/client/__init__.py @@ -0,0 +1,22 @@ +from .bind_file import BindFile, BindRecord, RecordClass +from .client import PKBClient, PKBClientException, API_ENDPOINT +from .dns import DNSRecord, DNSRestoreMode, DNSRecordType +from .domain import DomainInfo +from .forwarding import URLForwarding, URLForwardingType +from .ssl_cert import SSLCertBundle + +__all__ = [ + "PKBClient", + "PKBClientException", + "API_ENDPOINT", + "BindFile", + "BindRecord", + "RecordClass", + "DNSRecord", + "DNSRestoreMode", + "DNSRecordType", + "DomainInfo", + "URLForwarding", + "URLForwardingType", + "SSLCertBundle", +] diff --git a/pkb_client/client/bind_file.py b/pkb_client/client/bind_file.py new file mode 100644 index 0000000..af9abe0 --- /dev/null +++ b/pkb_client/client/bind_file.py @@ -0,0 +1,169 @@ +import logging +from dataclasses import dataclass +from enum import Enum +from typing import Optional, List + +from pkb_client.client.dns import DNSRecordType, DNS_RECORDS_WITH_PRIORITY + + +class RecordClass(str, Enum): + IN = "IN" + + def __str__(self): + return self.value + + +@dataclass +class BindRecord: + name: str + ttl: int + record_class: RecordClass + record_type: DNSRecordType + data: str + prio: Optional[int] = None + comment: Optional[str] = None + + def __str__(self): + record_string = f"{self.name} {self.ttl} {self.record_class} {self.record_type}" + if self.prio is not None: + record_string += f" {self.prio}" + record_string += f" {self.data}" + if self.comment: + record_string += f" ; {self.comment}" + return record_string + + +class BindFile: + origin: str + ttl: Optional[int] = None + records: List[BindRecord] + + def __init__( + self, + origin: str, + ttl: Optional[int] = None, + records: Optional[List[BindRecord]] = None, + ) -> None: + self.origin = origin + self.ttl = ttl + self.records = records or [] + + @staticmethod + def from_file(file_path: str) -> "BindFile": + with open(file_path, "r") as f: + file_data = f.readlines() + + # parse the file line by line + origin = None + ttl = None + records = [] + for line in file_data: + if line.startswith("$ORIGIN"): + origin = line.split()[1] + elif line.startswith("$TTL"): + ttl = int(line.split()[1]) + else: + # parse the records with the two possible formats: + # 1: name ttl record-class record-type record-data + # 2: name record-class ttl record-type record-data + # whereby the ttl is optional + + # drop any right trailing comments + line_parts = line.split(";", 1) + line = line_parts[0].strip() + comment = line_parts[1].strip() if len(line_parts) > 1 else None + prio = None + + # skip empty lines + if not line: + continue + + # find which format the line is + record_parts = line.split() + if record_parts[1].isdigit(): + # scheme 1 + if record_parts[3] not in DNSRecordType.__members__: + logging.warning(f"Ignoring unsupported record type: {line}") + continue + if record_parts[2] not in RecordClass.__members__: + logging.warning(f"Ignoring unsupported record class: {line}") + continue + record_name = record_parts[0] + record_ttl = int(record_parts[1]) + record_class = RecordClass[record_parts[2]] + record_type = DNSRecordType[record_parts[3]] + if record_type in DNS_RECORDS_WITH_PRIORITY: + prio = int(record_parts[4]) + record_data = " ".join(record_parts[5:]) + else: + record_data = " ".join(record_parts[4:]) + elif record_parts[2].isdigit(): + # scheme 2 + if record_parts[3] not in DNSRecordType.__members__: + logging.warning(f"Ignoring unsupported record type: {line}") + continue + if record_parts[1] not in RecordClass.__members__: + logging.warning(f"Ignoring unsupported record class: {line}") + continue + record_name = record_parts[0] + record_ttl = int(record_parts[2]) + record_class = RecordClass[record_parts[1]] + record_type = DNSRecordType[record_parts[3]] + if record_type in DNS_RECORDS_WITH_PRIORITY: + prio = int(record_parts[4]) + record_data = " ".join(record_parts[5:]) + else: + record_data = " ".join(record_parts[4:]) + else: + # no ttl, use default or previous + if record_parts[2] not in DNSRecordType.__members__: + logging.warning(f"Ignoring unsupported record type: {line}") + continue + if record_parts[1] not in RecordClass.__members__: + logging.warning(f"Ignoring unsupported record class: {line}") + continue + record_name = record_parts[0] + if ttl is None and not records: + raise ValueError("No TTL found in file") + record_ttl = ttl or records[-1].ttl + record_class = RecordClass[record_parts[1]] + record_type = DNSRecordType[record_parts[2]] + if record_type in DNS_RECORDS_WITH_PRIORITY: + prio = int(record_parts[3]) + record_data = " ".join(record_parts[4:]) + else: + record_data = " ".join(record_parts[3:]) + + # replace @ in record name with origin + record_name = record_name.replace("@", origin) + + records.append( + BindRecord( + record_name, + record_ttl, + record_class, + record_type, + record_data, + prio=prio, + comment=comment, + ) + ) + + if origin is None: + raise ValueError("No origin found in file") + + return BindFile(origin, ttl, records) + + def to_file(self, file_path: str) -> None: + with open(file_path, "w") as f: + f.write(str(self)) + + def __str__(self) -> str: + bind = f"$ORIGIN {self.origin}\n" + + if self.ttl is not None: + bind += f"$TTL {self.ttl}\n" + + for record in self.records: + bind += f"{record}\n" + return bind diff --git a/pkb_client/client/client.py b/pkb_client/client/client.py new file mode 100644 index 0000000..86956a5 --- /dev/null +++ b/pkb_client/client/client.py @@ -0,0 +1,867 @@ +import json +import logging +from hashlib import sha256 +from pathlib import Path +from typing import Optional, List, Union +from urllib.parse import urljoin + +import dns.resolver +import requests + +from pkb_client.client import BindFile +from pkb_client.client.dns import ( + DNSRecord, + DNSRestoreMode, + DNSRecordType, + DNS_RECORDS_WITH_PRIORITY, +) +from pkb_client.client.domain import DomainInfo +from pkb_client.client.forwarding import URLForwarding, URLForwardingType +from pkb_client.client.ssl_cert import SSLCertBundle + +API_ENDPOINT = "https://api.porkbun.com/api/json/v3/" + +logger = logging.getLogger("pkb_client") +logging.basicConfig(level=logging.INFO) + + +class PKBClientException(Exception): + def __init__(self, status, message): + super().__init__(f"{status}: {message}") + + +class PKBClient: + """ + API client for Porkbun. + """ + + default_ttl: int = 300 + + def __init__( + self, + api_key: Optional[str] = None, + secret_api_key: Optional[str] = None, + api_endpoint: str = API_ENDPOINT, + debug: bool = False, + ) -> None: + """ + Creates a new PKBClient object. + + :param api_key: the API key used for Porkbun API calls + :param secret_api_key: the API secret used for Porkbun API calls + :param api_endpoint: the endpoint of the Porkbun API. + :param debug: boolean to enable debug logging + """ + self.api_key = api_key + self.secret_api_key = secret_api_key + self.api_endpoint = api_endpoint + self.debug = debug + if self.debug: + logger.setLevel(logging.DEBUG) + + def _get_auth_request_json(self) -> dict: + """ + Get the request json for the authentication of the Porkbun API calls. + + :return: the request json for the authentication of the Porkbun API calls + """ + + if self.api_key is None or self.secret_api_key is None: + raise ValueError("api_key and secret_api_key must be set") + + return {"apikey": self.api_key, "secretapikey": self.secret_api_key} + + def ping(self) -> str: + """ + API ping method: get the current public ip address of the requesting system; can also be used for auth checking. + See https://api.porkbun.com/api/json/v3/documentation#Authentication for more info. + + :return: the current public ip address of the requesting system + """ + + url = urljoin(self.api_endpoint, "ping") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return json.loads(r.text).get("yourIp", None) + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def create_dns_record( + self, + domain: str, + record_type: DNSRecordType, + content: str, + name: Optional[str] = None, + ttl: int = default_ttl, + prio: Optional[int] = None, + ) -> str: + """ + API DNS create method: create a new DNS record for given domain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Create%20Record for more info. + + :param domain: the domain for which the DNS record should be created + :param record_type: the type of the new DNS record + :param content: the content of the new DNS record + :param name: the subdomain for which the new DNS record entry should apply; the * can be used for a + wildcard DNS record; if not used, then a DNS record for the root domain will be created + :param ttl: the time to live in seconds of the new DNS record; have to be between 300 and 86400 + :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None + :return: the id of the new created DNS record + """ + + if ttl > 86400 or ttl < self.default_ttl: + raise ValueError(f"ttl must be between {self.default_ttl} and 86400") + + if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY: + raise ValueError( + f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}" + ) + + url = urljoin(self.api_endpoint, f"dns/create/{domain}") + req_json = { + **self._get_auth_request_json(), + "name": name, + "type": record_type, + "content": content, + "ttl": ttl, + "prio": prio, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return str(json.loads(r.text).get("id", None)) + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def update_dns_record( + self, + domain: str, + record_id: str, + record_type: DNSRecordType, + content: str, + name: Optional[str] = None, + ttl: int = default_ttl, + prio: Optional[int] = None, + ) -> bool: + """ + API DNS edit method: edit an existing DNS record specified by the id for a given domain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record for more info. + + :param domain: the domain for which the DNS record should be edited + :param record_id: the id of the DNS record which should be edited + :param record_type: the new type of the DNS record + :param content: the new content of the DNS record + :param name: the new value of the subdomain for which the DNS record should apply; the * can be used for a + wildcard DNS record; if not set, the record will be set for the record domain + :param ttl: the new time to live in seconds of the DNS record, have to be between 300 and 86400 + :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None + + :return: True if the editing was successful + """ + + if ttl > 86400 or ttl < self.default_ttl: + raise ValueError(f"ttl must be between {self.default_ttl} and 86400") + + if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY: + raise ValueError( + f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}" + ) + + url = urljoin(self.api_endpoint, f"dns/edit/{domain}/{record_id}") + req_json = { + **self._get_auth_request_json(), + "name": name, + "type": record_type, + "content": content, + "ttl": ttl, + "prio": prio, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def update_all_dns_records( + self, + domain: str, + record_type: DNSRecordType, + subdomain: str, + content: str, + ttl: int = default_ttl, + prio: Optional[int] = None, + ) -> bool: + """ + API DNS edit method: edit all existing DNS record matching the domain, record type and subdomain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record%20by%20Domain,%20Subdomain%20and%20Type for more info. + + :param domain: the domain for which the DNS record should be edited + :param record_type: the type of the DNS record + :param subdomain: the subdomain of the DNS record can be empty string for root domain + :param content: the new content of the DNS record + :param ttl: the new time to live in seconds of the DNS record, have to be between 300 and 86400 + :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None + + :return: True if the editing was successful + """ + + if ttl > 86400 or ttl < self.default_ttl: + raise ValueError(f"ttl must be between {self.default_ttl} and 86400") + + if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY: + raise ValueError( + f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}" + ) + + url = urljoin( + self.api_endpoint, f"dns/editByNameType/{domain}/{record_type}/{subdomain}" + ) + req_json = { + **self._get_auth_request_json(), + "type": record_type, + "content": content, + "ttl": ttl, + "prio": prio, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def delete_dns_record(self, domain: str, record_id: str) -> bool: + """ + API DNS delete method: delete an existing DNS record specified by the id for a given domain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Delete%20Record for more info. + + :param domain: the domain for which the DNS record should be deleted + :param record_id: the id of the DNS record which should be deleted + + :return: True if the deletion was successful + """ + + url = urljoin(self.api_endpoint, f"dns/delete/{domain}/{record_id}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def delete_all_dns_records( + self, domain: str, record_type: DNSRecordType, subdomain: str + ) -> bool: + """ + API DNS delete method: delete all existing DNS record matching the domain, record type and subdomain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Delete%20Records%20by%20Domain,%20Subdomain%20and%20Type for more info. + + :param domain: the domain for which the DNS record should be deleted + :param record_type: the type of the DNS record + :param subdomain: the subdomain of the DNS record can be empty string for root domain + + :return: True if the deletion was successful + """ + + url = urljoin( + self.api_endpoint, + f"dns/deleteByNameType/{domain}/{record_type}/{subdomain}", + ) + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_dns_records( + self, domain, record_id: Optional[str] = None + ) -> List[DNSRecord]: + """ + API DNS retrieve method: retrieve all DNS records for given domain if no record id is specified. + Otherwise, retrieve the DNS record of the specified domain with the given record id. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records for more info. + + :param domain: the domain for which the DNS records should be retrieved + :param record_id: the id of the DNS record which should be retrieved + + :return: list of DNSRecords objects + """ + + if record_id is None: + url = urljoin(self.api_endpoint, f"dns/retrieve/{domain}") + else: + url = urljoin(self.api_endpoint, f"dns/retrieve/{domain}/{record_id}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + DNSRecord.from_dict(record) + for record in json.loads(r.text).get("records", []) + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_all_dns_records( + self, domain: str, record_type: DNSRecordType, subdomain: str + ) -> List[DNSRecord]: + """ + API DNS retrieve method: retrieve all DNS records matching the domain, record type and subdomain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records%20by%20Domain,%20Subdomain%20and%20Type for more info. + + :param domain: the domain for which the DNS records should be retrieved + :param record_type: the type of the DNS records + :param subdomain: the subdomain of the DNS records can be empty string for root domain + + :return: list of DNSRecords objects + """ + + url = urljoin( + self.api_endpoint, + f"dns/retrieveByNameType/{domain}/{record_type}/{subdomain}", + ) + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + DNSRecord.from_dict(record) + for record in json.loads(r.text).get("records", []) + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def export_dns_records(self, domain: str, filepath: Union[Path, str]) -> bool: + """ + Export all DNS record from the given domain to a json file. + This method does not represent a Porkbun API method. + DNS records with all custom fields like notes are exported. + + :param domain: the domain for which the DNS record should be retrieved and saved + :param filepath: the filepath where to save the exported DNS records + + :return: True if everything went well + """ + + filepath = Path(filepath) + + logger.debug("retrieve current DNS records...") + dns_records = self.get_dns_records(domain) + + logger.debug("save DNS records to {} ...".format(filepath)) + # merge the single DNS records into one single dict with the record id as key + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record.id] = record + + if filepath.exists(): + logger.warning("file already exists, overwriting...") + + with open(filepath, "w") as f: + json.dump(dns_records_dict, f, default=lambda o: o.__dict__, indent=4) + + logger.info("export finished") + + return True + + def export_bind_dns_records(self, domain: str, filepath: Union[Path, str]) -> bool: + """ + Export all DNS record from the given domain to a BIND file. + This method does not represent a Porkbun API method. + Porkbun DNS record notes are exported as comments. + + :param domain: the domain for which the DNS record should be retrieved and saved + :param filepath: the filepath where to save the exported DNS records + + :return: True if everything went well + """ + + filepath = Path(filepath) + + logger.debug("retrieve current DNS records...") + dns_records = self.get_dns_records(domain) + + logger.debug("save DNS records to {} ...".format(filepath)) + # merge the single DNS records into one single dict with the record id as key + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record.id] = record + + if filepath.exists(): + logger.warning("file already exists, overwriting...") + + # domain header + bind_file_content = f"$ORIGIN {domain}" + + # SOA record + soa_records = dns.resolver.resolve(domain, "SOA") + if soa_records: + soa_record = soa_records[0] + bind_file_content += f"\n@ IN SOA {soa_record.mname} {soa_record.rname} ({soa_record.serial} {soa_record.refresh} {soa_record.retry} {soa_record.expire} {soa_record.minimum})" + + # records + for record in dns_records: + # name record class ttl record type record data + if record.prio: + record_content = f"{record.prio} {record.content}" + else: + record_content = record.content + bind_file_content += ( + f"\n{record.name} IN {record.ttl} {record.type} {record_content}" + ) + + if record.notes: + bind_file_content += f" ; {record.notes}" + + with open(filepath, "w") as f: + f.write(bind_file_content) + + logger.info("export finished") + + return True + + def import_dns_records( + self, domain: str, filepath: Union[Path, str], restore_mode: DNSRestoreMode + ) -> bool: + """ + Restore all DNS records from a json file to the given domain. + This method does not represent a Porkbun API method. + + :param domain: the domain for which the DNS record should be restored + :param filepath: the filepath from which the DNS records are to be restored + :param restore_mode: The restore mode (DNS records are identified by the record type, name and prio if supported): + clear: remove all existing DNS records and restore all DNS records from the provided file + replace: replace only existing DNS records with the DNS records from the provided file, + but do not create any new DNS records + keep: keep the existing DNS records and only create new ones for all DNS records from + the specified file if they do not exist + + :return: True if everything went well + """ + + filepath = Path(filepath) + + existing_dns_records = self.get_dns_records(domain) + + with open(filepath, "r") as f: + exported_dns_records_dict = json.load(f) + + if restore_mode is DNSRestoreMode.clear: + logger.debug("restore mode: clear") + + try: + # delete all existing DNS records + for record in existing_dns_records: + self.delete_dns_record(domain, record.id) + + # restore all exported records by creating new DNS records + for _, exported_record in exported_dns_records_dict.items(): + name = ".".join(exported_record["name"].split(".")[:-2]) + self.create_dns_record( + domain=domain, + record_type=exported_record["type"], + content=exported_record["content"], + name=name, + ttl=exported_record["ttl"], + prio=exported_record["prio"], + ) + except Exception as e: + logger.error("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + logger.error("import failed") + return False + elif restore_mode is DNSRestoreMode.replace: + logger.debug("restore mode: replace") + + try: + existing_dns_record_hashed = { + sha256( + f"{record.type}{record.name}{record.prio}".encode() + ).hexdigest(): record + for record in existing_dns_records + } + for record in exported_dns_records_dict.values(): + record_hash = sha256( + f"{record['type']}{record['name']}{record['prio']}".encode() + ).hexdigest() + existing_record = existing_dns_record_hashed.get(record_hash, None) + # check if the exported dns record is different to the existing record, + # so we can reduce unnecessary api calls + if existing_record is not None and ( + record["content"] != existing_record.content + or record["ttl"] != existing_record.ttl + or record["prio"] != existing_record.prio + ): + self.update_dns_record( + domain=domain, + record_id=existing_record.id, + record_type=record["type"], + content=record["content"], + name=record["name"].replace(f".{domain}", ""), + ttl=record["ttl"], + prio=record["prio"], + ) + except Exception as e: + logger.error("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + logger.error("import failed") + return False + elif restore_mode is DNSRestoreMode.keep: + logger.debug("restore mode: keep") + + existing_dns_record_hashed = { + sha256( + f"{record.type}{record.name}{record.prio}".encode() + ).hexdigest(): record + for record in existing_dns_records + } + + try: + for record in exported_dns_records_dict.values(): + record_hash = sha256( + f"{record['type']}{record['name']}{record['prio']}".encode() + ).hexdigest() + existing_record = existing_dns_record_hashed.get(record_hash, None) + if existing_record is None: + self.create_dns_record( + domain=domain, + record_type=record["type"], + content=record["content"], + name=record["name"].replace(f".{domain}", ""), + ttl=record["ttl"], + prio=record["prio"], + ) + except Exception as e: + logger.error("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + logger.error("import failed") + return False + else: + raise Exception("restore mode not supported") + + logger.info("import successfully completed") + + return True + + def import_bind_dns_records( + self, filepath: Union[Path, str], restore_mode: DNSRestoreMode + ) -> bool: + """ + Restore all DNS records from a BIND file. + This method does not represent a Porkbun API method. + + :param filepath: the bind filepath from which the DNS records are to be restored + :param restore_mode: The restore mode: + clear: remove all existing DNS records and restore all DNS records from the provided file + :return: True if everything went well + """ + + bind_file = BindFile.from_file(filepath) + + existing_dns_records = self.get_dns_records(bind_file.origin[:-1]) + + if restore_mode is DNSRestoreMode.clear: + logger.debug("restore mode: clear") + + try: + # delete all existing DNS records + for record in existing_dns_records: + self.delete_dns_record(bind_file.origin[:-1], record.id) + + # restore all records from BIND file by creating new DNS records + for record in bind_file.records: + # extract subdomain from record name + subdomain = record.name.replace(bind_file.origin, "") + # replace trailing dot + subdomain = subdomain[:-1] if subdomain.endswith(".") else subdomain + self.create_dns_record( + domain=bind_file.origin[:-1], + record_type=record.record_type, + content=record.data, + name=subdomain, + ttl=record.ttl, + prio=record.prio, + ) + + except Exception as e: + logger.error("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + logger.error("import failed") + return False + else: + raise Exception(f"restore mode '{restore_mode.value}' not supported") + + logger.info("import successfully completed") + + return True + + def update_dns_servers(self, domain: str, name_servers: List[str]) -> bool: + """ + Update the name servers of the specified domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Update%20Name%20Servers for more info. + + :return: True if everything went well + """ + + url = urljoin(self.api_endpoint, f"domain/updateNs/{domain}") + req_json = {**self._get_auth_request_json(), "ns": name_servers} + r = requests.post(url=url, json=req_json) + + if r.status_code == 200 and json.loads(r.text).get("status", None) == "SUCCESS": + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_dns_servers(self, domain: str) -> List[str]: + """ + Get the name servers for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Get%20Name%20Servers for more info. + + :return: list of name servers + """ + + url = urljoin(self.api_endpoint, f"domain/getNs/{domain}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return json.loads(r.text).get("ns", []) + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_domains(self, start: int = 0) -> List[DomainInfo]: + """ + Get all domains for the account in chunks of 1000. If you reach the end of all domains, the list will be empty. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20List%20All for more info. + + :param start: the index of the first domain to retrieve + + :return: list of DomainInfo objects + """ + + url = urljoin(self.api_endpoint, "domain/listAll") + + req_json = {**self._get_auth_request_json(), "start": start} + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + DomainInfo.from_dict(domain) + for domain in json.loads(r.text).get("domains", []) + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_url_forwards(self, domain: str) -> List[URLForwarding]: + """ + Get the url forwarding for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Get%20URL%20Forwarding for more info. + + :return: list of URLForwarding objects + """ + + url = urljoin(self.api_endpoint, f"domain/getUrlForwarding/{domain}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + URLForwarding.from_dict(forwarding) + for forwarding in json.loads(r.text).get("forwards", []) + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def create_url_forward( + self, + domain: str, + subdomain: str, + location: str, + type: URLForwardingType, + include_path: bool, + wildcard: bool, + ) -> bool: + """ + Add a url forward for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Add%20URL%20Forward for more info. + + :param domain: the domain for which the url forwarding should be added + :param subdomain: the subdomain for which the url forwarding should be added, can be empty for root domain + :param location: the location to which the url forwarding should redirect + :param type: the type of the url forwarding + :param include_path: if the path should be included in the url forwarding + :param wildcard: if the url forwarding should also be applied to all subdomains + + :return: True if the forwarding was added successfully + """ + + url = urljoin(self.api_endpoint, f"domain/addUrlForward/{domain}") + req_json = { + **self._get_auth_request_json(), + "subdomain": subdomain, + "location": location, + "type": type, + "includePath": include_path, + "wildcard": wildcard, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def delete_url_forward(self, domain: str, id: str) -> bool: + """ + Delete an url forward for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Delete%20URL%20Forward for more info. + + :param domain: the domain for which the url forwarding should be deleted + :param id: the id of the url forwarding which should be deleted + + :return: True if the deletion was successful + """ + + url = urljoin(self.api_endpoint, f"domain/deleteUrlForward/{domain}/{id}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_domain_pricing(self) -> dict: + """ + Get the pricing for all Porkbun domains. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Pricing for more info. + + :return: dict with pricing + """ + + url = urljoin(self.api_endpoint, "pricing/get") + r = requests.post(url=url) + + if r.status_code == 200: + return json.loads(r.text)["pricing"] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_ssl_bundle(self, domain) -> SSLCertBundle: + """ + API SSL bundle retrieve method: retrieve an SSL bundle for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info. + + :param domain: the domain for which the SSL bundle should be retrieved + + :return: tuple of intermediate certificate, certificate chain, private key, public key + """ + + url = urljoin(self.api_endpoint, f"ssl/retrieve/{domain}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + ssl_bundle = json.loads(r.text) + + return SSLCertBundle( + certificate_chain=ssl_bundle["certificatechain"], + private_key=ssl_bundle["privatekey"], + public_key=ssl_bundle["publickey"], + ) + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + @staticmethod + def __handle_error_backup__(dns_records): + # merge the single DNS records into one single dict with the record id as key + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record["id"]] = record + + # generate filename with incremental suffix + base_backup_filename = "pkb_client_dns_records_backup" + suffix = 0 + backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) + while backup_file_path.exists(): + suffix += 1 + backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) + + with open(backup_file_path, "w") as f: + json.dump(dns_records_dict, f) + + logger.warning( + "a backup of your existing dns records was saved to {}".format( + str(backup_file_path) + ) + ) diff --git a/pkb_client/client/dns.py b/pkb_client/client/dns.py new file mode 100644 index 0000000..85ae971 --- /dev/null +++ b/pkb_client/client/dns.py @@ -0,0 +1,63 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Optional + + +class DNSRecordType(str, Enum): + A = "A" + AAAA = "AAAA" + MX = "MX" + CNAME = "CNAME" + ALIAS = "ALIAS" + TXT = "TXT" + NS = "NS" + SRV = "SRV" + TLSA = "TLSA" + CAA = "CAA" + + def __str__(self): + return self.value + + +DNS_RECORDS_WITH_PRIORITY = {DNSRecordType.MX, DNSRecordType.SRV} + + +@dataclass +class DNSRecord: + id: str + name: str + type: DNSRecordType + content: str + ttl: int + prio: Optional[int] + notes: str + + @staticmethod + def from_dict(d): + # only use prio for supported record types since the API returns it for all records with default value 0 + prio = int(d["prio"]) if d["type"] in DNS_RECORDS_WITH_PRIORITY else None + return DNSRecord( + id=d["id"], + name=d["name"], + type=DNSRecordType[d["type"]], + content=d["content"], + ttl=int(d["ttl"]), + prio=prio, + notes=d["notes"], + ) + + +class DNSRestoreMode(Enum): + clear = 0 + replace = 1 + keep = 2 + + def __str__(self): + return self.name + + @staticmethod + def from_string(a): + try: + return DNSRestoreMode[a] + except KeyError: + return a diff --git a/pkb_client/client/domain.py b/pkb_client/client/domain.py new file mode 100644 index 0000000..a44a904 --- /dev/null +++ b/pkb_client/client/domain.py @@ -0,0 +1,29 @@ +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class DomainInfo: + domain: str + status: str + tld: str + create_date: datetime + expire_date: datetime + security_lock: bool + whois_privacy: bool + auto_renew: bool + not_local: bool + + @staticmethod + def from_dict(d): + return DomainInfo( + domain=d["domain"], + status=d["status"], + tld=d["tld"], + create_date=datetime.fromisoformat(d["createDate"]), + expire_date=datetime.fromisoformat(d["expireDate"]), + security_lock=bool(d["securityLock"]), + whois_privacy=bool(d["whoisPrivacy"]), + auto_renew=bool(d["autoRenew"]), + not_local=bool(d["notLocal"]), + ) diff --git a/pkb_client/client/forwarding.py b/pkb_client/client/forwarding.py new file mode 100644 index 0000000..64962ca --- /dev/null +++ b/pkb_client/client/forwarding.py @@ -0,0 +1,28 @@ +from dataclasses import dataclass +from enum import Enum + + +class URLForwardingType(str, Enum): + temporary = "temporary" + permanent = "permanent" + + +@dataclass +class URLForwarding: + id: str + subdomain: str + location: str + type: URLForwardingType + include_path: bool + wildcard: bool + + @staticmethod + def from_dict(d): + return URLForwarding( + id=d["id"], + subdomain=d["subdomain"], + location=d["location"], + type=URLForwardingType[d["type"]], + include_path=d["includePath"] == "yes", + wildcard=d["wildcard"] == "yes", + ) diff --git a/pkb_client/client/ssl_cert.py b/pkb_client/client/ssl_cert.py new file mode 100644 index 0000000..0662ce4 --- /dev/null +++ b/pkb_client/client/ssl_cert.py @@ -0,0 +1,13 @@ +from dataclasses import dataclass + + +@dataclass +class SSLCertBundle: + # The complete certificate chain. + certificate_chain: str + + # The private key. + private_key: str + + # The public key. + public_key: str |
