summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/bind_file.py148
-rw-r--r--tests/client.py1025
-rw-r--r--tests/data/__init__.py0
-rw-r--r--tests/data/test.bind10
-rw-r--r--tests/data/test_no_ttl.bind9
-rw-r--r--tests/pkb_client_tests.py569
6 files changed, 1192 insertions, 569 deletions
diff --git a/tests/bind_file.py b/tests/bind_file.py
new file mode 100644
index 0000000..4d1114f
--- /dev/null
+++ b/tests/bind_file.py
@@ -0,0 +1,148 @@
+import tempfile
+import unittest
+from importlib import resources
+
+from pkb_client.client.bind_file import BindFile, BindRecord, RecordClass
+from pkb_client.client.dns import DNSRecordType
+from tests import data
+
+
+class TestBindFileParsing(unittest.TestCase):
+ def test_reading_bind_file(self):
+ with self.subTest("With default TTL"):
+ with resources.open_text(data, "test.bind") as f:
+ bind_file = BindFile.from_file(f.name)
+
+ self.assertEqual("test.com.", bind_file.origin)
+ self.assertEqual(1234, bind_file.ttl)
+ self.assertEqual(5, len(bind_file.records))
+ self.assertEqual(
+ BindRecord(
+ "test.com.", 600, RecordClass.IN, DNSRecordType.A, "1.2.3.4"
+ ),
+ bind_file.records[0],
+ )
+ self.assertEqual(
+ BindRecord(
+ "sub.test.com.", 600, RecordClass.IN, DNSRecordType.A, "4.3.2.1"
+ ),
+ bind_file.records[1],
+ )
+ self.assertEqual(
+ BindRecord(
+ "test.com.",
+ 600,
+ RecordClass.IN,
+ DNSRecordType.AAAA,
+ "2001:db8::1",
+ comment="This is a comment",
+ ),
+ bind_file.records[2],
+ )
+ self.assertEqual(
+ BindRecord(
+ "test.com.", 1234, RecordClass.IN, DNSRecordType.TXT, "pkb-client"
+ ),
+ bind_file.records[3],
+ )
+ self.assertEqual(
+ BindRecord(
+ "test.com.",
+ 600,
+ RecordClass.IN,
+ DNSRecordType.MX,
+ "mail.test.com.",
+ prio=10,
+ ),
+ bind_file.records[4],
+ )
+
+ with self.subTest("Without default TTL"):
+ with resources.open_text(data, "test_no_ttl.bind") as f:
+ bind_file = BindFile.from_file(f.name)
+
+ self.assertEqual("test.com.", bind_file.origin)
+ self.assertEqual(None, bind_file.ttl)
+ self.assertEqual(5, len(bind_file.records))
+ self.assertEqual(
+ BindRecord(
+ "test.com.", 600, RecordClass.IN, DNSRecordType.A, "1.2.3.4"
+ ),
+ bind_file.records[0],
+ )
+ self.assertEqual(
+ BindRecord(
+ "sub.test.com.", 600, RecordClass.IN, DNSRecordType.A, "4.3.2.1"
+ ),
+ bind_file.records[1],
+ )
+ self.assertEqual(
+ BindRecord(
+ "test.com.",
+ 700,
+ RecordClass.IN,
+ DNSRecordType.AAAA,
+ "2001:db8::1",
+ comment="This is a comment",
+ ),
+ bind_file.records[2],
+ )
+ self.assertEqual(
+ BindRecord(
+ "test.com.", 700, RecordClass.IN, DNSRecordType.TXT, "pkb-client"
+ ),
+ bind_file.records[3],
+ )
+ self.assertEqual(
+ BindRecord(
+ "test.com.",
+ 600,
+ RecordClass.IN,
+ DNSRecordType.MX,
+ "mail.test.com.",
+ prio=10,
+ ),
+ bind_file.records[4],
+ )
+
+ def test_writing_bind_file(self):
+ records = [
+ BindRecord("test.com.", 600, RecordClass.IN, DNSRecordType.A, "1.2.3.4"),
+ BindRecord(
+ "sub.test.com.", 700, RecordClass.IN, DNSRecordType.A, "4.3.2.1"
+ ),
+ BindRecord(
+ "test.com.", 600, RecordClass.IN, DNSRecordType.AAAA, "2001:db8::1"
+ ),
+ BindRecord(
+ "test.com.", 600, RecordClass.IN, DNSRecordType.TXT, "pkb-client"
+ ),
+ BindRecord(
+ "test.com.",
+ 600,
+ RecordClass.IN,
+ DNSRecordType.MX,
+ "mail.test.com.",
+ prio=10,
+ ),
+ ]
+ bind_file = BindFile("test.com.", 1234, records)
+
+ file_content = (
+ "$ORIGIN test.com.\n"
+ "$TTL 1234\n"
+ "test.com. 600 IN A 1.2.3.4\n"
+ "sub.test.com. 700 IN A 4.3.2.1\n"
+ "test.com. 600 IN AAAA 2001:db8::1\n"
+ "test.com. 600 IN TXT pkb-client\n"
+ "test.com. 600 IN MX 10 mail.test.com.\n"
+ )
+
+ with tempfile.NamedTemporaryFile() as f:
+ bind_file.to_file(f.name)
+ with open(f.name) as f2:
+ self.assertEqual(file_content.strip(), f2.read().strip())
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/client.py b/tests/client.py
new file mode 100644
index 0000000..9362be2
--- /dev/null
+++ b/tests/client.py
@@ -0,0 +1,1025 @@
+import json
+import tempfile
+import unittest
+from pathlib import Path
+from urllib.parse import urljoin
+
+import responses
+from responses import matchers
+from responses.registries import OrderedRegistry
+
+from pkb_client.client import (
+ PKBClient,
+ PKBClientException,
+ API_ENDPOINT,
+ DNSRestoreMode,
+)
+from pkb_client.client import SSLCertBundle
+from pkb_client.client.dns import DNSRecord, DNSRecordType
+from pkb_client.client.forwarding import URLForwarding, URLForwardingType
+
+
+class TestClientAuth(unittest.TestCase):
+ @responses.activate
+ def test_valid_auth(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "ping"),
+ json={"status": "SUCCESS", "yourIp": "127.0.0.1"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ ip_address = pkb_client.ping()
+ self.assertEqual("127.0.0.1", ip_address)
+
+ @responses.activate
+ def test_invalid_auth(self):
+ pkb_client = PKBClient("key" + "s", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "ping"),
+ json={"status": "ERROR", "message": "Invalid credentials"},
+ status=401,
+ )
+ with self.assertRaises(PKBClientException):
+ pkb_client.ping()
+
+ @responses.activate
+ def test_ping(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "ping"),
+ json={"status": "SUCCESS", "yourIp": "127.0.0.1"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ ip_address = pkb_client.ping()
+ self.assertEqual("127.0.0.1", ip_address)
+
+ @responses.activate(registry=OrderedRegistry)
+ def test_create_dns_record(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "123456"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": 3600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+ assert "123456" == pkb_client.create_dns_record(
+ "example.com", DNSRecordType.A, "127.0.0.1", "sub.example.com", 3600
+ )
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "234561"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "sub.example.com",
+ "type": "MX",
+ "content": "127.0.0.1",
+ "ttl": 3600,
+ "prio": 2,
+ }
+ )
+ ],
+ )
+ assert "234561" == pkb_client.create_dns_record(
+ "example.com", DNSRecordType.MX, "127.0.0.1", "sub.example.com", 3600, 2
+ )
+
+ def test_create_dns_record_invalid_prio_record_type(self):
+ pkb_client = PKBClient("key", "secret")
+ with self.assertRaises(ValueError):
+ pkb_client.create_dns_record(
+ "example.com", DNSRecordType.A, "127.0.0.1", "sub.example.com", 3600, 2
+ )
+
+ @responses.activate(registry=OrderedRegistry)
+ def test_update_dns_record(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/edit/example.com/123456"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "type": "A",
+ "content": "127.0.0.1",
+ "name": "sub.example.com",
+ "ttl": 3600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+
+ success = pkb_client.update_dns_record(
+ "example.com",
+ "123456",
+ DNSRecordType.A,
+ "127.0.0.1",
+ "sub.example.com",
+ 3600,
+ )
+ self.assertTrue(success)
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/edit/example.com/123456"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "type": "MX",
+ "content": "127.0.0.1",
+ "name": "sub.example.com",
+ "ttl": 3600,
+ "prio": 2,
+ }
+ )
+ ],
+ )
+
+ success = pkb_client.update_dns_record(
+ "example.com",
+ "123456",
+ DNSRecordType.MX,
+ "127.0.0.1",
+ "sub.example.com",
+ 3600,
+ 2,
+ )
+ self.assertTrue(success)
+
+ def test_update_dns_records_invalid_prio_record_type(self):
+ pkb_client = PKBClient("key", "secret")
+ with self.assertRaises(ValueError):
+ pkb_client.update_dns_record(
+ "example.com",
+ "123456",
+ DNSRecordType.A,
+ "127.0.0.1",
+ "sub.example.com",
+ 3600,
+ 2,
+ )
+
+ @responses.activate
+ def test_update_all_dns_records(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/editByNameType/example.com/A/sub"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": 1234,
+ "prio": None,
+ }
+ )
+ ],
+ )
+
+ success = pkb_client.update_all_dns_records(
+ "example.com", DNSRecordType.A, "sub", "127.0.0.1", 1234
+ )
+
+ self.assertTrue(success)
+
+ def test_update_all_dns_records_all_invalid_prio_record_type(self):
+ pkb_client = PKBClient("key", "secret")
+
+ with self.assertRaises(ValueError):
+ pkb_client.update_all_dns_records(
+ "example.com", DNSRecordType.A, "sub", "127.0.0.1", 1234, 2
+ )
+
+ @responses.activate
+ def test_delete_dns_record(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/delete/example.com/123456"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ success = pkb_client.delete_dns_record("example.com", "123456")
+
+ self.assertTrue(success)
+
+ @responses.activate
+ def test_delete_all_dns_records(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/deleteByNameType/example.com/A/sub"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ success = pkb_client.delete_all_dns_records(
+ "example.com", DNSRecordType.A, "sub"
+ )
+
+ self.assertTrue(success)
+
+ @responses.activate
+ def test_get_dns_records(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": "600",
+ "prio": None,
+ "notes": "",
+ },
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ },
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ records = pkb_client.get_dns_records("example.com")
+
+ expected_records = [
+ DNSRecord(
+ "123456", "example.com", DNSRecordType.A, "127.0.0.1", 600, None, ""
+ ),
+ DNSRecord(
+ "1234567",
+ "sub.example.com",
+ DNSRecordType.A,
+ "127.0.0.2",
+ 600,
+ None,
+ "",
+ ),
+ ]
+ self.assertEqual(expected_records, records)
+
+ @responses.activate
+ def test_get_all_dns_records(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieveByNameType/example.com/A/sub"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ }
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ records = pkb_client.get_all_dns_records("example.com", DNSRecordType.A, "sub")
+
+ expected_records = [
+ DNSRecord(
+ "1234567",
+ "sub.example.com",
+ DNSRecordType.A,
+ "127.0.0.2",
+ 600,
+ None,
+ "",
+ )
+ ]
+ self.assertEqual(expected_records, records)
+
+ @responses.activate
+ def test_update_dns_servers(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "domain/updateNs/example.com"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "ns": ["ns1.example.com", "ns2.example.com"],
+ }
+ )
+ ],
+ )
+
+ success = pkb_client.update_dns_servers(
+ "example.com", ["ns1.example.com", "ns2.example.com"]
+ )
+
+ self.assertTrue(success)
+
+ @responses.activate
+ def test_get_url_forwards(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "domain/getUrlForwarding/example.com"),
+ json={
+ "status": "SUCCESS",
+ "forwards": [
+ {
+ "id": "123456",
+ "subdomain": "",
+ "location": "https://example.com",
+ "type": "temporary",
+ "includePath": "no",
+ "wildcard": "yes",
+ },
+ {
+ "id": "234567",
+ "subdomain": "sub1",
+ "location": "https://sub1.example.com",
+ "type": "permanent",
+ "includePath": "no",
+ "wildcard": "yes",
+ },
+ ],
+ },
+ )
+
+ forwards = pkb_client.get_url_forwards("example.com")
+
+ expected_forwards = [
+ URLForwarding(
+ "123456",
+ "",
+ "https://example.com",
+ URLForwardingType.temporary,
+ False,
+ True,
+ ),
+ URLForwarding(
+ "234567",
+ "sub1",
+ "https://sub1.example.com",
+ URLForwardingType.permanent,
+ False,
+ True,
+ ),
+ ]
+
+ self.assertEqual(expected_forwards, forwards)
+
+ @responses.activate
+ def test_create_url_forward(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "domain/addUrlForward/example.com"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "subdomain": "sub.example.com",
+ "location": "https://www.example.com",
+ "type": "permanent",
+ "includePath": False,
+ "wildcard": False,
+ }
+ )
+ ],
+ )
+
+ success = pkb_client.create_url_forward(
+ "example.com",
+ "sub.example.com",
+ "https://www.example.com",
+ URLForwardingType.permanent,
+ False,
+ False,
+ )
+
+ self.assertTrue(success)
+
+ @responses.activate
+ def test_delete_url_forward(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "domain/deleteUrlForward/example.com/123456"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ success = pkb_client.delete_url_forward("example.com", "123456")
+
+ self.assertTrue(success)
+
+ @responses.activate
+ def test_get_domain_pricing(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "pricing/get"),
+ json={
+ "status": "SUCCESS",
+ "pricing": {
+ "com": {
+ "registration": "42.42",
+ "renewal": "4.2",
+ "transfer": "42.2",
+ "coupons": [],
+ },
+ "test": {
+ "registration": "4.42",
+ "renewal": "44.2",
+ "transfer": "4.2",
+ "coupons": [],
+ },
+ },
+ },
+ )
+
+ pricing = pkb_client.get_domain_pricing()
+
+ expected_pricing = {
+ "com": {
+ "registration": "42.42",
+ "renewal": "4.2",
+ "transfer": "42.2",
+ "coupons": [],
+ },
+ "test": {
+ "registration": "4.42",
+ "renewal": "44.2",
+ "transfer": "4.2",
+ "coupons": [],
+ },
+ }
+
+ self.assertEqual(expected_pricing, pricing)
+
+ @responses.activate
+ def test_get_ssl_bundle(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "ssl/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "certificatechain": "----BEGIN CERTIFICATE-----\nabc1-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc2-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc3-----END CERTIFICATE-----\n",
+ "privatekey": "-----BEGIN PRIVATE KEY-----\nabc4-----END PRIVATE KEY-----\n",
+ "publickey": "-----BEGIN PUBLIC KEY-----\nabc5-----END PUBLIC KEY-----\n",
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ ssl_cert_bundle = pkb_client.get_ssl_bundle("example.com")
+
+ expected_ssl_cert_bundle = SSLCertBundle(
+ certificate_chain="----BEGIN CERTIFICATE-----\nabc1-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc2-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc3-----END CERTIFICATE-----\n",
+ private_key="-----BEGIN PRIVATE KEY-----\nabc4-----END PRIVATE KEY-----\n",
+ public_key="-----BEGIN PUBLIC KEY-----\nabc5-----END PUBLIC KEY-----\n",
+ )
+
+ self.assertEqual(expected_ssl_cert_bundle, ssl_cert_bundle)
+
+ @responses.activate
+ def test_export_dns_records(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": "600",
+ "prio": None,
+ "notes": "",
+ },
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": "1200",
+ "prio": None,
+ "notes": "This is a comment",
+ },
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ with tempfile.NamedTemporaryFile() as f:
+ pkb_client.export_dns_records("example.com", f.name)
+
+ with open(f.name, "r") as f:
+ exported_dns_file = json.load(f)
+
+ expected_exported_dns_file = {
+ "123456": {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ },
+ "1234567": {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 1200,
+ "prio": None,
+ "notes": "This is a comment",
+ },
+ }
+
+ self.assertEqual(expected_exported_dns_file, exported_dns_file)
+
+ @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True)
+ def test_import_dns_records_clear(self):
+ pkb_client = PKBClient("key", "secret")
+
+ # first all records should be retrieved
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": "600",
+ "prio": None,
+ "notes": "",
+ },
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ },
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ # then all records should be deleted
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/delete/example.com/123456"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/delete/example.com/1234567"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ # then all records should be imported / created
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "123456"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "",
+ "type": "A",
+ "content": "127.0.0.3",
+ "ttl": 600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "1234567"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "sub",
+ "type": "A",
+ "content": "127.0.0.4",
+ "ttl": 600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ filename = Path(temp_dir, "records.json")
+ with open(filename, "w") as f:
+ json.dump(
+ {
+ "123456": {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.3",
+ "ttl": 600,
+ "prio": None,
+ },
+ "1234567": {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.4",
+ "ttl": 600,
+ "prio": None,
+ },
+ },
+ f,
+ )
+
+ pkb_client.import_dns_records(
+ "example.com", str(filename), DNSRestoreMode.clear
+ )
+
+ @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True)
+ def test_import_dns_records_replace(self):
+ pkb_client = PKBClient("key", "secret")
+
+ # first all records should be retrieved
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": "600",
+ "prio": None,
+ "notes": "",
+ },
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ },
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ # same record should be updated
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/edit/example.com/1234567"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "type": "A",
+ "content": "127.0.0.3",
+ "name": "sub",
+ "ttl": 600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ filename = Path(temp_dir, "records.json")
+ with open(filename, "w") as f:
+ json.dump(
+ {
+ "123451": {
+ "id": "123451",
+ "name": "test.example.com",
+ "type": "A",
+ "content": "127.0.0.4",
+ "ttl": 600,
+ "prio": None,
+ },
+ "1234562": {
+ "id": "1234562",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.3",
+ "ttl": 600,
+ "prio": None,
+ },
+ },
+ f,
+ )
+
+ pkb_client.import_dns_records(
+ "example.com", str(filename), DNSRestoreMode.replace
+ )
+
+ @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True)
+ def test_import_dns_records_keep(self):
+ pkb_client = PKBClient("key", "secret")
+
+ # first all records should be retrieved
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": "600",
+ "prio": None,
+ "notes": "",
+ },
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ },
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ # only new records should be created
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "1234562"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "test",
+ "type": "A",
+ "content": "127.0.0.4",
+ "ttl": 600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ filename = Path(temp_dir, "records.json")
+ with open(filename, "w") as f:
+ json.dump(
+ {
+ "123451": {
+ "id": "123451",
+ "name": "test.example.com",
+ "type": "A",
+ "content": "127.0.0.4",
+ "ttl": 600,
+ "prio": None,
+ },
+ "1234562": {
+ "id": "1234562",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.3",
+ "ttl": 600,
+ "prio": None,
+ },
+ },
+ f,
+ )
+
+ pkb_client.import_dns_records(
+ "example.com", str(filename), DNSRestoreMode.keep
+ )
+
+ @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True)
+ def test_import_bind_dns_records(self):
+ pkb_client = PKBClient("key", "secret")
+
+ # first all records should be retrieved
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": "600",
+ "prio": None,
+ "notes": "",
+ },
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ },
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ # then all records should be deleted
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/delete/example.com/123456"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/delete/example.com/1234567"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ # then all records should be imported / created
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "123456"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "",
+ "type": "A",
+ "content": "127.0.0.3",
+ "ttl": 600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "1234567"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "sub",
+ "type": "A",
+ "content": "127.0.0.4",
+ "ttl": 600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ filename = Path(temp_dir, "records.bind")
+ with open(filename, "w") as f:
+ f.write(
+ (
+ "$ORIGIN example.com.\n"
+ "$TTL 1234\n"
+ "@ IN SOA dns.example.com. dns2.example.com. (100 300 100 6000 600)\n"
+ "example.com. IN 600 A 127.0.0.3\n"
+ "sub.example.com. 600 IN A 127.0.0.4"
+ )
+ )
+
+ pkb_client.import_bind_dns_records(filename, DNSRestoreMode.clear)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/data/__init__.py b/tests/data/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/data/__init__.py
diff --git a/tests/data/test.bind b/tests/data/test.bind
new file mode 100644
index 0000000..b99dcf4
--- /dev/null
+++ b/tests/data/test.bind
@@ -0,0 +1,10 @@
+$ORIGIN test.com.
+$TTL 1234
+@ IN SOA dns.example.com. dns2.example.com. (100 300 100 6000 600)
+test.com. IN 600 A 1.2.3.4
+test.com. HS 600 A 1.2.3.4
+sub.test.com. 600 IN A 4.3.2.1
+@ IN 600 AAAA 2001:db8::1 ; This is a comment
+
+test.com. IN TXT pkb-client
+test.com. 600 IN MX 10 mail.test.com.
diff --git a/tests/data/test_no_ttl.bind b/tests/data/test_no_ttl.bind
new file mode 100644
index 0000000..7dac0ff
--- /dev/null
+++ b/tests/data/test_no_ttl.bind
@@ -0,0 +1,9 @@
+$ORIGIN test.com.
+@ IN SOA dns.example.com. dns2.example.com. (100 300 100 6000 600)
+test.com. IN 600 A 1.2.3.4
+test.com. HS 600 A 1.2.3.4
+sub.test.com. 600 IN A 4.3.2.1
+@ IN 700 AAAA 2001:db8::1 ; This is a comment
+
+test.com. IN TXT pkb-client
+test.com. 600 IN MX 10 mail.test.com.
diff --git a/tests/pkb_client_tests.py b/tests/pkb_client_tests.py
deleted file mode 100644
index 2668a3c..0000000
--- a/tests/pkb_client_tests.py
+++ /dev/null
@@ -1,569 +0,0 @@
-import json
-import os
-import unittest
-from pathlib import Path
-
-import requests
-
-from pkb_client.client import PKBClient, DNSRestoreMode
-
-"""
-WARNING: DO NOT RUN THIS TEST WITH A PRODUCTION DOMAIN OR IN A PRODUCTION ENVIRONMENT!!
- This test sets, edits and deletes dns record entries and if the test fails,
- unintended changes to dns entries may result.
-"""
-
-TEST_DOMAIN = os.environ.get("TEST_DOMAIN")
-PORKBUN_API_KEY = os.environ.get("PORKBUN_API_KEY")
-PORKBUN_API_SECRET = os.environ.get("PORKBUN_API_SECRET")
-DNS_RECORDS = os.environ.get("DNS_RECORDS")
-
-PUBLIC_IP_URL = "https://api64.ipify.org"
-
-
-class DNSTestWithCleanup(unittest.TestCase):
-
- def tearDown(self):
- if hasattr(self, "record_id") and self.record_id is not None:
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- pkb_client.dns_delete(TEST_DOMAIN, self.record_id)
-
-
-class TestClientAuth(unittest.TestCase):
- def test_valid_auth(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- ip_address = pkb_client.ping()
-
- self.assertEqual(ip_address, requests.get(PUBLIC_IP_URL).text)
-
- def test_invalid_api_key(self):
- pkb_client = PKBClient("invalid-api-key", PORKBUN_API_SECRET)
- with self.assertRaises(Exception):
- pkb_client.ping()
-
- def test_invalid_api_secret(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, "invalid-api-secret")
- with self.assertRaises(Exception):
- pkb_client.ping()
-
- def test_invalid_api_key_and_secret(self):
- pkb_client = PKBClient("invalid-api-key", "invalid-api-secret")
- with self.assertRaises(Exception):
- pkb_client.ping()
-
-
-class TestPingMethod(unittest.TestCase):
- def test_ping(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- ip_address = pkb_client.ping()
-
- self.assertEqual(ip_address, requests.get(PUBLIC_IP_URL).text)
-
-
-class TestDNSCreateMethod(DNSTestWithCleanup):
-
- def test_valid_request(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- ttl = 342
- name = "test_pkb_client"
-
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name, ttl=ttl)
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
-
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest():
- self.assertEqual(txt_content, record["content"])
- with self.subTest():
- self.assertEqual(ttl, int(record["ttl"]))
- with self.subTest():
- self.assertEqual("{}.{}".format(name, TEST_DOMAIN), record["name"])
- return
- self.assertTrue(False)
-
- def test_invalid_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- with self.assertRaises(Exception):
- self.record_id = pkb_client.dns_create("notvaliddomain", "TXT", "interesting-content",
- name="test_pkb_client")
-
- def test_invalid_record_type(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- with self.assertRaises(AssertionError):
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "ABC", "interesting-content", name="test_pkb_client")
-
- def test_larger_than_allowed_content_length(self):
- # the api call should not fail because the api creates multiple TXT entries which will be concatenated
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- txt_content = "interesting-content-interesting-content-interesting-content-interesting-content-" \
- "interesting-content-interesting-content-interesting-content-interesting-content-" \
- "interesting-content-interesting-content-interesting-content-interesting-content-" \
- "interesting-content"
- assert len(txt_content) == 259
-
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client")
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- self.assertEqual(txt_content, record["content"])
- return
- self.assertTrue(False)
-
- def test_largest_allowed_content_length(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- txt_content = "interesting-content-interesting-content-interesting-content-interesting-content-" \
- "interesting-content-interesting-content-interesting-content-interesting-content-" \
- "interesting-content-interesting-content-interesting-content-interesting-content-" \
- "interesting-con"
- assert len(txt_content) == 255
-
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client")
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- self.assertEqual(txt_content, record["content"])
- return
- self.assertTrue(False)
-
- def test_empty_content_str(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- txt_content = ""
- assert len(txt_content) == 0
-
- with self.assertRaises(AssertionError):
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client")
-
- def test_none_content(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", None, name="test_pkb_client")
-
- def test_smaller_than_allowed_ttl(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", ttl=299,
- name="test_pkb_client")
-
- def test_negative_ttl(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", ttl=-1,
- name="test_pkb_client")
-
- def test_larger_than_allowed_ttl(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", name="test_pkb_client",
- ttl=2147483648)
-
- def test_largest_allowed_ttl(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- ttl = 2147483647
-
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", ttl=ttl)
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest():
- self.assertEqual(txt_content, record["content"])
- with self.subTest():
- self.assertEqual(ttl, int(record["ttl"]))
- return
- self.assertTrue(False)
-
- def test_valid_prio_with_txt(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- prio = 10
-
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", prio=prio)
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest():
- self.assertEqual(txt_content, record["content"])
- with self.subTest():
- self.assertEqual(prio, int(record["prio"]))
- return
- self.assertTrue(False)
-
- def test_negative_prio_with_txt(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- prio = -42
-
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", prio=prio)
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest():
- self.assertEqual(txt_content, record["content"])
- with self.subTest():
- self.assertEqual(prio, int(record["prio"]))
- return
- self.assertTrue(False)
-
-
-class TestDNSEditMethod(DNSTestWithCleanup):
- def test_valid_edit_request(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- name = "test_pkb_client"
- tll = 342
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name, ttl=tll)
-
- edited_txt_content = "more-interesting-content"
- edited_name = "more_test_pkb_client"
- edited_tll = 423
- pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content, name=edited_name, ttl=edited_tll)
-
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest("txt record content is not edited"):
- self.assertEqual(edited_txt_content, record["content"])
- with self.subTest("txt record name is not edited"):
- self.assertEqual("{}.{}".format(edited_name, TEST_DOMAIN), record["name"])
- with self.subTest("txt record ttl is not edited"):
- self.assertEqual(edited_tll, int(record["ttl"]))
- return
- self.assertTrue(False)
-
- def test_change_subdomain_to_root_txt_record(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- name = "test_pkb_client"
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name)
-
- edited_txt_content = "more-interesting-content"
- edited_name = ""
- pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content, name=edited_name)
-
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest("txt record content is not edited"):
- self.assertEqual(edited_txt_content, record["content"])
- with self.subTest("txt record name is not edited"):
- self.assertEqual(TEST_DOMAIN, record["name"])
- return
- self.assertTrue(False)
-
- def test_no_name_change(self):
- # the name is required for each edit, otherwise the record will apply for the root domain
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- name = "test_pkb_client"
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name)
-
- edited_txt_content = "more-interesting-content"
- pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content)
-
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest("txt record content is not edited"):
- self.assertEqual(edited_txt_content, record["content"])
- with self.subTest("txt record name is not edited"):
- self.assertEqual(TEST_DOMAIN, record["name"])
- return
- self.assertTrue(False)
-
- def test_record_type_change(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- name = "test_pkb_client"
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name)
-
- edited_txt_content = "more-interesting-content"
- name = "test_pkb_client"
- edited_record_type = "MX"
- pkb_client.dns_edit(TEST_DOMAIN, self.record_id, edited_record_type, edited_txt_content, name=name)
-
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest("txt record content is not edited"):
- self.assertEqual(edited_txt_content, record["content"])
- with self.subTest("record type is not edited"):
- self.assertEqual(edited_record_type, record["type"])
- return
- self.assertTrue(False)
-
-
-class TestDNSDeleteMethod(DNSTestWithCleanup):
- def test_valid_delete_request(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- name = "test_pkb_client"
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name)
-
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- record_exists = False
- for record in records:
- if record["id"] == self.record_id:
- record_exists = True
- break
- with self.subTest("test txt record setup failed"):
- self.assertTrue(record_exists)
-
- pkb_client.dns_delete(TEST_DOMAIN, self.record_id)
-
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- record_exists = False
- for record in records:
- if record["id"] == self.record_id:
- record_exists = True
- break
- if not record_exists:
- self.record_id = None
- with self.subTest("txt record is not deleted"):
- self.assertFalse(record_exists)
-
-
-class TestDNSReceiveMethod(unittest.TestCase):
- def test_valid_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- self.assertEqual(records, DNS_RECORDS)
-
- def test_invalid_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- with self.assertRaises(Exception):
- pkb_client.dns_retrieve("invaliddomain")
-
-
-class TestDNSExport(unittest.TestCase):
- def test_valid_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- dns_records = pkb_client.dns_retrieve(domain=TEST_DOMAIN)
- # reformat the dns records to a single dict
- dns_records_dict = dict()
- for record in dns_records:
- dns_records_dict[record["id"]] = record
-
- filepath = Path("dns_backup.json")
- if filepath.exists():
- filepath.unlink()
- pkb_client.dns_export(domain=TEST_DOMAIN, filename=str(filepath))
-
- with open(str("dns_backup.json"), "r") as f:
- self.assertEqual(json.load(f), dns_records_dict)
-
- filepath.unlink()
-
- def test_invalid_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(Exception):
- pkb_client.dns_export(domain="invaliddomain", filename="dns_backup.json")
-
- def test_empty_str_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_export(domain="", filename="dns_backup.json")
-
- def test_none_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_export(domain=None, filename="dns_backup.json")
-
- def test_filename_already_exists(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- filepath = Path("dns_backup.json")
- filepath.touch()
-
- with self.assertRaises(Exception):
- pkb_client.dns_export(domain=TEST_DOMAIN, filename=str(filepath))
-
- filepath.unlink()
-
- def test_empty_str_filename(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_export(domain=TEST_DOMAIN, filename="")
-
- def test_none_filename(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_export(domain=TEST_DOMAIN, filename=None)
-
-
-class TestDNSImport(unittest.TestCase):
- def test_valid_clear_import(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- existing_dns_record_ids = set()
- dns_records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in dns_records:
- existing_dns_record_ids.add(record["id"])
-
- filename = "dns_backup_clear.json"
-
- with open(filename, "r") as f:
- file_dns_records = json.load(f)
-
- pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.clear)
-
- new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for new_record in new_dns_records:
- # test if the previous dns records still exists
- if new_record["id"] in existing_dns_record_ids:
- self.assertTrue(False)
- # test if the new dns record was created
- new_record_created = False
- for _, file_dns_record in file_dns_records.items():
- if file_dns_record["name"] == new_record["name"] \
- and file_dns_record["type"] == new_record["type"] \
- and file_dns_record["content"] == new_record["content"] \
- and file_dns_record["ttl"] == new_record["ttl"] \
- and file_dns_record["prio"] == new_record["prio"]:
- new_record_created = True
- self.assertTrue(new_record_created)
-
- def test_valid_replace_import(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- existing_dns_record_ids = set()
- dns_records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in dns_records:
- existing_dns_record_ids.add(record["id"])
-
- filename = "dns_backup_replace.json"
-
- with open(filename, "r") as f:
- file_dns_records = json.load(f)
-
- pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.replace)
-
- new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for new_record in new_dns_records:
- # test if the previous dns records still exists
- if new_record["id"] not in existing_dns_record_ids:
- self.assertTrue(False)
- # test if the dns record was edited
- record_edited = False
- for _, file_dns_record in file_dns_records.items():
- if file_dns_record["name"] == new_record["name"] \
- and file_dns_record["type"] == new_record["type"] \
- and file_dns_record["content"] == new_record["content"] \
- and file_dns_record["ttl"] == new_record["ttl"] \
- and file_dns_record["prio"] == new_record["prio"]:
- record_edited = True
- break
- self.assertTrue(record_edited)
-
- def test_valid_keep_import(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- existing_dns_records = dict()
- dns_records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in dns_records:
- existing_dns_records[record["id"]] = record
-
- filename = "dns_backup_keep.json"
-
- with open(filename, "r") as f:
- file_dns_records = json.load(f)
-
- pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.keep)
-
- new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN)
-
- # test if the all old dns records are kept
- for _, existing_record in existing_dns_records.items():
- record_kept = False
- for new_record in new_dns_records:
- if existing_record["id"] == new_record["id"] \
- and existing_record["name"] == new_record["name"] \
- and existing_record["type"] == new_record["type"] \
- and existing_record["content"] == new_record["content"] \
- and existing_record["ttl"] == new_record["ttl"] \
- and existing_record["prio"] == new_record["prio"]:
- record_kept = True
- break
- with self.subTest():
- self.assertTrue(record_kept)
-
- # test if the new records are created
- for new_record in new_dns_records:
- if new_record["id"] not in existing_dns_records:
- record_created = False
- for _, file_dns_record in file_dns_records.items():
- if file_dns_record["name"] == new_record["name"] \
- and file_dns_record["type"] == new_record["type"] \
- and file_dns_record["content"] == new_record["content"] \
- and file_dns_record["ttl"] == new_record["ttl"] \
- and file_dns_record["prio"] == new_record["prio"]:
- record_created = True
- break
- with self.subTest():
- self.assertTrue(record_created)
-
- def test_invalid_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(Exception):
- pkb_client.dns_import(domain="invaliddomain", filename="dns_backup.json", restore_mode=DNSRestoreMode.clear)
-
- def test_empty_str_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain="", filename="dns_backup.json", restore_mode=DNSRestoreMode.clear)
-
- def test_none_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain=None, filename="dns_backup.json", restore_mode=DNSRestoreMode.clear)
-
- def test_empty_str_filename(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain=TEST_DOMAIN, filename="", restore_mode=DNSRestoreMode.clear)
-
- def test_none_filename(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain=TEST_DOMAIN, filename=None, restore_mode=DNSRestoreMode.clear)
-
- def test_invalid_restore_mode(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.subTest("None as restore mode"):
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode=None)
- with self.subTest("empty string as restore mode"):
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode="")
- with self.subTest("number as restore mode"):
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode=0)
-
-
-if __name__ == '__main__':
- unittest.main()