diff options
Diffstat (limited to 'pkb_client/client/client.py')
| -rw-r--r-- | pkb_client/client/client.py | 113 |
1 files changed, 110 insertions, 3 deletions
diff --git a/pkb_client/client/client.py b/pkb_client/client/client.py index 86956a5..81b946b 100644 --- a/pkb_client/client/client.py +++ b/pkb_client/client/client.py @@ -2,7 +2,7 @@ import json import logging from hashlib import sha256 from pathlib import Path -from typing import Optional, List, Union +from typing import List, Optional, Union from urllib.parse import urljoin import dns.resolver @@ -10,11 +10,12 @@ import requests from pkb_client.client import BindFile from pkb_client.client.dns import ( + DNS_RECORDS_WITH_PRIORITY, DNSRecord, - DNSRestoreMode, DNSRecordType, - DNS_RECORDS_WITH_PRIORITY, + DNSRestoreMode, ) +from pkb_client.client.dnssec import DNSSECRecord from pkb_client.client.domain import DomainInfo from pkb_client.client.forwarding import URLForwarding, URLForwardingType from pkb_client.client.ssl_cert import SSLCertBundle @@ -842,6 +843,112 @@ class PKBClient: response_json.get("message", "Unknown message"), ) + def get_dnssec_records(self, domain: str) -> List[DNSSECRecord]: + """ + API DNSSEC retrieve method: retrieve all DNSSEC records for the given domain. + See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Get%20Records for more info. + + :param domain: the domain for which the DNSSEC records should be retrieved + :return: list of :class:`DNSSECRecord` objects + """ + + url = urljoin(self.api_endpoint, f"dns/getDnssecRecords/{domain}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + DNSSECRecord.from_dict(record) + for record in json.loads(r.text).get("records", {}).values() + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def create_dnssec_record( + self, + domain: str, + key_tag: int, + alg: int, + digest_type: int, + digest: str, + max_sig_life: Optional[int] = None, + key_data_flags: Optional[int] = None, + key_data_protocol: Optional[int] = None, + key_data_algo: Optional[int] = None, + key_data_pub_key: Optional[str] = None, + ) -> bool: + """ + API DNSSEC create method: create a new DNSSEC record for the given domain. + See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Create%20Record for more info. + + :param domain: the domain for which the DNSSEC record should be created + :param key_tag: the key tag of the DNSSEC record + :param alg: algorithm of the DNSSEC record + :param digest_type: digest type of the DNSSEC record + :param digest: digest of the DNSSEC record + :param max_sig_life: maximum signature life of the DNSSEC record in seconds + :param key_data_flags: key data flags of the DNSSEC record + :param key_data_protocol: key data protocol of the DNSSEC record + :param key_data_algo: key data algorithm of the DNSSEC record + :param key_data_pub_key: key data public key of the DNSSEC record + + :return: True if everything went well + """ + + if max_sig_life is not None and max_sig_life < 0: + raise ValueError("max_sig_life must be greater than 0") + + url = urljoin(self.api_endpoint, f"dns/createDnssecRecord/{domain}") + req_json = { + **self._get_auth_request_json(), + "keyTag": key_tag, + "alg": alg, + "digestType": digest_type, + "digest": digest, + "maxSigLife": max_sig_life, + "keyDataFlags": key_data_flags, + "keyDataProtocol": key_data_protocol, + "keyDataAlgo": key_data_algo, + "keyDataPubKey": key_data_pub_key, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def delete_dnssec_record(self, domain: str, key_tag: int) -> bool: + """ + API DNSSEC delete method: delete an existing DNSSEC record for the given domain. + See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Delete%20Record for more info. + + :param domain: the domain for which the DNSSEC record should be deleted + :param key_tag: the key tag of the DNSSEC record + :return: True if everything went well + """ + + url = urljoin(self.api_endpoint, f"dns/deleteDnssecRecord/{domain}/{key_tag}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + @staticmethod def __handle_error_backup__(dns_records): # merge the single DNS records into one single dict with the record id as key |
