summaryrefslogtreecommitdiffstats
path: root/pkb_client/client.py
diff options
context:
space:
mode:
authorLibravatarUnit 193 <unit193@unit193.net>2024-11-20 01:17:40 -0500
committerLibravatarUnit 193 <unit193@unit193.net>2024-11-20 01:17:40 -0500
commit3e3ebe586385a83b10c8f1d0b9ba9b67c8b56d2f (patch)
tree5682f748fc9867166043734aad44e1734d16abeb /pkb_client/client.py
parentfa197fe27b8a03bbf4504476f842956ece2c76c9 (diff)
New upstream version 2.0.0.upstream/2.0.0
Diffstat (limited to 'pkb_client/client.py')
-rw-r--r--pkb_client/client.py455
1 files changed, 0 insertions, 455 deletions
diff --git a/pkb_client/client.py b/pkb_client/client.py
deleted file mode 100644
index 48c0d82..0000000
--- a/pkb_client/client.py
+++ /dev/null
@@ -1,455 +0,0 @@
-import json
-import logging
-from enum import Enum
-from pathlib import Path
-from typing import Optional, Tuple
-from urllib.parse import urljoin
-
-import requests
-
-from pkb_client.helper import parse_dns_record
-
-API_ENDPOINT = "https://porkbun.com/api/json/v3/"
-SUPPORTED_DNS_RECORD_TYPES = ["A", "AAAA", "MX", "CNAME", "ALIAS", "TXT", "NS", "SRV", "TLSA", "CAA"]
-
-# prevent urllib3 to log request with the api key and secret
-logging.getLogger("urllib3").setLevel(logging.WARNING)
-
-
-class DNSRestoreMode(Enum):
- clear = 0
- replace = 1
- keep = 2
-
- def __str__(self):
- return self.name
-
- @staticmethod
- def from_string(a):
- try:
- return DNSRestoreMode[a]
- except KeyError:
- return a
-
-
-class PKBClient:
- """
- API client for Porkbun.
- """
-
- def __init__(self, api_key: str, secret_api_key: str) -> None:
- """
- Creates a new PKBClient object.
-
- :param api_key: the API key used for Porkbun API calls
- :param secret_api_key: the API secret used for Porkbun API calls
- """
-
- assert api_key is not None and len(api_key) > 0
- assert secret_api_key is not None and len(secret_api_key) > 0
-
- self.api_key = api_key
- self.secret_api_key = secret_api_key
-
- def ping(self, **kwargs) -> str:
- """
- API ping method: get the current public ip address of the requesting system; can also be used for auth checking
- see https://porkbun.com/api/json/v3/documentation#Authentication for more info
-
- :return: the current public ip address of the requesting system
- """
-
- url = urljoin(API_ENDPOINT, "ping")
- req_json = {
- "apikey": self.api_key,
- "secretapikey": self.secret_api_key
- }
- r = requests.post(url=url, json=req_json)
-
- if r.status_code == 200:
- return json.loads(r.text).get("yourIp", None)
- else:
- raise Exception("ERROR: ping api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- def dns_create(self,
- domain: str,
- record_type: str,
- content: str,
- name: Optional[str] = None,
- ttl: Optional[int] = 300,
- prio: Optional[int] = None, **kwargs) -> str:
- """
- API DNS create method: create a new DNS record for given domain
- see https://porkbun.com/api/json/v3/documentation#DNS%20Create%20Record for more info
-
- :param domain: the domain for which the DNS record should be created
- :param record_type: the type of the new DNS record;
- supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA
- :param content: the content of the new DNS record
- :param name: the subdomain for which the new DNS record entry should apply; the * can be used for a
- wildcard DNS record; if not used, then a DNS record for the root domain will be created
- :param ttl: the time to live in seconds of the new DNS record; have to be between 0 and 2147483647
- :param prio: the priority of the new DNS record
-
- :return: the id of the new created DNS record
- """
-
- assert domain is not None and len(domain) > 0
- assert record_type in SUPPORTED_DNS_RECORD_TYPES
- assert content is not None and len(content) > 0
- assert ttl is None or 300 <= ttl <= 2147483647
-
- url = urljoin(API_ENDPOINT, "dns/create/{}".format(domain))
- req_json = {
- "apikey": self.api_key,
- "secretapikey": self.secret_api_key,
- "name": name,
- "type": record_type,
- "content": content,
- "ttl": ttl,
- "prio": prio
- }
- r = requests.post(url=url, json=req_json)
-
- if r.status_code == 200:
- return str(json.loads(r.text).get("id", None))
- else:
- raise Exception("ERROR: DNS create api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- def dns_edit(self,
- domain: str,
- record_id: str,
- record_type: str,
- content: str,
- name: str = None,
- ttl: int = 300,
- prio: int = None,
- **kwargs) -> bool:
- """
- API DNS edit method: edit an existing DNS record specified by the id for a given domain
- see https://porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record for more info
-
- :param domain: the domain for which the DNS record should be edited
- :param record_id: the id of the DNS record which should be edited
- :param record_type: the new type of the DNS record;
- supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA
- :param content: the new content of the DNS record
- :param name: the new value of the subdomain for which the DNS record should apply; the * can be used for a
- wildcard DNS record; if not set, the record will be set for the record domain
- :param ttl: the new time to live in seconds of the DNS record, have to be between 0 and 2147483647
- :param prio: the new priority of the DNS record
-
- :return: True if the editing was successful
- """
-
- assert domain is not None and len(domain) > 0
- assert record_id is not None and len(record_id) > 0
- assert record_type in SUPPORTED_DNS_RECORD_TYPES
- assert content is not None and len(content) > 0
- assert ttl is None or 300 <= ttl <= 2147483647
-
- url = urljoin(API_ENDPOINT, "dns/edit/{}/{}".format(domain, record_id))
- req_json = {
- "apikey": self.api_key,
- "secretapikey": self.secret_api_key,
- "name": name,
- "type": record_type,
- "content": content,
- "ttl": ttl,
- "prio": prio
- }
- r = requests.post(url=url, json=req_json)
-
- if r.status_code == 200:
- return True
- else:
- raise Exception("ERROR: DNS edit api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- def dns_delete(self,
- domain: str,
- record_id: str,
- **kwargs) -> bool:
- """
- API DNS delete method: delete an existing DNS record specified by the id for a given domain
- see https://porkbun.com/api/json/v3/documentation#DNS%20Delete%20Record for more info
-
- :param domain: the domain for which the DNS record should be deleted
- :param record_id: the id of the DNS record which should be deleted
-
- :return: True if the deletion was successful
- """
-
- assert domain is not None and len(domain) > 0
- assert record_id is not None and len(record_id) > 0
-
- url = urljoin(API_ENDPOINT, "dns/delete/{}/{}".format(domain, record_id))
- req_json = {
- "apikey": self.api_key,
- "secretapikey": self.secret_api_key
- }
- r = requests.post(url=url, json=req_json)
-
- if r.status_code == 200:
- return True
- else:
- raise Exception("ERROR: DNS delete api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- def dns_retrieve(self, domain, **kwargs) -> list:
- """
- API DNS retrieve method: retrieve all DNS records for given domain
- see https://porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records for more info
-
- :param domain: the domain for which the DNS records should be retrieved
-
- :return: list of DNS records as dicts
-
- The list structure will be:
- [
- {
- "id": "123456789",
- "name": "example.com",
- "type": "TXT",
- "content": "this is a nice text",
- "ttl": "300",
- "prio": None,
- "notes": ""
- },
- {
- "id": "234567890",
- "name": "example.com",
- "type": "A",
- "content": "0.0.0.0",
- "ttl": "300",
- "prio": 0,
- "notes": ""
- }
- ]
- """
-
- assert domain is not None and len(domain) > 0
-
- url = urljoin(API_ENDPOINT, "dns/retrieve/{}".format(domain))
- req_json = {
- "apikey": self.api_key,
- "secretapikey": self.secret_api_key
- }
- r = requests.post(url=url, json=req_json)
-
- if r.status_code == 200:
- return [parse_dns_record(record) for record in json.loads(r.text).get("records", [])]
- else:
- raise Exception("ERROR: DNS retrieve api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- def dns_export(self, domain: str, filename: str, **kwargs) -> bool:
- """
- Export all DNS record from the given domain as json to a file.
- This method does not not represent a Porkbun API method.
-
- :param domain: the domain for which the DNS record should be retrieved and saved
- :param filename: the filename where to save the exported DNS records
-
- :return: True if everything went well
- """
-
- assert domain is not None and len(domain) > 0
- assert filename is not None and len(filename) > 0
-
- print("retrieve current DNS records...")
- dns_records = self.dns_retrieve(domain)
-
- print("save DNS records to {} ...".format(filename))
- # merge the single DNS records into one single dict with the record id as key
- dns_records_dict = dict()
- for record in dns_records:
- dns_records_dict[record["id"]] = record
-
- filepath = Path(filename)
- if filepath.exists():
- raise Exception("File already exists. Please try another filename")
- with open(filepath, "w") as f:
- json.dump(dns_records_dict, f)
- print("export finished")
-
- return True
-
- def dns_import(self, domain: str, filename: str, restore_mode: DNSRestoreMode, **kwargs) -> bool:
- """
- Restore
- This method does not not represent a Porkbun API method.
-
- :param domain: the domain for which the DNS record should be restored
- :param filename: the filename from which the DNS records are to be restored
- :param restore_mode: The restore mode (DNS records are identified by the record id)
- clean: remove all existing DNS records and restore all DNS records from the provided file
- replace: replace only existing DNS records with the DNS records from the provided file,
- but do not create any new DNS records
- keep: keep the existing DNS records and only create new ones for all DNS records from
- the specified file if they do not exist
-
- :return: True if everything went well
- """
-
- assert domain is not None and len(domain) > 0
- assert filename is not None and len(filename) > 0
- assert isinstance(restore_mode, DNSRestoreMode)
-
- existing_dns_records = self.dns_retrieve(domain)
-
- with open(filename, "r") as f:
- exported_dns_records_dict = json.load(f)
-
- if restore_mode is DNSRestoreMode.clear:
- print("restore mode: clear")
-
- try:
- # delete all existing DNS records
- for record in existing_dns_records:
- self.dns_delete(domain, record["id"])
-
- # restore all exported records by creating new DNS records
- for _, exported_record in exported_dns_records_dict.items():
- name = ".".join(exported_record["name"].split(".")[:-2])
- self.dns_create(domain=domain,
- record_type=exported_record["type"],
- content=exported_record["content"],
- name=name,
- ttl=exported_record["ttl"],
- prio=exported_record["prio"])
- except Exception as e:
- print("something went wrong: {}".format(e.__str__()))
- self.__handle_error_backup__(existing_dns_records)
- print("import failed")
- return False
- elif restore_mode is DNSRestoreMode.replace:
- print("restore mode: replace")
-
- try:
- for existing_record in existing_dns_records:
- record_id = existing_record["id"]
- exported_record = exported_dns_records_dict.get(record_id, None)
- # also check if the exported dns record is different to the existing record,
- # so we can reduce unnecessary api calls
- if exported_record is not None and exported_record != existing_record:
- name = ".".join(exported_record["name"].split(".")[:-2])
- self.dns_edit(domain=domain,
- record_id=record_id,
- record_type=exported_record["type"],
- content=exported_record["content"],
- name=name,
- ttl=exported_record["ttl"],
- prio=exported_record["prio"])
- except Exception as e:
- print("something went wrong: {}".format(e.__str__()))
- self.__handle_error_backup__(existing_dns_records)
- print("import failed")
- return False
- elif restore_mode is DNSRestoreMode.keep:
- print("restore mode: keep")
-
- existing_dns_records_dict = dict()
- for record in existing_dns_records:
- existing_dns_records_dict[record["id"]] = record
-
- try:
- for _, exported_record in exported_dns_records_dict.items():
- if exported_record["id"] not in existing_dns_records_dict:
- name = ".".join(exported_record["name"].split(".")[:-2])
- self.dns_create(domain=domain,
- record_type=exported_record["type"],
- content=exported_record["content"],
- name=name,
- ttl=exported_record["ttl"],
- prio=exported_record["prio"])
- except Exception as e:
- print("something went wrong: {}".format(e.__str__()))
- self.__handle_error_backup__(existing_dns_records)
- print("import failed")
- return False
- else:
- raise Exception("restore mode not supported")
-
- print("import successfully completed")
-
- return True
-
- @staticmethod
- def get_domain_pricing(**kwargs) -> dict:
- """
- Get the pricing for porkbun domains
- see https://porkbun.com/api/json/v3/documentation#Domain%20Pricing for more info
-
- :return: dict with pricing
- """
-
- url = urljoin(API_ENDPOINT, "pricing/get")
- r = requests.post(url=url)
-
- if r.status_code == 200:
- return json.loads(r.text)
- else:
- raise Exception("ERROR: Domain pricing retrieve api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- def ssl_retrieve(self, domain, **kwargs) -> Tuple[str, str, str, str]:
- """
- API SSL bundle retrieve method: retrieve an SSL bundle for given domain
- see https://porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info
-
- :param domain: the domain for which the SSL bundle should be retrieved
-
- :return: tuple of intermediate certificate, certificate chain, private key, public key
- """
-
- assert domain is not None and len(domain) > 0
-
- url = urljoin(API_ENDPOINT, "ssl/retrieve/{}".format(domain))
- req_json = {
- "apikey": self.api_key,
- "secretapikey": self.secret_api_key
- }
- r = requests.post(url=url, json=req_json)
-
- if r.status_code == 200:
- ssl_bundle = json.loads(r.text)
-
- intermediate_certificate = ssl_bundle["intermediate_certificate"]
- certificate_chain = ssl_bundle["certificate_chain"]
- private_key = ssl_bundle["private_key"]
- public_key = ssl_bundle["public_key"]
-
- return intermediate_certificate, certificate_chain, private_key, public_key
- else:
- raise Exception("ERROR: SSL bundle retrieve api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- @staticmethod
- def __handle_error_backup__(dns_records):
- # merge the single DNS records into one single dict with the record id as key
- dns_records_dict = dict()
- for record in dns_records:
- dns_records_dict[record["id"]] = record
-
- # generate filename with incremental suffix
- base_backup_filename = "pkb_client_dns_records_backup"
- suffix = 0
- backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix))
- while backup_file_path.exists():
- suffix += 1
- backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix))
-
- with open(backup_file_path, "w") as f:
- json.dump(dns_records_dict, f)
-
- print("a backup of your existing dns records was saved to {}".format(str(backup_file_path)))