diff options
| author | 2025-04-10 05:29:27 -0400 | |
|---|---|---|
| committer | 2025-04-10 05:29:27 -0400 | |
| commit | b10c92e14e6bc28dfd5e7ca235fc4a2a521f8272 (patch) | |
| tree | bd679d9c6618668477647b4b8fc9b3e371231402 /pkb_client/client | |
| parent | 3e3ebe586385a83b10c8f1d0b9ba9b67c8b56d2f (diff) | |
New upstream version 2.1.1.upstream/2.1.1
Diffstat (limited to 'pkb_client/client')
| -rw-r--r-- | pkb_client/client/client.py | 113 | ||||
| -rw-r--r-- | pkb_client/client/dnssec.py | 35 |
2 files changed, 145 insertions, 3 deletions
diff --git a/pkb_client/client/client.py b/pkb_client/client/client.py index 86956a5..81b946b 100644 --- a/pkb_client/client/client.py +++ b/pkb_client/client/client.py @@ -2,7 +2,7 @@ import json import logging from hashlib import sha256 from pathlib import Path -from typing import Optional, List, Union +from typing import List, Optional, Union from urllib.parse import urljoin import dns.resolver @@ -10,11 +10,12 @@ import requests from pkb_client.client import BindFile from pkb_client.client.dns import ( + DNS_RECORDS_WITH_PRIORITY, DNSRecord, - DNSRestoreMode, DNSRecordType, - DNS_RECORDS_WITH_PRIORITY, + DNSRestoreMode, ) +from pkb_client.client.dnssec import DNSSECRecord from pkb_client.client.domain import DomainInfo from pkb_client.client.forwarding import URLForwarding, URLForwardingType from pkb_client.client.ssl_cert import SSLCertBundle @@ -842,6 +843,112 @@ class PKBClient: response_json.get("message", "Unknown message"), ) + def get_dnssec_records(self, domain: str) -> List[DNSSECRecord]: + """ + API DNSSEC retrieve method: retrieve all DNSSEC records for the given domain. + See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Get%20Records for more info. + + :param domain: the domain for which the DNSSEC records should be retrieved + :return: list of :class:`DNSSECRecord` objects + """ + + url = urljoin(self.api_endpoint, f"dns/getDnssecRecords/{domain}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + DNSSECRecord.from_dict(record) + for record in json.loads(r.text).get("records", {}).values() + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def create_dnssec_record( + self, + domain: str, + key_tag: int, + alg: int, + digest_type: int, + digest: str, + max_sig_life: Optional[int] = None, + key_data_flags: Optional[int] = None, + key_data_protocol: Optional[int] = None, + key_data_algo: Optional[int] = None, + key_data_pub_key: Optional[str] = None, + ) -> bool: + """ + API DNSSEC create method: create a new DNSSEC record for the given domain. + See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Create%20Record for more info. + + :param domain: the domain for which the DNSSEC record should be created + :param key_tag: the key tag of the DNSSEC record + :param alg: algorithm of the DNSSEC record + :param digest_type: digest type of the DNSSEC record + :param digest: digest of the DNSSEC record + :param max_sig_life: maximum signature life of the DNSSEC record in seconds + :param key_data_flags: key data flags of the DNSSEC record + :param key_data_protocol: key data protocol of the DNSSEC record + :param key_data_algo: key data algorithm of the DNSSEC record + :param key_data_pub_key: key data public key of the DNSSEC record + + :return: True if everything went well + """ + + if max_sig_life is not None and max_sig_life < 0: + raise ValueError("max_sig_life must be greater than 0") + + url = urljoin(self.api_endpoint, f"dns/createDnssecRecord/{domain}") + req_json = { + **self._get_auth_request_json(), + "keyTag": key_tag, + "alg": alg, + "digestType": digest_type, + "digest": digest, + "maxSigLife": max_sig_life, + "keyDataFlags": key_data_flags, + "keyDataProtocol": key_data_protocol, + "keyDataAlgo": key_data_algo, + "keyDataPubKey": key_data_pub_key, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def delete_dnssec_record(self, domain: str, key_tag: int) -> bool: + """ + API DNSSEC delete method: delete an existing DNSSEC record for the given domain. + See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Delete%20Record for more info. + + :param domain: the domain for which the DNSSEC record should be deleted + :param key_tag: the key tag of the DNSSEC record + :return: True if everything went well + """ + + url = urljoin(self.api_endpoint, f"dns/deleteDnssecRecord/{domain}/{key_tag}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + @staticmethod def __handle_error_backup__(dns_records): # merge the single DNS records into one single dict with the record id as key diff --git a/pkb_client/client/dnssec.py b/pkb_client/client/dnssec.py new file mode 100644 index 0000000..2f68ecb --- /dev/null +++ b/pkb_client/client/dnssec.py @@ -0,0 +1,35 @@ +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class DNSSECRecord: + key_tag: int # The key tag is a 16-bit integer that identifies the DNSKEY record + alg: int # Indicates the algorithm used to generate the public key + digest_type: int # Indicates the type of digest algorithm used + digest: str # The digest of the public key + max_sig_life: Optional[ + int + ] # Indicates the amount of time in seconds the signature is valid + key_data_flags: Optional[ + int + ] # Indicates the key type (Zone-signing or Key-signing) + key_data_protocol: Optional[int] # Indicates the protocol used for the key + key_data_algo: Optional[int] # Indicates the algorithm used for the key + key_data_pub_key: Optional[str] # The public key in base64 format + + @staticmethod + def from_dict(d): + return DNSSECRecord( + key_tag=int(d["keyTag"]), + alg=int(d["alg"]), + digest_type=int(d["digestType"]), + digest=d["digest"], + max_sig_life=int(d["maxSigLife"]) if "maxSigLife" in d else None, + key_data_flags=int(d["keyDataFlags"]) if "keyDataFlags" in d else None, + key_data_protocol=int(d["keyDataProtocol"]) + if "keyDataProtocol" in d + else None, + key_data_algo=int(d["keyDataAlgo"]) if "keyDataAlgo" in d else None, + key_data_pub_key=d["keyDataPubKey"] if "keyDataPubKey" in d else None, + ) |
