summaryrefslogtreecommitdiffstats
path: root/pkb_client/client/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'pkb_client/client/client.py')
-rw-r--r--pkb_client/client/client.py113
1 files changed, 110 insertions, 3 deletions
diff --git a/pkb_client/client/client.py b/pkb_client/client/client.py
index 86956a5..81b946b 100644
--- a/pkb_client/client/client.py
+++ b/pkb_client/client/client.py
@@ -2,7 +2,7 @@ import json
import logging
from hashlib import sha256
from pathlib import Path
-from typing import Optional, List, Union
+from typing import List, Optional, Union
from urllib.parse import urljoin
import dns.resolver
@@ -10,11 +10,12 @@ import requests
from pkb_client.client import BindFile
from pkb_client.client.dns import (
+ DNS_RECORDS_WITH_PRIORITY,
DNSRecord,
- DNSRestoreMode,
DNSRecordType,
- DNS_RECORDS_WITH_PRIORITY,
+ DNSRestoreMode,
)
+from pkb_client.client.dnssec import DNSSECRecord
from pkb_client.client.domain import DomainInfo
from pkb_client.client.forwarding import URLForwarding, URLForwardingType
from pkb_client.client.ssl_cert import SSLCertBundle
@@ -842,6 +843,112 @@ class PKBClient:
response_json.get("message", "Unknown message"),
)
+ def get_dnssec_records(self, domain: str) -> List[DNSSECRecord]:
+ """
+ API DNSSEC retrieve method: retrieve all DNSSEC records for the given domain.
+ See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Get%20Records for more info.
+
+ :param domain: the domain for which the DNSSEC records should be retrieved
+ :return: list of :class:`DNSSECRecord` objects
+ """
+
+ url = urljoin(self.api_endpoint, f"dns/getDnssecRecords/{domain}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return [
+ DNSSECRecord.from_dict(record)
+ for record in json.loads(r.text).get("records", {}).values()
+ ]
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def create_dnssec_record(
+ self,
+ domain: str,
+ key_tag: int,
+ alg: int,
+ digest_type: int,
+ digest: str,
+ max_sig_life: Optional[int] = None,
+ key_data_flags: Optional[int] = None,
+ key_data_protocol: Optional[int] = None,
+ key_data_algo: Optional[int] = None,
+ key_data_pub_key: Optional[str] = None,
+ ) -> bool:
+ """
+ API DNSSEC create method: create a new DNSSEC record for the given domain.
+ See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Create%20Record for more info.
+
+ :param domain: the domain for which the DNSSEC record should be created
+ :param key_tag: the key tag of the DNSSEC record
+ :param alg: algorithm of the DNSSEC record
+ :param digest_type: digest type of the DNSSEC record
+ :param digest: digest of the DNSSEC record
+ :param max_sig_life: maximum signature life of the DNSSEC record in seconds
+ :param key_data_flags: key data flags of the DNSSEC record
+ :param key_data_protocol: key data protocol of the DNSSEC record
+ :param key_data_algo: key data algorithm of the DNSSEC record
+ :param key_data_pub_key: key data public key of the DNSSEC record
+
+ :return: True if everything went well
+ """
+
+ if max_sig_life is not None and max_sig_life < 0:
+ raise ValueError("max_sig_life must be greater than 0")
+
+ url = urljoin(self.api_endpoint, f"dns/createDnssecRecord/{domain}")
+ req_json = {
+ **self._get_auth_request_json(),
+ "keyTag": key_tag,
+ "alg": alg,
+ "digestType": digest_type,
+ "digest": digest,
+ "maxSigLife": max_sig_life,
+ "keyDataFlags": key_data_flags,
+ "keyDataProtocol": key_data_protocol,
+ "keyDataAlgo": key_data_algo,
+ "keyDataPubKey": key_data_pub_key,
+ }
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def delete_dnssec_record(self, domain: str, key_tag: int) -> bool:
+ """
+ API DNSSEC delete method: delete an existing DNSSEC record for the given domain.
+ See https://porkbun.com/api/json/v3/documentation#DNSSEC%20Delete%20Record for more info.
+
+ :param domain: the domain for which the DNSSEC record should be deleted
+ :param key_tag: the key tag of the DNSSEC record
+ :return: True if everything went well
+ """
+
+ url = urljoin(self.api_endpoint, f"dns/deleteDnssecRecord/{domain}/{key_tag}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
@staticmethod
def __handle_error_backup__(dns_records):
# merge the single DNS records into one single dict with the record id as key