diff options
| author | 2023-12-26 19:39:06 -0500 | |
|---|---|---|
| committer | 2023-12-26 19:39:06 -0500 | |
| commit | fa197fe27b8a03bbf4504476f842956ece2c76c9 (patch) | |
| tree | 5a75b92e4c731a4b2ced68eadb9581a8c922d82e /tests/pkb_client_tests.py | |
Import Upstream version 1.2upstream/1.2
Diffstat (limited to 'tests/pkb_client_tests.py')
| -rw-r--r-- | tests/pkb_client_tests.py | 569 |
1 files changed, 569 insertions, 0 deletions
diff --git a/tests/pkb_client_tests.py b/tests/pkb_client_tests.py new file mode 100644 index 0000000..2668a3c --- /dev/null +++ b/tests/pkb_client_tests.py @@ -0,0 +1,569 @@ +import json +import os +import unittest +from pathlib import Path + +import requests + +from pkb_client.client import PKBClient, DNSRestoreMode + +""" +WARNING: DO NOT RUN THIS TEST WITH A PRODUCTION DOMAIN OR IN A PRODUCTION ENVIRONMENT!! + This test sets, edits and deletes dns record entries and if the test fails, + unintended changes to dns entries may result. +""" + +TEST_DOMAIN = os.environ.get("TEST_DOMAIN") +PORKBUN_API_KEY = os.environ.get("PORKBUN_API_KEY") +PORKBUN_API_SECRET = os.environ.get("PORKBUN_API_SECRET") +DNS_RECORDS = os.environ.get("DNS_RECORDS") + +PUBLIC_IP_URL = "https://api64.ipify.org" + + +class DNSTestWithCleanup(unittest.TestCase): + + def tearDown(self): + if hasattr(self, "record_id") and self.record_id is not None: + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + pkb_client.dns_delete(TEST_DOMAIN, self.record_id) + + +class TestClientAuth(unittest.TestCase): + def test_valid_auth(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + ip_address = pkb_client.ping() + + self.assertEqual(ip_address, requests.get(PUBLIC_IP_URL).text) + + def test_invalid_api_key(self): + pkb_client = PKBClient("invalid-api-key", PORKBUN_API_SECRET) + with self.assertRaises(Exception): + pkb_client.ping() + + def test_invalid_api_secret(self): + pkb_client = PKBClient(PORKBUN_API_KEY, "invalid-api-secret") + with self.assertRaises(Exception): + pkb_client.ping() + + def test_invalid_api_key_and_secret(self): + pkb_client = PKBClient("invalid-api-key", "invalid-api-secret") + with self.assertRaises(Exception): + pkb_client.ping() + + +class TestPingMethod(unittest.TestCase): + def test_ping(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + ip_address = pkb_client.ping() + + self.assertEqual(ip_address, requests.get(PUBLIC_IP_URL).text) + + +class TestDNSCreateMethod(DNSTestWithCleanup): + + def test_valid_request(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + ttl = 342 + name = "test_pkb_client" + + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name, ttl=ttl) + records = pkb_client.dns_retrieve(TEST_DOMAIN) + + for record in records: + if record["id"] == self.record_id: + with self.subTest(): + self.assertEqual(txt_content, record["content"]) + with self.subTest(): + self.assertEqual(ttl, int(record["ttl"])) + with self.subTest(): + self.assertEqual("{}.{}".format(name, TEST_DOMAIN), record["name"]) + return + self.assertTrue(False) + + def test_invalid_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + with self.assertRaises(Exception): + self.record_id = pkb_client.dns_create("notvaliddomain", "TXT", "interesting-content", + name="test_pkb_client") + + def test_invalid_record_type(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + with self.assertRaises(AssertionError): + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "ABC", "interesting-content", name="test_pkb_client") + + def test_larger_than_allowed_content_length(self): + # the api call should not fail because the api creates multiple TXT entries which will be concatenated + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + txt_content = "interesting-content-interesting-content-interesting-content-interesting-content-" \ + "interesting-content-interesting-content-interesting-content-interesting-content-" \ + "interesting-content-interesting-content-interesting-content-interesting-content-" \ + "interesting-content" + assert len(txt_content) == 259 + + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client") + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + self.assertEqual(txt_content, record["content"]) + return + self.assertTrue(False) + + def test_largest_allowed_content_length(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + txt_content = "interesting-content-interesting-content-interesting-content-interesting-content-" \ + "interesting-content-interesting-content-interesting-content-interesting-content-" \ + "interesting-content-interesting-content-interesting-content-interesting-content-" \ + "interesting-con" + assert len(txt_content) == 255 + + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client") + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + self.assertEqual(txt_content, record["content"]) + return + self.assertTrue(False) + + def test_empty_content_str(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + txt_content = "" + assert len(txt_content) == 0 + + with self.assertRaises(AssertionError): + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client") + + def test_none_content(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", None, name="test_pkb_client") + + def test_smaller_than_allowed_ttl(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", ttl=299, + name="test_pkb_client") + + def test_negative_ttl(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", ttl=-1, + name="test_pkb_client") + + def test_larger_than_allowed_ttl(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", name="test_pkb_client", + ttl=2147483648) + + def test_largest_allowed_ttl(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + ttl = 2147483647 + + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", ttl=ttl) + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest(): + self.assertEqual(txt_content, record["content"]) + with self.subTest(): + self.assertEqual(ttl, int(record["ttl"])) + return + self.assertTrue(False) + + def test_valid_prio_with_txt(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + prio = 10 + + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", prio=prio) + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest(): + self.assertEqual(txt_content, record["content"]) + with self.subTest(): + self.assertEqual(prio, int(record["prio"])) + return + self.assertTrue(False) + + def test_negative_prio_with_txt(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + prio = -42 + + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", prio=prio) + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest(): + self.assertEqual(txt_content, record["content"]) + with self.subTest(): + self.assertEqual(prio, int(record["prio"])) + return + self.assertTrue(False) + + +class TestDNSEditMethod(DNSTestWithCleanup): + def test_valid_edit_request(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + name = "test_pkb_client" + tll = 342 + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name, ttl=tll) + + edited_txt_content = "more-interesting-content" + edited_name = "more_test_pkb_client" + edited_tll = 423 + pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content, name=edited_name, ttl=edited_tll) + + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest("txt record content is not edited"): + self.assertEqual(edited_txt_content, record["content"]) + with self.subTest("txt record name is not edited"): + self.assertEqual("{}.{}".format(edited_name, TEST_DOMAIN), record["name"]) + with self.subTest("txt record ttl is not edited"): + self.assertEqual(edited_tll, int(record["ttl"])) + return + self.assertTrue(False) + + def test_change_subdomain_to_root_txt_record(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + name = "test_pkb_client" + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) + + edited_txt_content = "more-interesting-content" + edited_name = "" + pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content, name=edited_name) + + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest("txt record content is not edited"): + self.assertEqual(edited_txt_content, record["content"]) + with self.subTest("txt record name is not edited"): + self.assertEqual(TEST_DOMAIN, record["name"]) + return + self.assertTrue(False) + + def test_no_name_change(self): + # the name is required for each edit, otherwise the record will apply for the root domain + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + name = "test_pkb_client" + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) + + edited_txt_content = "more-interesting-content" + pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content) + + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest("txt record content is not edited"): + self.assertEqual(edited_txt_content, record["content"]) + with self.subTest("txt record name is not edited"): + self.assertEqual(TEST_DOMAIN, record["name"]) + return + self.assertTrue(False) + + def test_record_type_change(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + name = "test_pkb_client" + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) + + edited_txt_content = "more-interesting-content" + name = "test_pkb_client" + edited_record_type = "MX" + pkb_client.dns_edit(TEST_DOMAIN, self.record_id, edited_record_type, edited_txt_content, name=name) + + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest("txt record content is not edited"): + self.assertEqual(edited_txt_content, record["content"]) + with self.subTest("record type is not edited"): + self.assertEqual(edited_record_type, record["type"]) + return + self.assertTrue(False) + + +class TestDNSDeleteMethod(DNSTestWithCleanup): + def test_valid_delete_request(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + name = "test_pkb_client" + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) + + records = pkb_client.dns_retrieve(TEST_DOMAIN) + record_exists = False + for record in records: + if record["id"] == self.record_id: + record_exists = True + break + with self.subTest("test txt record setup failed"): + self.assertTrue(record_exists) + + pkb_client.dns_delete(TEST_DOMAIN, self.record_id) + + records = pkb_client.dns_retrieve(TEST_DOMAIN) + record_exists = False + for record in records: + if record["id"] == self.record_id: + record_exists = True + break + if not record_exists: + self.record_id = None + with self.subTest("txt record is not deleted"): + self.assertFalse(record_exists) + + +class TestDNSReceiveMethod(unittest.TestCase): + def test_valid_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + records = pkb_client.dns_retrieve(TEST_DOMAIN) + self.assertEqual(records, DNS_RECORDS) + + def test_invalid_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + with self.assertRaises(Exception): + pkb_client.dns_retrieve("invaliddomain") + + +class TestDNSExport(unittest.TestCase): + def test_valid_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + dns_records = pkb_client.dns_retrieve(domain=TEST_DOMAIN) + # reformat the dns records to a single dict + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record["id"]] = record + + filepath = Path("dns_backup.json") + if filepath.exists(): + filepath.unlink() + pkb_client.dns_export(domain=TEST_DOMAIN, filename=str(filepath)) + + with open(str("dns_backup.json"), "r") as f: + self.assertEqual(json.load(f), dns_records_dict) + + filepath.unlink() + + def test_invalid_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(Exception): + pkb_client.dns_export(domain="invaliddomain", filename="dns_backup.json") + + def test_empty_str_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_export(domain="", filename="dns_backup.json") + + def test_none_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_export(domain=None, filename="dns_backup.json") + + def test_filename_already_exists(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + filepath = Path("dns_backup.json") + filepath.touch() + + with self.assertRaises(Exception): + pkb_client.dns_export(domain=TEST_DOMAIN, filename=str(filepath)) + + filepath.unlink() + + def test_empty_str_filename(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_export(domain=TEST_DOMAIN, filename="") + + def test_none_filename(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_export(domain=TEST_DOMAIN, filename=None) + + +class TestDNSImport(unittest.TestCase): + def test_valid_clear_import(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + existing_dns_record_ids = set() + dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in dns_records: + existing_dns_record_ids.add(record["id"]) + + filename = "dns_backup_clear.json" + + with open(filename, "r") as f: + file_dns_records = json.load(f) + + pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.clear) + + new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) + for new_record in new_dns_records: + # test if the previous dns records still exists + if new_record["id"] in existing_dns_record_ids: + self.assertTrue(False) + # test if the new dns record was created + new_record_created = False + for _, file_dns_record in file_dns_records.items(): + if file_dns_record["name"] == new_record["name"] \ + and file_dns_record["type"] == new_record["type"] \ + and file_dns_record["content"] == new_record["content"] \ + and file_dns_record["ttl"] == new_record["ttl"] \ + and file_dns_record["prio"] == new_record["prio"]: + new_record_created = True + self.assertTrue(new_record_created) + + def test_valid_replace_import(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + existing_dns_record_ids = set() + dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in dns_records: + existing_dns_record_ids.add(record["id"]) + + filename = "dns_backup_replace.json" + + with open(filename, "r") as f: + file_dns_records = json.load(f) + + pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.replace) + + new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) + for new_record in new_dns_records: + # test if the previous dns records still exists + if new_record["id"] not in existing_dns_record_ids: + self.assertTrue(False) + # test if the dns record was edited + record_edited = False + for _, file_dns_record in file_dns_records.items(): + if file_dns_record["name"] == new_record["name"] \ + and file_dns_record["type"] == new_record["type"] \ + and file_dns_record["content"] == new_record["content"] \ + and file_dns_record["ttl"] == new_record["ttl"] \ + and file_dns_record["prio"] == new_record["prio"]: + record_edited = True + break + self.assertTrue(record_edited) + + def test_valid_keep_import(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + existing_dns_records = dict() + dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in dns_records: + existing_dns_records[record["id"]] = record + + filename = "dns_backup_keep.json" + + with open(filename, "r") as f: + file_dns_records = json.load(f) + + pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.keep) + + new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) + + # test if the all old dns records are kept + for _, existing_record in existing_dns_records.items(): + record_kept = False + for new_record in new_dns_records: + if existing_record["id"] == new_record["id"] \ + and existing_record["name"] == new_record["name"] \ + and existing_record["type"] == new_record["type"] \ + and existing_record["content"] == new_record["content"] \ + and existing_record["ttl"] == new_record["ttl"] \ + and existing_record["prio"] == new_record["prio"]: + record_kept = True + break + with self.subTest(): + self.assertTrue(record_kept) + + # test if the new records are created + for new_record in new_dns_records: + if new_record["id"] not in existing_dns_records: + record_created = False + for _, file_dns_record in file_dns_records.items(): + if file_dns_record["name"] == new_record["name"] \ + and file_dns_record["type"] == new_record["type"] \ + and file_dns_record["content"] == new_record["content"] \ + and file_dns_record["ttl"] == new_record["ttl"] \ + and file_dns_record["prio"] == new_record["prio"]: + record_created = True + break + with self.subTest(): + self.assertTrue(record_created) + + def test_invalid_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(Exception): + pkb_client.dns_import(domain="invaliddomain", filename="dns_backup.json", restore_mode=DNSRestoreMode.clear) + + def test_empty_str_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain="", filename="dns_backup.json", restore_mode=DNSRestoreMode.clear) + + def test_none_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain=None, filename="dns_backup.json", restore_mode=DNSRestoreMode.clear) + + def test_empty_str_filename(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain=TEST_DOMAIN, filename="", restore_mode=DNSRestoreMode.clear) + + def test_none_filename(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain=TEST_DOMAIN, filename=None, restore_mode=DNSRestoreMode.clear) + + def test_invalid_restore_mode(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.subTest("None as restore mode"): + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode=None) + with self.subTest("empty string as restore mode"): + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode="") + with self.subTest("number as restore mode"): + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode=0) + + +if __name__ == '__main__': + unittest.main() |
