diff options
| author | 2023-12-26 19:39:06 -0500 | |
|---|---|---|
| committer | 2023-12-26 19:39:06 -0500 | |
| commit | fa197fe27b8a03bbf4504476f842956ece2c76c9 (patch) | |
| tree | 5a75b92e4c731a4b2ced68eadb9581a8c922d82e | |
Import Upstream version 1.2upstream/1.2
| -rw-r--r-- | .github/ISSUE_TEMPLATE/bug_report.md | 29 | ||||
| -rw-r--r-- | .github/stale.yml | 20 | ||||
| -rw-r--r-- | .github/workflows/pypi-publish-release.yml | 56 | ||||
| -rw-r--r-- | .gitignore | 263 | ||||
| -rw-r--r-- | License | 21 | ||||
| -rw-r--r-- | Readme.md | 140 | ||||
| -rw-r--r-- | pkb_client/__init__.py | 1 | ||||
| -rw-r--r-- | pkb_client/cli.py | 134 | ||||
| -rw-r--r-- | pkb_client/client.py | 455 | ||||
| -rw-r--r-- | pkb_client/helper.py | 15 | ||||
| -rw-r--r-- | pyproject.toml | 3 | ||||
| -rw-r--r-- | requirements.txt | 2 | ||||
| -rw-r--r-- | setup.py | 41 | ||||
| -rw-r--r-- | tests/__init__.py | 0 | ||||
| -rw-r--r-- | tests/pkb_client_tests.py | 569 | ||||
| -rw-r--r-- | third-party-notices | 216 |
16 files changed, 1965 insertions, 0 deletions
diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 0000000..8a0d833 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,29 @@ +--- +name: Bug report +about: Create a report to help us improve pkb_client +title: '' +labels: bug +assignees: '' + +--- + +**Describe the bug** +A clear and concise description of what the bug is. + +**To Reproduce** +Steps to reproduce the behavior. + +**Expected behavior** +A clear and concise description of what you expected to happen. + +**pkb-client Command** +Specify the exact command of pkb-client. **Make sure to anonymize your Porkbun API key and secret and your Porkbun domain/subdomain.** + +**Versions (please complete the following version information):** + - pkb_client: [you can use `pip show pkb_client` to get the version] + +**Error message** +The returned error message. + +**Additional context** +Add any other context about the problem here. diff --git a/.github/stale.yml b/.github/stale.yml new file mode 100644 index 0000000..55ff86b --- /dev/null +++ b/.github/stale.yml @@ -0,0 +1,20 @@ +# Number of days of inactivity before an issue becomes stale +daysUntilStale: 30 +# Number of days of inactivity before a stale issue is closed +daysUntilClose: 14 +# Only issues or pull requests with all of these labels are check if stale +onlyLabels: + - "needs info" +# Issues with these labels will never be considered stale +exemptLabels: + - pinned + - security +# Label to use when marking an issue as stale +staleLabel: stale +# Comment to post when marking an issue as stale. Set to `false` to disable +markComment: > + This issue has been automatically marked as stale because it has not had + recent activity. It will be closed if no further activity occurs. Thank you + for your contributions. +# Limit to only `issues` or `pulls` +only: issues diff --git a/.github/workflows/pypi-publish-release.yml b/.github/workflows/pypi-publish-release.yml new file mode 100644 index 0000000..2071faa --- /dev/null +++ b/.github/workflows/pypi-publish-release.yml @@ -0,0 +1,56 @@ +name: Publish release distribution to PyPI + +on: + push: + tags: + - "v*" + +jobs: + build: + name: Build distribution + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@master + + - name: Set up Python 3.6 + uses: actions/setup-python@v2 + with: + python-version: 3.6 + + - name: Install pep517 + run: >- + python -m + pip install + pep517 + --user + + - name: Build a binary wheel and a source tarball + run: >- + python -m + pep517.build + --source + --binary + --out-dir dist/ + . + + - name: Upload distribution artifact for other jobs + uses: actions/upload-artifact@v2 + with: + name: pkb_client_dist + path: dist/ + + publish: + name: Publish distribution to PyPI + runs-on: ubuntu-latest + needs: build + steps: + - name: Download distribution from build job + uses: actions/download-artifact@v2 + with: + name: pkb_client_dist + path: dist/ + + - name: Publish distribution to PyPI + uses: pypa/gh-action-pypi-publish@master + with: + password: ${{ secrets.pypi_api_key }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..39a2d2a --- /dev/null +++ b/.gitignore @@ -0,0 +1,263 @@ +# Created by https://www.toptal.com/developers/gitignore/api/pycharm+all,linux,python,git +# Edit at https://www.toptal.com/developers/gitignore?templates=pycharm+all,linux,python,git + +### Git ### +# Created by git for backups. To disable backups in Git: +# $ git config --global mergetool.keepBackup false +*.orig + +# Created by git when using merge tools for conflicts +*.BACKUP.* +*.BASE.* +*.LOCAL.* +*.REMOTE.* +*_BACKUP_*.txt +*_BASE_*.txt +*_LOCAL_*.txt +*_REMOTE_*.txt + +### Linux ### +*~ + +# temporary files which can be created if a process still has a handle open of a deleted file +.fuse_hidden* + +# KDE directory preferences +.directory + +# Linux trash folder which might appear on any partition or disk +.Trash-* + +# .nfs files are created when an open file is removed but is still being accessed +.nfs* + +### PyCharm+all ### +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + +### PyCharm+all Patch ### +# Ignores the whole .idea folder and all .iml files +# See https://github.com/joeblau/gitignore.io/issues/186 and https://github.com/joeblau/gitignore.io/issues/360 + +.idea/ + +# Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-249601023 + +*.iml +modules.xml +.idea/misc.xml +*.ipr + +# Sonarlint plugin +.idea/sonarlint + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +pytestdebug.log + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ +doc/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ +pythonenv* + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# profiling data +.prof + +# End of https://www.toptal.com/developers/gitignore/api/pycharm+all,linux,python,git @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) Marvin Heptner + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..c0058df --- /dev/null +++ b/Readme.md @@ -0,0 +1,140 @@ +# pkb_client + +Unofficial client for the Porkbun API + +--- +[](https://pypi.org/project/pkb-client/)  [](https://pepy.tech/project/pkb-client)   +--- + +### Table of Contents + +1. [About](#about) +2. [Installation](#installation) + 1. [With pip (recommend)](#with-pip-recommend) + 2. [From source](#from-source) +3. [Usage](#usage) +4. [Third party notices](#third-party-notices) +5. [License](#license) + +--- + +### About + +*pkb_client* is an unofficial client for the [Porkbun](https://porkbun.com) API. It supports the v3 of the API. You can +find the official documentation of the Porkbun API [here](https://porkbun.com/api/json/v3/documentation). + +### Installation + +This project only works with Python 3, make sure you have at least Python 3.6 installed. + +#### With pip (recommend) + +Use the following command to install *pkb_client* with pip: + +```commandline +pip3 install pkb_client +``` + +You can also very easily update to a newer version: + +```commandline +pip3 install pkb_client -U +``` + +#### From source + +```commandline +git clone https://github.com/infinityofspace/pkb_client.git +cd pkb_client +pip install . +``` + +### Usage + +Each request must be made with the API key and secret. You can easily create them at Porkbun. Just follow +the [official instructions](https://porkbun.com/api/json/v3/documentation#Authentication). Make sure that you explicitly +activate the API usage for your domain at the end. + +After installation *pkb_client* is available under the command `pkb-client`. + +You have to specify your API key and secret each time as follows: + +```commandline +pkb-client -k <YOUR-API-KEY> -s <YOUR-API-SECRET> ping +``` + +If you don't want to specify the key and secret in the program call, because for example the command line calls are +logged and you don't want to log the API access, then you can also omit both arguments and *pkb-client* asks for a user +input. + +You can see an overview of all usable API methods via the help: + +```commandline +pkb-client -h +``` + +If you need more help on a supported API method, you can use the following command, for example for the ping method: + +```commandline +pkb-client ping -h +``` + +#### Here are a few usage examples: + +Create a new TXT record for the subdomain `test` of the domain `example.com` with the value `porkbun is cool` and a TTL +of `500`: + +```commandline +pkb-client -k <YOUR-API-KEY> -s <YOUR-API-KEY-SECRET> dns-create example.com TXT "porkbun is cool" --name test --ttl 500 +``` + +The call returns the DNS record id. The record DNS ids are used to distinguish the DNS records and can be used for +editing or deleting records. The ID is only a Porkbun internal identifier and is not publicly available. + +Delete the DNS record with the ID `12345` of the domain `example.com`: + +```commandline +pkb-client -k <YOUR-API-KEY> -s <YOUR-API-SECRET> dns-delete example.com 12345 +``` + +Get all DNS records of the domain `example.com`: + +```commandline +pkb-client -k <YOUR-API-KEY> -s <YOUR-API-SECRET> dns-retrieve example.com +``` + +Change the TXT DNS record content with the ID `456789` of the domain `example.com` to `the answer is 42`: + +```commandline +pkb-client -k <YOUR-API-KEY> -s <YOUR-API-SECRET> dns-edit example.com 456789 TXT "the answer is 42" +``` + +Exporting all current DNS records of the domain `example.com` to the file `dns_recods.json`: + +```commandline +pkb-client -k <YOUR-API-KEY> -s <YOUR-API-SECRET> dns-export example.com dns_recods.json +``` + +Remove all existing DNS records of the domain `example.com` and restore the DNS records from the file `dns_recods.json`: + +```commandline +pkb-client -k <YOUR-API-KEY> -s <YOUR-API-SECRET> dns-import example.com dns_recods.json clear +``` + +*Note:* The import function uses the record ID to distinguish DNS records. + +### Third party notices + +All modules used by this project are listed below: + +| Name | License| +|:---:|:---:| +| [requests](https://github.com/psf/requests) | [Apache 2.0](https://raw.githubusercontent.com/psf/requests/master/LICENSE) | +| [setuptools](https://github.com/pypa/setuptools) | [MIT](https://raw.githubusercontent.com/pypa/setuptools/main/LICENSE) | + +Furthermore, this readme file contains embeddings of [Shields.io](https://github.com/badges/shields) +and [PePy](https://github.com/psincraian/pepy). The tests use [ipify](https://github.com/rdegges/ipify-api). + +### License + +[MIT](https://github.com/infinityofspace/pkb_client/blob/master/License) - Copyright (c) Marvin Heptner diff --git a/pkb_client/__init__.py b/pkb_client/__init__.py new file mode 100644 index 0000000..17a5bda --- /dev/null +++ b/pkb_client/__init__.py @@ -0,0 +1 @@ +__version__ = "v1.2" diff --git a/pkb_client/cli.py b/pkb_client/cli.py new file mode 100644 index 0000000..4e87373 --- /dev/null +++ b/pkb_client/cli.py @@ -0,0 +1,134 @@ +import argparse +import pprint +import textwrap + +from pkb_client.client import PKBClient, SUPPORTED_DNS_RECORD_TYPES, DNSRestoreMode + + +def main(): + parser = argparse.ArgumentParser( + description="Unofficial client for the Porkbun API", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=textwrap.dedent(""" + License: + MIT - Copyright (c) Marvin Heptner + + Copyright notices: + requests: + Project: https://github.com/psf/requests + License: Apache-2.0 https://github.com/psf/requests/blob/master/LICENSE + setuptools: + Project: https://github.com/pypa/setuptools + License: MIT https://raw.githubusercontent.com/pypa/setuptools/main/LICENSE + """) + ) + + parser.add_argument("-k", "--key", help="The API key used for Porkbun API calls (usually starts with \"pk\").") + parser.add_argument("-s", "--secret", + help="The API secret used for Porkbun API calls (usually starts with \"sk\").") + + subparsers = parser.add_subparsers(help="Supported API methods") + + parser_ping = subparsers.add_parser("ping", help="Ping the API Endpoint") + parser_ping.set_defaults(func=PKBClient.ping) + + parser_dns_create = subparsers.add_parser("dns-create", help="Create a new DNS record.") + parser_dns_create.set_defaults(func=PKBClient.dns_create) + parser_dns_create.add_argument("domain", help="The domain for which the new DNS record should be created.") + parser_dns_create.add_argument("record_type", help="The type of the new DNS record.", + choices=SUPPORTED_DNS_RECORD_TYPES) + parser_dns_create.add_argument("content", help="The content of the new DNS record.") + parser_dns_create.add_argument("--name", + help="The subdomain for which the new DNS record should be created." + "The * can be used for a wildcard DNS record." + "If not used, then a DNS record for the root domain will be created", + required=False) + parser_dns_create.add_argument("--ttl", type=int, help="The ttl of the new DNS record.", required=False) + parser_dns_create.add_argument("--prio", type=int, help="The priority of the new DNS record.", required=False) + + parser_dns_edit = subparsers.add_parser("dns-edit", help="Edit an existing DNS record.") + parser_dns_edit.set_defaults(func=PKBClient.dns_edit) + parser_dns_edit.add_argument("domain", help="The domain for which the DNS record should be edited.") + parser_dns_edit.add_argument("record_id", help="The id of the DNS record which should be edited.") + parser_dns_edit.add_argument("record_type", help="The new type of the DNS record.", + choices=SUPPORTED_DNS_RECORD_TYPES) + parser_dns_edit.add_argument("content", help="The new content of the DNS record.") + parser_dns_edit.add_argument("--name", + help="The new value of the subdomain for which the DNS record should apply. " + "The * can be used for a wildcard DNS record. If not set, the record will " + "be set for the root domain.", + required=False) + parser_dns_edit.add_argument("--ttl", type=int, help="The new ttl of the DNS record.", required=False) + parser_dns_edit.add_argument("--prio", type=int, help="The new priority of the DNS record.", required=False) + + parser_dns_delete = subparsers.add_parser("dns-delete", help="Delete an existing DNS record.") + parser_dns_delete.set_defaults(func=PKBClient.dns_delete) + parser_dns_delete.add_argument("domain", help="The domain for which the DNS record should be deleted.") + parser_dns_delete.add_argument("record_id", help="The id of the DNS record which should be deleted.") + + parser_dns_receive = subparsers.add_parser("dns-retrieve", help="Get all DNS records.") + parser_dns_receive.set_defaults(func=PKBClient.dns_retrieve) + parser_dns_receive.add_argument("domain", help="The domain for which the DNS record should be retrieved.") + + parser_dns_export = subparsers.add_parser("dns-export", help="Save all DNS records to a local file as json.") + parser_dns_export.set_defaults(func=PKBClient.dns_export) + parser_dns_export.add_argument("domain", + help="The domain for which the DNS record should be retrieved and saved.") + parser_dns_export.add_argument("filename", help="The filename where to save the exported DNS records.") + + parser_dns_import = subparsers.add_parser("dns-import", help="Restore all DNS records from a local file.", + formatter_class=argparse.RawTextHelpFormatter) + parser_dns_import.set_defaults(func=PKBClient.dns_import) + parser_dns_import.add_argument("domain", help="The domain for which the DNS record should be restored.") + parser_dns_import.add_argument("filename", help="The filename from which the DNS records are to be restored.") + parser_dns_import.add_argument("restore_mode", help="""The restore mode (DNS records are identified by the record id): + clean: remove all existing DNS records and restore all DNS records from the provided file + replace: replace only existing DNS records with the DNS records from the provided file, but do not create any new DNS records + keep: keep the existing DNS records and only create new ones for all DNS records from the specified file if they do not exist + """, type=DNSRestoreMode.from_string, choices=list(DNSRestoreMode)) + + parser_domain_pricing = subparsers.add_parser("domain-pricing", help="Get the pricing for porkbun domains.") + parser_domain_pricing.set_defaults(func=PKBClient.get_domain_pricing) + + parser_ssl_retrieve = subparsers.add_parser("ssl-retrieve", help="Retrieve an SSL bundle for given domain.") + parser_ssl_retrieve.set_defaults(func=PKBClient.ssl_retrieve) + parser_ssl_retrieve.add_argument("domain", help="The domain for which the SSL bundle should be retrieve.") + + args = parser.parse_args() + + if not hasattr(args, "func"): + raise argparse.ArgumentError(None, "No method specified. Please provide a method and try again.") + + pp = pprint.PrettyPrinter(indent=4) + + # call the static methods + if args.func == PKBClient.get_domain_pricing: + pp.pprint(args.func(**vars(args))) + exit(0) + + if args.key is None: + while True: + api_key = input("Please enter your API key you got from Porkbun (usually starts with \"pk\"): ") + if len(api_key) == 0: + print("The api key can not be empty.") + else: + break + else: + api_key = args.key + + if args.secret is None: + while True: + api_secret = input("Please enter your API key secret you got from Porkbun (usually starts with \"sk\"): ") + if len(api_secret) == 0: + print("The api key secret can not be empty.") + else: + break + else: + api_secret = args.secret + + pkb_client = PKBClient(api_key, api_secret) + pp.pprint(args.func(pkb_client, **vars(args))) + + +if __name__ == "__main__": + main() diff --git a/pkb_client/client.py b/pkb_client/client.py new file mode 100644 index 0000000..48c0d82 --- /dev/null +++ b/pkb_client/client.py @@ -0,0 +1,455 @@ +import json +import logging +from enum import Enum +from pathlib import Path +from typing import Optional, Tuple +from urllib.parse import urljoin + +import requests + +from pkb_client.helper import parse_dns_record + +API_ENDPOINT = "https://porkbun.com/api/json/v3/" +SUPPORTED_DNS_RECORD_TYPES = ["A", "AAAA", "MX", "CNAME", "ALIAS", "TXT", "NS", "SRV", "TLSA", "CAA"] + +# prevent urllib3 to log request with the api key and secret +logging.getLogger("urllib3").setLevel(logging.WARNING) + + +class DNSRestoreMode(Enum): + clear = 0 + replace = 1 + keep = 2 + + def __str__(self): + return self.name + + @staticmethod + def from_string(a): + try: + return DNSRestoreMode[a] + except KeyError: + return a + + +class PKBClient: + """ + API client for Porkbun. + """ + + def __init__(self, api_key: str, secret_api_key: str) -> None: + """ + Creates a new PKBClient object. + + :param api_key: the API key used for Porkbun API calls + :param secret_api_key: the API secret used for Porkbun API calls + """ + + assert api_key is not None and len(api_key) > 0 + assert secret_api_key is not None and len(secret_api_key) > 0 + + self.api_key = api_key + self.secret_api_key = secret_api_key + + def ping(self, **kwargs) -> str: + """ + API ping method: get the current public ip address of the requesting system; can also be used for auth checking + see https://porkbun.com/api/json/v3/documentation#Authentication for more info + + :return: the current public ip address of the requesting system + """ + + url = urljoin(API_ENDPOINT, "ping") + req_json = { + "apikey": self.api_key, + "secretapikey": self.secret_api_key + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return json.loads(r.text).get("yourIp", None) + else: + raise Exception("ERROR: ping api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + def dns_create(self, + domain: str, + record_type: str, + content: str, + name: Optional[str] = None, + ttl: Optional[int] = 300, + prio: Optional[int] = None, **kwargs) -> str: + """ + API DNS create method: create a new DNS record for given domain + see https://porkbun.com/api/json/v3/documentation#DNS%20Create%20Record for more info + + :param domain: the domain for which the DNS record should be created + :param record_type: the type of the new DNS record; + supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA + :param content: the content of the new DNS record + :param name: the subdomain for which the new DNS record entry should apply; the * can be used for a + wildcard DNS record; if not used, then a DNS record for the root domain will be created + :param ttl: the time to live in seconds of the new DNS record; have to be between 0 and 2147483647 + :param prio: the priority of the new DNS record + + :return: the id of the new created DNS record + """ + + assert domain is not None and len(domain) > 0 + assert record_type in SUPPORTED_DNS_RECORD_TYPES + assert content is not None and len(content) > 0 + assert ttl is None or 300 <= ttl <= 2147483647 + + url = urljoin(API_ENDPOINT, "dns/create/{}".format(domain)) + req_json = { + "apikey": self.api_key, + "secretapikey": self.secret_api_key, + "name": name, + "type": record_type, + "content": content, + "ttl": ttl, + "prio": prio + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return str(json.loads(r.text).get("id", None)) + else: + raise Exception("ERROR: DNS create api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + def dns_edit(self, + domain: str, + record_id: str, + record_type: str, + content: str, + name: str = None, + ttl: int = 300, + prio: int = None, + **kwargs) -> bool: + """ + API DNS edit method: edit an existing DNS record specified by the id for a given domain + see https://porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record for more info + + :param domain: the domain for which the DNS record should be edited + :param record_id: the id of the DNS record which should be edited + :param record_type: the new type of the DNS record; + supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA + :param content: the new content of the DNS record + :param name: the new value of the subdomain for which the DNS record should apply; the * can be used for a + wildcard DNS record; if not set, the record will be set for the record domain + :param ttl: the new time to live in seconds of the DNS record, have to be between 0 and 2147483647 + :param prio: the new priority of the DNS record + + :return: True if the editing was successful + """ + + assert domain is not None and len(domain) > 0 + assert record_id is not None and len(record_id) > 0 + assert record_type in SUPPORTED_DNS_RECORD_TYPES + assert content is not None and len(content) > 0 + assert ttl is None or 300 <= ttl <= 2147483647 + + url = urljoin(API_ENDPOINT, "dns/edit/{}/{}".format(domain, record_id)) + req_json = { + "apikey": self.api_key, + "secretapikey": self.secret_api_key, + "name": name, + "type": record_type, + "content": content, + "ttl": ttl, + "prio": prio + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + raise Exception("ERROR: DNS edit api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + def dns_delete(self, + domain: str, + record_id: str, + **kwargs) -> bool: + """ + API DNS delete method: delete an existing DNS record specified by the id for a given domain + see https://porkbun.com/api/json/v3/documentation#DNS%20Delete%20Record for more info + + :param domain: the domain for which the DNS record should be deleted + :param record_id: the id of the DNS record which should be deleted + + :return: True if the deletion was successful + """ + + assert domain is not None and len(domain) > 0 + assert record_id is not None and len(record_id) > 0 + + url = urljoin(API_ENDPOINT, "dns/delete/{}/{}".format(domain, record_id)) + req_json = { + "apikey": self.api_key, + "secretapikey": self.secret_api_key + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + raise Exception("ERROR: DNS delete api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + def dns_retrieve(self, domain, **kwargs) -> list: + """ + API DNS retrieve method: retrieve all DNS records for given domain + see https://porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records for more info + + :param domain: the domain for which the DNS records should be retrieved + + :return: list of DNS records as dicts + + The list structure will be: + [ + { + "id": "123456789", + "name": "example.com", + "type": "TXT", + "content": "this is a nice text", + "ttl": "300", + "prio": None, + "notes": "" + }, + { + "id": "234567890", + "name": "example.com", + "type": "A", + "content": "0.0.0.0", + "ttl": "300", + "prio": 0, + "notes": "" + } + ] + """ + + assert domain is not None and len(domain) > 0 + + url = urljoin(API_ENDPOINT, "dns/retrieve/{}".format(domain)) + req_json = { + "apikey": self.api_key, + "secretapikey": self.secret_api_key + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [parse_dns_record(record) for record in json.loads(r.text).get("records", [])] + else: + raise Exception("ERROR: DNS retrieve api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + def dns_export(self, domain: str, filename: str, **kwargs) -> bool: + """ + Export all DNS record from the given domain as json to a file. + This method does not not represent a Porkbun API method. + + :param domain: the domain for which the DNS record should be retrieved and saved + :param filename: the filename where to save the exported DNS records + + :return: True if everything went well + """ + + assert domain is not None and len(domain) > 0 + assert filename is not None and len(filename) > 0 + + print("retrieve current DNS records...") + dns_records = self.dns_retrieve(domain) + + print("save DNS records to {} ...".format(filename)) + # merge the single DNS records into one single dict with the record id as key + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record["id"]] = record + + filepath = Path(filename) + if filepath.exists(): + raise Exception("File already exists. Please try another filename") + with open(filepath, "w") as f: + json.dump(dns_records_dict, f) + print("export finished") + + return True + + def dns_import(self, domain: str, filename: str, restore_mode: DNSRestoreMode, **kwargs) -> bool: + """ + Restore + This method does not not represent a Porkbun API method. + + :param domain: the domain for which the DNS record should be restored + :param filename: the filename from which the DNS records are to be restored + :param restore_mode: The restore mode (DNS records are identified by the record id) + clean: remove all existing DNS records and restore all DNS records from the provided file + replace: replace only existing DNS records with the DNS records from the provided file, + but do not create any new DNS records + keep: keep the existing DNS records and only create new ones for all DNS records from + the specified file if they do not exist + + :return: True if everything went well + """ + + assert domain is not None and len(domain) > 0 + assert filename is not None and len(filename) > 0 + assert isinstance(restore_mode, DNSRestoreMode) + + existing_dns_records = self.dns_retrieve(domain) + + with open(filename, "r") as f: + exported_dns_records_dict = json.load(f) + + if restore_mode is DNSRestoreMode.clear: + print("restore mode: clear") + + try: + # delete all existing DNS records + for record in existing_dns_records: + self.dns_delete(domain, record["id"]) + + # restore all exported records by creating new DNS records + for _, exported_record in exported_dns_records_dict.items(): + name = ".".join(exported_record["name"].split(".")[:-2]) + self.dns_create(domain=domain, + record_type=exported_record["type"], + content=exported_record["content"], + name=name, + ttl=exported_record["ttl"], + prio=exported_record["prio"]) + except Exception as e: + print("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + print("import failed") + return False + elif restore_mode is DNSRestoreMode.replace: + print("restore mode: replace") + + try: + for existing_record in existing_dns_records: + record_id = existing_record["id"] + exported_record = exported_dns_records_dict.get(record_id, None) + # also check if the exported dns record is different to the existing record, + # so we can reduce unnecessary api calls + if exported_record is not None and exported_record != existing_record: + name = ".".join(exported_record["name"].split(".")[:-2]) + self.dns_edit(domain=domain, + record_id=record_id, + record_type=exported_record["type"], + content=exported_record["content"], + name=name, + ttl=exported_record["ttl"], + prio=exported_record["prio"]) + except Exception as e: + print("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + print("import failed") + return False + elif restore_mode is DNSRestoreMode.keep: + print("restore mode: keep") + + existing_dns_records_dict = dict() + for record in existing_dns_records: + existing_dns_records_dict[record["id"]] = record + + try: + for _, exported_record in exported_dns_records_dict.items(): + if exported_record["id"] not in existing_dns_records_dict: + name = ".".join(exported_record["name"].split(".")[:-2]) + self.dns_create(domain=domain, + record_type=exported_record["type"], + content=exported_record["content"], + name=name, + ttl=exported_record["ttl"], + prio=exported_record["prio"]) + except Exception as e: + print("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + print("import failed") + return False + else: + raise Exception("restore mode not supported") + + print("import successfully completed") + + return True + + @staticmethod + def get_domain_pricing(**kwargs) -> dict: + """ + Get the pricing for porkbun domains + see https://porkbun.com/api/json/v3/documentation#Domain%20Pricing for more info + + :return: dict with pricing + """ + + url = urljoin(API_ENDPOINT, "pricing/get") + r = requests.post(url=url) + + if r.status_code == 200: + return json.loads(r.text) + else: + raise Exception("ERROR: Domain pricing retrieve api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + def ssl_retrieve(self, domain, **kwargs) -> Tuple[str, str, str, str]: + """ + API SSL bundle retrieve method: retrieve an SSL bundle for given domain + see https://porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info + + :param domain: the domain for which the SSL bundle should be retrieved + + :return: tuple of intermediate certificate, certificate chain, private key, public key + """ + + assert domain is not None and len(domain) > 0 + + url = urljoin(API_ENDPOINT, "ssl/retrieve/{}".format(domain)) + req_json = { + "apikey": self.api_key, + "secretapikey": self.secret_api_key + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + ssl_bundle = json.loads(r.text) + + intermediate_certificate = ssl_bundle["intermediate_certificate"] + certificate_chain = ssl_bundle["certificate_chain"] + private_key = ssl_bundle["private_key"] + public_key = ssl_bundle["public_key"] + + return intermediate_certificate, certificate_chain, private_key, public_key + else: + raise Exception("ERROR: SSL bundle retrieve api call was not successfully\n" + "Status code: {}\n" + "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) + + @staticmethod + def __handle_error_backup__(dns_records): + # merge the single DNS records into one single dict with the record id as key + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record["id"]] = record + + # generate filename with incremental suffix + base_backup_filename = "pkb_client_dns_records_backup" + suffix = 0 + backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) + while backup_file_path.exists(): + suffix += 1 + backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) + + with open(backup_file_path, "w") as f: + json.dump(dns_records_dict, f) + + print("a backup of your existing dns records was saved to {}".format(str(backup_file_path))) diff --git a/pkb_client/helper.py b/pkb_client/helper.py new file mode 100644 index 0000000..0c08167 --- /dev/null +++ b/pkb_client/helper.py @@ -0,0 +1,15 @@ +def parse_dns_record(record: dict) -> dict: + """ + Parse the DNS record. + Replace the ttl and prio string values with the int values. + + :param record: the unparsed DNS record dict + + :return: the parsed dns record dict + """ + if record.get("ttl", None) is not None: + record["ttl"] = int(record["ttl"]) + if record.get("prio", None) is not None: + record["prio"] = int(record["prio"]) + + return record diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..1e3acd2 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools>=50.3.0", "wheel"] +build-backend = "setuptools.build_meta:__legacy__" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2c81c10 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +setuptools>=39.0.1 +requests>=2.20.0 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..fcc4a15 --- /dev/null +++ b/setup.py @@ -0,0 +1,41 @@ +from setuptools import setup, find_packages + +import pkb_client + +with open("Readme.md") as f: + long_description = f.read() + +setup( + name="pkb_client", + version=pkb_client.__version__, + author="infinityofspace", + url="https://github.com/infinityofspace/pkb_client", + description="Unofficial client for the Porkbun API", + long_description=long_description, + long_description_content_type="text/markdown", + license="MIT", + classifiers=[ + "Development Status :: 4 - Beta", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "License :: OSI Approved :: MIT License", + "Topic :: Internet :: WWW/HTTP", + "Topic :: Internet :: Name Service (DNS)", + "Topic :: Utilities", + "Topic :: System :: Systems Administration", + ], + packages=find_packages(), + python_requires=">=3.6", + install_requires=[ + "setuptools>=39.0.1", + "requests>=2.20.0" + ], + entry_points={ + "console_scripts": [ + "pkb-client = pkb_client.cli:main", + ] + } +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/__init__.py diff --git a/tests/pkb_client_tests.py b/tests/pkb_client_tests.py new file mode 100644 index 0000000..2668a3c --- /dev/null +++ b/tests/pkb_client_tests.py @@ -0,0 +1,569 @@ +import json +import os +import unittest +from pathlib import Path + +import requests + +from pkb_client.client import PKBClient, DNSRestoreMode + +""" +WARNING: DO NOT RUN THIS TEST WITH A PRODUCTION DOMAIN OR IN A PRODUCTION ENVIRONMENT!! + This test sets, edits and deletes dns record entries and if the test fails, + unintended changes to dns entries may result. +""" + +TEST_DOMAIN = os.environ.get("TEST_DOMAIN") +PORKBUN_API_KEY = os.environ.get("PORKBUN_API_KEY") +PORKBUN_API_SECRET = os.environ.get("PORKBUN_API_SECRET") +DNS_RECORDS = os.environ.get("DNS_RECORDS") + +PUBLIC_IP_URL = "https://api64.ipify.org" + + +class DNSTestWithCleanup(unittest.TestCase): + + def tearDown(self): + if hasattr(self, "record_id") and self.record_id is not None: + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + pkb_client.dns_delete(TEST_DOMAIN, self.record_id) + + +class TestClientAuth(unittest.TestCase): + def test_valid_auth(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + ip_address = pkb_client.ping() + + self.assertEqual(ip_address, requests.get(PUBLIC_IP_URL).text) + + def test_invalid_api_key(self): + pkb_client = PKBClient("invalid-api-key", PORKBUN_API_SECRET) + with self.assertRaises(Exception): + pkb_client.ping() + + def test_invalid_api_secret(self): + pkb_client = PKBClient(PORKBUN_API_KEY, "invalid-api-secret") + with self.assertRaises(Exception): + pkb_client.ping() + + def test_invalid_api_key_and_secret(self): + pkb_client = PKBClient("invalid-api-key", "invalid-api-secret") + with self.assertRaises(Exception): + pkb_client.ping() + + +class TestPingMethod(unittest.TestCase): + def test_ping(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + ip_address = pkb_client.ping() + + self.assertEqual(ip_address, requests.get(PUBLIC_IP_URL).text) + + +class TestDNSCreateMethod(DNSTestWithCleanup): + + def test_valid_request(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + ttl = 342 + name = "test_pkb_client" + + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name, ttl=ttl) + records = pkb_client.dns_retrieve(TEST_DOMAIN) + + for record in records: + if record["id"] == self.record_id: + with self.subTest(): + self.assertEqual(txt_content, record["content"]) + with self.subTest(): + self.assertEqual(ttl, int(record["ttl"])) + with self.subTest(): + self.assertEqual("{}.{}".format(name, TEST_DOMAIN), record["name"]) + return + self.assertTrue(False) + + def test_invalid_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + with self.assertRaises(Exception): + self.record_id = pkb_client.dns_create("notvaliddomain", "TXT", "interesting-content", + name="test_pkb_client") + + def test_invalid_record_type(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + with self.assertRaises(AssertionError): + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "ABC", "interesting-content", name="test_pkb_client") + + def test_larger_than_allowed_content_length(self): + # the api call should not fail because the api creates multiple TXT entries which will be concatenated + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + txt_content = "interesting-content-interesting-content-interesting-content-interesting-content-" \ + "interesting-content-interesting-content-interesting-content-interesting-content-" \ + "interesting-content-interesting-content-interesting-content-interesting-content-" \ + "interesting-content" + assert len(txt_content) == 259 + + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client") + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + self.assertEqual(txt_content, record["content"]) + return + self.assertTrue(False) + + def test_largest_allowed_content_length(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + txt_content = "interesting-content-interesting-content-interesting-content-interesting-content-" \ + "interesting-content-interesting-content-interesting-content-interesting-content-" \ + "interesting-content-interesting-content-interesting-content-interesting-content-" \ + "interesting-con" + assert len(txt_content) == 255 + + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client") + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + self.assertEqual(txt_content, record["content"]) + return + self.assertTrue(False) + + def test_empty_content_str(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + txt_content = "" + assert len(txt_content) == 0 + + with self.assertRaises(AssertionError): + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client") + + def test_none_content(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", None, name="test_pkb_client") + + def test_smaller_than_allowed_ttl(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", ttl=299, + name="test_pkb_client") + + def test_negative_ttl(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", ttl=-1, + name="test_pkb_client") + + def test_larger_than_allowed_ttl(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", name="test_pkb_client", + ttl=2147483648) + + def test_largest_allowed_ttl(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + ttl = 2147483647 + + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", ttl=ttl) + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest(): + self.assertEqual(txt_content, record["content"]) + with self.subTest(): + self.assertEqual(ttl, int(record["ttl"])) + return + self.assertTrue(False) + + def test_valid_prio_with_txt(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + prio = 10 + + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", prio=prio) + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest(): + self.assertEqual(txt_content, record["content"]) + with self.subTest(): + self.assertEqual(prio, int(record["prio"])) + return + self.assertTrue(False) + + def test_negative_prio_with_txt(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + prio = -42 + + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", prio=prio) + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest(): + self.assertEqual(txt_content, record["content"]) + with self.subTest(): + self.assertEqual(prio, int(record["prio"])) + return + self.assertTrue(False) + + +class TestDNSEditMethod(DNSTestWithCleanup): + def test_valid_edit_request(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + name = "test_pkb_client" + tll = 342 + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name, ttl=tll) + + edited_txt_content = "more-interesting-content" + edited_name = "more_test_pkb_client" + edited_tll = 423 + pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content, name=edited_name, ttl=edited_tll) + + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest("txt record content is not edited"): + self.assertEqual(edited_txt_content, record["content"]) + with self.subTest("txt record name is not edited"): + self.assertEqual("{}.{}".format(edited_name, TEST_DOMAIN), record["name"]) + with self.subTest("txt record ttl is not edited"): + self.assertEqual(edited_tll, int(record["ttl"])) + return + self.assertTrue(False) + + def test_change_subdomain_to_root_txt_record(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + name = "test_pkb_client" + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) + + edited_txt_content = "more-interesting-content" + edited_name = "" + pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content, name=edited_name) + + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest("txt record content is not edited"): + self.assertEqual(edited_txt_content, record["content"]) + with self.subTest("txt record name is not edited"): + self.assertEqual(TEST_DOMAIN, record["name"]) + return + self.assertTrue(False) + + def test_no_name_change(self): + # the name is required for each edit, otherwise the record will apply for the root domain + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + name = "test_pkb_client" + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) + + edited_txt_content = "more-interesting-content" + pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content) + + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest("txt record content is not edited"): + self.assertEqual(edited_txt_content, record["content"]) + with self.subTest("txt record name is not edited"): + self.assertEqual(TEST_DOMAIN, record["name"]) + return + self.assertTrue(False) + + def test_record_type_change(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + name = "test_pkb_client" + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) + + edited_txt_content = "more-interesting-content" + name = "test_pkb_client" + edited_record_type = "MX" + pkb_client.dns_edit(TEST_DOMAIN, self.record_id, edited_record_type, edited_txt_content, name=name) + + records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in records: + if record["id"] == self.record_id: + with self.subTest("txt record content is not edited"): + self.assertEqual(edited_txt_content, record["content"]) + with self.subTest("record type is not edited"): + self.assertEqual(edited_record_type, record["type"]) + return + self.assertTrue(False) + + +class TestDNSDeleteMethod(DNSTestWithCleanup): + def test_valid_delete_request(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + txt_content = "interesting-content" + name = "test_pkb_client" + self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) + + records = pkb_client.dns_retrieve(TEST_DOMAIN) + record_exists = False + for record in records: + if record["id"] == self.record_id: + record_exists = True + break + with self.subTest("test txt record setup failed"): + self.assertTrue(record_exists) + + pkb_client.dns_delete(TEST_DOMAIN, self.record_id) + + records = pkb_client.dns_retrieve(TEST_DOMAIN) + record_exists = False + for record in records: + if record["id"] == self.record_id: + record_exists = True + break + if not record_exists: + self.record_id = None + with self.subTest("txt record is not deleted"): + self.assertFalse(record_exists) + + +class TestDNSReceiveMethod(unittest.TestCase): + def test_valid_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + records = pkb_client.dns_retrieve(TEST_DOMAIN) + self.assertEqual(records, DNS_RECORDS) + + def test_invalid_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + with self.assertRaises(Exception): + pkb_client.dns_retrieve("invaliddomain") + + +class TestDNSExport(unittest.TestCase): + def test_valid_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + dns_records = pkb_client.dns_retrieve(domain=TEST_DOMAIN) + # reformat the dns records to a single dict + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record["id"]] = record + + filepath = Path("dns_backup.json") + if filepath.exists(): + filepath.unlink() + pkb_client.dns_export(domain=TEST_DOMAIN, filename=str(filepath)) + + with open(str("dns_backup.json"), "r") as f: + self.assertEqual(json.load(f), dns_records_dict) + + filepath.unlink() + + def test_invalid_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(Exception): + pkb_client.dns_export(domain="invaliddomain", filename="dns_backup.json") + + def test_empty_str_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_export(domain="", filename="dns_backup.json") + + def test_none_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_export(domain=None, filename="dns_backup.json") + + def test_filename_already_exists(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + filepath = Path("dns_backup.json") + filepath.touch() + + with self.assertRaises(Exception): + pkb_client.dns_export(domain=TEST_DOMAIN, filename=str(filepath)) + + filepath.unlink() + + def test_empty_str_filename(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_export(domain=TEST_DOMAIN, filename="") + + def test_none_filename(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_export(domain=TEST_DOMAIN, filename=None) + + +class TestDNSImport(unittest.TestCase): + def test_valid_clear_import(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + existing_dns_record_ids = set() + dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in dns_records: + existing_dns_record_ids.add(record["id"]) + + filename = "dns_backup_clear.json" + + with open(filename, "r") as f: + file_dns_records = json.load(f) + + pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.clear) + + new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) + for new_record in new_dns_records: + # test if the previous dns records still exists + if new_record["id"] in existing_dns_record_ids: + self.assertTrue(False) + # test if the new dns record was created + new_record_created = False + for _, file_dns_record in file_dns_records.items(): + if file_dns_record["name"] == new_record["name"] \ + and file_dns_record["type"] == new_record["type"] \ + and file_dns_record["content"] == new_record["content"] \ + and file_dns_record["ttl"] == new_record["ttl"] \ + and file_dns_record["prio"] == new_record["prio"]: + new_record_created = True + self.assertTrue(new_record_created) + + def test_valid_replace_import(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + existing_dns_record_ids = set() + dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in dns_records: + existing_dns_record_ids.add(record["id"]) + + filename = "dns_backup_replace.json" + + with open(filename, "r") as f: + file_dns_records = json.load(f) + + pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.replace) + + new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) + for new_record in new_dns_records: + # test if the previous dns records still exists + if new_record["id"] not in existing_dns_record_ids: + self.assertTrue(False) + # test if the dns record was edited + record_edited = False + for _, file_dns_record in file_dns_records.items(): + if file_dns_record["name"] == new_record["name"] \ + and file_dns_record["type"] == new_record["type"] \ + and file_dns_record["content"] == new_record["content"] \ + and file_dns_record["ttl"] == new_record["ttl"] \ + and file_dns_record["prio"] == new_record["prio"]: + record_edited = True + break + self.assertTrue(record_edited) + + def test_valid_keep_import(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + existing_dns_records = dict() + dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) + for record in dns_records: + existing_dns_records[record["id"]] = record + + filename = "dns_backup_keep.json" + + with open(filename, "r") as f: + file_dns_records = json.load(f) + + pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.keep) + + new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) + + # test if the all old dns records are kept + for _, existing_record in existing_dns_records.items(): + record_kept = False + for new_record in new_dns_records: + if existing_record["id"] == new_record["id"] \ + and existing_record["name"] == new_record["name"] \ + and existing_record["type"] == new_record["type"] \ + and existing_record["content"] == new_record["content"] \ + and existing_record["ttl"] == new_record["ttl"] \ + and existing_record["prio"] == new_record["prio"]: + record_kept = True + break + with self.subTest(): + self.assertTrue(record_kept) + + # test if the new records are created + for new_record in new_dns_records: + if new_record["id"] not in existing_dns_records: + record_created = False + for _, file_dns_record in file_dns_records.items(): + if file_dns_record["name"] == new_record["name"] \ + and file_dns_record["type"] == new_record["type"] \ + and file_dns_record["content"] == new_record["content"] \ + and file_dns_record["ttl"] == new_record["ttl"] \ + and file_dns_record["prio"] == new_record["prio"]: + record_created = True + break + with self.subTest(): + self.assertTrue(record_created) + + def test_invalid_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(Exception): + pkb_client.dns_import(domain="invaliddomain", filename="dns_backup.json", restore_mode=DNSRestoreMode.clear) + + def test_empty_str_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain="", filename="dns_backup.json", restore_mode=DNSRestoreMode.clear) + + def test_none_domain(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain=None, filename="dns_backup.json", restore_mode=DNSRestoreMode.clear) + + def test_empty_str_filename(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain=TEST_DOMAIN, filename="", restore_mode=DNSRestoreMode.clear) + + def test_none_filename(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain=TEST_DOMAIN, filename=None, restore_mode=DNSRestoreMode.clear) + + def test_invalid_restore_mode(self): + pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) + + with self.subTest("None as restore mode"): + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode=None) + with self.subTest("empty string as restore mode"): + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode="") + with self.subTest("number as restore mode"): + with self.assertRaises(AssertionError): + pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode=0) + + +if __name__ == '__main__': + unittest.main() diff --git a/third-party-notices b/third-party-notices new file mode 100644 index 0000000..85cdfbc --- /dev/null +++ b/third-party-notices @@ -0,0 +1,216 @@ +This project uses other Python modules released under different license agreement than +pkb_client. Below are the used modules and their license and notice: + +########################################################################################### +## requests: ## + +License: + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +Notice: + +Requests +Copyright 2019 Kenneth Reitz + +########################################################################################### + +########################################################################################### +## setuptools: ## + +License: + +Copyright Jason R. Coombs + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to +deal in the Software without restriction, including without limitation the +rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +sell copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +IN THE SOFTWARE. + +###########################################################################################
\ No newline at end of file |
