diff options
| author | 2024-11-20 01:17:41 -0500 | |
|---|---|---|
| committer | 2024-11-20 01:17:41 -0500 | |
| commit | eab0e07b2d9931aa35f243fd24c42c02a4ec8533 (patch) | |
| tree | 767a0da67d0a1cd8ce30db0ad0c800e17380b4cf /tests | |
| parent | 6094eb734bfd2159d81ffea918ea5d31e1e61441 (diff) | |
| parent | 3e3ebe586385a83b10c8f1d0b9ba9b67c8b56d2f (diff) | |
Update upstream source from tag 'upstream/2.0.0'
Update to upstream version '2.0.0'
with Debian dir a538d4b69c2b9e7ec310387046aa40f0e2499b5f
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/bind_file.py | 148 | ||||
| -rw-r--r-- | tests/client.py | 1025 | ||||
| -rw-r--r-- | tests/data/__init__.py | 0 | ||||
| -rw-r--r-- | tests/data/test.bind | 10 | ||||
| -rw-r--r-- | tests/data/test_no_ttl.bind | 9 | ||||
| -rw-r--r-- | tests/pkb_client_tests.py | 569 |
6 files changed, 1192 insertions, 569 deletions
diff --git a/tests/bind_file.py b/tests/bind_file.py new file mode 100644 index 0000000..4d1114f --- /dev/null +++ b/tests/bind_file.py @@ -0,0 +1,148 @@ +import tempfile +import unittest +from importlib import resources + +from pkb_client.client.bind_file import BindFile, BindRecord, RecordClass +from pkb_client.client.dns import DNSRecordType +from tests import data + + +class TestBindFileParsing(unittest.TestCase): + def test_reading_bind_file(self): + with self.subTest("With default TTL"): + with resources.open_text(data, "test.bind") as f: + bind_file = BindFile.from_file(f.name) + + self.assertEqual("test.com.", bind_file.origin) + self.assertEqual(1234, bind_file.ttl) + self.assertEqual(5, len(bind_file.records)) + self.assertEqual( + BindRecord( + "test.com.", 600, RecordClass.IN, DNSRecordType.A, "1.2.3.4" + ), + bind_file.records[0], + ) + self.assertEqual( + BindRecord( + "sub.test.com.", 600, RecordClass.IN, DNSRecordType.A, "4.3.2.1" + ), + bind_file.records[1], + ) + self.assertEqual( + BindRecord( + "test.com.", + 600, + RecordClass.IN, + DNSRecordType.AAAA, + "2001:db8::1", + comment="This is a comment", + ), + bind_file.records[2], + ) + self.assertEqual( + BindRecord( + "test.com.", 1234, RecordClass.IN, DNSRecordType.TXT, "pkb-client" + ), + bind_file.records[3], + ) + self.assertEqual( + BindRecord( + "test.com.", + 600, + RecordClass.IN, + DNSRecordType.MX, + "mail.test.com.", + prio=10, + ), + bind_file.records[4], + ) + + with self.subTest("Without default TTL"): + with resources.open_text(data, "test_no_ttl.bind") as f: + bind_file = BindFile.from_file(f.name) + + self.assertEqual("test.com.", bind_file.origin) + self.assertEqual(None, bind_file.ttl) + self.assertEqual(5, len(bind_file.records)) + self.assertEqual( + BindRecord( + "test.com.", 600, RecordClass.IN, DNSRecordType.A, "1.2.3.4" + ), + bind_file.records[0], + ) + self.assertEqual( + BindRecord( + "sub.test.com.", 600, RecordClass.IN, DNSRecordType.A, "4.3.2.1" + ), + bind_file.records[1], + ) + self.assertEqual( + BindRecord( + "test.com.", + 700, + RecordClass.IN, + DNSRecordType.AAAA, + "2001:db8::1", + comment="This is a comment", + ), + bind_file.records[2], + ) + self.assertEqual( + BindRecord( + "test.com.", 700, RecordClass.IN, DNSRecordType.TXT, "pkb-client" + ), + bind_file.records[3], + ) + self.assertEqual( + BindRecord( + "test.com.", + 600, + RecordClass.IN, + DNSRecordType.MX, + "mail.test.com.", + prio=10, + ), + bind_file.records[4], + ) + + def test_writing_bind_file(self): + records = [ + BindRecord("test.com.", 600, RecordClass.IN, DNSRecordType.A, "1.2.3.4"), + BindRecord( + "sub.test.com.", 700, RecordClass.IN, DNSRecordType.A, "4.3.2.1" + ), + BindRecord( + "test.com.", 600, RecordClass.IN, DNSRecordType.AAAA, "2001:db8::1" + ), + BindRecord( + "test.com.", 600, RecordClass.IN, DNSRecordType.TXT, "pkb-client" + ), + BindRecord( + "test.com.", + 600, + RecordClass.IN, + DNSRecordType.MX, + "mail.test.com.", + prio=10, + ), + ] + bind_file = BindFile("test.com.", 1234, records) + + file_content = ( + "$ORIGIN test.com.\n" + "$TTL 1234\n" + "test.com. 600 IN A 1.2.3.4\n" + "sub.test.com. 700 IN A 4.3.2.1\n" + "test.com. 600 IN AAAA 2001:db8::1\n" + "test.com. 600 IN TXT pkb-client\n" + "test.com. 600 IN MX 10 mail.test.com.\n" + ) + + with tempfile.NamedTemporaryFile() as f: + bind_file.to_file(f.name) + with open(f.name) as f2: + self.assertEqual(file_content.strip(), f2.read().strip()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/client.py b/tests/client.py new file mode 100644 index 0000000..9362be2 --- /dev/null +++ b/tests/client.py @@ -0,0 +1,1025 @@ +import json +import tempfile +import unittest +from pathlib import Path +from urllib.parse import urljoin + +import responses +from responses import matchers +from responses.registries import OrderedRegistry + +from pkb_client.client import ( + PKBClient, + PKBClientException, + API_ENDPOINT, + DNSRestoreMode, +) +from pkb_client.client import SSLCertBundle +from pkb_client.client.dns import DNSRecord, DNSRecordType +from pkb_client.client.forwarding import URLForwarding, URLForwardingType + + +class TestClientAuth(unittest.TestCase): + @responses.activate + def test_valid_auth(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "ping"), + json={"status": "SUCCESS", "yourIp": "127.0.0.1"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + ip_address = pkb_client.ping() + self.assertEqual("127.0.0.1", ip_address) + + @responses.activate + def test_invalid_auth(self): + pkb_client = PKBClient("key" + "s", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "ping"), + json={"status": "ERROR", "message": "Invalid credentials"}, + status=401, + ) + with self.assertRaises(PKBClientException): + pkb_client.ping() + + @responses.activate + def test_ping(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "ping"), + json={"status": "SUCCESS", "yourIp": "127.0.0.1"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + ip_address = pkb_client.ping() + self.assertEqual("127.0.0.1", ip_address) + + @responses.activate(registry=OrderedRegistry) + def test_create_dns_record(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "123456"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": 3600, + "prio": None, + } + ) + ], + ) + assert "123456" == pkb_client.create_dns_record( + "example.com", DNSRecordType.A, "127.0.0.1", "sub.example.com", 3600 + ) + + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "234561"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "sub.example.com", + "type": "MX", + "content": "127.0.0.1", + "ttl": 3600, + "prio": 2, + } + ) + ], + ) + assert "234561" == pkb_client.create_dns_record( + "example.com", DNSRecordType.MX, "127.0.0.1", "sub.example.com", 3600, 2 + ) + + def test_create_dns_record_invalid_prio_record_type(self): + pkb_client = PKBClient("key", "secret") + with self.assertRaises(ValueError): + pkb_client.create_dns_record( + "example.com", DNSRecordType.A, "127.0.0.1", "sub.example.com", 3600, 2 + ) + + @responses.activate(registry=OrderedRegistry) + def test_update_dns_record(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/edit/example.com/123456"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "type": "A", + "content": "127.0.0.1", + "name": "sub.example.com", + "ttl": 3600, + "prio": None, + } + ) + ], + ) + + success = pkb_client.update_dns_record( + "example.com", + "123456", + DNSRecordType.A, + "127.0.0.1", + "sub.example.com", + 3600, + ) + self.assertTrue(success) + + responses.post( + url=urljoin(API_ENDPOINT, "dns/edit/example.com/123456"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "type": "MX", + "content": "127.0.0.1", + "name": "sub.example.com", + "ttl": 3600, + "prio": 2, + } + ) + ], + ) + + success = pkb_client.update_dns_record( + "example.com", + "123456", + DNSRecordType.MX, + "127.0.0.1", + "sub.example.com", + 3600, + 2, + ) + self.assertTrue(success) + + def test_update_dns_records_invalid_prio_record_type(self): + pkb_client = PKBClient("key", "secret") + with self.assertRaises(ValueError): + pkb_client.update_dns_record( + "example.com", + "123456", + DNSRecordType.A, + "127.0.0.1", + "sub.example.com", + 3600, + 2, + ) + + @responses.activate + def test_update_all_dns_records(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/editByNameType/example.com/A/sub"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "type": "A", + "content": "127.0.0.1", + "ttl": 1234, + "prio": None, + } + ) + ], + ) + + success = pkb_client.update_all_dns_records( + "example.com", DNSRecordType.A, "sub", "127.0.0.1", 1234 + ) + + self.assertTrue(success) + + def test_update_all_dns_records_all_invalid_prio_record_type(self): + pkb_client = PKBClient("key", "secret") + + with self.assertRaises(ValueError): + pkb_client.update_all_dns_records( + "example.com", DNSRecordType.A, "sub", "127.0.0.1", 1234, 2 + ) + + @responses.activate + def test_delete_dns_record(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/delete/example.com/123456"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + success = pkb_client.delete_dns_record("example.com", "123456") + + self.assertTrue(success) + + @responses.activate + def test_delete_all_dns_records(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/deleteByNameType/example.com/A/sub"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + success = pkb_client.delete_all_dns_records( + "example.com", DNSRecordType.A, "sub" + ) + + self.assertTrue(success) + + @responses.activate + def test_get_dns_records(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": "600", + "prio": None, + "notes": "", + }, + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 600, + "prio": None, + "notes": "", + }, + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + records = pkb_client.get_dns_records("example.com") + + expected_records = [ + DNSRecord( + "123456", "example.com", DNSRecordType.A, "127.0.0.1", 600, None, "" + ), + DNSRecord( + "1234567", + "sub.example.com", + DNSRecordType.A, + "127.0.0.2", + 600, + None, + "", + ), + ] + self.assertEqual(expected_records, records) + + @responses.activate + def test_get_all_dns_records(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieveByNameType/example.com/A/sub"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 600, + "prio": None, + "notes": "", + } + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + records = pkb_client.get_all_dns_records("example.com", DNSRecordType.A, "sub") + + expected_records = [ + DNSRecord( + "1234567", + "sub.example.com", + DNSRecordType.A, + "127.0.0.2", + 600, + None, + "", + ) + ] + self.assertEqual(expected_records, records) + + @responses.activate + def test_update_dns_servers(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "domain/updateNs/example.com"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "ns": ["ns1.example.com", "ns2.example.com"], + } + ) + ], + ) + + success = pkb_client.update_dns_servers( + "example.com", ["ns1.example.com", "ns2.example.com"] + ) + + self.assertTrue(success) + + @responses.activate + def test_get_url_forwards(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "domain/getUrlForwarding/example.com"), + json={ + "status": "SUCCESS", + "forwards": [ + { + "id": "123456", + "subdomain": "", + "location": "https://example.com", + "type": "temporary", + "includePath": "no", + "wildcard": "yes", + }, + { + "id": "234567", + "subdomain": "sub1", + "location": "https://sub1.example.com", + "type": "permanent", + "includePath": "no", + "wildcard": "yes", + }, + ], + }, + ) + + forwards = pkb_client.get_url_forwards("example.com") + + expected_forwards = [ + URLForwarding( + "123456", + "", + "https://example.com", + URLForwardingType.temporary, + False, + True, + ), + URLForwarding( + "234567", + "sub1", + "https://sub1.example.com", + URLForwardingType.permanent, + False, + True, + ), + ] + + self.assertEqual(expected_forwards, forwards) + + @responses.activate + def test_create_url_forward(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "domain/addUrlForward/example.com"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "subdomain": "sub.example.com", + "location": "https://www.example.com", + "type": "permanent", + "includePath": False, + "wildcard": False, + } + ) + ], + ) + + success = pkb_client.create_url_forward( + "example.com", + "sub.example.com", + "https://www.example.com", + URLForwardingType.permanent, + False, + False, + ) + + self.assertTrue(success) + + @responses.activate + def test_delete_url_forward(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "domain/deleteUrlForward/example.com/123456"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + success = pkb_client.delete_url_forward("example.com", "123456") + + self.assertTrue(success) + + @responses.activate + def test_get_domain_pricing(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "pricing/get"), + json={ + "status": "SUCCESS", + "pricing": { + "com": { + "registration": "42.42", + "renewal": "4.2", + "transfer": "42.2", + "coupons": [], + }, + "test": { + "registration": "4.42", + "renewal": "44.2", + "transfer": "4.2", + "coupons": [], + }, + }, + }, + ) + + pricing = pkb_client.get_domain_pricing() + + expected_pricing = { + "com": { + "registration": "42.42", + "renewal": "4.2", + "transfer": "42.2", + "coupons": [], + }, + "test": { + "registration": "4.42", + "renewal": "44.2", + "transfer": "4.2", + "coupons": [], + }, + } + + self.assertEqual(expected_pricing, pricing) + + @responses.activate + def test_get_ssl_bundle(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "ssl/retrieve/example.com"), + json={ + "status": "SUCCESS", + "certificatechain": "----BEGIN CERTIFICATE-----\nabc1-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc2-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc3-----END CERTIFICATE-----\n", + "privatekey": "-----BEGIN PRIVATE KEY-----\nabc4-----END PRIVATE KEY-----\n", + "publickey": "-----BEGIN PUBLIC KEY-----\nabc5-----END PUBLIC KEY-----\n", + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + ssl_cert_bundle = pkb_client.get_ssl_bundle("example.com") + + expected_ssl_cert_bundle = SSLCertBundle( + certificate_chain="----BEGIN CERTIFICATE-----\nabc1-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc2-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc3-----END CERTIFICATE-----\n", + private_key="-----BEGIN PRIVATE KEY-----\nabc4-----END PRIVATE KEY-----\n", + public_key="-----BEGIN PUBLIC KEY-----\nabc5-----END PUBLIC KEY-----\n", + ) + + self.assertEqual(expected_ssl_cert_bundle, ssl_cert_bundle) + + @responses.activate + def test_export_dns_records(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": "600", + "prio": None, + "notes": "", + }, + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": "1200", + "prio": None, + "notes": "This is a comment", + }, + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + with tempfile.NamedTemporaryFile() as f: + pkb_client.export_dns_records("example.com", f.name) + + with open(f.name, "r") as f: + exported_dns_file = json.load(f) + + expected_exported_dns_file = { + "123456": { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": 600, + "prio": None, + "notes": "", + }, + "1234567": { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 1200, + "prio": None, + "notes": "This is a comment", + }, + } + + self.assertEqual(expected_exported_dns_file, exported_dns_file) + + @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True) + def test_import_dns_records_clear(self): + pkb_client = PKBClient("key", "secret") + + # first all records should be retrieved + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": "600", + "prio": None, + "notes": "", + }, + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 600, + "prio": None, + "notes": "", + }, + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + # then all records should be deleted + responses.post( + url=urljoin(API_ENDPOINT, "dns/delete/example.com/123456"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + responses.post( + url=urljoin(API_ENDPOINT, "dns/delete/example.com/1234567"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + # then all records should be imported / created + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "123456"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "", + "type": "A", + "content": "127.0.0.3", + "ttl": 600, + "prio": None, + } + ) + ], + ) + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "1234567"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "sub", + "type": "A", + "content": "127.0.0.4", + "ttl": 600, + "prio": None, + } + ) + ], + ) + + with tempfile.TemporaryDirectory() as temp_dir: + filename = Path(temp_dir, "records.json") + with open(filename, "w") as f: + json.dump( + { + "123456": { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.3", + "ttl": 600, + "prio": None, + }, + "1234567": { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.4", + "ttl": 600, + "prio": None, + }, + }, + f, + ) + + pkb_client.import_dns_records( + "example.com", str(filename), DNSRestoreMode.clear + ) + + @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True) + def test_import_dns_records_replace(self): + pkb_client = PKBClient("key", "secret") + + # first all records should be retrieved + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": "600", + "prio": None, + "notes": "", + }, + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 600, + "prio": None, + "notes": "", + }, + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + # same record should be updated + responses.post( + url=urljoin(API_ENDPOINT, "dns/edit/example.com/1234567"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "type": "A", + "content": "127.0.0.3", + "name": "sub", + "ttl": 600, + "prio": None, + } + ) + ], + ) + + with tempfile.TemporaryDirectory() as temp_dir: + filename = Path(temp_dir, "records.json") + with open(filename, "w") as f: + json.dump( + { + "123451": { + "id": "123451", + "name": "test.example.com", + "type": "A", + "content": "127.0.0.4", + "ttl": 600, + "prio": None, + }, + "1234562": { + "id": "1234562", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.3", + "ttl": 600, + "prio": None, + }, + }, + f, + ) + + pkb_client.import_dns_records( + "example.com", str(filename), DNSRestoreMode.replace + ) + + @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True) + def test_import_dns_records_keep(self): + pkb_client = PKBClient("key", "secret") + + # first all records should be retrieved + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": "600", + "prio": None, + "notes": "", + }, + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 600, + "prio": None, + "notes": "", + }, + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + # only new records should be created + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "1234562"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "test", + "type": "A", + "content": "127.0.0.4", + "ttl": 600, + "prio": None, + } + ) + ], + ) + + with tempfile.TemporaryDirectory() as temp_dir: + filename = Path(temp_dir, "records.json") + with open(filename, "w") as f: + json.dump( + { + "123451": { + "id": "123451", + "name": "test.example.com", + "type": "A", + "content": "127.0.0.4", + "ttl": 600, + "prio": None, + }, + "1234562": { + "id": "1234562", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.3", + "ttl": 600, + "prio": None, + }, + }, + f, + ) + + pkb_client.import_dns_records( + "example.com", str(filename), DNSRestoreMode.keep + ) + + @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True) + def test_import_bind_dns_records(self): + pkb_client = PKBClient("key", "secret") + + # first all records should be retrieved + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": "600", + "prio": None, + "notes": "", + }, + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 600, + "prio": None, + "notes": "", + }, + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + # then all records should be deleted + responses.post( + url=urljoin(API_ENDPOINT, "dns/delete/example.com/123456"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + responses.post( + url=urljoin(API_ENDPOINT, "dns/delete/example.com/1234567"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + # then all records should be imported / created + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "123456"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "", + "type": "A", + "content": "127.0.0.3", + "ttl": 600, + "prio": None, + } + ) + ], + ) + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "1234567"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "sub", + "type": "A", + "content": "127.0.0.4", + "ttl": 600, + "prio": None, + } + ) + ], + ) + + with tempfile.TemporaryDirectory() as temp_dir: + filename = Path(temp_dir, "records.bind") + with open(filename, "w") as f: + f.write( + ( + "$ORIGIN example.com.\n" + "$TTL 1234\n" + "@ IN SOA dns.example.com. dns2.example.com. (100 300 100 6000 600)\n" + "example.com. IN 600 A 127.0.0.3\n" + "sub.example.com. 600 IN A 127.0.0.4" + ) + ) + + pkb_client.import_bind_dns_records(filename, DNSRestoreMode.clear) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/data/__init__.py b/tests/data/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/data/__init__.py diff --git a/tests/data/test.bind b/tests/data/test.bind new file mode 100644 index 0000000..b99dcf4 --- /dev/null +++ b/tests/data/test.bind @@ -0,0 +1,10 @@ +$ORIGIN test.com. +$TTL 1234 +@ IN SOA dns.example.com. dns2.example.com. (100 300 100 6000 600) +test.com. IN 600 A 1.2.3.4 +test.com. HS 600 A 1.2.3.4 +sub.test.com. 600 IN A 4.3.2.1 +@ IN 600 AAAA 2001:db8::1 ; This is a comment + +test.com. IN TXT pkb-client +test.com. 600 IN MX 10 mail.test.com. diff --git a/tests/data/test_no_ttl.bind b/tests/data/test_no_ttl.bind new file mode 100644 index 0000000..7dac0ff --- /dev/null +++ b/tests/data/test_no_ttl.bind @@ -0,0 +1,9 @@ +$ORIGIN test.com. +@ IN SOA dns.example.com. dns2.example.com. (100 300 100 6000 600) +test.com. IN 600 A 1.2.3.4 +test.com. HS 600 A 1.2.3.4 +sub.test.com. 600 IN A 4.3.2.1 +@ IN 700 AAAA 2001:db8::1 ; This is a comment + +test.com. IN TXT pkb-client +test.com. 600 IN MX 10 mail.test.com. diff --git a/tests/pkb_client_tests.py b/tests/pkb_client_tests.py deleted file mode 100644 index 2668a3c..0000000 --- a/tests/pkb_client_tests.py +++ /dev/null @@ -1,569 +0,0 @@ -import json -import os -import unittest -from pathlib import Path - -import requests - -from pkb_client.client import PKBClient, DNSRestoreMode - -""" -WARNING: DO NOT RUN THIS TEST WITH A PRODUCTION DOMAIN OR IN A PRODUCTION ENVIRONMENT!! - This test sets, edits and deletes dns record entries and if the test fails, - unintended changes to dns entries may result. -""" - -TEST_DOMAIN = os.environ.get("TEST_DOMAIN") -PORKBUN_API_KEY = os.environ.get("PORKBUN_API_KEY") -PORKBUN_API_SECRET = os.environ.get("PORKBUN_API_SECRET") -DNS_RECORDS = os.environ.get("DNS_RECORDS") - -PUBLIC_IP_URL = "https://api64.ipify.org" - - -class DNSTestWithCleanup(unittest.TestCase): - - def tearDown(self): - if hasattr(self, "record_id") and self.record_id is not None: - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - pkb_client.dns_delete(TEST_DOMAIN, self.record_id) - - -class TestClientAuth(unittest.TestCase): - def test_valid_auth(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - ip_address = pkb_client.ping() - - self.assertEqual(ip_address, requests.get(PUBLIC_IP_URL).text) - - def test_invalid_api_key(self): - pkb_client = PKBClient("invalid-api-key", PORKBUN_API_SECRET) - with self.assertRaises(Exception): - pkb_client.ping() - - def test_invalid_api_secret(self): - pkb_client = PKBClient(PORKBUN_API_KEY, "invalid-api-secret") - with self.assertRaises(Exception): - pkb_client.ping() - - def test_invalid_api_key_and_secret(self): - pkb_client = PKBClient("invalid-api-key", "invalid-api-secret") - with self.assertRaises(Exception): - pkb_client.ping() - - -class TestPingMethod(unittest.TestCase): - def test_ping(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - ip_address = pkb_client.ping() - - self.assertEqual(ip_address, requests.get(PUBLIC_IP_URL).text) - - -class TestDNSCreateMethod(DNSTestWithCleanup): - - def test_valid_request(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - ttl = 342 - name = "test_pkb_client" - - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name, ttl=ttl) - records = pkb_client.dns_retrieve(TEST_DOMAIN) - - for record in records: - if record["id"] == self.record_id: - with self.subTest(): - self.assertEqual(txt_content, record["content"]) - with self.subTest(): - self.assertEqual(ttl, int(record["ttl"])) - with self.subTest(): - self.assertEqual("{}.{}".format(name, TEST_DOMAIN), record["name"]) - return - self.assertTrue(False) - - def test_invalid_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - with self.assertRaises(Exception): - self.record_id = pkb_client.dns_create("notvaliddomain", "TXT", "interesting-content", - name="test_pkb_client") - - def test_invalid_record_type(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - with self.assertRaises(AssertionError): - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "ABC", "interesting-content", name="test_pkb_client") - - def test_larger_than_allowed_content_length(self): - # the api call should not fail because the api creates multiple TXT entries which will be concatenated - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - txt_content = "interesting-content-interesting-content-interesting-content-interesting-content-" \ - "interesting-content-interesting-content-interesting-content-interesting-content-" \ - "interesting-content-interesting-content-interesting-content-interesting-content-" \ - "interesting-content" - assert len(txt_content) == 259 - - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client") - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - self.assertEqual(txt_content, record["content"]) - return - self.assertTrue(False) - - def test_largest_allowed_content_length(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - txt_content = "interesting-content-interesting-content-interesting-content-interesting-content-" \ - "interesting-content-interesting-content-interesting-content-interesting-content-" \ - "interesting-content-interesting-content-interesting-content-interesting-content-" \ - "interesting-con" - assert len(txt_content) == 255 - - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client") - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - self.assertEqual(txt_content, record["content"]) - return - self.assertTrue(False) - - def test_empty_content_str(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - txt_content = "" - assert len(txt_content) == 0 - - with self.assertRaises(AssertionError): - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client") - - def test_none_content(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", None, name="test_pkb_client") - - def test_smaller_than_allowed_ttl(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", ttl=299, - name="test_pkb_client") - - def test_negative_ttl(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", ttl=-1, - name="test_pkb_client") - - def test_larger_than_allowed_ttl(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", name="test_pkb_client", - ttl=2147483648) - - def test_largest_allowed_ttl(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - ttl = 2147483647 - - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", ttl=ttl) - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest(): - self.assertEqual(txt_content, record["content"]) - with self.subTest(): - self.assertEqual(ttl, int(record["ttl"])) - return - self.assertTrue(False) - - def test_valid_prio_with_txt(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - prio = 10 - - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", prio=prio) - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest(): - self.assertEqual(txt_content, record["content"]) - with self.subTest(): - self.assertEqual(prio, int(record["prio"])) - return - self.assertTrue(False) - - def test_negative_prio_with_txt(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - prio = -42 - - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", prio=prio) - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest(): - self.assertEqual(txt_content, record["content"]) - with self.subTest(): - self.assertEqual(prio, int(record["prio"])) - return - self.assertTrue(False) - - -class TestDNSEditMethod(DNSTestWithCleanup): - def test_valid_edit_request(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - name = "test_pkb_client" - tll = 342 - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name, ttl=tll) - - edited_txt_content = "more-interesting-content" - edited_name = "more_test_pkb_client" - edited_tll = 423 - pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content, name=edited_name, ttl=edited_tll) - - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest("txt record content is not edited"): - self.assertEqual(edited_txt_content, record["content"]) - with self.subTest("txt record name is not edited"): - self.assertEqual("{}.{}".format(edited_name, TEST_DOMAIN), record["name"]) - with self.subTest("txt record ttl is not edited"): - self.assertEqual(edited_tll, int(record["ttl"])) - return - self.assertTrue(False) - - def test_change_subdomain_to_root_txt_record(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - name = "test_pkb_client" - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) - - edited_txt_content = "more-interesting-content" - edited_name = "" - pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content, name=edited_name) - - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest("txt record content is not edited"): - self.assertEqual(edited_txt_content, record["content"]) - with self.subTest("txt record name is not edited"): - self.assertEqual(TEST_DOMAIN, record["name"]) - return - self.assertTrue(False) - - def test_no_name_change(self): - # the name is required for each edit, otherwise the record will apply for the root domain - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - name = "test_pkb_client" - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) - - edited_txt_content = "more-interesting-content" - pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content) - - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest("txt record content is not edited"): - self.assertEqual(edited_txt_content, record["content"]) - with self.subTest("txt record name is not edited"): - self.assertEqual(TEST_DOMAIN, record["name"]) - return - self.assertTrue(False) - - def test_record_type_change(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - name = "test_pkb_client" - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) - - edited_txt_content = "more-interesting-content" - name = "test_pkb_client" - edited_record_type = "MX" - pkb_client.dns_edit(TEST_DOMAIN, self.record_id, edited_record_type, edited_txt_content, name=name) - - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest("txt record content is not edited"): - self.assertEqual(edited_txt_content, record["content"]) - with self.subTest("record type is not edited"): - self.assertEqual(edited_record_type, record["type"]) - return - self.assertTrue(False) - - -class TestDNSDeleteMethod(DNSTestWithCleanup): - def test_valid_delete_request(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - name = "test_pkb_client" - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) - - records = pkb_client.dns_retrieve(TEST_DOMAIN) - record_exists = False - for record in records: - if record["id"] == self.record_id: - record_exists = True - break - with self.subTest("test txt record setup failed"): - self.assertTrue(record_exists) - - pkb_client.dns_delete(TEST_DOMAIN, self.record_id) - - records = pkb_client.dns_retrieve(TEST_DOMAIN) - record_exists = False - for record in records: - if record["id"] == self.record_id: - record_exists = True - break - if not record_exists: - self.record_id = None - with self.subTest("txt record is not deleted"): - self.assertFalse(record_exists) - - -class TestDNSReceiveMethod(unittest.TestCase): - def test_valid_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - records = pkb_client.dns_retrieve(TEST_DOMAIN) - self.assertEqual(records, DNS_RECORDS) - - def test_invalid_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - with self.assertRaises(Exception): - pkb_client.dns_retrieve("invaliddomain") - - -class TestDNSExport(unittest.TestCase): - def test_valid_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - dns_records = pkb_client.dns_retrieve(domain=TEST_DOMAIN) - # reformat the dns records to a single dict - dns_records_dict = dict() - for record in dns_records: - dns_records_dict[record["id"]] = record - - filepath = Path("dns_backup.json") - if filepath.exists(): - filepath.unlink() - pkb_client.dns_export(domain=TEST_DOMAIN, filename=str(filepath)) - - with open(str("dns_backup.json"), "r") as f: - self.assertEqual(json.load(f), dns_records_dict) - - filepath.unlink() - - def test_invalid_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(Exception): - pkb_client.dns_export(domain="invaliddomain", filename="dns_backup.json") - - def test_empty_str_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_export(domain="", filename="dns_backup.json") - - def test_none_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_export(domain=None, filename="dns_backup.json") - - def test_filename_already_exists(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - filepath = Path("dns_backup.json") - filepath.touch() - - with self.assertRaises(Exception): - pkb_client.dns_export(domain=TEST_DOMAIN, filename=str(filepath)) - - filepath.unlink() - - def test_empty_str_filename(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_export(domain=TEST_DOMAIN, filename="") - - def test_none_filename(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_export(domain=TEST_DOMAIN, filename=None) - - -class TestDNSImport(unittest.TestCase): - def test_valid_clear_import(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - existing_dns_record_ids = set() - dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in dns_records: - existing_dns_record_ids.add(record["id"]) - - filename = "dns_backup_clear.json" - - with open(filename, "r") as f: - file_dns_records = json.load(f) - - pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.clear) - - new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) - for new_record in new_dns_records: - # test if the previous dns records still exists - if new_record["id"] in existing_dns_record_ids: - self.assertTrue(False) - # test if the new dns record was created - new_record_created = False - for _, file_dns_record in file_dns_records.items(): - if file_dns_record["name"] == new_record["name"] \ - and file_dns_record["type"] == new_record["type"] \ - and file_dns_record["content"] == new_record["content"] \ - and file_dns_record["ttl"] == new_record["ttl"] \ - and file_dns_record["prio"] == new_record["prio"]: - new_record_created = True - self.assertTrue(new_record_created) - - def test_valid_replace_import(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - existing_dns_record_ids = set() - dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in dns_records: - existing_dns_record_ids.add(record["id"]) - - filename = "dns_backup_replace.json" - - with open(filename, "r") as f: - file_dns_records = json.load(f) - - pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.replace) - - new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) - for new_record in new_dns_records: - # test if the previous dns records still exists - if new_record["id"] not in existing_dns_record_ids: - self.assertTrue(False) - # test if the dns record was edited - record_edited = False - for _, file_dns_record in file_dns_records.items(): - if file_dns_record["name"] == new_record["name"] \ - and file_dns_record["type"] == new_record["type"] \ - and file_dns_record["content"] == new_record["content"] \ - and file_dns_record["ttl"] == new_record["ttl"] \ - and file_dns_record["prio"] == new_record["prio"]: - record_edited = True - break - self.assertTrue(record_edited) - - def test_valid_keep_import(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - existing_dns_records = dict() - dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in dns_records: - existing_dns_records[record["id"]] = record - - filename = "dns_backup_keep.json" - - with open(filename, "r") as f: - file_dns_records = json.load(f) - - pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.keep) - - new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) - - # test if the all old dns records are kept - for _, existing_record in existing_dns_records.items(): - record_kept = False - for new_record in new_dns_records: - if existing_record["id"] == new_record["id"] \ - and existing_record["name"] == new_record["name"] \ - and existing_record["type"] == new_record["type"] \ - and existing_record["content"] == new_record["content"] \ - and existing_record["ttl"] == new_record["ttl"] \ - and existing_record["prio"] == new_record["prio"]: - record_kept = True - break - with self.subTest(): - self.assertTrue(record_kept) - - # test if the new records are created - for new_record in new_dns_records: - if new_record["id"] not in existing_dns_records: - record_created = False - for _, file_dns_record in file_dns_records.items(): - if file_dns_record["name"] == new_record["name"] \ - and file_dns_record["type"] == new_record["type"] \ - and file_dns_record["content"] == new_record["content"] \ - and file_dns_record["ttl"] == new_record["ttl"] \ - and file_dns_record["prio"] == new_record["prio"]: - record_created = True - break - with self.subTest(): - self.assertTrue(record_created) - - def test_invalid_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(Exception): - pkb_client.dns_import(domain="invaliddomain", filename="dns_backup.json", restore_mode=DNSRestoreMode.clear) - - def test_empty_str_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain="", filename="dns_backup.json", restore_mode=DNSRestoreMode.clear) - - def test_none_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain=None, filename="dns_backup.json", restore_mode=DNSRestoreMode.clear) - - def test_empty_str_filename(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain=TEST_DOMAIN, filename="", restore_mode=DNSRestoreMode.clear) - - def test_none_filename(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain=TEST_DOMAIN, filename=None, restore_mode=DNSRestoreMode.clear) - - def test_invalid_restore_mode(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.subTest("None as restore mode"): - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode=None) - with self.subTest("empty string as restore mode"): - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode="") - with self.subTest("number as restore mode"): - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode=0) - - -if __name__ == '__main__': - unittest.main() |
