summaryrefslogtreecommitdiffstats
path: root/pkb_client/client
diff options
context:
space:
mode:
Diffstat (limited to 'pkb_client/client')
-rw-r--r--pkb_client/client/__init__.py22
-rw-r--r--pkb_client/client/bind_file.py169
-rw-r--r--pkb_client/client/client.py867
-rw-r--r--pkb_client/client/dns.py63
-rw-r--r--pkb_client/client/domain.py29
-rw-r--r--pkb_client/client/forwarding.py28
-rw-r--r--pkb_client/client/ssl_cert.py13
7 files changed, 1191 insertions, 0 deletions
diff --git a/pkb_client/client/__init__.py b/pkb_client/client/__init__.py
new file mode 100644
index 0000000..da8a42b
--- /dev/null
+++ b/pkb_client/client/__init__.py
@@ -0,0 +1,22 @@
+from .bind_file import BindFile, BindRecord, RecordClass
+from .client import PKBClient, PKBClientException, API_ENDPOINT
+from .dns import DNSRecord, DNSRestoreMode, DNSRecordType
+from .domain import DomainInfo
+from .forwarding import URLForwarding, URLForwardingType
+from .ssl_cert import SSLCertBundle
+
+__all__ = [
+ "PKBClient",
+ "PKBClientException",
+ "API_ENDPOINT",
+ "BindFile",
+ "BindRecord",
+ "RecordClass",
+ "DNSRecord",
+ "DNSRestoreMode",
+ "DNSRecordType",
+ "DomainInfo",
+ "URLForwarding",
+ "URLForwardingType",
+ "SSLCertBundle",
+]
diff --git a/pkb_client/client/bind_file.py b/pkb_client/client/bind_file.py
new file mode 100644
index 0000000..af9abe0
--- /dev/null
+++ b/pkb_client/client/bind_file.py
@@ -0,0 +1,169 @@
+import logging
+from dataclasses import dataclass
+from enum import Enum
+from typing import Optional, List
+
+from pkb_client.client.dns import DNSRecordType, DNS_RECORDS_WITH_PRIORITY
+
+
+class RecordClass(str, Enum):
+ IN = "IN"
+
+ def __str__(self):
+ return self.value
+
+
+@dataclass
+class BindRecord:
+ name: str
+ ttl: int
+ record_class: RecordClass
+ record_type: DNSRecordType
+ data: str
+ prio: Optional[int] = None
+ comment: Optional[str] = None
+
+ def __str__(self):
+ record_string = f"{self.name} {self.ttl} {self.record_class} {self.record_type}"
+ if self.prio is not None:
+ record_string += f" {self.prio}"
+ record_string += f" {self.data}"
+ if self.comment:
+ record_string += f" ; {self.comment}"
+ return record_string
+
+
+class BindFile:
+ origin: str
+ ttl: Optional[int] = None
+ records: List[BindRecord]
+
+ def __init__(
+ self,
+ origin: str,
+ ttl: Optional[int] = None,
+ records: Optional[List[BindRecord]] = None,
+ ) -> None:
+ self.origin = origin
+ self.ttl = ttl
+ self.records = records or []
+
+ @staticmethod
+ def from_file(file_path: str) -> "BindFile":
+ with open(file_path, "r") as f:
+ file_data = f.readlines()
+
+ # parse the file line by line
+ origin = None
+ ttl = None
+ records = []
+ for line in file_data:
+ if line.startswith("$ORIGIN"):
+ origin = line.split()[1]
+ elif line.startswith("$TTL"):
+ ttl = int(line.split()[1])
+ else:
+ # parse the records with the two possible formats:
+ # 1: name ttl record-class record-type record-data
+ # 2: name record-class ttl record-type record-data
+ # whereby the ttl is optional
+
+ # drop any right trailing comments
+ line_parts = line.split(";", 1)
+ line = line_parts[0].strip()
+ comment = line_parts[1].strip() if len(line_parts) > 1 else None
+ prio = None
+
+ # skip empty lines
+ if not line:
+ continue
+
+ # find which format the line is
+ record_parts = line.split()
+ if record_parts[1].isdigit():
+ # scheme 1
+ if record_parts[3] not in DNSRecordType.__members__:
+ logging.warning(f"Ignoring unsupported record type: {line}")
+ continue
+ if record_parts[2] not in RecordClass.__members__:
+ logging.warning(f"Ignoring unsupported record class: {line}")
+ continue
+ record_name = record_parts[0]
+ record_ttl = int(record_parts[1])
+ record_class = RecordClass[record_parts[2]]
+ record_type = DNSRecordType[record_parts[3]]
+ if record_type in DNS_RECORDS_WITH_PRIORITY:
+ prio = int(record_parts[4])
+ record_data = " ".join(record_parts[5:])
+ else:
+ record_data = " ".join(record_parts[4:])
+ elif record_parts[2].isdigit():
+ # scheme 2
+ if record_parts[3] not in DNSRecordType.__members__:
+ logging.warning(f"Ignoring unsupported record type: {line}")
+ continue
+ if record_parts[1] not in RecordClass.__members__:
+ logging.warning(f"Ignoring unsupported record class: {line}")
+ continue
+ record_name = record_parts[0]
+ record_ttl = int(record_parts[2])
+ record_class = RecordClass[record_parts[1]]
+ record_type = DNSRecordType[record_parts[3]]
+ if record_type in DNS_RECORDS_WITH_PRIORITY:
+ prio = int(record_parts[4])
+ record_data = " ".join(record_parts[5:])
+ else:
+ record_data = " ".join(record_parts[4:])
+ else:
+ # no ttl, use default or previous
+ if record_parts[2] not in DNSRecordType.__members__:
+ logging.warning(f"Ignoring unsupported record type: {line}")
+ continue
+ if record_parts[1] not in RecordClass.__members__:
+ logging.warning(f"Ignoring unsupported record class: {line}")
+ continue
+ record_name = record_parts[0]
+ if ttl is None and not records:
+ raise ValueError("No TTL found in file")
+ record_ttl = ttl or records[-1].ttl
+ record_class = RecordClass[record_parts[1]]
+ record_type = DNSRecordType[record_parts[2]]
+ if record_type in DNS_RECORDS_WITH_PRIORITY:
+ prio = int(record_parts[3])
+ record_data = " ".join(record_parts[4:])
+ else:
+ record_data = " ".join(record_parts[3:])
+
+ # replace @ in record name with origin
+ record_name = record_name.replace("@", origin)
+
+ records.append(
+ BindRecord(
+ record_name,
+ record_ttl,
+ record_class,
+ record_type,
+ record_data,
+ prio=prio,
+ comment=comment,
+ )
+ )
+
+ if origin is None:
+ raise ValueError("No origin found in file")
+
+ return BindFile(origin, ttl, records)
+
+ def to_file(self, file_path: str) -> None:
+ with open(file_path, "w") as f:
+ f.write(str(self))
+
+ def __str__(self) -> str:
+ bind = f"$ORIGIN {self.origin}\n"
+
+ if self.ttl is not None:
+ bind += f"$TTL {self.ttl}\n"
+
+ for record in self.records:
+ bind += f"{record}\n"
+ return bind
diff --git a/pkb_client/client/client.py b/pkb_client/client/client.py
new file mode 100644
index 0000000..86956a5
--- /dev/null
+++ b/pkb_client/client/client.py
@@ -0,0 +1,867 @@
+import json
+import logging
+from hashlib import sha256
+from pathlib import Path
+from typing import Optional, List, Union
+from urllib.parse import urljoin
+
+import dns.resolver
+import requests
+
+from pkb_client.client import BindFile
+from pkb_client.client.dns import (
+ DNSRecord,
+ DNSRestoreMode,
+ DNSRecordType,
+ DNS_RECORDS_WITH_PRIORITY,
+)
+from pkb_client.client.domain import DomainInfo
+from pkb_client.client.forwarding import URLForwarding, URLForwardingType
+from pkb_client.client.ssl_cert import SSLCertBundle
+
+API_ENDPOINT = "https://api.porkbun.com/api/json/v3/"
+
+logger = logging.getLogger("pkb_client")
+logging.basicConfig(level=logging.INFO)
+
+
+class PKBClientException(Exception):
+ def __init__(self, status, message):
+ super().__init__(f"{status}: {message}")
+
+
+class PKBClient:
+ """
+ API client for Porkbun.
+ """
+
+ default_ttl: int = 300
+
+ def __init__(
+ self,
+ api_key: Optional[str] = None,
+ secret_api_key: Optional[str] = None,
+ api_endpoint: str = API_ENDPOINT,
+ debug: bool = False,
+ ) -> None:
+ """
+ Creates a new PKBClient object.
+
+ :param api_key: the API key used for Porkbun API calls
+ :param secret_api_key: the API secret used for Porkbun API calls
+ :param api_endpoint: the endpoint of the Porkbun API.
+ :param debug: boolean to enable debug logging
+ """
+ self.api_key = api_key
+ self.secret_api_key = secret_api_key
+ self.api_endpoint = api_endpoint
+ self.debug = debug
+ if self.debug:
+ logger.setLevel(logging.DEBUG)
+
+ def _get_auth_request_json(self) -> dict:
+ """
+ Get the request json for the authentication of the Porkbun API calls.
+
+ :return: the request json for the authentication of the Porkbun API calls
+ """
+
+ if self.api_key is None or self.secret_api_key is None:
+ raise ValueError("api_key and secret_api_key must be set")
+
+ return {"apikey": self.api_key, "secretapikey": self.secret_api_key}
+
+ def ping(self) -> str:
+ """
+ API ping method: get the current public ip address of the requesting system; can also be used for auth checking.
+ See https://api.porkbun.com/api/json/v3/documentation#Authentication for more info.
+
+ :return: the current public ip address of the requesting system
+ """
+
+ url = urljoin(self.api_endpoint, "ping")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return json.loads(r.text).get("yourIp", None)
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def create_dns_record(
+ self,
+ domain: str,
+ record_type: DNSRecordType,
+ content: str,
+ name: Optional[str] = None,
+ ttl: int = default_ttl,
+ prio: Optional[int] = None,
+ ) -> str:
+ """
+ API DNS create method: create a new DNS record for given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Create%20Record for more info.
+
+ :param domain: the domain for which the DNS record should be created
+ :param record_type: the type of the new DNS record
+ :param content: the content of the new DNS record
+ :param name: the subdomain for which the new DNS record entry should apply; the * can be used for a
+ wildcard DNS record; if not used, then a DNS record for the root domain will be created
+ :param ttl: the time to live in seconds of the new DNS record; have to be between 300 and 86400
+ :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None
+ :return: the id of the new created DNS record
+ """
+
+ if ttl > 86400 or ttl < self.default_ttl:
+ raise ValueError(f"ttl must be between {self.default_ttl} and 86400")
+
+ if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY:
+ raise ValueError(
+ f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}"
+ )
+
+ url = urljoin(self.api_endpoint, f"dns/create/{domain}")
+ req_json = {
+ **self._get_auth_request_json(),
+ "name": name,
+ "type": record_type,
+ "content": content,
+ "ttl": ttl,
+ "prio": prio,
+ }
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return str(json.loads(r.text).get("id", None))
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def update_dns_record(
+ self,
+ domain: str,
+ record_id: str,
+ record_type: DNSRecordType,
+ content: str,
+ name: Optional[str] = None,
+ ttl: int = default_ttl,
+ prio: Optional[int] = None,
+ ) -> bool:
+ """
+ API DNS edit method: edit an existing DNS record specified by the id for a given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record for more info.
+
+ :param domain: the domain for which the DNS record should be edited
+ :param record_id: the id of the DNS record which should be edited
+ :param record_type: the new type of the DNS record
+ :param content: the new content of the DNS record
+ :param name: the new value of the subdomain for which the DNS record should apply; the * can be used for a
+ wildcard DNS record; if not set, the record will be set for the record domain
+ :param ttl: the new time to live in seconds of the DNS record, have to be between 300 and 86400
+ :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None
+
+ :return: True if the editing was successful
+ """
+
+ if ttl > 86400 or ttl < self.default_ttl:
+ raise ValueError(f"ttl must be between {self.default_ttl} and 86400")
+
+ if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY:
+ raise ValueError(
+ f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}"
+ )
+
+ url = urljoin(self.api_endpoint, f"dns/edit/{domain}/{record_id}")
+ req_json = {
+ **self._get_auth_request_json(),
+ "name": name,
+ "type": record_type,
+ "content": content,
+ "ttl": ttl,
+ "prio": prio,
+ }
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def update_all_dns_records(
+ self,
+ domain: str,
+ record_type: DNSRecordType,
+ subdomain: str,
+ content: str,
+ ttl: int = default_ttl,
+ prio: Optional[int] = None,
+ ) -> bool:
+ """
+ API DNS edit method: edit all existing DNS record matching the domain, record type and subdomain.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record%20by%20Domain,%20Subdomain%20and%20Type for more info.
+
+ :param domain: the domain for which the DNS record should be edited
+ :param record_type: the type of the DNS record
+ :param subdomain: the subdomain of the DNS record can be empty string for root domain
+ :param content: the new content of the DNS record
+ :param ttl: the new time to live in seconds of the DNS record, have to be between 300 and 86400
+ :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None
+
+ :return: True if the editing was successful
+ """
+
+ if ttl > 86400 or ttl < self.default_ttl:
+ raise ValueError(f"ttl must be between {self.default_ttl} and 86400")
+
+ if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY:
+ raise ValueError(
+ f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}"
+ )
+
+ url = urljoin(
+ self.api_endpoint, f"dns/editByNameType/{domain}/{record_type}/{subdomain}"
+ )
+ req_json = {
+ **self._get_auth_request_json(),
+ "type": record_type,
+ "content": content,
+ "ttl": ttl,
+ "prio": prio,
+ }
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def delete_dns_record(self, domain: str, record_id: str) -> bool:
+ """
+ API DNS delete method: delete an existing DNS record specified by the id for a given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Delete%20Record for more info.
+
+ :param domain: the domain for which the DNS record should be deleted
+ :param record_id: the id of the DNS record which should be deleted
+
+ :return: True if the deletion was successful
+ """
+
+ url = urljoin(self.api_endpoint, f"dns/delete/{domain}/{record_id}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def delete_all_dns_records(
+ self, domain: str, record_type: DNSRecordType, subdomain: str
+ ) -> bool:
+ """
+ API DNS delete method: delete all existing DNS record matching the domain, record type and subdomain.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Delete%20Records%20by%20Domain,%20Subdomain%20and%20Type for more info.
+
+ :param domain: the domain for which the DNS record should be deleted
+ :param record_type: the type of the DNS record
+ :param subdomain: the subdomain of the DNS record can be empty string for root domain
+
+ :return: True if the deletion was successful
+ """
+
+ url = urljoin(
+ self.api_endpoint,
+ f"dns/deleteByNameType/{domain}/{record_type}/{subdomain}",
+ )
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_dns_records(
+ self, domain, record_id: Optional[str] = None
+ ) -> List[DNSRecord]:
+ """
+ API DNS retrieve method: retrieve all DNS records for given domain if no record id is specified.
+ Otherwise, retrieve the DNS record of the specified domain with the given record id.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records for more info.
+
+ :param domain: the domain for which the DNS records should be retrieved
+ :param record_id: the id of the DNS record which should be retrieved
+
+ :return: list of DNSRecords objects
+ """
+
+ if record_id is None:
+ url = urljoin(self.api_endpoint, f"dns/retrieve/{domain}")
+ else:
+ url = urljoin(self.api_endpoint, f"dns/retrieve/{domain}/{record_id}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return [
+ DNSRecord.from_dict(record)
+ for record in json.loads(r.text).get("records", [])
+ ]
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_all_dns_records(
+ self, domain: str, record_type: DNSRecordType, subdomain: str
+ ) -> List[DNSRecord]:
+ """
+ API DNS retrieve method: retrieve all DNS records matching the domain, record type and subdomain.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records%20by%20Domain,%20Subdomain%20and%20Type for more info.
+
+ :param domain: the domain for which the DNS records should be retrieved
+ :param record_type: the type of the DNS records
+ :param subdomain: the subdomain of the DNS records can be empty string for root domain
+
+ :return: list of DNSRecords objects
+ """
+
+ url = urljoin(
+ self.api_endpoint,
+ f"dns/retrieveByNameType/{domain}/{record_type}/{subdomain}",
+ )
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return [
+ DNSRecord.from_dict(record)
+ for record in json.loads(r.text).get("records", [])
+ ]
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def export_dns_records(self, domain: str, filepath: Union[Path, str]) -> bool:
+ """
+ Export all DNS record from the given domain to a json file.
+ This method does not represent a Porkbun API method.
+ DNS records with all custom fields like notes are exported.
+
+ :param domain: the domain for which the DNS record should be retrieved and saved
+ :param filepath: the filepath where to save the exported DNS records
+
+ :return: True if everything went well
+ """
+
+ filepath = Path(filepath)
+
+ logger.debug("retrieve current DNS records...")
+ dns_records = self.get_dns_records(domain)
+
+ logger.debug("save DNS records to {} ...".format(filepath))
+ # merge the single DNS records into one single dict with the record id as key
+ dns_records_dict = dict()
+ for record in dns_records:
+ dns_records_dict[record.id] = record
+
+ if filepath.exists():
+ logger.warning("file already exists, overwriting...")
+
+ with open(filepath, "w") as f:
+ json.dump(dns_records_dict, f, default=lambda o: o.__dict__, indent=4)
+
+ logger.info("export finished")
+
+ return True
+
+ def export_bind_dns_records(self, domain: str, filepath: Union[Path, str]) -> bool:
+ """
+ Export all DNS record from the given domain to a BIND file.
+ This method does not represent a Porkbun API method.
+ Porkbun DNS record notes are exported as comments.
+
+ :param domain: the domain for which the DNS record should be retrieved and saved
+ :param filepath: the filepath where to save the exported DNS records
+
+ :return: True if everything went well
+ """
+
+ filepath = Path(filepath)
+
+ logger.debug("retrieve current DNS records...")
+ dns_records = self.get_dns_records(domain)
+
+ logger.debug("save DNS records to {} ...".format(filepath))
+ # merge the single DNS records into one single dict with the record id as key
+ dns_records_dict = dict()
+ for record in dns_records:
+ dns_records_dict[record.id] = record
+
+ if filepath.exists():
+ logger.warning("file already exists, overwriting...")
+
+ # domain header
+ bind_file_content = f"$ORIGIN {domain}"
+
+ # SOA record
+ soa_records = dns.resolver.resolve(domain, "SOA")
+ if soa_records:
+ soa_record = soa_records[0]
+ bind_file_content += f"\n@ IN SOA {soa_record.mname} {soa_record.rname} ({soa_record.serial} {soa_record.refresh} {soa_record.retry} {soa_record.expire} {soa_record.minimum})"
+
+ # records
+ for record in dns_records:
+ # name record class ttl record type record data
+ if record.prio:
+ record_content = f"{record.prio} {record.content}"
+ else:
+ record_content = record.content
+ bind_file_content += (
+ f"\n{record.name} IN {record.ttl} {record.type} {record_content}"
+ )
+
+ if record.notes:
+ bind_file_content += f" ; {record.notes}"
+
+ with open(filepath, "w") as f:
+ f.write(bind_file_content)
+
+ logger.info("export finished")
+
+ return True
+
+ def import_dns_records(
+ self, domain: str, filepath: Union[Path, str], restore_mode: DNSRestoreMode
+ ) -> bool:
+ """
+ Restore all DNS records from a json file to the given domain.
+ This method does not represent a Porkbun API method.
+
+ :param domain: the domain for which the DNS record should be restored
+ :param filepath: the filepath from which the DNS records are to be restored
+ :param restore_mode: The restore mode (DNS records are identified by the record type, name and prio if supported):
+ clear: remove all existing DNS records and restore all DNS records from the provided file
+ replace: replace only existing DNS records with the DNS records from the provided file,
+ but do not create any new DNS records
+ keep: keep the existing DNS records and only create new ones for all DNS records from
+ the specified file if they do not exist
+
+ :return: True if everything went well
+ """
+
+ filepath = Path(filepath)
+
+ existing_dns_records = self.get_dns_records(domain)
+
+ with open(filepath, "r") as f:
+ exported_dns_records_dict = json.load(f)
+
+ if restore_mode is DNSRestoreMode.clear:
+ logger.debug("restore mode: clear")
+
+ try:
+ # delete all existing DNS records
+ for record in existing_dns_records:
+ self.delete_dns_record(domain, record.id)
+
+ # restore all exported records by creating new DNS records
+ for _, exported_record in exported_dns_records_dict.items():
+ name = ".".join(exported_record["name"].split(".")[:-2])
+ self.create_dns_record(
+ domain=domain,
+ record_type=exported_record["type"],
+ content=exported_record["content"],
+ name=name,
+ ttl=exported_record["ttl"],
+ prio=exported_record["prio"],
+ )
+ except Exception as e:
+ logger.error("something went wrong: {}".format(e.__str__()))
+ self.__handle_error_backup__(existing_dns_records)
+ logger.error("import failed")
+ return False
+ elif restore_mode is DNSRestoreMode.replace:
+ logger.debug("restore mode: replace")
+
+ try:
+ existing_dns_record_hashed = {
+ sha256(
+ f"{record.type}{record.name}{record.prio}".encode()
+ ).hexdigest(): record
+ for record in existing_dns_records
+ }
+ for record in exported_dns_records_dict.values():
+ record_hash = sha256(
+ f"{record['type']}{record['name']}{record['prio']}".encode()
+ ).hexdigest()
+ existing_record = existing_dns_record_hashed.get(record_hash, None)
+ # check if the exported dns record is different to the existing record,
+ # so we can reduce unnecessary api calls
+ if existing_record is not None and (
+ record["content"] != existing_record.content
+ or record["ttl"] != existing_record.ttl
+ or record["prio"] != existing_record.prio
+ ):
+ self.update_dns_record(
+ domain=domain,
+ record_id=existing_record.id,
+ record_type=record["type"],
+ content=record["content"],
+ name=record["name"].replace(f".{domain}", ""),
+ ttl=record["ttl"],
+ prio=record["prio"],
+ )
+ except Exception as e:
+ logger.error("something went wrong: {}".format(e.__str__()))
+ self.__handle_error_backup__(existing_dns_records)
+ logger.error("import failed")
+ return False
+ elif restore_mode is DNSRestoreMode.keep:
+ logger.debug("restore mode: keep")
+
+ existing_dns_record_hashed = {
+ sha256(
+ f"{record.type}{record.name}{record.prio}".encode()
+ ).hexdigest(): record
+ for record in existing_dns_records
+ }
+
+ try:
+ for record in exported_dns_records_dict.values():
+ record_hash = sha256(
+ f"{record['type']}{record['name']}{record['prio']}".encode()
+ ).hexdigest()
+ existing_record = existing_dns_record_hashed.get(record_hash, None)
+ if existing_record is None:
+ self.create_dns_record(
+ domain=domain,
+ record_type=record["type"],
+ content=record["content"],
+ name=record["name"].replace(f".{domain}", ""),
+ ttl=record["ttl"],
+ prio=record["prio"],
+ )
+ except Exception as e:
+ logger.error("something went wrong: {}".format(e.__str__()))
+ self.__handle_error_backup__(existing_dns_records)
+ logger.error("import failed")
+ return False
+ else:
+ raise Exception("restore mode not supported")
+
+ logger.info("import successfully completed")
+
+ return True
+
+ def import_bind_dns_records(
+ self, filepath: Union[Path, str], restore_mode: DNSRestoreMode
+ ) -> bool:
+ """
+ Restore all DNS records from a BIND file.
+ This method does not represent a Porkbun API method.
+
+ :param filepath: the bind filepath from which the DNS records are to be restored
+ :param restore_mode: The restore mode:
+ clear: remove all existing DNS records and restore all DNS records from the provided file
+ :return: True if everything went well
+ """
+
+ bind_file = BindFile.from_file(filepath)
+
+ existing_dns_records = self.get_dns_records(bind_file.origin[:-1])
+
+ if restore_mode is DNSRestoreMode.clear:
+ logger.debug("restore mode: clear")
+
+ try:
+ # delete all existing DNS records
+ for record in existing_dns_records:
+ self.delete_dns_record(bind_file.origin[:-1], record.id)
+
+ # restore all records from BIND file by creating new DNS records
+ for record in bind_file.records:
+ # extract subdomain from record name
+ subdomain = record.name.replace(bind_file.origin, "")
+ # replace trailing dot
+ subdomain = subdomain[:-1] if subdomain.endswith(".") else subdomain
+ self.create_dns_record(
+ domain=bind_file.origin[:-1],
+ record_type=record.record_type,
+ content=record.data,
+ name=subdomain,
+ ttl=record.ttl,
+ prio=record.prio,
+ )
+
+ except Exception as e:
+ logger.error("something went wrong: {}".format(e.__str__()))
+ self.__handle_error_backup__(existing_dns_records)
+ logger.error("import failed")
+ return False
+ else:
+ raise Exception(f"restore mode '{restore_mode.value}' not supported")
+
+ logger.info("import successfully completed")
+
+ return True
+
+ def update_dns_servers(self, domain: str, name_servers: List[str]) -> bool:
+ """
+ Update the name servers of the specified domain.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20Update%20Name%20Servers for more info.
+
+ :return: True if everything went well
+ """
+
+ url = urljoin(self.api_endpoint, f"domain/updateNs/{domain}")
+ req_json = {**self._get_auth_request_json(), "ns": name_servers}
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200 and json.loads(r.text).get("status", None) == "SUCCESS":
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_dns_servers(self, domain: str) -> List[str]:
+ """
+ Get the name servers for the given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20Get%20Name%20Servers for more info.
+
+ :return: list of name servers
+ """
+
+ url = urljoin(self.api_endpoint, f"domain/getNs/{domain}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return json.loads(r.text).get("ns", [])
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_domains(self, start: int = 0) -> List[DomainInfo]:
+ """
+ Get all domains for the account in chunks of 1000. If you reach the end of all domains, the list will be empty.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20List%20All for more info.
+
+ :param start: the index of the first domain to retrieve
+
+ :return: list of DomainInfo objects
+ """
+
+ url = urljoin(self.api_endpoint, "domain/listAll")
+
+ req_json = {**self._get_auth_request_json(), "start": start}
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return [
+ DomainInfo.from_dict(domain)
+ for domain in json.loads(r.text).get("domains", [])
+ ]
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_url_forwards(self, domain: str) -> List[URLForwarding]:
+ """
+ Get the url forwarding for the given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20Get%20URL%20Forwarding for more info.
+
+ :return: list of URLForwarding objects
+ """
+
+ url = urljoin(self.api_endpoint, f"domain/getUrlForwarding/{domain}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return [
+ URLForwarding.from_dict(forwarding)
+ for forwarding in json.loads(r.text).get("forwards", [])
+ ]
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def create_url_forward(
+ self,
+ domain: str,
+ subdomain: str,
+ location: str,
+ type: URLForwardingType,
+ include_path: bool,
+ wildcard: bool,
+ ) -> bool:
+ """
+ Add a url forward for the given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20Add%20URL%20Forward for more info.
+
+ :param domain: the domain for which the url forwarding should be added
+ :param subdomain: the subdomain for which the url forwarding should be added, can be empty for root domain
+ :param location: the location to which the url forwarding should redirect
+ :param type: the type of the url forwarding
+ :param include_path: if the path should be included in the url forwarding
+ :param wildcard: if the url forwarding should also be applied to all subdomains
+
+ :return: True if the forwarding was added successfully
+ """
+
+ url = urljoin(self.api_endpoint, f"domain/addUrlForward/{domain}")
+ req_json = {
+ **self._get_auth_request_json(),
+ "subdomain": subdomain,
+ "location": location,
+ "type": type,
+ "includePath": include_path,
+ "wildcard": wildcard,
+ }
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def delete_url_forward(self, domain: str, id: str) -> bool:
+ """
+ Delete an url forward for the given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20Delete%20URL%20Forward for more info.
+
+ :param domain: the domain for which the url forwarding should be deleted
+ :param id: the id of the url forwarding which should be deleted
+
+ :return: True if the deletion was successful
+ """
+
+ url = urljoin(self.api_endpoint, f"domain/deleteUrlForward/{domain}/{id}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_domain_pricing(self) -> dict:
+ """
+ Get the pricing for all Porkbun domains.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20Pricing for more info.
+
+ :return: dict with pricing
+ """
+
+ url = urljoin(self.api_endpoint, "pricing/get")
+ r = requests.post(url=url)
+
+ if r.status_code == 200:
+ return json.loads(r.text)["pricing"]
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_ssl_bundle(self, domain) -> SSLCertBundle:
+ """
+ API SSL bundle retrieve method: retrieve an SSL bundle for the given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info.
+
+ :param domain: the domain for which the SSL bundle should be retrieved
+
+ :return: tuple of intermediate certificate, certificate chain, private key, public key
+ """
+
+ url = urljoin(self.api_endpoint, f"ssl/retrieve/{domain}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ ssl_bundle = json.loads(r.text)
+
+ return SSLCertBundle(
+ certificate_chain=ssl_bundle["certificatechain"],
+ private_key=ssl_bundle["privatekey"],
+ public_key=ssl_bundle["publickey"],
+ )
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ @staticmethod
+ def __handle_error_backup__(dns_records):
+ # merge the single DNS records into one single dict with the record id as key
+ dns_records_dict = dict()
+ for record in dns_records:
+ dns_records_dict[record["id"]] = record
+
+ # generate filename with incremental suffix
+ base_backup_filename = "pkb_client_dns_records_backup"
+ suffix = 0
+ backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix))
+ while backup_file_path.exists():
+ suffix += 1
+ backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix))
+
+ with open(backup_file_path, "w") as f:
+ json.dump(dns_records_dict, f)
+
+ logger.warning(
+ "a backup of your existing dns records was saved to {}".format(
+ str(backup_file_path)
+ )
+ )
diff --git a/pkb_client/client/dns.py b/pkb_client/client/dns.py
new file mode 100644
index 0000000..85ae971
--- /dev/null
+++ b/pkb_client/client/dns.py
@@ -0,0 +1,63 @@
+from dataclasses import dataclass
+from enum import Enum
+from typing import Optional
+
+
+class DNSRecordType(str, Enum):
+ A = "A"
+ AAAA = "AAAA"
+ MX = "MX"
+ CNAME = "CNAME"
+ ALIAS = "ALIAS"
+ TXT = "TXT"
+ NS = "NS"
+ SRV = "SRV"
+ TLSA = "TLSA"
+ CAA = "CAA"
+
+ def __str__(self):
+ return self.value
+
+
+DNS_RECORDS_WITH_PRIORITY = {DNSRecordType.MX, DNSRecordType.SRV}
+
+
+@dataclass
+class DNSRecord:
+ id: str
+ name: str
+ type: DNSRecordType
+ content: str
+ ttl: int
+ prio: Optional[int]
+ notes: str
+
+ @staticmethod
+ def from_dict(d):
+ # only use prio for supported record types since the API returns it for all records with default value 0
+ prio = int(d["prio"]) if d["type"] in DNS_RECORDS_WITH_PRIORITY else None
+ return DNSRecord(
+ id=d["id"],
+ name=d["name"],
+ type=DNSRecordType[d["type"]],
+ content=d["content"],
+ ttl=int(d["ttl"]),
+ prio=prio,
+ notes=d["notes"],
+ )
+
+
+class DNSRestoreMode(Enum):
+ clear = 0
+ replace = 1
+ keep = 2
+
+ def __str__(self):
+ return self.name
+
+ @staticmethod
+ def from_string(a):
+ try:
+ return DNSRestoreMode[a]
+ except KeyError:
+ return a
diff --git a/pkb_client/client/domain.py b/pkb_client/client/domain.py
new file mode 100644
index 0000000..a44a904
--- /dev/null
+++ b/pkb_client/client/domain.py
@@ -0,0 +1,29 @@
+from dataclasses import dataclass
+from datetime import datetime
+
+
+@dataclass
+class DomainInfo:
+ domain: str
+ status: str
+ tld: str
+ create_date: datetime
+ expire_date: datetime
+ security_lock: bool
+ whois_privacy: bool
+ auto_renew: bool
+ not_local: bool
+
+ @staticmethod
+ def from_dict(d):
+ return DomainInfo(
+ domain=d["domain"],
+ status=d["status"],
+ tld=d["tld"],
+ create_date=datetime.fromisoformat(d["createDate"]),
+ expire_date=datetime.fromisoformat(d["expireDate"]),
+ security_lock=bool(d["securityLock"]),
+ whois_privacy=bool(d["whoisPrivacy"]),
+ auto_renew=bool(d["autoRenew"]),
+ not_local=bool(d["notLocal"]),
+ )
diff --git a/pkb_client/client/forwarding.py b/pkb_client/client/forwarding.py
new file mode 100644
index 0000000..64962ca
--- /dev/null
+++ b/pkb_client/client/forwarding.py
@@ -0,0 +1,28 @@
+from dataclasses import dataclass
+from enum import Enum
+
+
+class URLForwardingType(str, Enum):
+ temporary = "temporary"
+ permanent = "permanent"
+
+
+@dataclass
+class URLForwarding:
+ id: str
+ subdomain: str
+ location: str
+ type: URLForwardingType
+ include_path: bool
+ wildcard: bool
+
+ @staticmethod
+ def from_dict(d):
+ return URLForwarding(
+ id=d["id"],
+ subdomain=d["subdomain"],
+ location=d["location"],
+ type=URLForwardingType[d["type"]],
+ include_path=d["includePath"] == "yes",
+ wildcard=d["wildcard"] == "yes",
+ )
diff --git a/pkb_client/client/ssl_cert.py b/pkb_client/client/ssl_cert.py
new file mode 100644
index 0000000..0662ce4
--- /dev/null
+++ b/pkb_client/client/ssl_cert.py
@@ -0,0 +1,13 @@
+from dataclasses import dataclass
+
+
+@dataclass
+class SSLCertBundle:
+ # The complete certificate chain.
+ certificate_chain: str
+
+ # The private key.
+ private_key: str
+
+ # The public key.
+ public_key: str