summaryrefslogtreecommitdiffstats
path: root/pkb_client/client
diff options
context:
space:
mode:
authorLibravatarUnit 193 <unit193@unit193.net>2025-04-10 05:29:27 -0400
committerLibravatarUnit 193 <unit193@unit193.net>2025-04-10 05:29:27 -0400
commitb10c92e14e6bc28dfd5e7ca235fc4a2a521f8272 (patch)
treebd679d9c6618668477647b4b8fc9b3e371231402 /pkb_client/client
parent3e3ebe586385a83b10c8f1d0b9ba9b67c8b56d2f (diff)
New upstream version 2.1.1.upstream/2.1.1
Diffstat (limited to 'pkb_client/client')
-rw-r--r--pkb_client/client/client.py113
-rw-r--r--pkb_client/client/dnssec.py35
2 files changed, 145 insertions, 3 deletions
diff --git a/pkb_client/client/client.py b/pkb_client/client/client.py
index 86956a5..81b946b 100644
--- a/pkb_client/client/client.py
+++ b/pkb_client/client/client.py
@@ -2,7 +2,7 @@ import json
import logging
from hashlib import sha256
from pathlib import Path
-from typing import Optional, List, Union
+from typing import List, Optional, Union
from urllib.parse import urljoin
import dns.resolver
@@ -10,11 +10,12 @@ import requests
from pkb_client.client import BindFile
from pkb_client.client.dns import (
+ DNS_RECORDS_WITH_PRIORITY,
DNSRecord,
- DNSRestoreMode,
DNSRecordType,
- DNS_RECORDS_WITH_PRIORITY,
+ DNSRestoreMode,
)
+from pkb_client.client.dnssec import DNSSECRecord
from pkb_client.client.domain import DomainInfo
from pkb_client.client.forwarding import URLForwarding, URLForwardingType
from pkb_client.client.ssl_cert import SSLCertBundle
@@ -842,6 +843,112 @@ class PKBClient:
response_json.get("message", "Unknown message"),
)
+ def get_dnssec_records(self, domain: str) -> List[DNSSECRecord]:
+ """
+ API DNSSEC retrieve method: retrieve all DNSSEC records for the given domain.
+ See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Get%20Records for more info.
+
+ :param domain: the domain for which the DNSSEC records should be retrieved
+ :return: list of :class:`DNSSECRecord` objects
+ """
+
+ url = urljoin(self.api_endpoint, f"dns/getDnssecRecords/{domain}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return [
+ DNSSECRecord.from_dict(record)
+ for record in json.loads(r.text).get("records", {}).values()
+ ]
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def create_dnssec_record(
+ self,
+ domain: str,
+ key_tag: int,
+ alg: int,
+ digest_type: int,
+ digest: str,
+ max_sig_life: Optional[int] = None,
+ key_data_flags: Optional[int] = None,
+ key_data_protocol: Optional[int] = None,
+ key_data_algo: Optional[int] = None,
+ key_data_pub_key: Optional[str] = None,
+ ) -> bool:
+ """
+ API DNSSEC create method: create a new DNSSEC record for the given domain.
+ See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Create%20Record for more info.
+
+ :param domain: the domain for which the DNSSEC record should be created
+ :param key_tag: the key tag of the DNSSEC record
+ :param alg: algorithm of the DNSSEC record
+ :param digest_type: digest type of the DNSSEC record
+ :param digest: digest of the DNSSEC record
+ :param max_sig_life: maximum signature life of the DNSSEC record in seconds
+ :param key_data_flags: key data flags of the DNSSEC record
+ :param key_data_protocol: key data protocol of the DNSSEC record
+ :param key_data_algo: key data algorithm of the DNSSEC record
+ :param key_data_pub_key: key data public key of the DNSSEC record
+
+ :return: True if everything went well
+ """
+
+ if max_sig_life is not None and max_sig_life < 0:
+ raise ValueError("max_sig_life must be greater than 0")
+
+ url = urljoin(self.api_endpoint, f"dns/createDnssecRecord/{domain}")
+ req_json = {
+ **self._get_auth_request_json(),
+ "keyTag": key_tag,
+ "alg": alg,
+ "digestType": digest_type,
+ "digest": digest,
+ "maxSigLife": max_sig_life,
+ "keyDataFlags": key_data_flags,
+ "keyDataProtocol": key_data_protocol,
+ "keyDataAlgo": key_data_algo,
+ "keyDataPubKey": key_data_pub_key,
+ }
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def delete_dnssec_record(self, domain: str, key_tag: int) -> bool:
+ """
+ API DNSSEC delete method: delete an existing DNSSEC record for the given domain.
+ See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Delete%20Record for more info.
+
+ :param domain: the domain for which the DNSSEC record should be deleted
+ :param key_tag: the key tag of the DNSSEC record
+ :return: True if everything went well
+ """
+
+ url = urljoin(self.api_endpoint, f"dns/deleteDnssecRecord/{domain}/{key_tag}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
@staticmethod
def __handle_error_backup__(dns_records):
# merge the single DNS records into one single dict with the record id as key
diff --git a/pkb_client/client/dnssec.py b/pkb_client/client/dnssec.py
new file mode 100644
index 0000000..2f68ecb
--- /dev/null
+++ b/pkb_client/client/dnssec.py
@@ -0,0 +1,35 @@
+from dataclasses import dataclass
+from typing import Optional
+
+
+@dataclass
+class DNSSECRecord:
+ key_tag: int # The key tag is a 16-bit integer that identifies the DNSKEY record
+ alg: int # Indicates the algorithm used to generate the public key
+ digest_type: int # Indicates the type of digest algorithm used
+ digest: str # The digest of the public key
+ max_sig_life: Optional[
+ int
+ ] # Indicates the amount of time in seconds the signature is valid
+ key_data_flags: Optional[
+ int
+ ] # Indicates the key type (Zone-signing or Key-signing)
+ key_data_protocol: Optional[int] # Indicates the protocol used for the key
+ key_data_algo: Optional[int] # Indicates the algorithm used for the key
+ key_data_pub_key: Optional[str] # The public key in base64 format
+
+ @staticmethod
+ def from_dict(d):
+ return DNSSECRecord(
+ key_tag=int(d["keyTag"]),
+ alg=int(d["alg"]),
+ digest_type=int(d["digestType"]),
+ digest=d["digest"],
+ max_sig_life=int(d["maxSigLife"]) if "maxSigLife" in d else None,
+ key_data_flags=int(d["keyDataFlags"]) if "keyDataFlags" in d else None,
+ key_data_protocol=int(d["keyDataProtocol"])
+ if "keyDataProtocol" in d
+ else None,
+ key_data_algo=int(d["keyDataAlgo"]) if "keyDataAlgo" in d else None,
+ key_data_pub_key=d["keyDataPubKey"] if "keyDataPubKey" in d else None,
+ )