diff options
46 files changed, 3587 insertions, 1456 deletions
diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md deleted file mode 100644 index 8a0d833..0000000 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ /dev/null @@ -1,29 +0,0 @@ ---- -name: Bug report -about: Create a report to help us improve pkb_client -title: '' -labels: bug -assignees: '' - ---- - -**Describe the bug** -A clear and concise description of what the bug is. - -**To Reproduce** -Steps to reproduce the behavior. - -**Expected behavior** -A clear and concise description of what you expected to happen. - -**pkb-client Command** -Specify the exact command of pkb-client. **Make sure to anonymize your Porkbun API key and secret and your Porkbun domain/subdomain.** - -**Versions (please complete the following version information):** - - pkb_client: [you can use `pip show pkb_client` to get the version] - -**Error message** -The returned error message. - -**Additional context** -Add any other context about the problem here. diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml new file mode 100644 index 0000000..5b12aa8 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -0,0 +1,83 @@ +name: Bug Report +description: File a bug report +title: "[Bug]: " +labels: ["bug", "triage"] + +body: + - type: markdown + attributes: + value: "Thanks for taking the time to fill out this bug report!" + + - type: textarea + id: system-os-info + attributes: + label: OS info + description: The operating system and version you are using. + placeholder: eg. Debian 12 , Ubuntu 23.10, Arch 6.6.3, Windows 10, MacOS 12 + validations: + required: true + + - type: textarea + id: version-info + attributes: + label: pkb_client version + description: The pkb_client version you are using. + placeholder: eg. v1.2 + validations: + required: true + + - type: textarea + id: bug-description + attributes: + label: Bug description + description: A clear and precise description of what the bug is. + placeholder: What happend? + validations: + required: true + + - type: textarea + id: expected-description + attributes: + label: Expected behaviour description + description: A simple and precise description of the expected behavior. + placeholder: What should happend? + validations: + required: true + + - type: textarea + id: logs + attributes: + label: Relevant log output + description: Please copy and paste any relevant log output. + render: shell + validations: + required: false + + - type: textarea + id: reproduce-steps + attributes: + label: Steps to reproduce + description: Steps to reproduce the behavior. + placeholder: How can the error be reproduced? + validations: + required: true + + - type: textarea + id: additional-context + attributes: + label: Additional context + description: All further information on the context of the bug that does not belong to the other sections, such as a workaround or already tested approaches to a solution. + placeholder: Is there any further context? + validations: + required: false + + - type: checkboxes + id: checklist + attributes: + label: Checklist + description: Please check off the following checklist after you have performed the corresponding actions + options: + - label: I have checked for [existing Github issues](https://github.com/infinityofspace/pkb_client/issues) for the same bug. + required: true + - label: I have checked to see if there is newer current version that already fixes this error. + required: true diff --git a/.github/ISSUE_TEMPLATE/feature_request.yml b/.github/ISSUE_TEMPLATE/feature_request.yml new file mode 100644 index 0000000..2b82445 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.yml @@ -0,0 +1,36 @@ +name: Feature Request +description: File a feature request +title: "[Enhancement] " +labels: ["enhancement"] + +body: + - type: markdown + attributes: + value: "Thanks for taking the time to fill out this feature request!" + + - type: textarea + id: problem-description + attributes: + label: Problem description + description: Description of the problem that needs to be solved. + placeholder: What problem should be solved? + validations: + required: true + + - type: textarea + id: solution-description + attributes: + label: Solution description + description: Description of a possible problem solution. + placeholder: Ideas for a solution? + validations: + required: false + + - type: checkboxes + id: checklist + attributes: + label: Checklist + description: Please check off the following checklist after you have performed the corresponding actions + options: + - label: I have checked for [existing Github issues](https://github.com/infinityofspace/pkb_client/issues) for the same feature request. + required: true diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..670670d --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,11 @@ +version: 2 +updates: + - package-ecosystem: "pip" + directory: "/" + schedule: + interval: "daily" + + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "daily" diff --git a/.github/workflows/docs_publish.yml b/.github/workflows/docs_publish.yml new file mode 100644 index 0000000..af146b2 --- /dev/null +++ b/.github/workflows/docs_publish.yml @@ -0,0 +1,32 @@ +name: build and publish docs + +on: + push: + tags: + - "v*" + branches: # for testing + - main + +jobs: + build: + name: build and publish docs + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.12 + uses: actions/setup-python@v5.3.0 + with: + python-version: 3.12 + + - name: Install sphinx + run: pip install sphinx + + - name: Build docs + run: cd docs && make html + + - name: Publish docs to GitHub Pages + uses: peaceiris/actions-gh-pages@v4 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + publish_dir: docs/build/html diff --git a/.github/workflows/docs_source_update.yml b/.github/workflows/docs_source_update.yml new file mode 100644 index 0000000..c80ee0b --- /dev/null +++ b/.github/workflows/docs_source_update.yml @@ -0,0 +1,37 @@ +name: update docs source + +on: + push: + branches: + - main + paths: + - pkb_client/** + +jobs: + update: + name: update docs source + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.12 + uses: actions/setup-python@v5.3.0 + with: + python-version: 3.12 + + - name: Install sphinx + run: pip install sphinx + + - name: Build docs source + run: sphinx-apidoc -f -o docs/source pkb_client + + - name: Open PR with changes + uses: peter-evans/create-pull-request@v7 + with: + token: ${{ secrets.GITHUB_TOKEN }} + title: "[Docs]: update docs source" + commit-message: "update docs source" + branch: "docs_source_update" + delete-branch: true + label: "docs-update" + add-paths: docs/source diff --git a/.github/workflows/formatting_check.yml b/.github/workflows/formatting_check.yml new file mode 100644 index 0000000..fadbadd --- /dev/null +++ b/.github/workflows/formatting_check.yml @@ -0,0 +1,26 @@ +name: formatting check + +on: + push: + pull_request: + +jobs: + formatting-check: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ "3.13" ] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5.3.0 + with: + python-version: ${{ matrix.python-version }} + + - name: Install requirements + run: pip install -r requirements.txt + + - name: Check formatting + run: ruff format --check diff --git a/.github/workflows/linting_check.yml b/.github/workflows/linting_check.yml new file mode 100644 index 0000000..e223b36 --- /dev/null +++ b/.github/workflows/linting_check.yml @@ -0,0 +1,26 @@ +name: linting check + +on: + push: + pull_request: + +jobs: + linting-check: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ "3.13" ] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5.3.0 + with: + python-version: ${{ matrix.python-version }} + + - name: Install requirements + run: pip install -r requirements.txt + + - name: Check formatting + run: ruff check diff --git a/.github/workflows/pypi-publish-release.yml b/.github/workflows/pypi-publish-release.yml index 2071faa..5150277 100644 --- a/.github/workflows/pypi-publish-release.yml +++ b/.github/workflows/pypi-publish-release.yml @@ -10,12 +10,12 @@ jobs: name: Build distribution runs-on: ubuntu-latest steps: - - uses: actions/checkout@master + - uses: actions/checkout@v4 - - name: Set up Python 3.6 - uses: actions/setup-python@v2 + - name: Set up Python 3.9 + uses: actions/setup-python@v5.3.0 with: - python-version: 3.6 + python-version: 3.9 - name: Install pep517 run: >- @@ -34,7 +34,7 @@ jobs: . - name: Upload distribution artifact for other jobs - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: pkb_client_dist path: dist/ @@ -45,7 +45,7 @@ jobs: needs: build steps: - name: Download distribution from build job - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v4 with: name: pkb_client_dist path: dist/ diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml new file mode 100644 index 0000000..53e60e1 --- /dev/null +++ b/.github/workflows/unit_tests.yml @@ -0,0 +1,25 @@ +name: unit tests + +on: + push: + +jobs: + tests: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13" ] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5.3.0 + with: + python-version: ${{ matrix.python-version }} + + - name: Install requirements + run: pip install -r requirements.txt + + - name: Run unit tests + run: python -m unittest tests/client.py diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6d2ca3d --- /dev/null +++ b/Makefile @@ -0,0 +1,90 @@ +VERSION = 2.0.0 +RELEASE = 1 + +# system paths +RESULT_PATH = target +RPMBUILD_PATH = ~/rpmbuild + +all: help + +help: + @printf '\nusuage make ...\n' + @printf ' clean -> remove results\n' + @printf ' package -> package archive for deploy .tar.xz\n' + @printf ' build-spec -> build python3-pkb-client.spec\n' + @printf ' build-srpm -> build python3-pkb-client-'${VERSION}-${RELEASE}'.src.rpm\n' + @printf ' build-rpm -> build python3-pkb-client-'${VERSION}-${RELEASE}'.noarch.rpm\n' + +# helper commands + +clean: + @printf '[INFO] removing '${RESULT_PATH}/'\n' + @rm -rf python3-pkb-client.spec ${RESULT_PATH}/ + +package: ${RESULT_PATH}/python3-pkb-client-${VERSION}.tar.xz + +build-spec: python3-pkb-client.spec + +build-srpm: ${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.src.rpm + +build-rpm: ${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.noarch.rpm + +# file generators + +python3-pkb-client.spec: + @mkdir -p ${RESULT_PATH}/ + @printf '[INFO] generating python3-pkb-client.spec\n' | tee -a ${RESULT_PATH}/build.log + @printf '%%global modname pkb_client\n\n' > python3-pkb-client.spec + @printf 'Name: python3-pkb-client\n' >> python3-pkb-client.spec + @printf 'Version: '${VERSION}'\n' >> python3-pkb-client.spec + @printf 'Release: '${RELEASE}'\n' >> python3-pkb-client.spec + @printf 'Obsoletes: %%{name} <= %%{version}\n' >> python3-pkb-client.spec + @printf 'Summary: Python client for the Porkbun API\n\n' >> python3-pkb-client.spec + @printf 'License: MIT License\n' >> python3-pkb-client.spec + @printf 'URL: https://github.com/infinityofspace/pkb_client/\n' >> python3-pkb-client.spec + @printf 'Source0: %%{name}-%%{version}.tar.xz\n\n' >> python3-pkb-client.spec + @printf 'BuildArch: noarch\n' >> python3-pkb-client.spec + @printf 'BuildRequires: python3-setuptools\n' >> python3-pkb-client.spec + @printf 'BuildRequires: python3-rpm-macros\n' >> python3-pkb-client.spec + @printf 'BuildRequires: python3-py\n\n' >> python3-pkb-client.spec + @printf '%%?python_enable_dependency_generator\n\n' >> python3-pkb-client.spec + @printf '%%description\n' >> python3-pkb-client.spec + @printf 'Python client for the Porkbun API\n\n' >> python3-pkb-client.spec + @printf '%%prep\n' >> python3-pkb-client.spec + @printf '%%autosetup -n %%{modname}_v%%{version}\n\n' >> python3-pkb-client.spec + @printf '%%build\n' >> python3-pkb-client.spec + @printf '%%py3_build\n\n' >> python3-pkb-client.spec + @printf '%%install\n' >> python3-pkb-client.spec + @printf '%%py3_install\n\n' >> python3-pkb-client.spec + @printf '%%files\n' >> python3-pkb-client.spec + @printf '%%doc Readme.md\n' >> python3-pkb-client.spec + @printf '%%license License\n' >> python3-pkb-client.spec + @printf '%%{_bindir}/pkb-client\n' >> python3-pkb-client.spec + @printf '%%{python3_sitelib}/%%{modname}/\n' >> python3-pkb-client.spec + @printf '%%{python3_sitelib}/%%{modname}-%%{version}*\n\n' >> python3-pkb-client.spec + @printf '%%changelog\n' >> python3-pkb-client.spec + @printf '...\n' >> python3-pkb-client.spec + @printf '\n' >> python3-pkb-client.spec + +${RESULT_PATH}/python3-pkb-client-${VERSION}.tar.xz: + @mkdir -p ${RESULT_PATH}/ + @printf '[INFO] packing python3-pkb-client-'${VERSION}'.tar.xz\n' | tee -a ${RESULT_PATH}/build.log + @mkdir -p ${RESULT_PATH}/pkb_client_v${VERSION} + @cp -r pkb_client requirements.txt setup.py License Readme.md \ + ${RESULT_PATH}/pkb_client_v${VERSION}/ + @cd ${RESULT_PATH}; tar -I "pxz -9" -cf python3-pkb-client-${VERSION}.tar.xz pkb_client_v${VERSION} + +${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.src.rpm: ${RESULT_PATH}/python3-pkb-client-${VERSION}.tar.xz python3-pkb-client.spec + @printf '[INFO] building python3-pkb-client-'${VERSION}-${RELEASE}'.src.rpm\n' | tee -a ${RESULT_PATH}/build.log + @mkdir -p ${RPMBUILD_PATH}/SOURCES/ + @cp ${RESULT_PATH}/python3-pkb-client-${VERSION}.tar.xz ${RPMBUILD_PATH}/SOURCES/ + @rpmbuild -bs python3-pkb-client.spec &>> ${RESULT_PATH}/build.log + @mv ${RPMBUILD_PATH}/SRPMS/python3-pkb-client-${VERSION}-${RELEASE}.src.rpm ${RESULT_PATH}/ + +${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.noarch.rpm: ${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.src.rpm + @printf '[INFO] building python3-pkb-client-'${VERSION}-${RELEASE}'.noarch.rpm\n' | tee -a ${RESULT_PATH}/build.log + @mkdir -p ${RPMBUILD_PATH}/SRPMS/ + @cp ${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.src.rpm ${RPMBUILD_PATH}/SRPMS/ + @rpmbuild --rebuild ${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.src.rpm &>> ${RESULT_PATH}/build.log + @mv ${RPMBUILD_PATH}/RPMS/noarch/python3-pkb-client-${VERSION}-${RELEASE}.noarch.rpm ${RESULT_PATH}/ + @@ -1,9 +1,9 @@ # pkb_client -Unofficial client for the Porkbun API +Python client for the Porkbun API --- -[](https://pypi.org/project/pkb-client/)  [](https://pepy.tech/project/pkb-client)   +[](https://pypi.org/project/pkb-client/)  [](https://pepy.tech/project/pkb-client)   --- ### Table of Contents @@ -13,19 +13,24 @@ Unofficial client for the Porkbun API 1. [With pip (recommend)](#with-pip-recommend) 2. [From source](#from-source) 3. [Usage](#usage) -4. [Third party notices](#third-party-notices) -5. [License](#license) +4. [Notes](#notes) +5. [Third party notices](#third-party-notices) +6. [Development](#development) + 1. [Setup environment](#setup-environment) + 2. [Tests](#tests) + 3. [Documentation](#documentation) +7. [License](#license) --- ### About -*pkb_client* is an unofficial client for the [Porkbun](https://porkbun.com) API. It supports the v3 of the API. You can -find the official documentation of the Porkbun API [here](https://porkbun.com/api/json/v3/documentation). +*pkb_client* is a python client for the [Porkbun](https://porkbun.com) API. It supports the v3 of the API. You can +find the official documentation of the Porkbun API [here](https://api.porkbun.com/api/json/v3/documentation). ### Installation -This project only works with Python 3, make sure you have at least Python 3.6 installed. +This project only works with Python 3, make sure you have at least Python 3.9 installed. #### With pip (recommend) @@ -46,14 +51,16 @@ pip3 install pkb_client -U ```commandline git clone https://github.com/infinityofspace/pkb_client.git cd pkb_client -pip install . +pip3 install . ``` ### Usage Each request must be made with the API key and secret. You can easily create them at Porkbun. Just follow -the [official instructions](https://porkbun.com/api/json/v3/documentation#Authentication). Make sure that you explicitly -activate the API usage for your domain at the end. +the [official instructions](https://api.porkbun.com/api/json/v3/documentation#Authentication). Make sure that you explicitly +activate the API usage for your domain at the end. There are two ways to use `pkb_client`. The first way is to use it as +a Python module. See the [module documentation](https://infinityofspace.github.io/pkb_client) for more information. The +second way is to use the module from the command line, see below for more information. After installation *pkb_client* is available under the command `pkb-client`. @@ -64,10 +71,11 @@ pkb-client -k <YOUR-API-KEY> -s <YOUR-API-SECRET> ping ``` If you don't want to specify the key and secret in the program call, because for example the command line calls are -logged and you don't want to log the API access, then you can also omit both arguments and *pkb-client* asks for a user -input. +logged, and you don't want to log the API access, then you can also set the environment variables `PKB_API_KEY` and +`PKB_API_SECRET`. If you not specify API key and secret in any way, *pkb-client* asks for a user input. The command line +arguments of the API key and secret have the highest priority. -You can see an overview of all usable API methods via the help: +You can see an overview of all usable cli methods via the help: ```commandline pkb-client -h @@ -121,19 +129,77 @@ Remove all existing DNS records of the domain `example.com` and restore the DNS pkb-client -k <YOUR-API-KEY> -s <YOUR-API-SECRET> dns-import example.com dns_recods.json clear ``` -*Note:* The import function uses the record ID to distinguish DNS records. +*Note:* The `dns-import` function uses the record ID to distinguish DNS records. + +### Notes + +Currently, TTL smaller than `600` are ignored by the Porkbun API and the minimum value is `600`, although a minimum +value of `300` is [supported](https://api.porkbun.com/api/json/v3/documentation) and allowed by the RFC standard. However, +you can do TTL smaller than `600` via the web dashboard. ### Third party notices All modules used by this project are listed below: -| Name | License| -|:---:|:---:| -| [requests](https://github.com/psf/requests) | [Apache 2.0](https://raw.githubusercontent.com/psf/requests/master/LICENSE) | -| [setuptools](https://github.com/pypa/setuptools) | [MIT](https://raw.githubusercontent.com/pypa/setuptools/main/LICENSE) | +| Name | License | +|:-----------------------------------------------------:|:-------------------------------------------------------------------------------------------------:| +| [requests](https://github.com/psf/requests) | [Apache 2.0](https://raw.githubusercontent.com/psf/requests/master/LICENSE) | +| [setuptools](https://github.com/pypa/setuptools) | [MIT](https://raw.githubusercontent.com/pypa/setuptools/main/LICENSE) | +| [sphinx](https://github.com/sphinx-doc/sphinx) | [BSD 2 Clause](https://raw.githubusercontent.com/sphinx-doc/sphinx/refs/heads/master/LICENSE.rst) | +| [dnspython](https://github.com/rthalley/dnspython) | [ISC](https://raw.githubusercontent.com/rthalley/dnspython/refs/heads/main/LICENSEc) | +| [responses](https://github.com/getsentry/responses) | [Apache 2.0](https://raw.githubusercontent.com/getsentry/responses/refs/heads/master/LICENSE) | +| [ruff](https://github.com/astral-sh/ruff) | [MIT](https://raw.githubusercontent.com/astral-sh/ruff/refs/heads/main/LICENSE) | Furthermore, this readme file contains embeddings of [Shields.io](https://github.com/badges/shields) -and [PePy](https://github.com/psincraian/pepy). The tests use [ipify](https://github.com/rdegges/ipify-api). +and [PePy](https://github.com/psincraian/pepy) images. + +_This project is not associated with Porkbun LLC._ + +### Development + +#### Setup environment + +First get the source code: + +```commandline +git clone https://github.com/infinityofspace/pkb_client.git +cd pkb_client +``` + +Now create a virtual environment, activate it and install all dependencies with the following commands: + +```commandline +python3 -m venv venv +source venv/bin/activate +pip3 install -r requirements.txt +``` + +Now you can start developing. + +Feel free to contribute to this project by creating a pull request. +Before you create a pull request, make sure that you code meets the following requirements (you can use the specified +commands to check/fulfill the requirements): + +- check unit tests: `python -m unittest tests/*.py` +- format the code: `ruff format` +- check linting errors: `ruff check` + +#### Tests + +You can run the tests with the following command: + +```commandline +python -m unittest tests/*.py +``` + +#### Documentation + +To build the documentation you can use the following commands: + +```commandline +sphinx-apidoc -f -o docs/source pkb_client +cd docs && make html +``` ### License diff --git a/docs/.gitignore b/docs/.gitignore new file mode 100644 index 0000000..378eac2 --- /dev/null +++ b/docs/.gitignore @@ -0,0 +1 @@ +build diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..d0c3cbf --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = source +BUILDDIR = build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..dc1312a --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=source +set BUILDDIR=build + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.https://www.sphinx-doc.org/ + exit /b 1 +) + +if "%1" == "" goto help + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/docs/source/conf.py b/docs/source/conf.py new file mode 100644 index 0000000..7a07b51 --- /dev/null +++ b/docs/source/conf.py @@ -0,0 +1,30 @@ +# Configuration file for the Sphinx documentation builder. +# +# For the full list of built-in configuration values, see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +# -- Project information ----------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information + +import sys + +sys.path.append("..") + +project = "pkb_client" +copyright = "2023-2024, infinityofspace" +author = "infinityofspace" +release = "v2.0.0" + +# -- General configuration --------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration + +extensions = ["sphinx.ext.autodoc", "sphinx.ext.viewcode", "sphinx.ext.githubpages"] + +templates_path = ["_templates"] +exclude_patterns = [] + +# -- Options for HTML output ------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output + +html_theme = "alabaster" +html_static_path = ["_static"] diff --git a/docs/source/index.rst b/docs/source/index.rst new file mode 100644 index 0000000..46f8eb1 --- /dev/null +++ b/docs/source/index.rst @@ -0,0 +1,29 @@ +Welcome to pkb_client's documentation! +====================================== + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + installation + usage + modules + migration_guide + +About ++++++ + +*pkb_client* is a python client for the `Porkbun <https://porkbun.com>`_ API. It supports the v3 of the API. You can +find the official documentation of the Porkbun API `here <https://api.porkbun.com/api/json/v3/documentation>`_. + +Link to the source code: `Github <https://github.com/infinityofspace/pkb_client>`_ + +*Note:* This project is not associated with Porkbun LLC. + + +Indices and tables +++++++++++++++++++ + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/docs/source/installation.rst b/docs/source/installation.rst new file mode 100644 index 0000000..6e52bbf --- /dev/null +++ b/docs/source/installation.rst @@ -0,0 +1,17 @@ +Installation +============ + +You can either install the package from the Python Package Index (PyPI) using pip: + +.. code-block:: bash + + pip3 install pkb_client + + +or you can install the package from the source code: + +.. code-block:: bash + + git clone https://github.com/infinityofspace/pkb_client.git + cd pkb_client + pip3 install . diff --git a/docs/source/migration_guide.rst b/docs/source/migration_guide.rst new file mode 100644 index 0000000..0fea446 --- /dev/null +++ b/docs/source/migration_guide.rst @@ -0,0 +1,54 @@ +Migration Guide +=============== + +From v1 to v2 ++++++++++++++ + +The version 2 of the package is a major release that introduces a lot of changes. The main changes are: + +- support for new API methods +- package is now a proper Python package with focus on usage as a library (in general more object oriented): + - return types are now objects instead of tuples or dictionaries (except domain pricing method) + - improved and more consistent error handling + - fixed method signatures/no more additional keyworded arguments + +These changes are not backward compatible with the version 1 of the package. If you are using the version 1 of the +package, you will need to update your code to work with the version 2. + +To migrate your code from the version 1 to the version 2, follow these steps: + +1. Update the package to the version 2 or higher: + - if you are using the package from PyPI, run the following command: + .. code-block:: bash + + pip3 install --upgrade pkb_client + - if you are using the package from the source code, run the following commands: + .. code-block:: bash + + git fetch + git checkout v2.0.0 # or any later tag + pip3 install . +2. Remove any additional keyworded arguments from all `PKBClient` methods. The methods now have fixed signatures. +3. Refactor the usage of the following methods: + - `PKBClient.dns_create`: + - renamed to `PKBClient.create_dns_record` + - the method argument `record_type` needs to be enum of :class:`DNSRecordType <pkb_client.client.dns.DNSRecordType>` + - `PKBClient.dns_edit`: + - renamed to `PKBClient.update_dns_record` + - the method argument `record_type` needs to be enum of :class:`DNSRecordType <pkb_client.client.dns.DNSRecordType>` + - `PKBClient.dns_delete`: + - renamed to `PKBClient.delete_dns_records` + - `PKBClient.dns_retrieve`: + - renamed to `PKBClient.get_dns_records` + - return type is now a list of :class:`DNSRecord <pkb_client.client.dns.DNSRecord>` + - `PKBClient.dns_export`: + - renamed to `PKBClient.export_dns_records` + - the methods argument `filename` is renamed to `filepath` + - `PKBClient.dns_import`: + - renamed to `PKBClient.import_dns_records` + - the methods argument `filename` is renamed to `filepath` + - `PKBClient.get_domain_pricing`: + - method is not static anymore, you need to create an instance of `PKBClient` to use it + - `PKBClient.ssl_retrieve`: + - renamed to `PKBClient.get_ssl_bundle` + - return type is now :class:`SSLCertificate <pkb_client.client.ssl_cert.SSLCertBundle>` diff --git a/docs/source/modules.rst b/docs/source/modules.rst new file mode 100644 index 0000000..c1ae0f3 --- /dev/null +++ b/docs/source/modules.rst @@ -0,0 +1,7 @@ +pkb_client +========== + +.. toctree:: + :maxdepth: 4 + + pkb_client diff --git a/docs/source/pkb_client.cli.rst b/docs/source/pkb_client.cli.rst new file mode 100644 index 0000000..29fc20f --- /dev/null +++ b/docs/source/pkb_client.cli.rst @@ -0,0 +1,21 @@ +pkb\_client.cli package +======================= + +Submodules +---------- + +pkb\_client.cli.cli module +-------------------------- + +.. automodule:: pkb_client.cli.cli + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: pkb_client.cli + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/pkb_client.client.rst b/docs/source/pkb_client.client.rst new file mode 100644 index 0000000..fd677b1 --- /dev/null +++ b/docs/source/pkb_client.client.rst @@ -0,0 +1,61 @@ +pkb\_client.client package +========================== + +Submodules +---------- + +pkb\_client.client.bind\_file module +------------------------------------ + +.. automodule:: pkb_client.client.bind_file + :members: + :undoc-members: + :show-inheritance: + +pkb\_client.client.client module +-------------------------------- + +.. automodule:: pkb_client.client.client + :members: + :undoc-members: + :show-inheritance: + +pkb\_client.client.dns module +----------------------------- + +.. automodule:: pkb_client.client.dns + :members: + :undoc-members: + :show-inheritance: + +pkb\_client.client.domain module +-------------------------------- + +.. automodule:: pkb_client.client.domain + :members: + :undoc-members: + :show-inheritance: + +pkb\_client.client.forwarding module +------------------------------------ + +.. automodule:: pkb_client.client.forwarding + :members: + :undoc-members: + :show-inheritance: + +pkb\_client.client.ssl\_cert module +----------------------------------- + +.. automodule:: pkb_client.client.ssl_cert + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: pkb_client.client + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/pkb_client.rst b/docs/source/pkb_client.rst new file mode 100644 index 0000000..161588e --- /dev/null +++ b/docs/source/pkb_client.rst @@ -0,0 +1,19 @@ +pkb\_client package +=================== + +Subpackages +----------- + +.. toctree:: + :maxdepth: 4 + + pkb_client.cli + pkb_client.client + +Module contents +--------------- + +.. automodule:: pkb_client + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/source/usage.rst b/docs/source/usage.rst new file mode 100644 index 0000000..89abc1e --- /dev/null +++ b/docs/source/usage.rst @@ -0,0 +1,89 @@ +Usage +===== + +Module +++++++ + +The module provides the :class:`PKBClient <pkb_client.client.client.PKBClient>` class, which is used to interact with the PKB API. +To use the PKB client, you need to create an instance of the :class:`PKBClient <pkb_client.client.client.PKBClient>` class: + +.. code-block:: python + + from pkb_client.client import PKBClient + + pkb = PKBClient( + api_key="<your-api-key>", + secret_api_key="<your-secret-api-key>", + api_endpoint="https://api.porkbun.com/api/json/v3", + ) + +Whereby the `api_key` and `secret_api_key` are optional and only required if you want to use the PKB API with API endpoints +that require authentication (e.g. to manage dns records of your domains). Moreover the api_endpoint is also optional and +defaults to the latest version of the official PKB API endpoint. + +For example to get the domain pricing, which does not require authentication, you can use the +:func:`get_domain_pricing <pkb_client.client.client.PKBClient.get_domain_pricing>` method: + +.. code-block:: python + + from pkb_client.client import PKBClient + + pkb = PKBClient() + domain_pricing = pkb.get_domain_pricing() + print(domain_pricing) + +You can find all available methods in the :class:`PKBClient <pkb_client.client.client.PKBClient>` class documentation. + +CLI ++++ + +The module also provides a CLI to interact with the PKB API. For example to get the domain pricing, you can use the `get-domain-pricing` command: + +.. code-block:: bash + + pkb-client domain-pricing + +All available commands can be listed with the `--help` option: + +.. code-block:: bash + + pkb-client --help + +.. code-block:: bash + + usage: pkb-client [-h] [-k KEY] [-s SECRET] [--debug] [--endpoint ENDPOINT] + {ping,create-dns-record,update-dns-record,delete-dns-records,get-dns-records,export-dns-records,export-bind-dns-records,import-dns-records,import-bind-dns-records,get-domain-pricing,get-ssl-bundle,update-dns-servers,get-dns-servers,get-domains,get-url-forwards,create-url-forward,delete-url-forward} + ... + + Python client for the Porkbun API + + positional arguments: + {ping,create-dns-record,update-dns-record,delete-dns-records,get-dns-records,export-dns-records,export-bind-dns-records,import-dns-records,import-bind-dns-records,get-domain-pricing,get-ssl-bundle,update-dns-servers,get-dns-servers,get-domains,get-url-forwards,create-url-forward,delete-url-forward} + Supported API methods + ping Ping the API Endpoint + create-dns-record Create a new DNS record. + update-dns-record Edit an existing DNS record. + delete-dns-records Delete an existing DNS record. + get-dns-records Get all DNS records. + export-dns-records Save all DNS records to a local json file. + export-bind-dns-records + Save all DNS records to a local BIND file. + import-dns-records Restore all DNS records from a local json file. + import-bind-dns-records + Restore all DNS records from a local BIND file. + get-domain-pricing Get the pricing for Porkbun domains. + get-ssl-bundle Retrieve an SSL bundle for given domain. + update-dns-servers Update the DNS servers for a domain. + get-dns-servers Retrieve the DNS servers for a domain. + get-domains List all domains in this account in chunks of 1000. + get-url-forwards Retrieve all URL forwards. + create-url-forward Create a new URL forward. + delete-url-forward Delete an existing URL forward. + + options: + -h, --help show this help message and exit + -k KEY, --key KEY The API key used for Porkbun API calls (usually starts with "pk"). + -s SECRET, --secret SECRET + The API secret used for Porkbun API calls (usually starts with "sk"). + --debug Enable debug mode. + --endpoint ENDPOINT The API endpoint to use. diff --git a/pkb_client/__init__.py b/pkb_client/__init__.py index 17a5bda..d5096ef 100644 --- a/pkb_client/__init__.py +++ b/pkb_client/__init__.py @@ -1 +1 @@ -__version__ = "v1.2" +__version__ = "v2.0.0" diff --git a/pkb_client/cli.py b/pkb_client/cli.py deleted file mode 100644 index 4e87373..0000000 --- a/pkb_client/cli.py +++ /dev/null @@ -1,134 +0,0 @@ -import argparse -import pprint -import textwrap - -from pkb_client.client import PKBClient, SUPPORTED_DNS_RECORD_TYPES, DNSRestoreMode - - -def main(): - parser = argparse.ArgumentParser( - description="Unofficial client for the Porkbun API", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=textwrap.dedent(""" - License: - MIT - Copyright (c) Marvin Heptner - - Copyright notices: - requests: - Project: https://github.com/psf/requests - License: Apache-2.0 https://github.com/psf/requests/blob/master/LICENSE - setuptools: - Project: https://github.com/pypa/setuptools - License: MIT https://raw.githubusercontent.com/pypa/setuptools/main/LICENSE - """) - ) - - parser.add_argument("-k", "--key", help="The API key used for Porkbun API calls (usually starts with \"pk\").") - parser.add_argument("-s", "--secret", - help="The API secret used for Porkbun API calls (usually starts with \"sk\").") - - subparsers = parser.add_subparsers(help="Supported API methods") - - parser_ping = subparsers.add_parser("ping", help="Ping the API Endpoint") - parser_ping.set_defaults(func=PKBClient.ping) - - parser_dns_create = subparsers.add_parser("dns-create", help="Create a new DNS record.") - parser_dns_create.set_defaults(func=PKBClient.dns_create) - parser_dns_create.add_argument("domain", help="The domain for which the new DNS record should be created.") - parser_dns_create.add_argument("record_type", help="The type of the new DNS record.", - choices=SUPPORTED_DNS_RECORD_TYPES) - parser_dns_create.add_argument("content", help="The content of the new DNS record.") - parser_dns_create.add_argument("--name", - help="The subdomain for which the new DNS record should be created." - "The * can be used for a wildcard DNS record." - "If not used, then a DNS record for the root domain will be created", - required=False) - parser_dns_create.add_argument("--ttl", type=int, help="The ttl of the new DNS record.", required=False) - parser_dns_create.add_argument("--prio", type=int, help="The priority of the new DNS record.", required=False) - - parser_dns_edit = subparsers.add_parser("dns-edit", help="Edit an existing DNS record.") - parser_dns_edit.set_defaults(func=PKBClient.dns_edit) - parser_dns_edit.add_argument("domain", help="The domain for which the DNS record should be edited.") - parser_dns_edit.add_argument("record_id", help="The id of the DNS record which should be edited.") - parser_dns_edit.add_argument("record_type", help="The new type of the DNS record.", - choices=SUPPORTED_DNS_RECORD_TYPES) - parser_dns_edit.add_argument("content", help="The new content of the DNS record.") - parser_dns_edit.add_argument("--name", - help="The new value of the subdomain for which the DNS record should apply. " - "The * can be used for a wildcard DNS record. If not set, the record will " - "be set for the root domain.", - required=False) - parser_dns_edit.add_argument("--ttl", type=int, help="The new ttl of the DNS record.", required=False) - parser_dns_edit.add_argument("--prio", type=int, help="The new priority of the DNS record.", required=False) - - parser_dns_delete = subparsers.add_parser("dns-delete", help="Delete an existing DNS record.") - parser_dns_delete.set_defaults(func=PKBClient.dns_delete) - parser_dns_delete.add_argument("domain", help="The domain for which the DNS record should be deleted.") - parser_dns_delete.add_argument("record_id", help="The id of the DNS record which should be deleted.") - - parser_dns_receive = subparsers.add_parser("dns-retrieve", help="Get all DNS records.") - parser_dns_receive.set_defaults(func=PKBClient.dns_retrieve) - parser_dns_receive.add_argument("domain", help="The domain for which the DNS record should be retrieved.") - - parser_dns_export = subparsers.add_parser("dns-export", help="Save all DNS records to a local file as json.") - parser_dns_export.set_defaults(func=PKBClient.dns_export) - parser_dns_export.add_argument("domain", - help="The domain for which the DNS record should be retrieved and saved.") - parser_dns_export.add_argument("filename", help="The filename where to save the exported DNS records.") - - parser_dns_import = subparsers.add_parser("dns-import", help="Restore all DNS records from a local file.", - formatter_class=argparse.RawTextHelpFormatter) - parser_dns_import.set_defaults(func=PKBClient.dns_import) - parser_dns_import.add_argument("domain", help="The domain for which the DNS record should be restored.") - parser_dns_import.add_argument("filename", help="The filename from which the DNS records are to be restored.") - parser_dns_import.add_argument("restore_mode", help="""The restore mode (DNS records are identified by the record id): - clean: remove all existing DNS records and restore all DNS records from the provided file - replace: replace only existing DNS records with the DNS records from the provided file, but do not create any new DNS records - keep: keep the existing DNS records and only create new ones for all DNS records from the specified file if they do not exist - """, type=DNSRestoreMode.from_string, choices=list(DNSRestoreMode)) - - parser_domain_pricing = subparsers.add_parser("domain-pricing", help="Get the pricing for porkbun domains.") - parser_domain_pricing.set_defaults(func=PKBClient.get_domain_pricing) - - parser_ssl_retrieve = subparsers.add_parser("ssl-retrieve", help="Retrieve an SSL bundle for given domain.") - parser_ssl_retrieve.set_defaults(func=PKBClient.ssl_retrieve) - parser_ssl_retrieve.add_argument("domain", help="The domain for which the SSL bundle should be retrieve.") - - args = parser.parse_args() - - if not hasattr(args, "func"): - raise argparse.ArgumentError(None, "No method specified. Please provide a method and try again.") - - pp = pprint.PrettyPrinter(indent=4) - - # call the static methods - if args.func == PKBClient.get_domain_pricing: - pp.pprint(args.func(**vars(args))) - exit(0) - - if args.key is None: - while True: - api_key = input("Please enter your API key you got from Porkbun (usually starts with \"pk\"): ") - if len(api_key) == 0: - print("The api key can not be empty.") - else: - break - else: - api_key = args.key - - if args.secret is None: - while True: - api_secret = input("Please enter your API key secret you got from Porkbun (usually starts with \"sk\"): ") - if len(api_secret) == 0: - print("The api key secret can not be empty.") - else: - break - else: - api_secret = args.secret - - pkb_client = PKBClient(api_key, api_secret) - pp.pprint(args.func(pkb_client, **vars(args))) - - -if __name__ == "__main__": - main() diff --git a/pkb_client/cli/__init__.py b/pkb_client/cli/__init__.py new file mode 100644 index 0000000..ed32c05 --- /dev/null +++ b/pkb_client/cli/__init__.py @@ -0,0 +1,3 @@ +from .cli import main + +__all__ = ["main"] diff --git a/pkb_client/cli/cli.py b/pkb_client/cli/cli.py new file mode 100644 index 0000000..4bf53cd --- /dev/null +++ b/pkb_client/cli/cli.py @@ -0,0 +1,347 @@ +import argparse +import dataclasses +import json +import os +import textwrap +from datetime import datetime + +from pkb_client.client import PKBClient, API_ENDPOINT +from pkb_client.client.dns import DNSRecordType, DNSRestoreMode +from pkb_client.client.forwarding import URLForwardingType + + +class CustomJSONEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, datetime): + return o.isoformat() + if dataclasses.is_dataclass(o): + return dataclasses.asdict(o) + return super().default(o) + + +def main(): + parser = argparse.ArgumentParser( + description="Python client for the Porkbun API", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=textwrap.dedent(""" + License: + MIT - Copyright (c) Marvin Heptner + """), + ) + + parser.add_argument( + "-k", + "--key", + help='The API key used for Porkbun API calls (usually starts with "pk").', + ) + parser.add_argument( + "-s", + "--secret", + help='The API secret used for Porkbun API calls (usually starts with "sk").', + ) + parser.add_argument("--debug", help="Enable debug mode.", action="store_true") + parser.add_argument( + "--endpoint", help="The API endpoint to use.", default=API_ENDPOINT + ) + + subparsers = parser.add_subparsers(help="Supported API methods") + + parser_ping = subparsers.add_parser("ping", help="Ping the API Endpoint") + parser_ping.set_defaults(func=PKBClient.ping) + + parser_dns_create = subparsers.add_parser( + "create-dns-record", help="Create a new DNS record." + ) + parser_dns_create.set_defaults(func=PKBClient.create_dns_record) + parser_dns_create.add_argument( + "domain", help="The domain for which the new DNS record should be created." + ) + parser_dns_create.add_argument( + "record_type", + help="The type of the new DNS record.", + choices=list(DNSRecordType), + ) + parser_dns_create.add_argument("content", help="The content of the new DNS record.") + parser_dns_create.add_argument( + "--name", + help="The subdomain for which the new DNS record should be created." + "The * can be used for a wildcard DNS record." + "If not used, then a DNS record for the root domain will be created", + required=False, + ) + parser_dns_create.add_argument( + "--ttl", type=int, help="The ttl of the new DNS record.", required=False + ) + parser_dns_create.add_argument( + "--prio", type=int, help="The priority of the new DNS record.", required=False + ) + + parser_dns_edit = subparsers.add_parser( + "update-dns-record", help="Edit an existing DNS record." + ) + parser_dns_edit.set_defaults(func=PKBClient.update_dns_record) + parser_dns_edit.add_argument( + "domain", help="The domain for which the DNS record should be edited." + ) + parser_dns_edit.add_argument( + "record_id", help="The id of the DNS record which should be edited." + ) + parser_dns_edit.add_argument( + "record_type", + help="The new type of the DNS record.", + choices=list(DNSRecordType), + ) + parser_dns_edit.add_argument("content", help="The new content of the DNS record.") + parser_dns_edit.add_argument( + "--name", + help="The new value of the subdomain for which the DNS record should apply. " + "The * can be used for a wildcard DNS record. If not set, the record will " + "be set for the root domain.", + required=False, + ) + parser_dns_edit.add_argument( + "--ttl", type=int, help="The new ttl of the DNS record.", required=False + ) + parser_dns_edit.add_argument( + "--prio", type=int, help="The new priority of the DNS record.", required=False + ) + + parser_dns_delete = subparsers.add_parser( + "delete-dns-records", help="Delete an existing DNS record." + ) + parser_dns_delete.set_defaults(func=PKBClient.delete_dns_record) + parser_dns_delete.add_argument( + "domain", help="The domain for which the DNS record should be deleted." + ) + parser_dns_delete.add_argument( + "record_id", help="The id of the DNS record which should be deleted." + ) + + parser_dns_receive = subparsers.add_parser( + "get-dns-records", help="Get all DNS records." + ) + parser_dns_receive.set_defaults(func=PKBClient.get_dns_records) + parser_dns_receive.add_argument( + "domain", help="The domain for which the DNS record should be retrieved." + ) + + parser_dns_export = subparsers.add_parser( + "export-dns-records", help="Save all DNS records to a local json file." + ) + parser_dns_export.set_defaults(func=PKBClient.export_dns_records) + parser_dns_export.add_argument( + "domain", + help="The domain for which the DNS record should be retrieved and saved.", + ) + parser_dns_export.add_argument( + "filepath", help="The filepath where to save the exported DNS records." + ) + + parser_dns_export_bind = subparsers.add_parser( + "export-bind-dns-records", help="Save all DNS records to a local BIND file." + ) + parser_dns_export_bind.set_defaults(func=PKBClient.export_bind_dns_records) + parser_dns_export_bind.add_argument( + "domain", + help="The domain for which the DNS record should be retrieved and saved.", + ) + parser_dns_export_bind.add_argument( + "filepath", help="The filepath where to save the exported DNS records." + ) + + parser_dns_import = subparsers.add_parser( + "import-dns-records", + help="Restore all DNS records from a local json file.", + formatter_class=argparse.RawTextHelpFormatter, + ) + parser_dns_import.set_defaults(func=PKBClient.import_dns_records) + parser_dns_import.add_argument( + "domain", help="The domain for which the DNS record should be restored." + ) + parser_dns_import.add_argument( + "filepath", help="The filepath from which the DNS records are to be restored." + ) + parser_dns_import.add_argument( + "restore_mode", + help="""The restore mode (DNS records are identified by the record type, name and prio if supported): + clear: remove all existing DNS records and restore all DNS records from the provided file + replace: replace only existing DNS records with the DNS records from the provided file, but do not create any new DNS records + keep: keep the existing DNS records and only create new ones for all DNS records from the specified file if they do not exist + """, + type=DNSRestoreMode.from_string, + choices=list(DNSRestoreMode), + ) + + parser_dns_import_bind = subparsers.add_parser( + "import-bind-dns-records", + help="Restore all DNS records from a local BIND file.", + formatter_class=argparse.RawTextHelpFormatter, + ) + parser_dns_import_bind.set_defaults(func=PKBClient.import_bind_dns_records) + parser_dns_import_bind.add_argument( + "filepath", help="The filepath from which the DNS records are to be restored." + ) + parser_dns_import_bind.add_argument( + "restore_mode", + help="""The restore mode (DNS records are identified by the record id): + clear: remove all existing DNS records and restore all DNS records from the provided file + """, + type=DNSRestoreMode.from_string, + choices=[DNSRestoreMode.clear], + ) + + parser_domain_pricing = subparsers.add_parser( + "get-domain-pricing", help="Get the pricing for Porkbun domains." + ) + parser_domain_pricing.set_defaults(func=PKBClient.get_domain_pricing) + + parser_ssl_retrieve = subparsers.add_parser( + "get-ssl-bundle", help="Retrieve an SSL bundle for given domain." + ) + parser_ssl_retrieve.set_defaults(func=PKBClient.get_ssl_bundle) + parser_ssl_retrieve.add_argument( + "domain", help="The domain for which the SSL bundle should be retrieve." + ) + + parser_update_dns_server = subparsers.add_parser( + "update-dns-servers", help="Update the DNS servers for a domain." + ) + parser_update_dns_server.set_defaults(func=PKBClient.update_dns_servers) + parser_update_dns_server.add_argument( + "domain", help="The domain for which the DNS servers should be set." + ) + parser_update_dns_server.add_argument( + "dns_servers", nargs="+", help="The DNS servers to be set." + ) + + parser_get_dns_server = subparsers.add_parser( + "get-dns-servers", help="Retrieve the DNS servers for a domain." + ) + parser_get_dns_server.set_defaults(func=PKBClient.get_dns_servers) + parser_get_dns_server.add_argument( + "domain", help="The domain for which the DNS servers should be retrieved." + ) + + parser_list_domains = subparsers.add_parser( + "get-domains", help="List all domains in this account in chunks of 1000." + ) + parser_list_domains.set_defaults(func=PKBClient.get_domains) + parser_list_domains.add_argument( + "--start", + type=int, + help="The start index of the list.", + default=0, + required=False, + ) + + parser_get_url_forward = subparsers.add_parser( + "get-url-forwards", help="Retrieve all URL forwards." + ) + parser_get_url_forward.set_defaults(func=PKBClient.get_url_forwards) + parser_get_url_forward.add_argument( + "domain", help="The domain for which the URL forwards should be retrieved." + ) + + parser_add_url_forward = subparsers.add_parser( + "create-url-forward", help="Create a new URL forward." + ) + parser_add_url_forward.set_defaults(func=PKBClient.create_url_forward) + parser_add_url_forward.add_argument( + "domain", help="The domain for which the new URL forward should be created." + ) + parser_add_url_forward.add_argument( + "location", + help="The location to which the url forwarding should redirect.", + ) + parser_add_url_forward.add_argument( + "type", help="The type of the url forwarding.", choices=list(URLForwardingType) + ) + parser_add_url_forward.add_argument( + "--subdomain", + help="The subdomain for which the url forwarding should be added.", + required=False, + default="", + ) + parser_add_url_forward.add_argument( + "--include-path", + help="Whether the path should be included in the url forwarding.", + action="store_true", + default=False, + ) + parser_add_url_forward.add_argument( + "--wildcard", + help="Whether the url forwarding should be also applied to subdomains.", + action="store_true", + default=False, + ) + + parser_delete_url_forward = subparsers.add_parser( + "delete-url-forward", help="Delete an existing URL forward." + ) + parser_delete_url_forward.set_defaults(func=PKBClient.delete_url_forward) + parser_delete_url_forward.add_argument( + "domain", help="The domain for which the URL forward should be deleted." + ) + parser_delete_url_forward.add_argument( + "id", help="The id of the URL forward which should be deleted." + ) + + args = vars(parser.parse_args()) + + debug = args.pop("debug", False) + + func = args.pop("func", None) + if not func: + raise argparse.ArgumentError( + None, "No method specified. Please provide a method and try again." + ) + + endpoint = args.pop("endpoint") + api_key = args.pop("key") + api_secret = args.pop("secret") + + # call the api methods which do not require authentication + if func == PKBClient.get_domain_pricing: + pkb_client = PKBClient(api_endpoint=endpoint, debug=debug) + ret = func(pkb_client, **args) + + print(json.dumps(ret, cls=CustomJSONEncoder, indent=4)) + exit(0) + + if api_key is None: + # try to get the api key from the environment variable or fallback to user input + api_key = os.environ.get("PKB_API_KEY", "") + if len(api_key.strip()) == 0: + while True: + api_key = input( + 'Please enter your API key you got from Porkbun (usually starts with "pk"): ' + ) + if len(api_key.strip()) == 0: + print("The api key can not be empty.") + else: + break + + if api_secret is None: + # try to get the api secret from the environment variable or fallback to user input + api_secret = os.environ.get("PKB_API_SECRET", "") + if len(api_secret.strip()) == 0: + while True: + api_secret = input( + 'Please enter your API key secret you got from Porkbun (usually starts with "sk"): ' + ) + if len(api_secret.strip()) == 0: + print("The api key secret can not be empty.") + else: + break + + pkb_client = PKBClient( + api_key=api_key, secret_api_key=api_secret, api_endpoint=endpoint, debug=debug + ) + + ret = func(pkb_client, **args) + + print(json.dumps(ret, cls=CustomJSONEncoder, indent=4)) + + +if __name__ == "__main__": + main() diff --git a/pkb_client/client.py b/pkb_client/client.py deleted file mode 100644 index 48c0d82..0000000 --- a/pkb_client/client.py +++ /dev/null @@ -1,455 +0,0 @@ -import json -import logging -from enum import Enum -from pathlib import Path -from typing import Optional, Tuple -from urllib.parse import urljoin - -import requests - -from pkb_client.helper import parse_dns_record - -API_ENDPOINT = "https://porkbun.com/api/json/v3/" -SUPPORTED_DNS_RECORD_TYPES = ["A", "AAAA", "MX", "CNAME", "ALIAS", "TXT", "NS", "SRV", "TLSA", "CAA"] - -# prevent urllib3 to log request with the api key and secret -logging.getLogger("urllib3").setLevel(logging.WARNING) - - -class DNSRestoreMode(Enum): - clear = 0 - replace = 1 - keep = 2 - - def __str__(self): - return self.name - - @staticmethod - def from_string(a): - try: - return DNSRestoreMode[a] - except KeyError: - return a - - -class PKBClient: - """ - API client for Porkbun. - """ - - def __init__(self, api_key: str, secret_api_key: str) -> None: - """ - Creates a new PKBClient object. - - :param api_key: the API key used for Porkbun API calls - :param secret_api_key: the API secret used for Porkbun API calls - """ - - assert api_key is not None and len(api_key) > 0 - assert secret_api_key is not None and len(secret_api_key) > 0 - - self.api_key = api_key - self.secret_api_key = secret_api_key - - def ping(self, **kwargs) -> str: - """ - API ping method: get the current public ip address of the requesting system; can also be used for auth checking - see https://porkbun.com/api/json/v3/documentation#Authentication for more info - - :return: the current public ip address of the requesting system - """ - - url = urljoin(API_ENDPOINT, "ping") - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return json.loads(r.text).get("yourIp", None) - else: - raise Exception("ERROR: ping api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_create(self, - domain: str, - record_type: str, - content: str, - name: Optional[str] = None, - ttl: Optional[int] = 300, - prio: Optional[int] = None, **kwargs) -> str: - """ - API DNS create method: create a new DNS record for given domain - see https://porkbun.com/api/json/v3/documentation#DNS%20Create%20Record for more info - - :param domain: the domain for which the DNS record should be created - :param record_type: the type of the new DNS record; - supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA - :param content: the content of the new DNS record - :param name: the subdomain for which the new DNS record entry should apply; the * can be used for a - wildcard DNS record; if not used, then a DNS record for the root domain will be created - :param ttl: the time to live in seconds of the new DNS record; have to be between 0 and 2147483647 - :param prio: the priority of the new DNS record - - :return: the id of the new created DNS record - """ - - assert domain is not None and len(domain) > 0 - assert record_type in SUPPORTED_DNS_RECORD_TYPES - assert content is not None and len(content) > 0 - assert ttl is None or 300 <= ttl <= 2147483647 - - url = urljoin(API_ENDPOINT, "dns/create/{}".format(domain)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key, - "name": name, - "type": record_type, - "content": content, - "ttl": ttl, - "prio": prio - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return str(json.loads(r.text).get("id", None)) - else: - raise Exception("ERROR: DNS create api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_edit(self, - domain: str, - record_id: str, - record_type: str, - content: str, - name: str = None, - ttl: int = 300, - prio: int = None, - **kwargs) -> bool: - """ - API DNS edit method: edit an existing DNS record specified by the id for a given domain - see https://porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record for more info - - :param domain: the domain for which the DNS record should be edited - :param record_id: the id of the DNS record which should be edited - :param record_type: the new type of the DNS record; - supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA - :param content: the new content of the DNS record - :param name: the new value of the subdomain for which the DNS record should apply; the * can be used for a - wildcard DNS record; if not set, the record will be set for the record domain - :param ttl: the new time to live in seconds of the DNS record, have to be between 0 and 2147483647 - :param prio: the new priority of the DNS record - - :return: True if the editing was successful - """ - - assert domain is not None and len(domain) > 0 - assert record_id is not None and len(record_id) > 0 - assert record_type in SUPPORTED_DNS_RECORD_TYPES - assert content is not None and len(content) > 0 - assert ttl is None or 300 <= ttl <= 2147483647 - - url = urljoin(API_ENDPOINT, "dns/edit/{}/{}".format(domain, record_id)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key, - "name": name, - "type": record_type, - "content": content, - "ttl": ttl, - "prio": prio - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return True - else: - raise Exception("ERROR: DNS edit api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_delete(self, - domain: str, - record_id: str, - **kwargs) -> bool: - """ - API DNS delete method: delete an existing DNS record specified by the id for a given domain - see https://porkbun.com/api/json/v3/documentation#DNS%20Delete%20Record for more info - - :param domain: the domain for which the DNS record should be deleted - :param record_id: the id of the DNS record which should be deleted - - :return: True if the deletion was successful - """ - - assert domain is not None and len(domain) > 0 - assert record_id is not None and len(record_id) > 0 - - url = urljoin(API_ENDPOINT, "dns/delete/{}/{}".format(domain, record_id)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return True - else: - raise Exception("ERROR: DNS delete api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_retrieve(self, domain, **kwargs) -> list: - """ - API DNS retrieve method: retrieve all DNS records for given domain - see https://porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records for more info - - :param domain: the domain for which the DNS records should be retrieved - - :return: list of DNS records as dicts - - The list structure will be: - [ - { - "id": "123456789", - "name": "example.com", - "type": "TXT", - "content": "this is a nice text", - "ttl": "300", - "prio": None, - "notes": "" - }, - { - "id": "234567890", - "name": "example.com", - "type": "A", - "content": "0.0.0.0", - "ttl": "300", - "prio": 0, - "notes": "" - } - ] - """ - - assert domain is not None and len(domain) > 0 - - url = urljoin(API_ENDPOINT, "dns/retrieve/{}".format(domain)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - return [parse_dns_record(record) for record in json.loads(r.text).get("records", [])] - else: - raise Exception("ERROR: DNS retrieve api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def dns_export(self, domain: str, filename: str, **kwargs) -> bool: - """ - Export all DNS record from the given domain as json to a file. - This method does not not represent a Porkbun API method. - - :param domain: the domain for which the DNS record should be retrieved and saved - :param filename: the filename where to save the exported DNS records - - :return: True if everything went well - """ - - assert domain is not None and len(domain) > 0 - assert filename is not None and len(filename) > 0 - - print("retrieve current DNS records...") - dns_records = self.dns_retrieve(domain) - - print("save DNS records to {} ...".format(filename)) - # merge the single DNS records into one single dict with the record id as key - dns_records_dict = dict() - for record in dns_records: - dns_records_dict[record["id"]] = record - - filepath = Path(filename) - if filepath.exists(): - raise Exception("File already exists. Please try another filename") - with open(filepath, "w") as f: - json.dump(dns_records_dict, f) - print("export finished") - - return True - - def dns_import(self, domain: str, filename: str, restore_mode: DNSRestoreMode, **kwargs) -> bool: - """ - Restore - This method does not not represent a Porkbun API method. - - :param domain: the domain for which the DNS record should be restored - :param filename: the filename from which the DNS records are to be restored - :param restore_mode: The restore mode (DNS records are identified by the record id) - clean: remove all existing DNS records and restore all DNS records from the provided file - replace: replace only existing DNS records with the DNS records from the provided file, - but do not create any new DNS records - keep: keep the existing DNS records and only create new ones for all DNS records from - the specified file if they do not exist - - :return: True if everything went well - """ - - assert domain is not None and len(domain) > 0 - assert filename is not None and len(filename) > 0 - assert isinstance(restore_mode, DNSRestoreMode) - - existing_dns_records = self.dns_retrieve(domain) - - with open(filename, "r") as f: - exported_dns_records_dict = json.load(f) - - if restore_mode is DNSRestoreMode.clear: - print("restore mode: clear") - - try: - # delete all existing DNS records - for record in existing_dns_records: - self.dns_delete(domain, record["id"]) - - # restore all exported records by creating new DNS records - for _, exported_record in exported_dns_records_dict.items(): - name = ".".join(exported_record["name"].split(".")[:-2]) - self.dns_create(domain=domain, - record_type=exported_record["type"], - content=exported_record["content"], - name=name, - ttl=exported_record["ttl"], - prio=exported_record["prio"]) - except Exception as e: - print("something went wrong: {}".format(e.__str__())) - self.__handle_error_backup__(existing_dns_records) - print("import failed") - return False - elif restore_mode is DNSRestoreMode.replace: - print("restore mode: replace") - - try: - for existing_record in existing_dns_records: - record_id = existing_record["id"] - exported_record = exported_dns_records_dict.get(record_id, None) - # also check if the exported dns record is different to the existing record, - # so we can reduce unnecessary api calls - if exported_record is not None and exported_record != existing_record: - name = ".".join(exported_record["name"].split(".")[:-2]) - self.dns_edit(domain=domain, - record_id=record_id, - record_type=exported_record["type"], - content=exported_record["content"], - name=name, - ttl=exported_record["ttl"], - prio=exported_record["prio"]) - except Exception as e: - print("something went wrong: {}".format(e.__str__())) - self.__handle_error_backup__(existing_dns_records) - print("import failed") - return False - elif restore_mode is DNSRestoreMode.keep: - print("restore mode: keep") - - existing_dns_records_dict = dict() - for record in existing_dns_records: - existing_dns_records_dict[record["id"]] = record - - try: - for _, exported_record in exported_dns_records_dict.items(): - if exported_record["id"] not in existing_dns_records_dict: - name = ".".join(exported_record["name"].split(".")[:-2]) - self.dns_create(domain=domain, - record_type=exported_record["type"], - content=exported_record["content"], - name=name, - ttl=exported_record["ttl"], - prio=exported_record["prio"]) - except Exception as e: - print("something went wrong: {}".format(e.__str__())) - self.__handle_error_backup__(existing_dns_records) - print("import failed") - return False - else: - raise Exception("restore mode not supported") - - print("import successfully completed") - - return True - - @staticmethod - def get_domain_pricing(**kwargs) -> dict: - """ - Get the pricing for porkbun domains - see https://porkbun.com/api/json/v3/documentation#Domain%20Pricing for more info - - :return: dict with pricing - """ - - url = urljoin(API_ENDPOINT, "pricing/get") - r = requests.post(url=url) - - if r.status_code == 200: - return json.loads(r.text) - else: - raise Exception("ERROR: Domain pricing retrieve api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - def ssl_retrieve(self, domain, **kwargs) -> Tuple[str, str, str, str]: - """ - API SSL bundle retrieve method: retrieve an SSL bundle for given domain - see https://porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info - - :param domain: the domain for which the SSL bundle should be retrieved - - :return: tuple of intermediate certificate, certificate chain, private key, public key - """ - - assert domain is not None and len(domain) > 0 - - url = urljoin(API_ENDPOINT, "ssl/retrieve/{}".format(domain)) - req_json = { - "apikey": self.api_key, - "secretapikey": self.secret_api_key - } - r = requests.post(url=url, json=req_json) - - if r.status_code == 200: - ssl_bundle = json.loads(r.text) - - intermediate_certificate = ssl_bundle["intermediate_certificate"] - certificate_chain = ssl_bundle["certificate_chain"] - private_key = ssl_bundle["private_key"] - public_key = ssl_bundle["public_key"] - - return intermediate_certificate, certificate_chain, private_key, public_key - else: - raise Exception("ERROR: SSL bundle retrieve api call was not successfully\n" - "Status code: {}\n" - "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found"))) - - @staticmethod - def __handle_error_backup__(dns_records): - # merge the single DNS records into one single dict with the record id as key - dns_records_dict = dict() - for record in dns_records: - dns_records_dict[record["id"]] = record - - # generate filename with incremental suffix - base_backup_filename = "pkb_client_dns_records_backup" - suffix = 0 - backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) - while backup_file_path.exists(): - suffix += 1 - backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) - - with open(backup_file_path, "w") as f: - json.dump(dns_records_dict, f) - - print("a backup of your existing dns records was saved to {}".format(str(backup_file_path))) diff --git a/pkb_client/client/__init__.py b/pkb_client/client/__init__.py new file mode 100644 index 0000000..da8a42b --- /dev/null +++ b/pkb_client/client/__init__.py @@ -0,0 +1,22 @@ +from .bind_file import BindFile, BindRecord, RecordClass +from .client import PKBClient, PKBClientException, API_ENDPOINT +from .dns import DNSRecord, DNSRestoreMode, DNSRecordType +from .domain import DomainInfo +from .forwarding import URLForwarding, URLForwardingType +from .ssl_cert import SSLCertBundle + +__all__ = [ + "PKBClient", + "PKBClientException", + "API_ENDPOINT", + "BindFile", + "BindRecord", + "RecordClass", + "DNSRecord", + "DNSRestoreMode", + "DNSRecordType", + "DomainInfo", + "URLForwarding", + "URLForwardingType", + "SSLCertBundle", +] diff --git a/pkb_client/client/bind_file.py b/pkb_client/client/bind_file.py new file mode 100644 index 0000000..af9abe0 --- /dev/null +++ b/pkb_client/client/bind_file.py @@ -0,0 +1,169 @@ +import logging +from dataclasses import dataclass +from enum import Enum +from typing import Optional, List + +from pkb_client.client.dns import DNSRecordType, DNS_RECORDS_WITH_PRIORITY + + +class RecordClass(str, Enum): + IN = "IN" + + def __str__(self): + return self.value + + +@dataclass +class BindRecord: + name: str + ttl: int + record_class: RecordClass + record_type: DNSRecordType + data: str + prio: Optional[int] = None + comment: Optional[str] = None + + def __str__(self): + record_string = f"{self.name} {self.ttl} {self.record_class} {self.record_type}" + if self.prio is not None: + record_string += f" {self.prio}" + record_string += f" {self.data}" + if self.comment: + record_string += f" ; {self.comment}" + return record_string + + +class BindFile: + origin: str + ttl: Optional[int] = None + records: List[BindRecord] + + def __init__( + self, + origin: str, + ttl: Optional[int] = None, + records: Optional[List[BindRecord]] = None, + ) -> None: + self.origin = origin + self.ttl = ttl + self.records = records or [] + + @staticmethod + def from_file(file_path: str) -> "BindFile": + with open(file_path, "r") as f: + file_data = f.readlines() + + # parse the file line by line + origin = None + ttl = None + records = [] + for line in file_data: + if line.startswith("$ORIGIN"): + origin = line.split()[1] + elif line.startswith("$TTL"): + ttl = int(line.split()[1]) + else: + # parse the records with the two possible formats: + # 1: name ttl record-class record-type record-data + # 2: name record-class ttl record-type record-data + # whereby the ttl is optional + + # drop any right trailing comments + line_parts = line.split(";", 1) + line = line_parts[0].strip() + comment = line_parts[1].strip() if len(line_parts) > 1 else None + prio = None + + # skip empty lines + if not line: + continue + + # find which format the line is + record_parts = line.split() + if record_parts[1].isdigit(): + # scheme 1 + if record_parts[3] not in DNSRecordType.__members__: + logging.warning(f"Ignoring unsupported record type: {line}") + continue + if record_parts[2] not in RecordClass.__members__: + logging.warning(f"Ignoring unsupported record class: {line}") + continue + record_name = record_parts[0] + record_ttl = int(record_parts[1]) + record_class = RecordClass[record_parts[2]] + record_type = DNSRecordType[record_parts[3]] + if record_type in DNS_RECORDS_WITH_PRIORITY: + prio = int(record_parts[4]) + record_data = " ".join(record_parts[5:]) + else: + record_data = " ".join(record_parts[4:]) + elif record_parts[2].isdigit(): + # scheme 2 + if record_parts[3] not in DNSRecordType.__members__: + logging.warning(f"Ignoring unsupported record type: {line}") + continue + if record_parts[1] not in RecordClass.__members__: + logging.warning(f"Ignoring unsupported record class: {line}") + continue + record_name = record_parts[0] + record_ttl = int(record_parts[2]) + record_class = RecordClass[record_parts[1]] + record_type = DNSRecordType[record_parts[3]] + if record_type in DNS_RECORDS_WITH_PRIORITY: + prio = int(record_parts[4]) + record_data = " ".join(record_parts[5:]) + else: + record_data = " ".join(record_parts[4:]) + else: + # no ttl, use default or previous + if record_parts[2] not in DNSRecordType.__members__: + logging.warning(f"Ignoring unsupported record type: {line}") + continue + if record_parts[1] not in RecordClass.__members__: + logging.warning(f"Ignoring unsupported record class: {line}") + continue + record_name = record_parts[0] + if ttl is None and not records: + raise ValueError("No TTL found in file") + record_ttl = ttl or records[-1].ttl + record_class = RecordClass[record_parts[1]] + record_type = DNSRecordType[record_parts[2]] + if record_type in DNS_RECORDS_WITH_PRIORITY: + prio = int(record_parts[3]) + record_data = " ".join(record_parts[4:]) + else: + record_data = " ".join(record_parts[3:]) + + # replace @ in record name with origin + record_name = record_name.replace("@", origin) + + records.append( + BindRecord( + record_name, + record_ttl, + record_class, + record_type, + record_data, + prio=prio, + comment=comment, + ) + ) + + if origin is None: + raise ValueError("No origin found in file") + + return BindFile(origin, ttl, records) + + def to_file(self, file_path: str) -> None: + with open(file_path, "w") as f: + f.write(str(self)) + + def __str__(self) -> str: + bind = f"$ORIGIN {self.origin}\n" + + if self.ttl is not None: + bind += f"$TTL {self.ttl}\n" + + for record in self.records: + bind += f"{record}\n" + return bind diff --git a/pkb_client/client/client.py b/pkb_client/client/client.py new file mode 100644 index 0000000..86956a5 --- /dev/null +++ b/pkb_client/client/client.py @@ -0,0 +1,867 @@ +import json +import logging +from hashlib import sha256 +from pathlib import Path +from typing import Optional, List, Union +from urllib.parse import urljoin + +import dns.resolver +import requests + +from pkb_client.client import BindFile +from pkb_client.client.dns import ( + DNSRecord, + DNSRestoreMode, + DNSRecordType, + DNS_RECORDS_WITH_PRIORITY, +) +from pkb_client.client.domain import DomainInfo +from pkb_client.client.forwarding import URLForwarding, URLForwardingType +from pkb_client.client.ssl_cert import SSLCertBundle + +API_ENDPOINT = "https://api.porkbun.com/api/json/v3/" + +logger = logging.getLogger("pkb_client") +logging.basicConfig(level=logging.INFO) + + +class PKBClientException(Exception): + def __init__(self, status, message): + super().__init__(f"{status}: {message}") + + +class PKBClient: + """ + API client for Porkbun. + """ + + default_ttl: int = 300 + + def __init__( + self, + api_key: Optional[str] = None, + secret_api_key: Optional[str] = None, + api_endpoint: str = API_ENDPOINT, + debug: bool = False, + ) -> None: + """ + Creates a new PKBClient object. + + :param api_key: the API key used for Porkbun API calls + :param secret_api_key: the API secret used for Porkbun API calls + :param api_endpoint: the endpoint of the Porkbun API. + :param debug: boolean to enable debug logging + """ + self.api_key = api_key + self.secret_api_key = secret_api_key + self.api_endpoint = api_endpoint + self.debug = debug + if self.debug: + logger.setLevel(logging.DEBUG) + + def _get_auth_request_json(self) -> dict: + """ + Get the request json for the authentication of the Porkbun API calls. + + :return: the request json for the authentication of the Porkbun API calls + """ + + if self.api_key is None or self.secret_api_key is None: + raise ValueError("api_key and secret_api_key must be set") + + return {"apikey": self.api_key, "secretapikey": self.secret_api_key} + + def ping(self) -> str: + """ + API ping method: get the current public ip address of the requesting system; can also be used for auth checking. + See https://api.porkbun.com/api/json/v3/documentation#Authentication for more info. + + :return: the current public ip address of the requesting system + """ + + url = urljoin(self.api_endpoint, "ping") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return json.loads(r.text).get("yourIp", None) + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def create_dns_record( + self, + domain: str, + record_type: DNSRecordType, + content: str, + name: Optional[str] = None, + ttl: int = default_ttl, + prio: Optional[int] = None, + ) -> str: + """ + API DNS create method: create a new DNS record for given domain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Create%20Record for more info. + + :param domain: the domain for which the DNS record should be created + :param record_type: the type of the new DNS record + :param content: the content of the new DNS record + :param name: the subdomain for which the new DNS record entry should apply; the * can be used for a + wildcard DNS record; if not used, then a DNS record for the root domain will be created + :param ttl: the time to live in seconds of the new DNS record; have to be between 300 and 86400 + :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None + :return: the id of the new created DNS record + """ + + if ttl > 86400 or ttl < self.default_ttl: + raise ValueError(f"ttl must be between {self.default_ttl} and 86400") + + if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY: + raise ValueError( + f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}" + ) + + url = urljoin(self.api_endpoint, f"dns/create/{domain}") + req_json = { + **self._get_auth_request_json(), + "name": name, + "type": record_type, + "content": content, + "ttl": ttl, + "prio": prio, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return str(json.loads(r.text).get("id", None)) + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def update_dns_record( + self, + domain: str, + record_id: str, + record_type: DNSRecordType, + content: str, + name: Optional[str] = None, + ttl: int = default_ttl, + prio: Optional[int] = None, + ) -> bool: + """ + API DNS edit method: edit an existing DNS record specified by the id for a given domain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record for more info. + + :param domain: the domain for which the DNS record should be edited + :param record_id: the id of the DNS record which should be edited + :param record_type: the new type of the DNS record + :param content: the new content of the DNS record + :param name: the new value of the subdomain for which the DNS record should apply; the * can be used for a + wildcard DNS record; if not set, the record will be set for the record domain + :param ttl: the new time to live in seconds of the DNS record, have to be between 300 and 86400 + :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None + + :return: True if the editing was successful + """ + + if ttl > 86400 or ttl < self.default_ttl: + raise ValueError(f"ttl must be between {self.default_ttl} and 86400") + + if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY: + raise ValueError( + f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}" + ) + + url = urljoin(self.api_endpoint, f"dns/edit/{domain}/{record_id}") + req_json = { + **self._get_auth_request_json(), + "name": name, + "type": record_type, + "content": content, + "ttl": ttl, + "prio": prio, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def update_all_dns_records( + self, + domain: str, + record_type: DNSRecordType, + subdomain: str, + content: str, + ttl: int = default_ttl, + prio: Optional[int] = None, + ) -> bool: + """ + API DNS edit method: edit all existing DNS record matching the domain, record type and subdomain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record%20by%20Domain,%20Subdomain%20and%20Type for more info. + + :param domain: the domain for which the DNS record should be edited + :param record_type: the type of the DNS record + :param subdomain: the subdomain of the DNS record can be empty string for root domain + :param content: the new content of the DNS record + :param ttl: the new time to live in seconds of the DNS record, have to be between 300 and 86400 + :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None + + :return: True if the editing was successful + """ + + if ttl > 86400 or ttl < self.default_ttl: + raise ValueError(f"ttl must be between {self.default_ttl} and 86400") + + if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY: + raise ValueError( + f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}" + ) + + url = urljoin( + self.api_endpoint, f"dns/editByNameType/{domain}/{record_type}/{subdomain}" + ) + req_json = { + **self._get_auth_request_json(), + "type": record_type, + "content": content, + "ttl": ttl, + "prio": prio, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def delete_dns_record(self, domain: str, record_id: str) -> bool: + """ + API DNS delete method: delete an existing DNS record specified by the id for a given domain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Delete%20Record for more info. + + :param domain: the domain for which the DNS record should be deleted + :param record_id: the id of the DNS record which should be deleted + + :return: True if the deletion was successful + """ + + url = urljoin(self.api_endpoint, f"dns/delete/{domain}/{record_id}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def delete_all_dns_records( + self, domain: str, record_type: DNSRecordType, subdomain: str + ) -> bool: + """ + API DNS delete method: delete all existing DNS record matching the domain, record type and subdomain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Delete%20Records%20by%20Domain,%20Subdomain%20and%20Type for more info. + + :param domain: the domain for which the DNS record should be deleted + :param record_type: the type of the DNS record + :param subdomain: the subdomain of the DNS record can be empty string for root domain + + :return: True if the deletion was successful + """ + + url = urljoin( + self.api_endpoint, + f"dns/deleteByNameType/{domain}/{record_type}/{subdomain}", + ) + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_dns_records( + self, domain, record_id: Optional[str] = None + ) -> List[DNSRecord]: + """ + API DNS retrieve method: retrieve all DNS records for given domain if no record id is specified. + Otherwise, retrieve the DNS record of the specified domain with the given record id. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records for more info. + + :param domain: the domain for which the DNS records should be retrieved + :param record_id: the id of the DNS record which should be retrieved + + :return: list of DNSRecords objects + """ + + if record_id is None: + url = urljoin(self.api_endpoint, f"dns/retrieve/{domain}") + else: + url = urljoin(self.api_endpoint, f"dns/retrieve/{domain}/{record_id}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + DNSRecord.from_dict(record) + for record in json.loads(r.text).get("records", []) + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_all_dns_records( + self, domain: str, record_type: DNSRecordType, subdomain: str + ) -> List[DNSRecord]: + """ + API DNS retrieve method: retrieve all DNS records matching the domain, record type and subdomain. + See https://api.porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records%20by%20Domain,%20Subdomain%20and%20Type for more info. + + :param domain: the domain for which the DNS records should be retrieved + :param record_type: the type of the DNS records + :param subdomain: the subdomain of the DNS records can be empty string for root domain + + :return: list of DNSRecords objects + """ + + url = urljoin( + self.api_endpoint, + f"dns/retrieveByNameType/{domain}/{record_type}/{subdomain}", + ) + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + DNSRecord.from_dict(record) + for record in json.loads(r.text).get("records", []) + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def export_dns_records(self, domain: str, filepath: Union[Path, str]) -> bool: + """ + Export all DNS record from the given domain to a json file. + This method does not represent a Porkbun API method. + DNS records with all custom fields like notes are exported. + + :param domain: the domain for which the DNS record should be retrieved and saved + :param filepath: the filepath where to save the exported DNS records + + :return: True if everything went well + """ + + filepath = Path(filepath) + + logger.debug("retrieve current DNS records...") + dns_records = self.get_dns_records(domain) + + logger.debug("save DNS records to {} ...".format(filepath)) + # merge the single DNS records into one single dict with the record id as key + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record.id] = record + + if filepath.exists(): + logger.warning("file already exists, overwriting...") + + with open(filepath, "w") as f: + json.dump(dns_records_dict, f, default=lambda o: o.__dict__, indent=4) + + logger.info("export finished") + + return True + + def export_bind_dns_records(self, domain: str, filepath: Union[Path, str]) -> bool: + """ + Export all DNS record from the given domain to a BIND file. + This method does not represent a Porkbun API method. + Porkbun DNS record notes are exported as comments. + + :param domain: the domain for which the DNS record should be retrieved and saved + :param filepath: the filepath where to save the exported DNS records + + :return: True if everything went well + """ + + filepath = Path(filepath) + + logger.debug("retrieve current DNS records...") + dns_records = self.get_dns_records(domain) + + logger.debug("save DNS records to {} ...".format(filepath)) + # merge the single DNS records into one single dict with the record id as key + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record.id] = record + + if filepath.exists(): + logger.warning("file already exists, overwriting...") + + # domain header + bind_file_content = f"$ORIGIN {domain}" + + # SOA record + soa_records = dns.resolver.resolve(domain, "SOA") + if soa_records: + soa_record = soa_records[0] + bind_file_content += f"\n@ IN SOA {soa_record.mname} {soa_record.rname} ({soa_record.serial} {soa_record.refresh} {soa_record.retry} {soa_record.expire} {soa_record.minimum})" + + # records + for record in dns_records: + # name record class ttl record type record data + if record.prio: + record_content = f"{record.prio} {record.content}" + else: + record_content = record.content + bind_file_content += ( + f"\n{record.name} IN {record.ttl} {record.type} {record_content}" + ) + + if record.notes: + bind_file_content += f" ; {record.notes}" + + with open(filepath, "w") as f: + f.write(bind_file_content) + + logger.info("export finished") + + return True + + def import_dns_records( + self, domain: str, filepath: Union[Path, str], restore_mode: DNSRestoreMode + ) -> bool: + """ + Restore all DNS records from a json file to the given domain. + This method does not represent a Porkbun API method. + + :param domain: the domain for which the DNS record should be restored + :param filepath: the filepath from which the DNS records are to be restored + :param restore_mode: The restore mode (DNS records are identified by the record type, name and prio if supported): + clear: remove all existing DNS records and restore all DNS records from the provided file + replace: replace only existing DNS records with the DNS records from the provided file, + but do not create any new DNS records + keep: keep the existing DNS records and only create new ones for all DNS records from + the specified file if they do not exist + + :return: True if everything went well + """ + + filepath = Path(filepath) + + existing_dns_records = self.get_dns_records(domain) + + with open(filepath, "r") as f: + exported_dns_records_dict = json.load(f) + + if restore_mode is DNSRestoreMode.clear: + logger.debug("restore mode: clear") + + try: + # delete all existing DNS records + for record in existing_dns_records: + self.delete_dns_record(domain, record.id) + + # restore all exported records by creating new DNS records + for _, exported_record in exported_dns_records_dict.items(): + name = ".".join(exported_record["name"].split(".")[:-2]) + self.create_dns_record( + domain=domain, + record_type=exported_record["type"], + content=exported_record["content"], + name=name, + ttl=exported_record["ttl"], + prio=exported_record["prio"], + ) + except Exception as e: + logger.error("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + logger.error("import failed") + return False + elif restore_mode is DNSRestoreMode.replace: + logger.debug("restore mode: replace") + + try: + existing_dns_record_hashed = { + sha256( + f"{record.type}{record.name}{record.prio}".encode() + ).hexdigest(): record + for record in existing_dns_records + } + for record in exported_dns_records_dict.values(): + record_hash = sha256( + f"{record['type']}{record['name']}{record['prio']}".encode() + ).hexdigest() + existing_record = existing_dns_record_hashed.get(record_hash, None) + # check if the exported dns record is different to the existing record, + # so we can reduce unnecessary api calls + if existing_record is not None and ( + record["content"] != existing_record.content + or record["ttl"] != existing_record.ttl + or record["prio"] != existing_record.prio + ): + self.update_dns_record( + domain=domain, + record_id=existing_record.id, + record_type=record["type"], + content=record["content"], + name=record["name"].replace(f".{domain}", ""), + ttl=record["ttl"], + prio=record["prio"], + ) + except Exception as e: + logger.error("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + logger.error("import failed") + return False + elif restore_mode is DNSRestoreMode.keep: + logger.debug("restore mode: keep") + + existing_dns_record_hashed = { + sha256( + f"{record.type}{record.name}{record.prio}".encode() + ).hexdigest(): record + for record in existing_dns_records + } + + try: + for record in exported_dns_records_dict.values(): + record_hash = sha256( + f"{record['type']}{record['name']}{record['prio']}".encode() + ).hexdigest() + existing_record = existing_dns_record_hashed.get(record_hash, None) + if existing_record is None: + self.create_dns_record( + domain=domain, + record_type=record["type"], + content=record["content"], + name=record["name"].replace(f".{domain}", ""), + ttl=record["ttl"], + prio=record["prio"], + ) + except Exception as e: + logger.error("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + logger.error("import failed") + return False + else: + raise Exception("restore mode not supported") + + logger.info("import successfully completed") + + return True + + def import_bind_dns_records( + self, filepath: Union[Path, str], restore_mode: DNSRestoreMode + ) -> bool: + """ + Restore all DNS records from a BIND file. + This method does not represent a Porkbun API method. + + :param filepath: the bind filepath from which the DNS records are to be restored + :param restore_mode: The restore mode: + clear: remove all existing DNS records and restore all DNS records from the provided file + :return: True if everything went well + """ + + bind_file = BindFile.from_file(filepath) + + existing_dns_records = self.get_dns_records(bind_file.origin[:-1]) + + if restore_mode is DNSRestoreMode.clear: + logger.debug("restore mode: clear") + + try: + # delete all existing DNS records + for record in existing_dns_records: + self.delete_dns_record(bind_file.origin[:-1], record.id) + + # restore all records from BIND file by creating new DNS records + for record in bind_file.records: + # extract subdomain from record name + subdomain = record.name.replace(bind_file.origin, "") + # replace trailing dot + subdomain = subdomain[:-1] if subdomain.endswith(".") else subdomain + self.create_dns_record( + domain=bind_file.origin[:-1], + record_type=record.record_type, + content=record.data, + name=subdomain, + ttl=record.ttl, + prio=record.prio, + ) + + except Exception as e: + logger.error("something went wrong: {}".format(e.__str__())) + self.__handle_error_backup__(existing_dns_records) + logger.error("import failed") + return False + else: + raise Exception(f"restore mode '{restore_mode.value}' not supported") + + logger.info("import successfully completed") + + return True + + def update_dns_servers(self, domain: str, name_servers: List[str]) -> bool: + """ + Update the name servers of the specified domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Update%20Name%20Servers for more info. + + :return: True if everything went well + """ + + url = urljoin(self.api_endpoint, f"domain/updateNs/{domain}") + req_json = {**self._get_auth_request_json(), "ns": name_servers} + r = requests.post(url=url, json=req_json) + + if r.status_code == 200 and json.loads(r.text).get("status", None) == "SUCCESS": + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_dns_servers(self, domain: str) -> List[str]: + """ + Get the name servers for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Get%20Name%20Servers for more info. + + :return: list of name servers + """ + + url = urljoin(self.api_endpoint, f"domain/getNs/{domain}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return json.loads(r.text).get("ns", []) + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_domains(self, start: int = 0) -> List[DomainInfo]: + """ + Get all domains for the account in chunks of 1000. If you reach the end of all domains, the list will be empty. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20List%20All for more info. + + :param start: the index of the first domain to retrieve + + :return: list of DomainInfo objects + """ + + url = urljoin(self.api_endpoint, "domain/listAll") + + req_json = {**self._get_auth_request_json(), "start": start} + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + DomainInfo.from_dict(domain) + for domain in json.loads(r.text).get("domains", []) + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_url_forwards(self, domain: str) -> List[URLForwarding]: + """ + Get the url forwarding for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Get%20URL%20Forwarding for more info. + + :return: list of URLForwarding objects + """ + + url = urljoin(self.api_endpoint, f"domain/getUrlForwarding/{domain}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return [ + URLForwarding.from_dict(forwarding) + for forwarding in json.loads(r.text).get("forwards", []) + ] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def create_url_forward( + self, + domain: str, + subdomain: str, + location: str, + type: URLForwardingType, + include_path: bool, + wildcard: bool, + ) -> bool: + """ + Add a url forward for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Add%20URL%20Forward for more info. + + :param domain: the domain for which the url forwarding should be added + :param subdomain: the subdomain for which the url forwarding should be added, can be empty for root domain + :param location: the location to which the url forwarding should redirect + :param type: the type of the url forwarding + :param include_path: if the path should be included in the url forwarding + :param wildcard: if the url forwarding should also be applied to all subdomains + + :return: True if the forwarding was added successfully + """ + + url = urljoin(self.api_endpoint, f"domain/addUrlForward/{domain}") + req_json = { + **self._get_auth_request_json(), + "subdomain": subdomain, + "location": location, + "type": type, + "includePath": include_path, + "wildcard": wildcard, + } + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def delete_url_forward(self, domain: str, id: str) -> bool: + """ + Delete an url forward for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Delete%20URL%20Forward for more info. + + :param domain: the domain for which the url forwarding should be deleted + :param id: the id of the url forwarding which should be deleted + + :return: True if the deletion was successful + """ + + url = urljoin(self.api_endpoint, f"domain/deleteUrlForward/{domain}/{id}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + return True + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_domain_pricing(self) -> dict: + """ + Get the pricing for all Porkbun domains. + See https://api.porkbun.com/api/json/v3/documentation#Domain%20Pricing for more info. + + :return: dict with pricing + """ + + url = urljoin(self.api_endpoint, "pricing/get") + r = requests.post(url=url) + + if r.status_code == 200: + return json.loads(r.text)["pricing"] + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + def get_ssl_bundle(self, domain) -> SSLCertBundle: + """ + API SSL bundle retrieve method: retrieve an SSL bundle for the given domain. + See https://api.porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info. + + :param domain: the domain for which the SSL bundle should be retrieved + + :return: tuple of intermediate certificate, certificate chain, private key, public key + """ + + url = urljoin(self.api_endpoint, f"ssl/retrieve/{domain}") + req_json = self._get_auth_request_json() + r = requests.post(url=url, json=req_json) + + if r.status_code == 200: + ssl_bundle = json.loads(r.text) + + return SSLCertBundle( + certificate_chain=ssl_bundle["certificatechain"], + private_key=ssl_bundle["privatekey"], + public_key=ssl_bundle["publickey"], + ) + else: + response_json = json.loads(r.text) + raise PKBClientException( + response_json.get("status", "Unknown status"), + response_json.get("message", "Unknown message"), + ) + + @staticmethod + def __handle_error_backup__(dns_records): + # merge the single DNS records into one single dict with the record id as key + dns_records_dict = dict() + for record in dns_records: + dns_records_dict[record["id"]] = record + + # generate filename with incremental suffix + base_backup_filename = "pkb_client_dns_records_backup" + suffix = 0 + backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) + while backup_file_path.exists(): + suffix += 1 + backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix)) + + with open(backup_file_path, "w") as f: + json.dump(dns_records_dict, f) + + logger.warning( + "a backup of your existing dns records was saved to {}".format( + str(backup_file_path) + ) + ) diff --git a/pkb_client/client/dns.py b/pkb_client/client/dns.py new file mode 100644 index 0000000..85ae971 --- /dev/null +++ b/pkb_client/client/dns.py @@ -0,0 +1,63 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Optional + + +class DNSRecordType(str, Enum): + A = "A" + AAAA = "AAAA" + MX = "MX" + CNAME = "CNAME" + ALIAS = "ALIAS" + TXT = "TXT" + NS = "NS" + SRV = "SRV" + TLSA = "TLSA" + CAA = "CAA" + + def __str__(self): + return self.value + + +DNS_RECORDS_WITH_PRIORITY = {DNSRecordType.MX, DNSRecordType.SRV} + + +@dataclass +class DNSRecord: + id: str + name: str + type: DNSRecordType + content: str + ttl: int + prio: Optional[int] + notes: str + + @staticmethod + def from_dict(d): + # only use prio for supported record types since the API returns it for all records with default value 0 + prio = int(d["prio"]) if d["type"] in DNS_RECORDS_WITH_PRIORITY else None + return DNSRecord( + id=d["id"], + name=d["name"], + type=DNSRecordType[d["type"]], + content=d["content"], + ttl=int(d["ttl"]), + prio=prio, + notes=d["notes"], + ) + + +class DNSRestoreMode(Enum): + clear = 0 + replace = 1 + keep = 2 + + def __str__(self): + return self.name + + @staticmethod + def from_string(a): + try: + return DNSRestoreMode[a] + except KeyError: + return a diff --git a/pkb_client/client/domain.py b/pkb_client/client/domain.py new file mode 100644 index 0000000..a44a904 --- /dev/null +++ b/pkb_client/client/domain.py @@ -0,0 +1,29 @@ +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class DomainInfo: + domain: str + status: str + tld: str + create_date: datetime + expire_date: datetime + security_lock: bool + whois_privacy: bool + auto_renew: bool + not_local: bool + + @staticmethod + def from_dict(d): + return DomainInfo( + domain=d["domain"], + status=d["status"], + tld=d["tld"], + create_date=datetime.fromisoformat(d["createDate"]), + expire_date=datetime.fromisoformat(d["expireDate"]), + security_lock=bool(d["securityLock"]), + whois_privacy=bool(d["whoisPrivacy"]), + auto_renew=bool(d["autoRenew"]), + not_local=bool(d["notLocal"]), + ) diff --git a/pkb_client/client/forwarding.py b/pkb_client/client/forwarding.py new file mode 100644 index 0000000..64962ca --- /dev/null +++ b/pkb_client/client/forwarding.py @@ -0,0 +1,28 @@ +from dataclasses import dataclass +from enum import Enum + + +class URLForwardingType(str, Enum): + temporary = "temporary" + permanent = "permanent" + + +@dataclass +class URLForwarding: + id: str + subdomain: str + location: str + type: URLForwardingType + include_path: bool + wildcard: bool + + @staticmethod + def from_dict(d): + return URLForwarding( + id=d["id"], + subdomain=d["subdomain"], + location=d["location"], + type=URLForwardingType[d["type"]], + include_path=d["includePath"] == "yes", + wildcard=d["wildcard"] == "yes", + ) diff --git a/pkb_client/client/ssl_cert.py b/pkb_client/client/ssl_cert.py new file mode 100644 index 0000000..0662ce4 --- /dev/null +++ b/pkb_client/client/ssl_cert.py @@ -0,0 +1,13 @@ +from dataclasses import dataclass + + +@dataclass +class SSLCertBundle: + # The complete certificate chain. + certificate_chain: str + + # The private key. + private_key: str + + # The public key. + public_key: str diff --git a/pkb_client/helper.py b/pkb_client/helper.py deleted file mode 100644 index 0c08167..0000000 --- a/pkb_client/helper.py +++ /dev/null @@ -1,15 +0,0 @@ -def parse_dns_record(record: dict) -> dict: - """ - Parse the DNS record. - Replace the ttl and prio string values with the int values. - - :param record: the unparsed DNS record dict - - :return: the parsed dns record dict - """ - if record.get("ttl", None) is not None: - record["ttl"] = int(record["ttl"]) - if record.get("prio", None) is not None: - record["prio"] = int(record["prio"]) - - return record diff --git a/requirements.txt b/requirements.txt index 2c81c10..b6b0625 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,6 @@ setuptools>=39.0.1 requests>=2.20.0 +sphinx~=7.4 +dnspython~=2.7 +responses~=0.25.3 +ruff~=0.7 @@ -10,17 +10,17 @@ setup( version=pkb_client.__version__, author="infinityofspace", url="https://github.com/infinityofspace/pkb_client", - description="Unofficial client for the Porkbun API", + description="Python client for the Porkbun API", long_description=long_description, long_description_content_type="text/markdown", license="MIT", classifiers=[ - "Development Status :: 4 - Beta", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", "License :: OSI Approved :: MIT License", "Topic :: Internet :: WWW/HTTP", "Topic :: Internet :: Name Service (DNS)", @@ -28,14 +28,11 @@ setup( "Topic :: System :: Systems Administration", ], packages=find_packages(), - python_requires=">=3.6", - install_requires=[ - "setuptools>=39.0.1", - "requests>=2.20.0" - ], + python_requires=">=3.9", + install_requires=["setuptools>=39.0.1", "requests>=2.20.0", "dnspython~=2.6"], entry_points={ "console_scripts": [ - "pkb-client = pkb_client.cli:main", + "pkb-client = pkb_client.cli.cli:main", ] - } + }, ) diff --git a/tests/bind_file.py b/tests/bind_file.py new file mode 100644 index 0000000..4d1114f --- /dev/null +++ b/tests/bind_file.py @@ -0,0 +1,148 @@ +import tempfile +import unittest +from importlib import resources + +from pkb_client.client.bind_file import BindFile, BindRecord, RecordClass +from pkb_client.client.dns import DNSRecordType +from tests import data + + +class TestBindFileParsing(unittest.TestCase): + def test_reading_bind_file(self): + with self.subTest("With default TTL"): + with resources.open_text(data, "test.bind") as f: + bind_file = BindFile.from_file(f.name) + + self.assertEqual("test.com.", bind_file.origin) + self.assertEqual(1234, bind_file.ttl) + self.assertEqual(5, len(bind_file.records)) + self.assertEqual( + BindRecord( + "test.com.", 600, RecordClass.IN, DNSRecordType.A, "1.2.3.4" + ), + bind_file.records[0], + ) + self.assertEqual( + BindRecord( + "sub.test.com.", 600, RecordClass.IN, DNSRecordType.A, "4.3.2.1" + ), + bind_file.records[1], + ) + self.assertEqual( + BindRecord( + "test.com.", + 600, + RecordClass.IN, + DNSRecordType.AAAA, + "2001:db8::1", + comment="This is a comment", + ), + bind_file.records[2], + ) + self.assertEqual( + BindRecord( + "test.com.", 1234, RecordClass.IN, DNSRecordType.TXT, "pkb-client" + ), + bind_file.records[3], + ) + self.assertEqual( + BindRecord( + "test.com.", + 600, + RecordClass.IN, + DNSRecordType.MX, + "mail.test.com.", + prio=10, + ), + bind_file.records[4], + ) + + with self.subTest("Without default TTL"): + with resources.open_text(data, "test_no_ttl.bind") as f: + bind_file = BindFile.from_file(f.name) + + self.assertEqual("test.com.", bind_file.origin) + self.assertEqual(None, bind_file.ttl) + self.assertEqual(5, len(bind_file.records)) + self.assertEqual( + BindRecord( + "test.com.", 600, RecordClass.IN, DNSRecordType.A, "1.2.3.4" + ), + bind_file.records[0], + ) + self.assertEqual( + BindRecord( + "sub.test.com.", 600, RecordClass.IN, DNSRecordType.A, "4.3.2.1" + ), + bind_file.records[1], + ) + self.assertEqual( + BindRecord( + "test.com.", + 700, + RecordClass.IN, + DNSRecordType.AAAA, + "2001:db8::1", + comment="This is a comment", + ), + bind_file.records[2], + ) + self.assertEqual( + BindRecord( + "test.com.", 700, RecordClass.IN, DNSRecordType.TXT, "pkb-client" + ), + bind_file.records[3], + ) + self.assertEqual( + BindRecord( + "test.com.", + 600, + RecordClass.IN, + DNSRecordType.MX, + "mail.test.com.", + prio=10, + ), + bind_file.records[4], + ) + + def test_writing_bind_file(self): + records = [ + BindRecord("test.com.", 600, RecordClass.IN, DNSRecordType.A, "1.2.3.4"), + BindRecord( + "sub.test.com.", 700, RecordClass.IN, DNSRecordType.A, "4.3.2.1" + ), + BindRecord( + "test.com.", 600, RecordClass.IN, DNSRecordType.AAAA, "2001:db8::1" + ), + BindRecord( + "test.com.", 600, RecordClass.IN, DNSRecordType.TXT, "pkb-client" + ), + BindRecord( + "test.com.", + 600, + RecordClass.IN, + DNSRecordType.MX, + "mail.test.com.", + prio=10, + ), + ] + bind_file = BindFile("test.com.", 1234, records) + + file_content = ( + "$ORIGIN test.com.\n" + "$TTL 1234\n" + "test.com. 600 IN A 1.2.3.4\n" + "sub.test.com. 700 IN A 4.3.2.1\n" + "test.com. 600 IN AAAA 2001:db8::1\n" + "test.com. 600 IN TXT pkb-client\n" + "test.com. 600 IN MX 10 mail.test.com.\n" + ) + + with tempfile.NamedTemporaryFile() as f: + bind_file.to_file(f.name) + with open(f.name) as f2: + self.assertEqual(file_content.strip(), f2.read().strip()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/client.py b/tests/client.py new file mode 100644 index 0000000..9362be2 --- /dev/null +++ b/tests/client.py @@ -0,0 +1,1025 @@ +import json +import tempfile +import unittest +from pathlib import Path +from urllib.parse import urljoin + +import responses +from responses import matchers +from responses.registries import OrderedRegistry + +from pkb_client.client import ( + PKBClient, + PKBClientException, + API_ENDPOINT, + DNSRestoreMode, +) +from pkb_client.client import SSLCertBundle +from pkb_client.client.dns import DNSRecord, DNSRecordType +from pkb_client.client.forwarding import URLForwarding, URLForwardingType + + +class TestClientAuth(unittest.TestCase): + @responses.activate + def test_valid_auth(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "ping"), + json={"status": "SUCCESS", "yourIp": "127.0.0.1"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + ip_address = pkb_client.ping() + self.assertEqual("127.0.0.1", ip_address) + + @responses.activate + def test_invalid_auth(self): + pkb_client = PKBClient("key" + "s", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "ping"), + json={"status": "ERROR", "message": "Invalid credentials"}, + status=401, + ) + with self.assertRaises(PKBClientException): + pkb_client.ping() + + @responses.activate + def test_ping(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "ping"), + json={"status": "SUCCESS", "yourIp": "127.0.0.1"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + ip_address = pkb_client.ping() + self.assertEqual("127.0.0.1", ip_address) + + @responses.activate(registry=OrderedRegistry) + def test_create_dns_record(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "123456"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": 3600, + "prio": None, + } + ) + ], + ) + assert "123456" == pkb_client.create_dns_record( + "example.com", DNSRecordType.A, "127.0.0.1", "sub.example.com", 3600 + ) + + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "234561"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "sub.example.com", + "type": "MX", + "content": "127.0.0.1", + "ttl": 3600, + "prio": 2, + } + ) + ], + ) + assert "234561" == pkb_client.create_dns_record( + "example.com", DNSRecordType.MX, "127.0.0.1", "sub.example.com", 3600, 2 + ) + + def test_create_dns_record_invalid_prio_record_type(self): + pkb_client = PKBClient("key", "secret") + with self.assertRaises(ValueError): + pkb_client.create_dns_record( + "example.com", DNSRecordType.A, "127.0.0.1", "sub.example.com", 3600, 2 + ) + + @responses.activate(registry=OrderedRegistry) + def test_update_dns_record(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/edit/example.com/123456"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "type": "A", + "content": "127.0.0.1", + "name": "sub.example.com", + "ttl": 3600, + "prio": None, + } + ) + ], + ) + + success = pkb_client.update_dns_record( + "example.com", + "123456", + DNSRecordType.A, + "127.0.0.1", + "sub.example.com", + 3600, + ) + self.assertTrue(success) + + responses.post( + url=urljoin(API_ENDPOINT, "dns/edit/example.com/123456"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "type": "MX", + "content": "127.0.0.1", + "name": "sub.example.com", + "ttl": 3600, + "prio": 2, + } + ) + ], + ) + + success = pkb_client.update_dns_record( + "example.com", + "123456", + DNSRecordType.MX, + "127.0.0.1", + "sub.example.com", + 3600, + 2, + ) + self.assertTrue(success) + + def test_update_dns_records_invalid_prio_record_type(self): + pkb_client = PKBClient("key", "secret") + with self.assertRaises(ValueError): + pkb_client.update_dns_record( + "example.com", + "123456", + DNSRecordType.A, + "127.0.0.1", + "sub.example.com", + 3600, + 2, + ) + + @responses.activate + def test_update_all_dns_records(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/editByNameType/example.com/A/sub"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "type": "A", + "content": "127.0.0.1", + "ttl": 1234, + "prio": None, + } + ) + ], + ) + + success = pkb_client.update_all_dns_records( + "example.com", DNSRecordType.A, "sub", "127.0.0.1", 1234 + ) + + self.assertTrue(success) + + def test_update_all_dns_records_all_invalid_prio_record_type(self): + pkb_client = PKBClient("key", "secret") + + with self.assertRaises(ValueError): + pkb_client.update_all_dns_records( + "example.com", DNSRecordType.A, "sub", "127.0.0.1", 1234, 2 + ) + + @responses.activate + def test_delete_dns_record(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/delete/example.com/123456"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + success = pkb_client.delete_dns_record("example.com", "123456") + + self.assertTrue(success) + + @responses.activate + def test_delete_all_dns_records(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/deleteByNameType/example.com/A/sub"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + success = pkb_client.delete_all_dns_records( + "example.com", DNSRecordType.A, "sub" + ) + + self.assertTrue(success) + + @responses.activate + def test_get_dns_records(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": "600", + "prio": None, + "notes": "", + }, + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 600, + "prio": None, + "notes": "", + }, + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + records = pkb_client.get_dns_records("example.com") + + expected_records = [ + DNSRecord( + "123456", "example.com", DNSRecordType.A, "127.0.0.1", 600, None, "" + ), + DNSRecord( + "1234567", + "sub.example.com", + DNSRecordType.A, + "127.0.0.2", + 600, + None, + "", + ), + ] + self.assertEqual(expected_records, records) + + @responses.activate + def test_get_all_dns_records(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieveByNameType/example.com/A/sub"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 600, + "prio": None, + "notes": "", + } + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + records = pkb_client.get_all_dns_records("example.com", DNSRecordType.A, "sub") + + expected_records = [ + DNSRecord( + "1234567", + "sub.example.com", + DNSRecordType.A, + "127.0.0.2", + 600, + None, + "", + ) + ] + self.assertEqual(expected_records, records) + + @responses.activate + def test_update_dns_servers(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "domain/updateNs/example.com"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "ns": ["ns1.example.com", "ns2.example.com"], + } + ) + ], + ) + + success = pkb_client.update_dns_servers( + "example.com", ["ns1.example.com", "ns2.example.com"] + ) + + self.assertTrue(success) + + @responses.activate + def test_get_url_forwards(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "domain/getUrlForwarding/example.com"), + json={ + "status": "SUCCESS", + "forwards": [ + { + "id": "123456", + "subdomain": "", + "location": "https://example.com", + "type": "temporary", + "includePath": "no", + "wildcard": "yes", + }, + { + "id": "234567", + "subdomain": "sub1", + "location": "https://sub1.example.com", + "type": "permanent", + "includePath": "no", + "wildcard": "yes", + }, + ], + }, + ) + + forwards = pkb_client.get_url_forwards("example.com") + + expected_forwards = [ + URLForwarding( + "123456", + "", + "https://example.com", + URLForwardingType.temporary, + False, + True, + ), + URLForwarding( + "234567", + "sub1", + "https://sub1.example.com", + URLForwardingType.permanent, + False, + True, + ), + ] + + self.assertEqual(expected_forwards, forwards) + + @responses.activate + def test_create_url_forward(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "domain/addUrlForward/example.com"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "subdomain": "sub.example.com", + "location": "https://www.example.com", + "type": "permanent", + "includePath": False, + "wildcard": False, + } + ) + ], + ) + + success = pkb_client.create_url_forward( + "example.com", + "sub.example.com", + "https://www.example.com", + URLForwardingType.permanent, + False, + False, + ) + + self.assertTrue(success) + + @responses.activate + def test_delete_url_forward(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "domain/deleteUrlForward/example.com/123456"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + success = pkb_client.delete_url_forward("example.com", "123456") + + self.assertTrue(success) + + @responses.activate + def test_get_domain_pricing(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "pricing/get"), + json={ + "status": "SUCCESS", + "pricing": { + "com": { + "registration": "42.42", + "renewal": "4.2", + "transfer": "42.2", + "coupons": [], + }, + "test": { + "registration": "4.42", + "renewal": "44.2", + "transfer": "4.2", + "coupons": [], + }, + }, + }, + ) + + pricing = pkb_client.get_domain_pricing() + + expected_pricing = { + "com": { + "registration": "42.42", + "renewal": "4.2", + "transfer": "42.2", + "coupons": [], + }, + "test": { + "registration": "4.42", + "renewal": "44.2", + "transfer": "4.2", + "coupons": [], + }, + } + + self.assertEqual(expected_pricing, pricing) + + @responses.activate + def test_get_ssl_bundle(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "ssl/retrieve/example.com"), + json={ + "status": "SUCCESS", + "certificatechain": "----BEGIN CERTIFICATE-----\nabc1-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc2-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc3-----END CERTIFICATE-----\n", + "privatekey": "-----BEGIN PRIVATE KEY-----\nabc4-----END PRIVATE KEY-----\n", + "publickey": "-----BEGIN PUBLIC KEY-----\nabc5-----END PUBLIC KEY-----\n", + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + ssl_cert_bundle = pkb_client.get_ssl_bundle("example.com") + + expected_ssl_cert_bundle = SSLCertBundle( + certificate_chain="----BEGIN CERTIFICATE-----\nabc1-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc2-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc3-----END CERTIFICATE-----\n", + private_key="-----BEGIN PRIVATE KEY-----\nabc4-----END PRIVATE KEY-----\n", + public_key="-----BEGIN PUBLIC KEY-----\nabc5-----END PUBLIC KEY-----\n", + ) + + self.assertEqual(expected_ssl_cert_bundle, ssl_cert_bundle) + + @responses.activate + def test_export_dns_records(self): + pkb_client = PKBClient("key", "secret") + + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": "600", + "prio": None, + "notes": "", + }, + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": "1200", + "prio": None, + "notes": "This is a comment", + }, + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + with tempfile.NamedTemporaryFile() as f: + pkb_client.export_dns_records("example.com", f.name) + + with open(f.name, "r") as f: + exported_dns_file = json.load(f) + + expected_exported_dns_file = { + "123456": { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": 600, + "prio": None, + "notes": "", + }, + "1234567": { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 1200, + "prio": None, + "notes": "This is a comment", + }, + } + + self.assertEqual(expected_exported_dns_file, exported_dns_file) + + @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True) + def test_import_dns_records_clear(self): + pkb_client = PKBClient("key", "secret") + + # first all records should be retrieved + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": "600", + "prio": None, + "notes": "", + }, + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 600, + "prio": None, + "notes": "", + }, + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + # then all records should be deleted + responses.post( + url=urljoin(API_ENDPOINT, "dns/delete/example.com/123456"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + responses.post( + url=urljoin(API_ENDPOINT, "dns/delete/example.com/1234567"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + # then all records should be imported / created + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "123456"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "", + "type": "A", + "content": "127.0.0.3", + "ttl": 600, + "prio": None, + } + ) + ], + ) + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "1234567"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "sub", + "type": "A", + "content": "127.0.0.4", + "ttl": 600, + "prio": None, + } + ) + ], + ) + + with tempfile.TemporaryDirectory() as temp_dir: + filename = Path(temp_dir, "records.json") + with open(filename, "w") as f: + json.dump( + { + "123456": { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.3", + "ttl": 600, + "prio": None, + }, + "1234567": { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.4", + "ttl": 600, + "prio": None, + }, + }, + f, + ) + + pkb_client.import_dns_records( + "example.com", str(filename), DNSRestoreMode.clear + ) + + @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True) + def test_import_dns_records_replace(self): + pkb_client = PKBClient("key", "secret") + + # first all records should be retrieved + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": "600", + "prio": None, + "notes": "", + }, + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 600, + "prio": None, + "notes": "", + }, + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + # same record should be updated + responses.post( + url=urljoin(API_ENDPOINT, "dns/edit/example.com/1234567"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "type": "A", + "content": "127.0.0.3", + "name": "sub", + "ttl": 600, + "prio": None, + } + ) + ], + ) + + with tempfile.TemporaryDirectory() as temp_dir: + filename = Path(temp_dir, "records.json") + with open(filename, "w") as f: + json.dump( + { + "123451": { + "id": "123451", + "name": "test.example.com", + "type": "A", + "content": "127.0.0.4", + "ttl": 600, + "prio": None, + }, + "1234562": { + "id": "1234562", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.3", + "ttl": 600, + "prio": None, + }, + }, + f, + ) + + pkb_client.import_dns_records( + "example.com", str(filename), DNSRestoreMode.replace + ) + + @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True) + def test_import_dns_records_keep(self): + pkb_client = PKBClient("key", "secret") + + # first all records should be retrieved + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": "600", + "prio": None, + "notes": "", + }, + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 600, + "prio": None, + "notes": "", + }, + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + + # only new records should be created + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "1234562"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "test", + "type": "A", + "content": "127.0.0.4", + "ttl": 600, + "prio": None, + } + ) + ], + ) + + with tempfile.TemporaryDirectory() as temp_dir: + filename = Path(temp_dir, "records.json") + with open(filename, "w") as f: + json.dump( + { + "123451": { + "id": "123451", + "name": "test.example.com", + "type": "A", + "content": "127.0.0.4", + "ttl": 600, + "prio": None, + }, + "1234562": { + "id": "1234562", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.3", + "ttl": 600, + "prio": None, + }, + }, + f, + ) + + pkb_client.import_dns_records( + "example.com", str(filename), DNSRestoreMode.keep + ) + + @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True) + def test_import_bind_dns_records(self): + pkb_client = PKBClient("key", "secret") + + # first all records should be retrieved + responses.post( + url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"), + json={ + "status": "SUCCESS", + "records": [ + { + "id": "123456", + "name": "example.com", + "type": "A", + "content": "127.0.0.1", + "ttl": "600", + "prio": None, + "notes": "", + }, + { + "id": "1234567", + "name": "sub.example.com", + "type": "A", + "content": "127.0.0.2", + "ttl": 600, + "prio": None, + "notes": "", + }, + ], + }, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + # then all records should be deleted + responses.post( + url=urljoin(API_ENDPOINT, "dns/delete/example.com/123456"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + responses.post( + url=urljoin(API_ENDPOINT, "dns/delete/example.com/1234567"), + json={"status": "SUCCESS"}, + match=[ + matchers.json_params_matcher( + {"apikey": "key", "secretapikey": "secret"} + ) + ], + ) + # then all records should be imported / created + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "123456"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "", + "type": "A", + "content": "127.0.0.3", + "ttl": 600, + "prio": None, + } + ) + ], + ) + responses.post( + url=urljoin(API_ENDPOINT, "dns/create/example.com"), + json={"status": "SUCCESS", "id": "1234567"}, + match=[ + matchers.json_params_matcher( + { + "apikey": "key", + "secretapikey": "secret", + "name": "sub", + "type": "A", + "content": "127.0.0.4", + "ttl": 600, + "prio": None, + } + ) + ], + ) + + with tempfile.TemporaryDirectory() as temp_dir: + filename = Path(temp_dir, "records.bind") + with open(filename, "w") as f: + f.write( + ( + "$ORIGIN example.com.\n" + "$TTL 1234\n" + "@ IN SOA dns.example.com. dns2.example.com. (100 300 100 6000 600)\n" + "example.com. IN 600 A 127.0.0.3\n" + "sub.example.com. 600 IN A 127.0.0.4" + ) + ) + + pkb_client.import_bind_dns_records(filename, DNSRestoreMode.clear) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/data/__init__.py b/tests/data/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/data/__init__.py diff --git a/tests/data/test.bind b/tests/data/test.bind new file mode 100644 index 0000000..b99dcf4 --- /dev/null +++ b/tests/data/test.bind @@ -0,0 +1,10 @@ +$ORIGIN test.com. +$TTL 1234 +@ IN SOA dns.example.com. dns2.example.com. (100 300 100 6000 600) +test.com. IN 600 A 1.2.3.4 +test.com. HS 600 A 1.2.3.4 +sub.test.com. 600 IN A 4.3.2.1 +@ IN 600 AAAA 2001:db8::1 ; This is a comment + +test.com. IN TXT pkb-client +test.com. 600 IN MX 10 mail.test.com. diff --git a/tests/data/test_no_ttl.bind b/tests/data/test_no_ttl.bind new file mode 100644 index 0000000..7dac0ff --- /dev/null +++ b/tests/data/test_no_ttl.bind @@ -0,0 +1,9 @@ +$ORIGIN test.com. +@ IN SOA dns.example.com. dns2.example.com. (100 300 100 6000 600) +test.com. IN 600 A 1.2.3.4 +test.com. HS 600 A 1.2.3.4 +sub.test.com. 600 IN A 4.3.2.1 +@ IN 700 AAAA 2001:db8::1 ; This is a comment + +test.com. IN TXT pkb-client +test.com. 600 IN MX 10 mail.test.com. diff --git a/tests/pkb_client_tests.py b/tests/pkb_client_tests.py deleted file mode 100644 index 2668a3c..0000000 --- a/tests/pkb_client_tests.py +++ /dev/null @@ -1,569 +0,0 @@ -import json -import os -import unittest -from pathlib import Path - -import requests - -from pkb_client.client import PKBClient, DNSRestoreMode - -""" -WARNING: DO NOT RUN THIS TEST WITH A PRODUCTION DOMAIN OR IN A PRODUCTION ENVIRONMENT!! - This test sets, edits and deletes dns record entries and if the test fails, - unintended changes to dns entries may result. -""" - -TEST_DOMAIN = os.environ.get("TEST_DOMAIN") -PORKBUN_API_KEY = os.environ.get("PORKBUN_API_KEY") -PORKBUN_API_SECRET = os.environ.get("PORKBUN_API_SECRET") -DNS_RECORDS = os.environ.get("DNS_RECORDS") - -PUBLIC_IP_URL = "https://api64.ipify.org" - - -class DNSTestWithCleanup(unittest.TestCase): - - def tearDown(self): - if hasattr(self, "record_id") and self.record_id is not None: - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - pkb_client.dns_delete(TEST_DOMAIN, self.record_id) - - -class TestClientAuth(unittest.TestCase): - def test_valid_auth(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - ip_address = pkb_client.ping() - - self.assertEqual(ip_address, requests.get(PUBLIC_IP_URL).text) - - def test_invalid_api_key(self): - pkb_client = PKBClient("invalid-api-key", PORKBUN_API_SECRET) - with self.assertRaises(Exception): - pkb_client.ping() - - def test_invalid_api_secret(self): - pkb_client = PKBClient(PORKBUN_API_KEY, "invalid-api-secret") - with self.assertRaises(Exception): - pkb_client.ping() - - def test_invalid_api_key_and_secret(self): - pkb_client = PKBClient("invalid-api-key", "invalid-api-secret") - with self.assertRaises(Exception): - pkb_client.ping() - - -class TestPingMethod(unittest.TestCase): - def test_ping(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - ip_address = pkb_client.ping() - - self.assertEqual(ip_address, requests.get(PUBLIC_IP_URL).text) - - -class TestDNSCreateMethod(DNSTestWithCleanup): - - def test_valid_request(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - ttl = 342 - name = "test_pkb_client" - - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name, ttl=ttl) - records = pkb_client.dns_retrieve(TEST_DOMAIN) - - for record in records: - if record["id"] == self.record_id: - with self.subTest(): - self.assertEqual(txt_content, record["content"]) - with self.subTest(): - self.assertEqual(ttl, int(record["ttl"])) - with self.subTest(): - self.assertEqual("{}.{}".format(name, TEST_DOMAIN), record["name"]) - return - self.assertTrue(False) - - def test_invalid_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - with self.assertRaises(Exception): - self.record_id = pkb_client.dns_create("notvaliddomain", "TXT", "interesting-content", - name="test_pkb_client") - - def test_invalid_record_type(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - with self.assertRaises(AssertionError): - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "ABC", "interesting-content", name="test_pkb_client") - - def test_larger_than_allowed_content_length(self): - # the api call should not fail because the api creates multiple TXT entries which will be concatenated - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - txt_content = "interesting-content-interesting-content-interesting-content-interesting-content-" \ - "interesting-content-interesting-content-interesting-content-interesting-content-" \ - "interesting-content-interesting-content-interesting-content-interesting-content-" \ - "interesting-content" - assert len(txt_content) == 259 - - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client") - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - self.assertEqual(txt_content, record["content"]) - return - self.assertTrue(False) - - def test_largest_allowed_content_length(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - txt_content = "interesting-content-interesting-content-interesting-content-interesting-content-" \ - "interesting-content-interesting-content-interesting-content-interesting-content-" \ - "interesting-content-interesting-content-interesting-content-interesting-content-" \ - "interesting-con" - assert len(txt_content) == 255 - - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client") - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - self.assertEqual(txt_content, record["content"]) - return - self.assertTrue(False) - - def test_empty_content_str(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - txt_content = "" - assert len(txt_content) == 0 - - with self.assertRaises(AssertionError): - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client") - - def test_none_content(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", None, name="test_pkb_client") - - def test_smaller_than_allowed_ttl(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", ttl=299, - name="test_pkb_client") - - def test_negative_ttl(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", ttl=-1, - name="test_pkb_client") - - def test_larger_than_allowed_ttl(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", name="test_pkb_client", - ttl=2147483648) - - def test_largest_allowed_ttl(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - ttl = 2147483647 - - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", ttl=ttl) - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest(): - self.assertEqual(txt_content, record["content"]) - with self.subTest(): - self.assertEqual(ttl, int(record["ttl"])) - return - self.assertTrue(False) - - def test_valid_prio_with_txt(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - prio = 10 - - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", prio=prio) - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest(): - self.assertEqual(txt_content, record["content"]) - with self.subTest(): - self.assertEqual(prio, int(record["prio"])) - return - self.assertTrue(False) - - def test_negative_prio_with_txt(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - prio = -42 - - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", prio=prio) - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest(): - self.assertEqual(txt_content, record["content"]) - with self.subTest(): - self.assertEqual(prio, int(record["prio"])) - return - self.assertTrue(False) - - -class TestDNSEditMethod(DNSTestWithCleanup): - def test_valid_edit_request(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - name = "test_pkb_client" - tll = 342 - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name, ttl=tll) - - edited_txt_content = "more-interesting-content" - edited_name = "more_test_pkb_client" - edited_tll = 423 - pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content, name=edited_name, ttl=edited_tll) - - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest("txt record content is not edited"): - self.assertEqual(edited_txt_content, record["content"]) - with self.subTest("txt record name is not edited"): - self.assertEqual("{}.{}".format(edited_name, TEST_DOMAIN), record["name"]) - with self.subTest("txt record ttl is not edited"): - self.assertEqual(edited_tll, int(record["ttl"])) - return - self.assertTrue(False) - - def test_change_subdomain_to_root_txt_record(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - name = "test_pkb_client" - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) - - edited_txt_content = "more-interesting-content" - edited_name = "" - pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content, name=edited_name) - - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest("txt record content is not edited"): - self.assertEqual(edited_txt_content, record["content"]) - with self.subTest("txt record name is not edited"): - self.assertEqual(TEST_DOMAIN, record["name"]) - return - self.assertTrue(False) - - def test_no_name_change(self): - # the name is required for each edit, otherwise the record will apply for the root domain - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - name = "test_pkb_client" - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) - - edited_txt_content = "more-interesting-content" - pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content) - - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest("txt record content is not edited"): - self.assertEqual(edited_txt_content, record["content"]) - with self.subTest("txt record name is not edited"): - self.assertEqual(TEST_DOMAIN, record["name"]) - return - self.assertTrue(False) - - def test_record_type_change(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - name = "test_pkb_client" - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) - - edited_txt_content = "more-interesting-content" - name = "test_pkb_client" - edited_record_type = "MX" - pkb_client.dns_edit(TEST_DOMAIN, self.record_id, edited_record_type, edited_txt_content, name=name) - - records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in records: - if record["id"] == self.record_id: - with self.subTest("txt record content is not edited"): - self.assertEqual(edited_txt_content, record["content"]) - with self.subTest("record type is not edited"): - self.assertEqual(edited_record_type, record["type"]) - return - self.assertTrue(False) - - -class TestDNSDeleteMethod(DNSTestWithCleanup): - def test_valid_delete_request(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - txt_content = "interesting-content" - name = "test_pkb_client" - self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name) - - records = pkb_client.dns_retrieve(TEST_DOMAIN) - record_exists = False - for record in records: - if record["id"] == self.record_id: - record_exists = True - break - with self.subTest("test txt record setup failed"): - self.assertTrue(record_exists) - - pkb_client.dns_delete(TEST_DOMAIN, self.record_id) - - records = pkb_client.dns_retrieve(TEST_DOMAIN) - record_exists = False - for record in records: - if record["id"] == self.record_id: - record_exists = True - break - if not record_exists: - self.record_id = None - with self.subTest("txt record is not deleted"): - self.assertFalse(record_exists) - - -class TestDNSReceiveMethod(unittest.TestCase): - def test_valid_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - records = pkb_client.dns_retrieve(TEST_DOMAIN) - self.assertEqual(records, DNS_RECORDS) - - def test_invalid_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - with self.assertRaises(Exception): - pkb_client.dns_retrieve("invaliddomain") - - -class TestDNSExport(unittest.TestCase): - def test_valid_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - dns_records = pkb_client.dns_retrieve(domain=TEST_DOMAIN) - # reformat the dns records to a single dict - dns_records_dict = dict() - for record in dns_records: - dns_records_dict[record["id"]] = record - - filepath = Path("dns_backup.json") - if filepath.exists(): - filepath.unlink() - pkb_client.dns_export(domain=TEST_DOMAIN, filename=str(filepath)) - - with open(str("dns_backup.json"), "r") as f: - self.assertEqual(json.load(f), dns_records_dict) - - filepath.unlink() - - def test_invalid_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(Exception): - pkb_client.dns_export(domain="invaliddomain", filename="dns_backup.json") - - def test_empty_str_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_export(domain="", filename="dns_backup.json") - - def test_none_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_export(domain=None, filename="dns_backup.json") - - def test_filename_already_exists(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - filepath = Path("dns_backup.json") - filepath.touch() - - with self.assertRaises(Exception): - pkb_client.dns_export(domain=TEST_DOMAIN, filename=str(filepath)) - - filepath.unlink() - - def test_empty_str_filename(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_export(domain=TEST_DOMAIN, filename="") - - def test_none_filename(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_export(domain=TEST_DOMAIN, filename=None) - - -class TestDNSImport(unittest.TestCase): - def test_valid_clear_import(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - existing_dns_record_ids = set() - dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in dns_records: - existing_dns_record_ids.add(record["id"]) - - filename = "dns_backup_clear.json" - - with open(filename, "r") as f: - file_dns_records = json.load(f) - - pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.clear) - - new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) - for new_record in new_dns_records: - # test if the previous dns records still exists - if new_record["id"] in existing_dns_record_ids: - self.assertTrue(False) - # test if the new dns record was created - new_record_created = False - for _, file_dns_record in file_dns_records.items(): - if file_dns_record["name"] == new_record["name"] \ - and file_dns_record["type"] == new_record["type"] \ - and file_dns_record["content"] == new_record["content"] \ - and file_dns_record["ttl"] == new_record["ttl"] \ - and file_dns_record["prio"] == new_record["prio"]: - new_record_created = True - self.assertTrue(new_record_created) - - def test_valid_replace_import(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - existing_dns_record_ids = set() - dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in dns_records: - existing_dns_record_ids.add(record["id"]) - - filename = "dns_backup_replace.json" - - with open(filename, "r") as f: - file_dns_records = json.load(f) - - pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.replace) - - new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) - for new_record in new_dns_records: - # test if the previous dns records still exists - if new_record["id"] not in existing_dns_record_ids: - self.assertTrue(False) - # test if the dns record was edited - record_edited = False - for _, file_dns_record in file_dns_records.items(): - if file_dns_record["name"] == new_record["name"] \ - and file_dns_record["type"] == new_record["type"] \ - and file_dns_record["content"] == new_record["content"] \ - and file_dns_record["ttl"] == new_record["ttl"] \ - and file_dns_record["prio"] == new_record["prio"]: - record_edited = True - break - self.assertTrue(record_edited) - - def test_valid_keep_import(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - existing_dns_records = dict() - dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) - for record in dns_records: - existing_dns_records[record["id"]] = record - - filename = "dns_backup_keep.json" - - with open(filename, "r") as f: - file_dns_records = json.load(f) - - pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.keep) - - new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN) - - # test if the all old dns records are kept - for _, existing_record in existing_dns_records.items(): - record_kept = False - for new_record in new_dns_records: - if existing_record["id"] == new_record["id"] \ - and existing_record["name"] == new_record["name"] \ - and existing_record["type"] == new_record["type"] \ - and existing_record["content"] == new_record["content"] \ - and existing_record["ttl"] == new_record["ttl"] \ - and existing_record["prio"] == new_record["prio"]: - record_kept = True - break - with self.subTest(): - self.assertTrue(record_kept) - - # test if the new records are created - for new_record in new_dns_records: - if new_record["id"] not in existing_dns_records: - record_created = False - for _, file_dns_record in file_dns_records.items(): - if file_dns_record["name"] == new_record["name"] \ - and file_dns_record["type"] == new_record["type"] \ - and file_dns_record["content"] == new_record["content"] \ - and file_dns_record["ttl"] == new_record["ttl"] \ - and file_dns_record["prio"] == new_record["prio"]: - record_created = True - break - with self.subTest(): - self.assertTrue(record_created) - - def test_invalid_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(Exception): - pkb_client.dns_import(domain="invaliddomain", filename="dns_backup.json", restore_mode=DNSRestoreMode.clear) - - def test_empty_str_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain="", filename="dns_backup.json", restore_mode=DNSRestoreMode.clear) - - def test_none_domain(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain=None, filename="dns_backup.json", restore_mode=DNSRestoreMode.clear) - - def test_empty_str_filename(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain=TEST_DOMAIN, filename="", restore_mode=DNSRestoreMode.clear) - - def test_none_filename(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain=TEST_DOMAIN, filename=None, restore_mode=DNSRestoreMode.clear) - - def test_invalid_restore_mode(self): - pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET) - - with self.subTest("None as restore mode"): - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode=None) - with self.subTest("empty string as restore mode"): - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode="") - with self.subTest("number as restore mode"): - with self.assertRaises(AssertionError): - pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode=0) - - -if __name__ == '__main__': - unittest.main() diff --git a/third-party-notices b/third-party-notices deleted file mode 100644 index 85cdfbc..0000000 --- a/third-party-notices +++ /dev/null @@ -1,216 +0,0 @@ -This project uses other Python modules released under different license agreement than -pkb_client. Below are the used modules and their license and notice: - -########################################################################################### -## requests: ## - -License: - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - -Notice: - -Requests -Copyright 2019 Kenneth Reitz - -########################################################################################### - -########################################################################################### -## setuptools: ## - -License: - -Copyright Jason R. Coombs - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to -deal in the Software without restriction, including without limitation the -rights to use, copy, modify, merge, publish, distribute, sublicense, and/or -sell copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -IN THE SOFTWARE. - -###########################################################################################
\ No newline at end of file |
