summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatarUnit 193 <unit193@unit193.net>2024-11-20 01:17:40 -0500
committerLibravatarUnit 193 <unit193@unit193.net>2024-11-20 01:17:40 -0500
commit3e3ebe586385a83b10c8f1d0b9ba9b67c8b56d2f (patch)
tree5682f748fc9867166043734aad44e1734d16abeb
parentfa197fe27b8a03bbf4504476f842956ece2c76c9 (diff)
New upstream version 2.0.0.upstream/2.0.0
-rw-r--r--.github/ISSUE_TEMPLATE/bug_report.md29
-rw-r--r--.github/ISSUE_TEMPLATE/bug_report.yml83
-rw-r--r--.github/ISSUE_TEMPLATE/feature_request.yml36
-rw-r--r--.github/dependabot.yml11
-rw-r--r--.github/workflows/docs_publish.yml32
-rw-r--r--.github/workflows/docs_source_update.yml37
-rw-r--r--.github/workflows/formatting_check.yml26
-rw-r--r--.github/workflows/linting_check.yml26
-rw-r--r--.github/workflows/pypi-publish-release.yml12
-rw-r--r--.github/workflows/unit_tests.yml25
-rw-r--r--Makefile90
-rw-r--r--Readme.md104
-rw-r--r--docs/.gitignore1
-rw-r--r--docs/Makefile20
-rw-r--r--docs/make.bat35
-rw-r--r--docs/source/conf.py30
-rw-r--r--docs/source/index.rst29
-rw-r--r--docs/source/installation.rst17
-rw-r--r--docs/source/migration_guide.rst54
-rw-r--r--docs/source/modules.rst7
-rw-r--r--docs/source/pkb_client.cli.rst21
-rw-r--r--docs/source/pkb_client.client.rst61
-rw-r--r--docs/source/pkb_client.rst19
-rw-r--r--docs/source/usage.rst89
-rw-r--r--pkb_client/__init__.py2
-rw-r--r--pkb_client/cli.py134
-rw-r--r--pkb_client/cli/__init__.py3
-rw-r--r--pkb_client/cli/cli.py347
-rw-r--r--pkb_client/client.py455
-rw-r--r--pkb_client/client/__init__.py22
-rw-r--r--pkb_client/client/bind_file.py169
-rw-r--r--pkb_client/client/client.py867
-rw-r--r--pkb_client/client/dns.py63
-rw-r--r--pkb_client/client/domain.py29
-rw-r--r--pkb_client/client/forwarding.py28
-rw-r--r--pkb_client/client/ssl_cert.py13
-rw-r--r--pkb_client/helper.py15
-rw-r--r--requirements.txt4
-rw-r--r--setup.py21
-rw-r--r--tests/bind_file.py148
-rw-r--r--tests/client.py1025
-rw-r--r--tests/data/__init__.py0
-rw-r--r--tests/data/test.bind10
-rw-r--r--tests/data/test_no_ttl.bind9
-rw-r--r--tests/pkb_client_tests.py569
-rw-r--r--third-party-notices216
46 files changed, 3587 insertions, 1456 deletions
diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md
deleted file mode 100644
index 8a0d833..0000000
--- a/.github/ISSUE_TEMPLATE/bug_report.md
+++ /dev/null
@@ -1,29 +0,0 @@
----
-name: Bug report
-about: Create a report to help us improve pkb_client
-title: ''
-labels: bug
-assignees: ''
-
----
-
-**Describe the bug**
-A clear and concise description of what the bug is.
-
-**To Reproduce**
-Steps to reproduce the behavior.
-
-**Expected behavior**
-A clear and concise description of what you expected to happen.
-
-**pkb-client Command**
-Specify the exact command of pkb-client. **Make sure to anonymize your Porkbun API key and secret and your Porkbun domain/subdomain.**
-
-**Versions (please complete the following version information):**
- - pkb_client: [you can use `pip show pkb_client` to get the version]
-
-**Error message**
-The returned error message.
-
-**Additional context**
-Add any other context about the problem here.
diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml
new file mode 100644
index 0000000..5b12aa8
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/bug_report.yml
@@ -0,0 +1,83 @@
+name: Bug Report
+description: File a bug report
+title: "[Bug]: "
+labels: ["bug", "triage"]
+
+body:
+ - type: markdown
+ attributes:
+ value: "Thanks for taking the time to fill out this bug report!"
+
+ - type: textarea
+ id: system-os-info
+ attributes:
+ label: OS info
+ description: The operating system and version you are using.
+ placeholder: eg. Debian 12 , Ubuntu 23.10, Arch 6.6.3, Windows 10, MacOS 12
+ validations:
+ required: true
+
+ - type: textarea
+ id: version-info
+ attributes:
+ label: pkb_client version
+ description: The pkb_client version you are using.
+ placeholder: eg. v1.2
+ validations:
+ required: true
+
+ - type: textarea
+ id: bug-description
+ attributes:
+ label: Bug description
+ description: A clear and precise description of what the bug is.
+ placeholder: What happend?
+ validations:
+ required: true
+
+ - type: textarea
+ id: expected-description
+ attributes:
+ label: Expected behaviour description
+ description: A simple and precise description of the expected behavior.
+ placeholder: What should happend?
+ validations:
+ required: true
+
+ - type: textarea
+ id: logs
+ attributes:
+ label: Relevant log output
+ description: Please copy and paste any relevant log output.
+ render: shell
+ validations:
+ required: false
+
+ - type: textarea
+ id: reproduce-steps
+ attributes:
+ label: Steps to reproduce
+ description: Steps to reproduce the behavior.
+ placeholder: How can the error be reproduced?
+ validations:
+ required: true
+
+ - type: textarea
+ id: additional-context
+ attributes:
+ label: Additional context
+ description: All further information on the context of the bug that does not belong to the other sections, such as a workaround or already tested approaches to a solution.
+ placeholder: Is there any further context?
+ validations:
+ required: false
+
+ - type: checkboxes
+ id: checklist
+ attributes:
+ label: Checklist
+ description: Please check off the following checklist after you have performed the corresponding actions
+ options:
+ - label: I have checked for [existing Github issues](https://github.com/infinityofspace/pkb_client/issues) for the same bug.
+ required: true
+ - label: I have checked to see if there is newer current version that already fixes this error.
+ required: true
diff --git a/.github/ISSUE_TEMPLATE/feature_request.yml b/.github/ISSUE_TEMPLATE/feature_request.yml
new file mode 100644
index 0000000..2b82445
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/feature_request.yml
@@ -0,0 +1,36 @@
+name: Feature Request
+description: File a feature request
+title: "[Enhancement] "
+labels: ["enhancement"]
+
+body:
+ - type: markdown
+ attributes:
+ value: "Thanks for taking the time to fill out this feature request!"
+
+ - type: textarea
+ id: problem-description
+ attributes:
+ label: Problem description
+ description: Description of the problem that needs to be solved.
+ placeholder: What problem should be solved?
+ validations:
+ required: true
+
+ - type: textarea
+ id: solution-description
+ attributes:
+ label: Solution description
+ description: Description of a possible problem solution.
+ placeholder: Ideas for a solution?
+ validations:
+ required: false
+
+ - type: checkboxes
+ id: checklist
+ attributes:
+ label: Checklist
+ description: Please check off the following checklist after you have performed the corresponding actions
+ options:
+ - label: I have checked for [existing Github issues](https://github.com/infinityofspace/pkb_client/issues) for the same feature request.
+ required: true
diff --git a/.github/dependabot.yml b/.github/dependabot.yml
new file mode 100644
index 0000000..670670d
--- /dev/null
+++ b/.github/dependabot.yml
@@ -0,0 +1,11 @@
+version: 2
+updates:
+ - package-ecosystem: "pip"
+ directory: "/"
+ schedule:
+ interval: "daily"
+
+ - package-ecosystem: "github-actions"
+ directory: "/"
+ schedule:
+ interval: "daily"
diff --git a/.github/workflows/docs_publish.yml b/.github/workflows/docs_publish.yml
new file mode 100644
index 0000000..af146b2
--- /dev/null
+++ b/.github/workflows/docs_publish.yml
@@ -0,0 +1,32 @@
+name: build and publish docs
+
+on:
+ push:
+ tags:
+ - "v*"
+ branches: # for testing
+ - main
+
+jobs:
+ build:
+ name: build and publish docs
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python 3.12
+ uses: actions/setup-python@v5.3.0
+ with:
+ python-version: 3.12
+
+ - name: Install sphinx
+ run: pip install sphinx
+
+ - name: Build docs
+ run: cd docs && make html
+
+ - name: Publish docs to GitHub Pages
+ uses: peaceiris/actions-gh-pages@v4
+ with:
+ github_token: ${{ secrets.GITHUB_TOKEN }}
+ publish_dir: docs/build/html
diff --git a/.github/workflows/docs_source_update.yml b/.github/workflows/docs_source_update.yml
new file mode 100644
index 0000000..c80ee0b
--- /dev/null
+++ b/.github/workflows/docs_source_update.yml
@@ -0,0 +1,37 @@
+name: update docs source
+
+on:
+ push:
+ branches:
+ - main
+ paths:
+ - pkb_client/**
+
+jobs:
+ update:
+ name: update docs source
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python 3.12
+ uses: actions/setup-python@v5.3.0
+ with:
+ python-version: 3.12
+
+ - name: Install sphinx
+ run: pip install sphinx
+
+ - name: Build docs source
+ run: sphinx-apidoc -f -o docs/source pkb_client
+
+ - name: Open PR with changes
+ uses: peter-evans/create-pull-request@v7
+ with:
+ token: ${{ secrets.GITHUB_TOKEN }}
+ title: "[Docs]: update docs source"
+ commit-message: "update docs source"
+ branch: "docs_source_update"
+ delete-branch: true
+ label: "docs-update"
+ add-paths: docs/source
diff --git a/.github/workflows/formatting_check.yml b/.github/workflows/formatting_check.yml
new file mode 100644
index 0000000..fadbadd
--- /dev/null
+++ b/.github/workflows/formatting_check.yml
@@ -0,0 +1,26 @@
+name: formatting check
+
+on:
+ push:
+ pull_request:
+
+jobs:
+ formatting-check:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ python-version: [ "3.13" ]
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v5.3.0
+ with:
+ python-version: ${{ matrix.python-version }}
+
+ - name: Install requirements
+ run: pip install -r requirements.txt
+
+ - name: Check formatting
+ run: ruff format --check
diff --git a/.github/workflows/linting_check.yml b/.github/workflows/linting_check.yml
new file mode 100644
index 0000000..e223b36
--- /dev/null
+++ b/.github/workflows/linting_check.yml
@@ -0,0 +1,26 @@
+name: linting check
+
+on:
+ push:
+ pull_request:
+
+jobs:
+ linting-check:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ python-version: [ "3.13" ]
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v5.3.0
+ with:
+ python-version: ${{ matrix.python-version }}
+
+ - name: Install requirements
+ run: pip install -r requirements.txt
+
+ - name: Check formatting
+ run: ruff check
diff --git a/.github/workflows/pypi-publish-release.yml b/.github/workflows/pypi-publish-release.yml
index 2071faa..5150277 100644
--- a/.github/workflows/pypi-publish-release.yml
+++ b/.github/workflows/pypi-publish-release.yml
@@ -10,12 +10,12 @@ jobs:
name: Build distribution
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@master
+ - uses: actions/checkout@v4
- - name: Set up Python 3.6
- uses: actions/setup-python@v2
+ - name: Set up Python 3.9
+ uses: actions/setup-python@v5.3.0
with:
- python-version: 3.6
+ python-version: 3.9
- name: Install pep517
run: >-
@@ -34,7 +34,7 @@ jobs:
.
- name: Upload distribution artifact for other jobs
- uses: actions/upload-artifact@v2
+ uses: actions/upload-artifact@v4
with:
name: pkb_client_dist
path: dist/
@@ -45,7 +45,7 @@ jobs:
needs: build
steps:
- name: Download distribution from build job
- uses: actions/download-artifact@v2
+ uses: actions/download-artifact@v4
with:
name: pkb_client_dist
path: dist/
diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml
new file mode 100644
index 0000000..53e60e1
--- /dev/null
+++ b/.github/workflows/unit_tests.yml
@@ -0,0 +1,25 @@
+name: unit tests
+
+on:
+ push:
+
+jobs:
+ tests:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13" ]
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v5.3.0
+ with:
+ python-version: ${{ matrix.python-version }}
+
+ - name: Install requirements
+ run: pip install -r requirements.txt
+
+ - name: Run unit tests
+ run: python -m unittest tests/client.py
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..6d2ca3d
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,90 @@
+VERSION = 2.0.0
+RELEASE = 1
+
+# system paths
+RESULT_PATH = target
+RPMBUILD_PATH = ~/rpmbuild
+
+all: help
+
+help:
+ @printf '\nusuage make ...\n'
+ @printf ' clean -> remove results\n'
+ @printf ' package -> package archive for deploy .tar.xz\n'
+ @printf ' build-spec -> build python3-pkb-client.spec\n'
+ @printf ' build-srpm -> build python3-pkb-client-'${VERSION}-${RELEASE}'.src.rpm\n'
+ @printf ' build-rpm -> build python3-pkb-client-'${VERSION}-${RELEASE}'.noarch.rpm\n'
+
+# helper commands
+
+clean:
+ @printf '[INFO] removing '${RESULT_PATH}/'\n'
+ @rm -rf python3-pkb-client.spec ${RESULT_PATH}/
+
+package: ${RESULT_PATH}/python3-pkb-client-${VERSION}.tar.xz
+
+build-spec: python3-pkb-client.spec
+
+build-srpm: ${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.src.rpm
+
+build-rpm: ${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.noarch.rpm
+
+# file generators
+
+python3-pkb-client.spec:
+ @mkdir -p ${RESULT_PATH}/
+ @printf '[INFO] generating python3-pkb-client.spec\n' | tee -a ${RESULT_PATH}/build.log
+ @printf '%%global modname pkb_client\n\n' > python3-pkb-client.spec
+ @printf 'Name: python3-pkb-client\n' >> python3-pkb-client.spec
+ @printf 'Version: '${VERSION}'\n' >> python3-pkb-client.spec
+ @printf 'Release: '${RELEASE}'\n' >> python3-pkb-client.spec
+ @printf 'Obsoletes: %%{name} <= %%{version}\n' >> python3-pkb-client.spec
+ @printf 'Summary: Python client for the Porkbun API\n\n' >> python3-pkb-client.spec
+ @printf 'License: MIT License\n' >> python3-pkb-client.spec
+ @printf 'URL: https://github.com/infinityofspace/pkb_client/\n' >> python3-pkb-client.spec
+ @printf 'Source0: %%{name}-%%{version}.tar.xz\n\n' >> python3-pkb-client.spec
+ @printf 'BuildArch: noarch\n' >> python3-pkb-client.spec
+ @printf 'BuildRequires: python3-setuptools\n' >> python3-pkb-client.spec
+ @printf 'BuildRequires: python3-rpm-macros\n' >> python3-pkb-client.spec
+ @printf 'BuildRequires: python3-py\n\n' >> python3-pkb-client.spec
+ @printf '%%?python_enable_dependency_generator\n\n' >> python3-pkb-client.spec
+ @printf '%%description\n' >> python3-pkb-client.spec
+ @printf 'Python client for the Porkbun API\n\n' >> python3-pkb-client.spec
+ @printf '%%prep\n' >> python3-pkb-client.spec
+ @printf '%%autosetup -n %%{modname}_v%%{version}\n\n' >> python3-pkb-client.spec
+ @printf '%%build\n' >> python3-pkb-client.spec
+ @printf '%%py3_build\n\n' >> python3-pkb-client.spec
+ @printf '%%install\n' >> python3-pkb-client.spec
+ @printf '%%py3_install\n\n' >> python3-pkb-client.spec
+ @printf '%%files\n' >> python3-pkb-client.spec
+ @printf '%%doc Readme.md\n' >> python3-pkb-client.spec
+ @printf '%%license License\n' >> python3-pkb-client.spec
+ @printf '%%{_bindir}/pkb-client\n' >> python3-pkb-client.spec
+ @printf '%%{python3_sitelib}/%%{modname}/\n' >> python3-pkb-client.spec
+ @printf '%%{python3_sitelib}/%%{modname}-%%{version}*\n\n' >> python3-pkb-client.spec
+ @printf '%%changelog\n' >> python3-pkb-client.spec
+ @printf '...\n' >> python3-pkb-client.spec
+ @printf '\n' >> python3-pkb-client.spec
+
+${RESULT_PATH}/python3-pkb-client-${VERSION}.tar.xz:
+ @mkdir -p ${RESULT_PATH}/
+ @printf '[INFO] packing python3-pkb-client-'${VERSION}'.tar.xz\n' | tee -a ${RESULT_PATH}/build.log
+ @mkdir -p ${RESULT_PATH}/pkb_client_v${VERSION}
+ @cp -r pkb_client requirements.txt setup.py License Readme.md \
+ ${RESULT_PATH}/pkb_client_v${VERSION}/
+ @cd ${RESULT_PATH}; tar -I "pxz -9" -cf python3-pkb-client-${VERSION}.tar.xz pkb_client_v${VERSION}
+
+${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.src.rpm: ${RESULT_PATH}/python3-pkb-client-${VERSION}.tar.xz python3-pkb-client.spec
+ @printf '[INFO] building python3-pkb-client-'${VERSION}-${RELEASE}'.src.rpm\n' | tee -a ${RESULT_PATH}/build.log
+ @mkdir -p ${RPMBUILD_PATH}/SOURCES/
+ @cp ${RESULT_PATH}/python3-pkb-client-${VERSION}.tar.xz ${RPMBUILD_PATH}/SOURCES/
+ @rpmbuild -bs python3-pkb-client.spec &>> ${RESULT_PATH}/build.log
+ @mv ${RPMBUILD_PATH}/SRPMS/python3-pkb-client-${VERSION}-${RELEASE}.src.rpm ${RESULT_PATH}/
+
+${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.noarch.rpm: ${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.src.rpm
+ @printf '[INFO] building python3-pkb-client-'${VERSION}-${RELEASE}'.noarch.rpm\n' | tee -a ${RESULT_PATH}/build.log
+ @mkdir -p ${RPMBUILD_PATH}/SRPMS/
+ @cp ${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.src.rpm ${RPMBUILD_PATH}/SRPMS/
+ @rpmbuild --rebuild ${RESULT_PATH}/python3-pkb-client-${VERSION}-${RELEASE}.src.rpm &>> ${RESULT_PATH}/build.log
+ @mv ${RPMBUILD_PATH}/RPMS/noarch/python3-pkb-client-${VERSION}-${RELEASE}.noarch.rpm ${RESULT_PATH}/
+
diff --git a/Readme.md b/Readme.md
index c0058df..adf0808 100644
--- a/Readme.md
+++ b/Readme.md
@@ -1,9 +1,9 @@
# pkb_client
-Unofficial client for the Porkbun API
+Python client for the Porkbun API
---
-[![PyPI](https://img.shields.io/pypi/v/pkb_client)](https://pypi.org/project/pkb-client/) ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/pkb_client) [![Downloads](https://static.pepy.tech/personalized-badge/pkb-client?period=total&units=international_system&left_color=grey&right_color=orange&left_text=Total%20Downloads)](https://pepy.tech/project/pkb-client) ![GitHub](https://img.shields.io/github/license/infinityofspace/pkb_client) ![GitHub Workflow Status](https://img.shields.io/github/workflow/status/infinityofspace/pkb_client/Publish%20release%20distribution%20to%20PyPI)
+[![PyPI](https://img.shields.io/pypi/v/pkb_client)](https://pypi.org/project/pkb-client/) ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/pkb_client) [![Downloads](https://static.pepy.tech/personalized-badge/pkb-client?period=total&units=international_system&left_color=grey&right_color=orange&left_text=Total%20Downloads)](https://pepy.tech/project/pkb-client) ![GitHub](https://img.shields.io/github/license/infinityofspace/pkb_client) ![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/infinityofspace/pkb_client/pypi-publish-release.yml)
---
### Table of Contents
@@ -13,19 +13,24 @@ Unofficial client for the Porkbun API
1. [With pip (recommend)](#with-pip-recommend)
2. [From source](#from-source)
3. [Usage](#usage)
-4. [Third party notices](#third-party-notices)
-5. [License](#license)
+4. [Notes](#notes)
+5. [Third party notices](#third-party-notices)
+6. [Development](#development)
+ 1. [Setup environment](#setup-environment)
+ 2. [Tests](#tests)
+ 3. [Documentation](#documentation)
+7. [License](#license)
---
### About
-*pkb_client* is an unofficial client for the [Porkbun](https://porkbun.com) API. It supports the v3 of the API. You can
-find the official documentation of the Porkbun API [here](https://porkbun.com/api/json/v3/documentation).
+*pkb_client* is a python client for the [Porkbun](https://porkbun.com) API. It supports the v3 of the API. You can
+find the official documentation of the Porkbun API [here](https://api.porkbun.com/api/json/v3/documentation).
### Installation
-This project only works with Python 3, make sure you have at least Python 3.6 installed.
+This project only works with Python 3, make sure you have at least Python 3.9 installed.
#### With pip (recommend)
@@ -46,14 +51,16 @@ pip3 install pkb_client -U
```commandline
git clone https://github.com/infinityofspace/pkb_client.git
cd pkb_client
-pip install .
+pip3 install .
```
### Usage
Each request must be made with the API key and secret. You can easily create them at Porkbun. Just follow
-the [official instructions](https://porkbun.com/api/json/v3/documentation#Authentication). Make sure that you explicitly
-activate the API usage for your domain at the end.
+the [official instructions](https://api.porkbun.com/api/json/v3/documentation#Authentication). Make sure that you explicitly
+activate the API usage for your domain at the end. There are two ways to use `pkb_client`. The first way is to use it as
+a Python module. See the [module documentation](https://infinityofspace.github.io/pkb_client) for more information. The
+second way is to use the module from the command line, see below for more information.
After installation *pkb_client* is available under the command `pkb-client`.
@@ -64,10 +71,11 @@ pkb-client -k <YOUR-API-KEY> -s <YOUR-API-SECRET> ping
```
If you don't want to specify the key and secret in the program call, because for example the command line calls are
-logged and you don't want to log the API access, then you can also omit both arguments and *pkb-client* asks for a user
-input.
+logged, and you don't want to log the API access, then you can also set the environment variables `PKB_API_KEY` and
+`PKB_API_SECRET`. If you not specify API key and secret in any way, *pkb-client* asks for a user input. The command line
+arguments of the API key and secret have the highest priority.
-You can see an overview of all usable API methods via the help:
+You can see an overview of all usable cli methods via the help:
```commandline
pkb-client -h
@@ -121,19 +129,77 @@ Remove all existing DNS records of the domain `example.com` and restore the DNS
pkb-client -k <YOUR-API-KEY> -s <YOUR-API-SECRET> dns-import example.com dns_recods.json clear
```
-*Note:* The import function uses the record ID to distinguish DNS records.
+*Note:* The `dns-import` function uses the record ID to distinguish DNS records.
+
+### Notes
+
+Currently, TTL smaller than `600` are ignored by the Porkbun API and the minimum value is `600`, although a minimum
+value of `300` is [supported](https://api.porkbun.com/api/json/v3/documentation) and allowed by the RFC standard. However,
+you can do TTL smaller than `600` via the web dashboard.
### Third party notices
All modules used by this project are listed below:
-| Name | License|
-|:---:|:---:|
-| [requests](https://github.com/psf/requests) | [Apache 2.0](https://raw.githubusercontent.com/psf/requests/master/LICENSE) |
-| [setuptools](https://github.com/pypa/setuptools) | [MIT](https://raw.githubusercontent.com/pypa/setuptools/main/LICENSE) |
+| Name | License |
+|:-----------------------------------------------------:|:-------------------------------------------------------------------------------------------------:|
+| [requests](https://github.com/psf/requests) | [Apache 2.0](https://raw.githubusercontent.com/psf/requests/master/LICENSE) |
+| [setuptools](https://github.com/pypa/setuptools) | [MIT](https://raw.githubusercontent.com/pypa/setuptools/main/LICENSE) |
+| [sphinx](https://github.com/sphinx-doc/sphinx) | [BSD 2 Clause](https://raw.githubusercontent.com/sphinx-doc/sphinx/refs/heads/master/LICENSE.rst) |
+| [dnspython](https://github.com/rthalley/dnspython) | [ISC](https://raw.githubusercontent.com/rthalley/dnspython/refs/heads/main/LICENSEc) |
+| [responses](https://github.com/getsentry/responses) | [Apache 2.0](https://raw.githubusercontent.com/getsentry/responses/refs/heads/master/LICENSE) |
+| [ruff](https://github.com/astral-sh/ruff) | [MIT](https://raw.githubusercontent.com/astral-sh/ruff/refs/heads/main/LICENSE) |
Furthermore, this readme file contains embeddings of [Shields.io](https://github.com/badges/shields)
-and [PePy](https://github.com/psincraian/pepy). The tests use [ipify](https://github.com/rdegges/ipify-api).
+and [PePy](https://github.com/psincraian/pepy) images.
+
+_This project is not associated with Porkbun LLC._
+
+### Development
+
+#### Setup environment
+
+First get the source code:
+
+```commandline
+git clone https://github.com/infinityofspace/pkb_client.git
+cd pkb_client
+```
+
+Now create a virtual environment, activate it and install all dependencies with the following commands:
+
+```commandline
+python3 -m venv venv
+source venv/bin/activate
+pip3 install -r requirements.txt
+```
+
+Now you can start developing.
+
+Feel free to contribute to this project by creating a pull request.
+Before you create a pull request, make sure that you code meets the following requirements (you can use the specified
+commands to check/fulfill the requirements):
+
+- check unit tests: `python -m unittest tests/*.py`
+- format the code: `ruff format`
+- check linting errors: `ruff check`
+
+#### Tests
+
+You can run the tests with the following command:
+
+```commandline
+python -m unittest tests/*.py
+```
+
+#### Documentation
+
+To build the documentation you can use the following commands:
+
+```commandline
+sphinx-apidoc -f -o docs/source pkb_client
+cd docs && make html
+```
### License
diff --git a/docs/.gitignore b/docs/.gitignore
new file mode 100644
index 0000000..378eac2
--- /dev/null
+++ b/docs/.gitignore
@@ -0,0 +1 @@
+build
diff --git a/docs/Makefile b/docs/Makefile
new file mode 100644
index 0000000..d0c3cbf
--- /dev/null
+++ b/docs/Makefile
@@ -0,0 +1,20 @@
+# Minimal makefile for Sphinx documentation
+#
+
+# You can set these variables from the command line, and also
+# from the environment for the first two.
+SPHINXOPTS ?=
+SPHINXBUILD ?= sphinx-build
+SOURCEDIR = source
+BUILDDIR = build
+
+# Put it first so that "make" without argument is like "make help".
+help:
+ @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
+
+.PHONY: help Makefile
+
+# Catch-all target: route all unknown targets to Sphinx using the new
+# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
+%: Makefile
+ @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
diff --git a/docs/make.bat b/docs/make.bat
new file mode 100644
index 0000000..dc1312a
--- /dev/null
+++ b/docs/make.bat
@@ -0,0 +1,35 @@
+@ECHO OFF
+
+pushd %~dp0
+
+REM Command file for Sphinx documentation
+
+if "%SPHINXBUILD%" == "" (
+ set SPHINXBUILD=sphinx-build
+)
+set SOURCEDIR=source
+set BUILDDIR=build
+
+%SPHINXBUILD% >NUL 2>NUL
+if errorlevel 9009 (
+ echo.
+ echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
+ echo.installed, then set the SPHINXBUILD environment variable to point
+ echo.to the full path of the 'sphinx-build' executable. Alternatively you
+ echo.may add the Sphinx directory to PATH.
+ echo.
+ echo.If you don't have Sphinx installed, grab it from
+ echo.https://www.sphinx-doc.org/
+ exit /b 1
+)
+
+if "%1" == "" goto help
+
+%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
+goto end
+
+:help
+%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
+
+:end
+popd
diff --git a/docs/source/conf.py b/docs/source/conf.py
new file mode 100644
index 0000000..7a07b51
--- /dev/null
+++ b/docs/source/conf.py
@@ -0,0 +1,30 @@
+# Configuration file for the Sphinx documentation builder.
+#
+# For the full list of built-in configuration values, see the documentation:
+# https://www.sphinx-doc.org/en/master/usage/configuration.html
+
+# -- Project information -----------------------------------------------------
+# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information
+
+import sys
+
+sys.path.append("..")
+
+project = "pkb_client"
+copyright = "2023-2024, infinityofspace"
+author = "infinityofspace"
+release = "v2.0.0"
+
+# -- General configuration ---------------------------------------------------
+# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
+
+extensions = ["sphinx.ext.autodoc", "sphinx.ext.viewcode", "sphinx.ext.githubpages"]
+
+templates_path = ["_templates"]
+exclude_patterns = []
+
+# -- Options for HTML output -------------------------------------------------
+# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
+
+html_theme = "alabaster"
+html_static_path = ["_static"]
diff --git a/docs/source/index.rst b/docs/source/index.rst
new file mode 100644
index 0000000..46f8eb1
--- /dev/null
+++ b/docs/source/index.rst
@@ -0,0 +1,29 @@
+Welcome to pkb_client's documentation!
+======================================
+
+.. toctree::
+ :maxdepth: 2
+ :caption: Contents:
+
+ installation
+ usage
+ modules
+ migration_guide
+
+About
++++++
+
+*pkb_client* is a python client for the `Porkbun <https://porkbun.com>`_ API. It supports the v3 of the API. You can
+find the official documentation of the Porkbun API `here <https://api.porkbun.com/api/json/v3/documentation>`_.
+
+Link to the source code: `Github <https://github.com/infinityofspace/pkb_client>`_
+
+*Note:* This project is not associated with Porkbun LLC.
+
+
+Indices and tables
+++++++++++++++++++
+
+* :ref:`genindex`
+* :ref:`modindex`
+* :ref:`search`
diff --git a/docs/source/installation.rst b/docs/source/installation.rst
new file mode 100644
index 0000000..6e52bbf
--- /dev/null
+++ b/docs/source/installation.rst
@@ -0,0 +1,17 @@
+Installation
+============
+
+You can either install the package from the Python Package Index (PyPI) using pip:
+
+.. code-block:: bash
+
+ pip3 install pkb_client
+
+
+or you can install the package from the source code:
+
+.. code-block:: bash
+
+ git clone https://github.com/infinityofspace/pkb_client.git
+ cd pkb_client
+ pip3 install .
diff --git a/docs/source/migration_guide.rst b/docs/source/migration_guide.rst
new file mode 100644
index 0000000..0fea446
--- /dev/null
+++ b/docs/source/migration_guide.rst
@@ -0,0 +1,54 @@
+Migration Guide
+===============
+
+From v1 to v2
++++++++++++++
+
+The version 2 of the package is a major release that introduces a lot of changes. The main changes are:
+
+- support for new API methods
+- package is now a proper Python package with focus on usage as a library (in general more object oriented):
+ - return types are now objects instead of tuples or dictionaries (except domain pricing method)
+ - improved and more consistent error handling
+ - fixed method signatures/no more additional keyworded arguments
+
+These changes are not backward compatible with the version 1 of the package. If you are using the version 1 of the
+package, you will need to update your code to work with the version 2.
+
+To migrate your code from the version 1 to the version 2, follow these steps:
+
+1. Update the package to the version 2 or higher:
+ - if you are using the package from PyPI, run the following command:
+ .. code-block:: bash
+
+ pip3 install --upgrade pkb_client
+ - if you are using the package from the source code, run the following commands:
+ .. code-block:: bash
+
+ git fetch
+ git checkout v2.0.0 # or any later tag
+ pip3 install .
+2. Remove any additional keyworded arguments from all `PKBClient` methods. The methods now have fixed signatures.
+3. Refactor the usage of the following methods:
+ - `PKBClient.dns_create`:
+ - renamed to `PKBClient.create_dns_record`
+ - the method argument `record_type` needs to be enum of :class:`DNSRecordType <pkb_client.client.dns.DNSRecordType>`
+ - `PKBClient.dns_edit`:
+ - renamed to `PKBClient.update_dns_record`
+ - the method argument `record_type` needs to be enum of :class:`DNSRecordType <pkb_client.client.dns.DNSRecordType>`
+ - `PKBClient.dns_delete`:
+ - renamed to `PKBClient.delete_dns_records`
+ - `PKBClient.dns_retrieve`:
+ - renamed to `PKBClient.get_dns_records`
+ - return type is now a list of :class:`DNSRecord <pkb_client.client.dns.DNSRecord>`
+ - `PKBClient.dns_export`:
+ - renamed to `PKBClient.export_dns_records`
+ - the methods argument `filename` is renamed to `filepath`
+ - `PKBClient.dns_import`:
+ - renamed to `PKBClient.import_dns_records`
+ - the methods argument `filename` is renamed to `filepath`
+ - `PKBClient.get_domain_pricing`:
+ - method is not static anymore, you need to create an instance of `PKBClient` to use it
+ - `PKBClient.ssl_retrieve`:
+ - renamed to `PKBClient.get_ssl_bundle`
+ - return type is now :class:`SSLCertificate <pkb_client.client.ssl_cert.SSLCertBundle>`
diff --git a/docs/source/modules.rst b/docs/source/modules.rst
new file mode 100644
index 0000000..c1ae0f3
--- /dev/null
+++ b/docs/source/modules.rst
@@ -0,0 +1,7 @@
+pkb_client
+==========
+
+.. toctree::
+ :maxdepth: 4
+
+ pkb_client
diff --git a/docs/source/pkb_client.cli.rst b/docs/source/pkb_client.cli.rst
new file mode 100644
index 0000000..29fc20f
--- /dev/null
+++ b/docs/source/pkb_client.cli.rst
@@ -0,0 +1,21 @@
+pkb\_client.cli package
+=======================
+
+Submodules
+----------
+
+pkb\_client.cli.cli module
+--------------------------
+
+.. automodule:: pkb_client.cli.cli
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+Module contents
+---------------
+
+.. automodule:: pkb_client.cli
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/docs/source/pkb_client.client.rst b/docs/source/pkb_client.client.rst
new file mode 100644
index 0000000..fd677b1
--- /dev/null
+++ b/docs/source/pkb_client.client.rst
@@ -0,0 +1,61 @@
+pkb\_client.client package
+==========================
+
+Submodules
+----------
+
+pkb\_client.client.bind\_file module
+------------------------------------
+
+.. automodule:: pkb_client.client.bind_file
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+pkb\_client.client.client module
+--------------------------------
+
+.. automodule:: pkb_client.client.client
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+pkb\_client.client.dns module
+-----------------------------
+
+.. automodule:: pkb_client.client.dns
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+pkb\_client.client.domain module
+--------------------------------
+
+.. automodule:: pkb_client.client.domain
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+pkb\_client.client.forwarding module
+------------------------------------
+
+.. automodule:: pkb_client.client.forwarding
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+pkb\_client.client.ssl\_cert module
+-----------------------------------
+
+.. automodule:: pkb_client.client.ssl_cert
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+Module contents
+---------------
+
+.. automodule:: pkb_client.client
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/docs/source/pkb_client.rst b/docs/source/pkb_client.rst
new file mode 100644
index 0000000..161588e
--- /dev/null
+++ b/docs/source/pkb_client.rst
@@ -0,0 +1,19 @@
+pkb\_client package
+===================
+
+Subpackages
+-----------
+
+.. toctree::
+ :maxdepth: 4
+
+ pkb_client.cli
+ pkb_client.client
+
+Module contents
+---------------
+
+.. automodule:: pkb_client
+ :members:
+ :undoc-members:
+ :show-inheritance:
diff --git a/docs/source/usage.rst b/docs/source/usage.rst
new file mode 100644
index 0000000..89abc1e
--- /dev/null
+++ b/docs/source/usage.rst
@@ -0,0 +1,89 @@
+Usage
+=====
+
+Module
+++++++
+
+The module provides the :class:`PKBClient <pkb_client.client.client.PKBClient>` class, which is used to interact with the PKB API.
+To use the PKB client, you need to create an instance of the :class:`PKBClient <pkb_client.client.client.PKBClient>` class:
+
+.. code-block:: python
+
+ from pkb_client.client import PKBClient
+
+ pkb = PKBClient(
+ api_key="<your-api-key>",
+ secret_api_key="<your-secret-api-key>",
+ api_endpoint="https://api.porkbun.com/api/json/v3",
+ )
+
+Whereby the `api_key` and `secret_api_key` are optional and only required if you want to use the PKB API with API endpoints
+that require authentication (e.g. to manage dns records of your domains). Moreover the api_endpoint is also optional and
+defaults to the latest version of the official PKB API endpoint.
+
+For example to get the domain pricing, which does not require authentication, you can use the
+:func:`get_domain_pricing <pkb_client.client.client.PKBClient.get_domain_pricing>` method:
+
+.. code-block:: python
+
+ from pkb_client.client import PKBClient
+
+ pkb = PKBClient()
+ domain_pricing = pkb.get_domain_pricing()
+ print(domain_pricing)
+
+You can find all available methods in the :class:`PKBClient <pkb_client.client.client.PKBClient>` class documentation.
+
+CLI
++++
+
+The module also provides a CLI to interact with the PKB API. For example to get the domain pricing, you can use the `get-domain-pricing` command:
+
+.. code-block:: bash
+
+ pkb-client domain-pricing
+
+All available commands can be listed with the `--help` option:
+
+.. code-block:: bash
+
+ pkb-client --help
+
+.. code-block:: bash
+
+ usage: pkb-client [-h] [-k KEY] [-s SECRET] [--debug] [--endpoint ENDPOINT]
+ {ping,create-dns-record,update-dns-record,delete-dns-records,get-dns-records,export-dns-records,export-bind-dns-records,import-dns-records,import-bind-dns-records,get-domain-pricing,get-ssl-bundle,update-dns-servers,get-dns-servers,get-domains,get-url-forwards,create-url-forward,delete-url-forward}
+ ...
+
+ Python client for the Porkbun API
+
+ positional arguments:
+ {ping,create-dns-record,update-dns-record,delete-dns-records,get-dns-records,export-dns-records,export-bind-dns-records,import-dns-records,import-bind-dns-records,get-domain-pricing,get-ssl-bundle,update-dns-servers,get-dns-servers,get-domains,get-url-forwards,create-url-forward,delete-url-forward}
+ Supported API methods
+ ping Ping the API Endpoint
+ create-dns-record Create a new DNS record.
+ update-dns-record Edit an existing DNS record.
+ delete-dns-records Delete an existing DNS record.
+ get-dns-records Get all DNS records.
+ export-dns-records Save all DNS records to a local json file.
+ export-bind-dns-records
+ Save all DNS records to a local BIND file.
+ import-dns-records Restore all DNS records from a local json file.
+ import-bind-dns-records
+ Restore all DNS records from a local BIND file.
+ get-domain-pricing Get the pricing for Porkbun domains.
+ get-ssl-bundle Retrieve an SSL bundle for given domain.
+ update-dns-servers Update the DNS servers for a domain.
+ get-dns-servers Retrieve the DNS servers for a domain.
+ get-domains List all domains in this account in chunks of 1000.
+ get-url-forwards Retrieve all URL forwards.
+ create-url-forward Create a new URL forward.
+ delete-url-forward Delete an existing URL forward.
+
+ options:
+ -h, --help show this help message and exit
+ -k KEY, --key KEY The API key used for Porkbun API calls (usually starts with "pk").
+ -s SECRET, --secret SECRET
+ The API secret used for Porkbun API calls (usually starts with "sk").
+ --debug Enable debug mode.
+ --endpoint ENDPOINT The API endpoint to use.
diff --git a/pkb_client/__init__.py b/pkb_client/__init__.py
index 17a5bda..d5096ef 100644
--- a/pkb_client/__init__.py
+++ b/pkb_client/__init__.py
@@ -1 +1 @@
-__version__ = "v1.2"
+__version__ = "v2.0.0"
diff --git a/pkb_client/cli.py b/pkb_client/cli.py
deleted file mode 100644
index 4e87373..0000000
--- a/pkb_client/cli.py
+++ /dev/null
@@ -1,134 +0,0 @@
-import argparse
-import pprint
-import textwrap
-
-from pkb_client.client import PKBClient, SUPPORTED_DNS_RECORD_TYPES, DNSRestoreMode
-
-
-def main():
- parser = argparse.ArgumentParser(
- description="Unofficial client for the Porkbun API",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog=textwrap.dedent("""
- License:
- MIT - Copyright (c) Marvin Heptner
-
- Copyright notices:
- requests:
- Project: https://github.com/psf/requests
- License: Apache-2.0 https://github.com/psf/requests/blob/master/LICENSE
- setuptools:
- Project: https://github.com/pypa/setuptools
- License: MIT https://raw.githubusercontent.com/pypa/setuptools/main/LICENSE
- """)
- )
-
- parser.add_argument("-k", "--key", help="The API key used for Porkbun API calls (usually starts with \"pk\").")
- parser.add_argument("-s", "--secret",
- help="The API secret used for Porkbun API calls (usually starts with \"sk\").")
-
- subparsers = parser.add_subparsers(help="Supported API methods")
-
- parser_ping = subparsers.add_parser("ping", help="Ping the API Endpoint")
- parser_ping.set_defaults(func=PKBClient.ping)
-
- parser_dns_create = subparsers.add_parser("dns-create", help="Create a new DNS record.")
- parser_dns_create.set_defaults(func=PKBClient.dns_create)
- parser_dns_create.add_argument("domain", help="The domain for which the new DNS record should be created.")
- parser_dns_create.add_argument("record_type", help="The type of the new DNS record.",
- choices=SUPPORTED_DNS_RECORD_TYPES)
- parser_dns_create.add_argument("content", help="The content of the new DNS record.")
- parser_dns_create.add_argument("--name",
- help="The subdomain for which the new DNS record should be created."
- "The * can be used for a wildcard DNS record."
- "If not used, then a DNS record for the root domain will be created",
- required=False)
- parser_dns_create.add_argument("--ttl", type=int, help="The ttl of the new DNS record.", required=False)
- parser_dns_create.add_argument("--prio", type=int, help="The priority of the new DNS record.", required=False)
-
- parser_dns_edit = subparsers.add_parser("dns-edit", help="Edit an existing DNS record.")
- parser_dns_edit.set_defaults(func=PKBClient.dns_edit)
- parser_dns_edit.add_argument("domain", help="The domain for which the DNS record should be edited.")
- parser_dns_edit.add_argument("record_id", help="The id of the DNS record which should be edited.")
- parser_dns_edit.add_argument("record_type", help="The new type of the DNS record.",
- choices=SUPPORTED_DNS_RECORD_TYPES)
- parser_dns_edit.add_argument("content", help="The new content of the DNS record.")
- parser_dns_edit.add_argument("--name",
- help="The new value of the subdomain for which the DNS record should apply. "
- "The * can be used for a wildcard DNS record. If not set, the record will "
- "be set for the root domain.",
- required=False)
- parser_dns_edit.add_argument("--ttl", type=int, help="The new ttl of the DNS record.", required=False)
- parser_dns_edit.add_argument("--prio", type=int, help="The new priority of the DNS record.", required=False)
-
- parser_dns_delete = subparsers.add_parser("dns-delete", help="Delete an existing DNS record.")
- parser_dns_delete.set_defaults(func=PKBClient.dns_delete)
- parser_dns_delete.add_argument("domain", help="The domain for which the DNS record should be deleted.")
- parser_dns_delete.add_argument("record_id", help="The id of the DNS record which should be deleted.")
-
- parser_dns_receive = subparsers.add_parser("dns-retrieve", help="Get all DNS records.")
- parser_dns_receive.set_defaults(func=PKBClient.dns_retrieve)
- parser_dns_receive.add_argument("domain", help="The domain for which the DNS record should be retrieved.")
-
- parser_dns_export = subparsers.add_parser("dns-export", help="Save all DNS records to a local file as json.")
- parser_dns_export.set_defaults(func=PKBClient.dns_export)
- parser_dns_export.add_argument("domain",
- help="The domain for which the DNS record should be retrieved and saved.")
- parser_dns_export.add_argument("filename", help="The filename where to save the exported DNS records.")
-
- parser_dns_import = subparsers.add_parser("dns-import", help="Restore all DNS records from a local file.",
- formatter_class=argparse.RawTextHelpFormatter)
- parser_dns_import.set_defaults(func=PKBClient.dns_import)
- parser_dns_import.add_argument("domain", help="The domain for which the DNS record should be restored.")
- parser_dns_import.add_argument("filename", help="The filename from which the DNS records are to be restored.")
- parser_dns_import.add_argument("restore_mode", help="""The restore mode (DNS records are identified by the record id):
- clean: remove all existing DNS records and restore all DNS records from the provided file
- replace: replace only existing DNS records with the DNS records from the provided file, but do not create any new DNS records
- keep: keep the existing DNS records and only create new ones for all DNS records from the specified file if they do not exist
- """, type=DNSRestoreMode.from_string, choices=list(DNSRestoreMode))
-
- parser_domain_pricing = subparsers.add_parser("domain-pricing", help="Get the pricing for porkbun domains.")
- parser_domain_pricing.set_defaults(func=PKBClient.get_domain_pricing)
-
- parser_ssl_retrieve = subparsers.add_parser("ssl-retrieve", help="Retrieve an SSL bundle for given domain.")
- parser_ssl_retrieve.set_defaults(func=PKBClient.ssl_retrieve)
- parser_ssl_retrieve.add_argument("domain", help="The domain for which the SSL bundle should be retrieve.")
-
- args = parser.parse_args()
-
- if not hasattr(args, "func"):
- raise argparse.ArgumentError(None, "No method specified. Please provide a method and try again.")
-
- pp = pprint.PrettyPrinter(indent=4)
-
- # call the static methods
- if args.func == PKBClient.get_domain_pricing:
- pp.pprint(args.func(**vars(args)))
- exit(0)
-
- if args.key is None:
- while True:
- api_key = input("Please enter your API key you got from Porkbun (usually starts with \"pk\"): ")
- if len(api_key) == 0:
- print("The api key can not be empty.")
- else:
- break
- else:
- api_key = args.key
-
- if args.secret is None:
- while True:
- api_secret = input("Please enter your API key secret you got from Porkbun (usually starts with \"sk\"): ")
- if len(api_secret) == 0:
- print("The api key secret can not be empty.")
- else:
- break
- else:
- api_secret = args.secret
-
- pkb_client = PKBClient(api_key, api_secret)
- pp.pprint(args.func(pkb_client, **vars(args)))
-
-
-if __name__ == "__main__":
- main()
diff --git a/pkb_client/cli/__init__.py b/pkb_client/cli/__init__.py
new file mode 100644
index 0000000..ed32c05
--- /dev/null
+++ b/pkb_client/cli/__init__.py
@@ -0,0 +1,3 @@
+from .cli import main
+
+__all__ = ["main"]
diff --git a/pkb_client/cli/cli.py b/pkb_client/cli/cli.py
new file mode 100644
index 0000000..4bf53cd
--- /dev/null
+++ b/pkb_client/cli/cli.py
@@ -0,0 +1,347 @@
+import argparse
+import dataclasses
+import json
+import os
+import textwrap
+from datetime import datetime
+
+from pkb_client.client import PKBClient, API_ENDPOINT
+from pkb_client.client.dns import DNSRecordType, DNSRestoreMode
+from pkb_client.client.forwarding import URLForwardingType
+
+
+class CustomJSONEncoder(json.JSONEncoder):
+ def default(self, o):
+ if isinstance(o, datetime):
+ return o.isoformat()
+ if dataclasses.is_dataclass(o):
+ return dataclasses.asdict(o)
+ return super().default(o)
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Python client for the Porkbun API",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog=textwrap.dedent("""
+ License:
+ MIT - Copyright (c) Marvin Heptner
+ """),
+ )
+
+ parser.add_argument(
+ "-k",
+ "--key",
+ help='The API key used for Porkbun API calls (usually starts with "pk").',
+ )
+ parser.add_argument(
+ "-s",
+ "--secret",
+ help='The API secret used for Porkbun API calls (usually starts with "sk").',
+ )
+ parser.add_argument("--debug", help="Enable debug mode.", action="store_true")
+ parser.add_argument(
+ "--endpoint", help="The API endpoint to use.", default=API_ENDPOINT
+ )
+
+ subparsers = parser.add_subparsers(help="Supported API methods")
+
+ parser_ping = subparsers.add_parser("ping", help="Ping the API Endpoint")
+ parser_ping.set_defaults(func=PKBClient.ping)
+
+ parser_dns_create = subparsers.add_parser(
+ "create-dns-record", help="Create a new DNS record."
+ )
+ parser_dns_create.set_defaults(func=PKBClient.create_dns_record)
+ parser_dns_create.add_argument(
+ "domain", help="The domain for which the new DNS record should be created."
+ )
+ parser_dns_create.add_argument(
+ "record_type",
+ help="The type of the new DNS record.",
+ choices=list(DNSRecordType),
+ )
+ parser_dns_create.add_argument("content", help="The content of the new DNS record.")
+ parser_dns_create.add_argument(
+ "--name",
+ help="The subdomain for which the new DNS record should be created."
+ "The * can be used for a wildcard DNS record."
+ "If not used, then a DNS record for the root domain will be created",
+ required=False,
+ )
+ parser_dns_create.add_argument(
+ "--ttl", type=int, help="The ttl of the new DNS record.", required=False
+ )
+ parser_dns_create.add_argument(
+ "--prio", type=int, help="The priority of the new DNS record.", required=False
+ )
+
+ parser_dns_edit = subparsers.add_parser(
+ "update-dns-record", help="Edit an existing DNS record."
+ )
+ parser_dns_edit.set_defaults(func=PKBClient.update_dns_record)
+ parser_dns_edit.add_argument(
+ "domain", help="The domain for which the DNS record should be edited."
+ )
+ parser_dns_edit.add_argument(
+ "record_id", help="The id of the DNS record which should be edited."
+ )
+ parser_dns_edit.add_argument(
+ "record_type",
+ help="The new type of the DNS record.",
+ choices=list(DNSRecordType),
+ )
+ parser_dns_edit.add_argument("content", help="The new content of the DNS record.")
+ parser_dns_edit.add_argument(
+ "--name",
+ help="The new value of the subdomain for which the DNS record should apply. "
+ "The * can be used for a wildcard DNS record. If not set, the record will "
+ "be set for the root domain.",
+ required=False,
+ )
+ parser_dns_edit.add_argument(
+ "--ttl", type=int, help="The new ttl of the DNS record.", required=False
+ )
+ parser_dns_edit.add_argument(
+ "--prio", type=int, help="The new priority of the DNS record.", required=False
+ )
+
+ parser_dns_delete = subparsers.add_parser(
+ "delete-dns-records", help="Delete an existing DNS record."
+ )
+ parser_dns_delete.set_defaults(func=PKBClient.delete_dns_record)
+ parser_dns_delete.add_argument(
+ "domain", help="The domain for which the DNS record should be deleted."
+ )
+ parser_dns_delete.add_argument(
+ "record_id", help="The id of the DNS record which should be deleted."
+ )
+
+ parser_dns_receive = subparsers.add_parser(
+ "get-dns-records", help="Get all DNS records."
+ )
+ parser_dns_receive.set_defaults(func=PKBClient.get_dns_records)
+ parser_dns_receive.add_argument(
+ "domain", help="The domain for which the DNS record should be retrieved."
+ )
+
+ parser_dns_export = subparsers.add_parser(
+ "export-dns-records", help="Save all DNS records to a local json file."
+ )
+ parser_dns_export.set_defaults(func=PKBClient.export_dns_records)
+ parser_dns_export.add_argument(
+ "domain",
+ help="The domain for which the DNS record should be retrieved and saved.",
+ )
+ parser_dns_export.add_argument(
+ "filepath", help="The filepath where to save the exported DNS records."
+ )
+
+ parser_dns_export_bind = subparsers.add_parser(
+ "export-bind-dns-records", help="Save all DNS records to a local BIND file."
+ )
+ parser_dns_export_bind.set_defaults(func=PKBClient.export_bind_dns_records)
+ parser_dns_export_bind.add_argument(
+ "domain",
+ help="The domain for which the DNS record should be retrieved and saved.",
+ )
+ parser_dns_export_bind.add_argument(
+ "filepath", help="The filepath where to save the exported DNS records."
+ )
+
+ parser_dns_import = subparsers.add_parser(
+ "import-dns-records",
+ help="Restore all DNS records from a local json file.",
+ formatter_class=argparse.RawTextHelpFormatter,
+ )
+ parser_dns_import.set_defaults(func=PKBClient.import_dns_records)
+ parser_dns_import.add_argument(
+ "domain", help="The domain for which the DNS record should be restored."
+ )
+ parser_dns_import.add_argument(
+ "filepath", help="The filepath from which the DNS records are to be restored."
+ )
+ parser_dns_import.add_argument(
+ "restore_mode",
+ help="""The restore mode (DNS records are identified by the record type, name and prio if supported):
+ clear: remove all existing DNS records and restore all DNS records from the provided file
+ replace: replace only existing DNS records with the DNS records from the provided file, but do not create any new DNS records
+ keep: keep the existing DNS records and only create new ones for all DNS records from the specified file if they do not exist
+ """,
+ type=DNSRestoreMode.from_string,
+ choices=list(DNSRestoreMode),
+ )
+
+ parser_dns_import_bind = subparsers.add_parser(
+ "import-bind-dns-records",
+ help="Restore all DNS records from a local BIND file.",
+ formatter_class=argparse.RawTextHelpFormatter,
+ )
+ parser_dns_import_bind.set_defaults(func=PKBClient.import_bind_dns_records)
+ parser_dns_import_bind.add_argument(
+ "filepath", help="The filepath from which the DNS records are to be restored."
+ )
+ parser_dns_import_bind.add_argument(
+ "restore_mode",
+ help="""The restore mode (DNS records are identified by the record id):
+ clear: remove all existing DNS records and restore all DNS records from the provided file
+ """,
+ type=DNSRestoreMode.from_string,
+ choices=[DNSRestoreMode.clear],
+ )
+
+ parser_domain_pricing = subparsers.add_parser(
+ "get-domain-pricing", help="Get the pricing for Porkbun domains."
+ )
+ parser_domain_pricing.set_defaults(func=PKBClient.get_domain_pricing)
+
+ parser_ssl_retrieve = subparsers.add_parser(
+ "get-ssl-bundle", help="Retrieve an SSL bundle for given domain."
+ )
+ parser_ssl_retrieve.set_defaults(func=PKBClient.get_ssl_bundle)
+ parser_ssl_retrieve.add_argument(
+ "domain", help="The domain for which the SSL bundle should be retrieve."
+ )
+
+ parser_update_dns_server = subparsers.add_parser(
+ "update-dns-servers", help="Update the DNS servers for a domain."
+ )
+ parser_update_dns_server.set_defaults(func=PKBClient.update_dns_servers)
+ parser_update_dns_server.add_argument(
+ "domain", help="The domain for which the DNS servers should be set."
+ )
+ parser_update_dns_server.add_argument(
+ "dns_servers", nargs="+", help="The DNS servers to be set."
+ )
+
+ parser_get_dns_server = subparsers.add_parser(
+ "get-dns-servers", help="Retrieve the DNS servers for a domain."
+ )
+ parser_get_dns_server.set_defaults(func=PKBClient.get_dns_servers)
+ parser_get_dns_server.add_argument(
+ "domain", help="The domain for which the DNS servers should be retrieved."
+ )
+
+ parser_list_domains = subparsers.add_parser(
+ "get-domains", help="List all domains in this account in chunks of 1000."
+ )
+ parser_list_domains.set_defaults(func=PKBClient.get_domains)
+ parser_list_domains.add_argument(
+ "--start",
+ type=int,
+ help="The start index of the list.",
+ default=0,
+ required=False,
+ )
+
+ parser_get_url_forward = subparsers.add_parser(
+ "get-url-forwards", help="Retrieve all URL forwards."
+ )
+ parser_get_url_forward.set_defaults(func=PKBClient.get_url_forwards)
+ parser_get_url_forward.add_argument(
+ "domain", help="The domain for which the URL forwards should be retrieved."
+ )
+
+ parser_add_url_forward = subparsers.add_parser(
+ "create-url-forward", help="Create a new URL forward."
+ )
+ parser_add_url_forward.set_defaults(func=PKBClient.create_url_forward)
+ parser_add_url_forward.add_argument(
+ "domain", help="The domain for which the new URL forward should be created."
+ )
+ parser_add_url_forward.add_argument(
+ "location",
+ help="The location to which the url forwarding should redirect.",
+ )
+ parser_add_url_forward.add_argument(
+ "type", help="The type of the url forwarding.", choices=list(URLForwardingType)
+ )
+ parser_add_url_forward.add_argument(
+ "--subdomain",
+ help="The subdomain for which the url forwarding should be added.",
+ required=False,
+ default="",
+ )
+ parser_add_url_forward.add_argument(
+ "--include-path",
+ help="Whether the path should be included in the url forwarding.",
+ action="store_true",
+ default=False,
+ )
+ parser_add_url_forward.add_argument(
+ "--wildcard",
+ help="Whether the url forwarding should be also applied to subdomains.",
+ action="store_true",
+ default=False,
+ )
+
+ parser_delete_url_forward = subparsers.add_parser(
+ "delete-url-forward", help="Delete an existing URL forward."
+ )
+ parser_delete_url_forward.set_defaults(func=PKBClient.delete_url_forward)
+ parser_delete_url_forward.add_argument(
+ "domain", help="The domain for which the URL forward should be deleted."
+ )
+ parser_delete_url_forward.add_argument(
+ "id", help="The id of the URL forward which should be deleted."
+ )
+
+ args = vars(parser.parse_args())
+
+ debug = args.pop("debug", False)
+
+ func = args.pop("func", None)
+ if not func:
+ raise argparse.ArgumentError(
+ None, "No method specified. Please provide a method and try again."
+ )
+
+ endpoint = args.pop("endpoint")
+ api_key = args.pop("key")
+ api_secret = args.pop("secret")
+
+ # call the api methods which do not require authentication
+ if func == PKBClient.get_domain_pricing:
+ pkb_client = PKBClient(api_endpoint=endpoint, debug=debug)
+ ret = func(pkb_client, **args)
+
+ print(json.dumps(ret, cls=CustomJSONEncoder, indent=4))
+ exit(0)
+
+ if api_key is None:
+ # try to get the api key from the environment variable or fallback to user input
+ api_key = os.environ.get("PKB_API_KEY", "")
+ if len(api_key.strip()) == 0:
+ while True:
+ api_key = input(
+ 'Please enter your API key you got from Porkbun (usually starts with "pk"): '
+ )
+ if len(api_key.strip()) == 0:
+ print("The api key can not be empty.")
+ else:
+ break
+
+ if api_secret is None:
+ # try to get the api secret from the environment variable or fallback to user input
+ api_secret = os.environ.get("PKB_API_SECRET", "")
+ if len(api_secret.strip()) == 0:
+ while True:
+ api_secret = input(
+ 'Please enter your API key secret you got from Porkbun (usually starts with "sk"): '
+ )
+ if len(api_secret.strip()) == 0:
+ print("The api key secret can not be empty.")
+ else:
+ break
+
+ pkb_client = PKBClient(
+ api_key=api_key, secret_api_key=api_secret, api_endpoint=endpoint, debug=debug
+ )
+
+ ret = func(pkb_client, **args)
+
+ print(json.dumps(ret, cls=CustomJSONEncoder, indent=4))
+
+
+if __name__ == "__main__":
+ main()
diff --git a/pkb_client/client.py b/pkb_client/client.py
deleted file mode 100644
index 48c0d82..0000000
--- a/pkb_client/client.py
+++ /dev/null
@@ -1,455 +0,0 @@
-import json
-import logging
-from enum import Enum
-from pathlib import Path
-from typing import Optional, Tuple
-from urllib.parse import urljoin
-
-import requests
-
-from pkb_client.helper import parse_dns_record
-
-API_ENDPOINT = "https://porkbun.com/api/json/v3/"
-SUPPORTED_DNS_RECORD_TYPES = ["A", "AAAA", "MX", "CNAME", "ALIAS", "TXT", "NS", "SRV", "TLSA", "CAA"]
-
-# prevent urllib3 to log request with the api key and secret
-logging.getLogger("urllib3").setLevel(logging.WARNING)
-
-
-class DNSRestoreMode(Enum):
- clear = 0
- replace = 1
- keep = 2
-
- def __str__(self):
- return self.name
-
- @staticmethod
- def from_string(a):
- try:
- return DNSRestoreMode[a]
- except KeyError:
- return a
-
-
-class PKBClient:
- """
- API client for Porkbun.
- """
-
- def __init__(self, api_key: str, secret_api_key: str) -> None:
- """
- Creates a new PKBClient object.
-
- :param api_key: the API key used for Porkbun API calls
- :param secret_api_key: the API secret used for Porkbun API calls
- """
-
- assert api_key is not None and len(api_key) > 0
- assert secret_api_key is not None and len(secret_api_key) > 0
-
- self.api_key = api_key
- self.secret_api_key = secret_api_key
-
- def ping(self, **kwargs) -> str:
- """
- API ping method: get the current public ip address of the requesting system; can also be used for auth checking
- see https://porkbun.com/api/json/v3/documentation#Authentication for more info
-
- :return: the current public ip address of the requesting system
- """
-
- url = urljoin(API_ENDPOINT, "ping")
- req_json = {
- "apikey": self.api_key,
- "secretapikey": self.secret_api_key
- }
- r = requests.post(url=url, json=req_json)
-
- if r.status_code == 200:
- return json.loads(r.text).get("yourIp", None)
- else:
- raise Exception("ERROR: ping api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- def dns_create(self,
- domain: str,
- record_type: str,
- content: str,
- name: Optional[str] = None,
- ttl: Optional[int] = 300,
- prio: Optional[int] = None, **kwargs) -> str:
- """
- API DNS create method: create a new DNS record for given domain
- see https://porkbun.com/api/json/v3/documentation#DNS%20Create%20Record for more info
-
- :param domain: the domain for which the DNS record should be created
- :param record_type: the type of the new DNS record;
- supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA
- :param content: the content of the new DNS record
- :param name: the subdomain for which the new DNS record entry should apply; the * can be used for a
- wildcard DNS record; if not used, then a DNS record for the root domain will be created
- :param ttl: the time to live in seconds of the new DNS record; have to be between 0 and 2147483647
- :param prio: the priority of the new DNS record
-
- :return: the id of the new created DNS record
- """
-
- assert domain is not None and len(domain) > 0
- assert record_type in SUPPORTED_DNS_RECORD_TYPES
- assert content is not None and len(content) > 0
- assert ttl is None or 300 <= ttl <= 2147483647
-
- url = urljoin(API_ENDPOINT, "dns/create/{}".format(domain))
- req_json = {
- "apikey": self.api_key,
- "secretapikey": self.secret_api_key,
- "name": name,
- "type": record_type,
- "content": content,
- "ttl": ttl,
- "prio": prio
- }
- r = requests.post(url=url, json=req_json)
-
- if r.status_code == 200:
- return str(json.loads(r.text).get("id", None))
- else:
- raise Exception("ERROR: DNS create api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- def dns_edit(self,
- domain: str,
- record_id: str,
- record_type: str,
- content: str,
- name: str = None,
- ttl: int = 300,
- prio: int = None,
- **kwargs) -> bool:
- """
- API DNS edit method: edit an existing DNS record specified by the id for a given domain
- see https://porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record for more info
-
- :param domain: the domain for which the DNS record should be edited
- :param record_id: the id of the DNS record which should be edited
- :param record_type: the new type of the DNS record;
- supported DNS record types: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA
- :param content: the new content of the DNS record
- :param name: the new value of the subdomain for which the DNS record should apply; the * can be used for a
- wildcard DNS record; if not set, the record will be set for the record domain
- :param ttl: the new time to live in seconds of the DNS record, have to be between 0 and 2147483647
- :param prio: the new priority of the DNS record
-
- :return: True if the editing was successful
- """
-
- assert domain is not None and len(domain) > 0
- assert record_id is not None and len(record_id) > 0
- assert record_type in SUPPORTED_DNS_RECORD_TYPES
- assert content is not None and len(content) > 0
- assert ttl is None or 300 <= ttl <= 2147483647
-
- url = urljoin(API_ENDPOINT, "dns/edit/{}/{}".format(domain, record_id))
- req_json = {
- "apikey": self.api_key,
- "secretapikey": self.secret_api_key,
- "name": name,
- "type": record_type,
- "content": content,
- "ttl": ttl,
- "prio": prio
- }
- r = requests.post(url=url, json=req_json)
-
- if r.status_code == 200:
- return True
- else:
- raise Exception("ERROR: DNS edit api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- def dns_delete(self,
- domain: str,
- record_id: str,
- **kwargs) -> bool:
- """
- API DNS delete method: delete an existing DNS record specified by the id for a given domain
- see https://porkbun.com/api/json/v3/documentation#DNS%20Delete%20Record for more info
-
- :param domain: the domain for which the DNS record should be deleted
- :param record_id: the id of the DNS record which should be deleted
-
- :return: True if the deletion was successful
- """
-
- assert domain is not None and len(domain) > 0
- assert record_id is not None and len(record_id) > 0
-
- url = urljoin(API_ENDPOINT, "dns/delete/{}/{}".format(domain, record_id))
- req_json = {
- "apikey": self.api_key,
- "secretapikey": self.secret_api_key
- }
- r = requests.post(url=url, json=req_json)
-
- if r.status_code == 200:
- return True
- else:
- raise Exception("ERROR: DNS delete api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- def dns_retrieve(self, domain, **kwargs) -> list:
- """
- API DNS retrieve method: retrieve all DNS records for given domain
- see https://porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records for more info
-
- :param domain: the domain for which the DNS records should be retrieved
-
- :return: list of DNS records as dicts
-
- The list structure will be:
- [
- {
- "id": "123456789",
- "name": "example.com",
- "type": "TXT",
- "content": "this is a nice text",
- "ttl": "300",
- "prio": None,
- "notes": ""
- },
- {
- "id": "234567890",
- "name": "example.com",
- "type": "A",
- "content": "0.0.0.0",
- "ttl": "300",
- "prio": 0,
- "notes": ""
- }
- ]
- """
-
- assert domain is not None and len(domain) > 0
-
- url = urljoin(API_ENDPOINT, "dns/retrieve/{}".format(domain))
- req_json = {
- "apikey": self.api_key,
- "secretapikey": self.secret_api_key
- }
- r = requests.post(url=url, json=req_json)
-
- if r.status_code == 200:
- return [parse_dns_record(record) for record in json.loads(r.text).get("records", [])]
- else:
- raise Exception("ERROR: DNS retrieve api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- def dns_export(self, domain: str, filename: str, **kwargs) -> bool:
- """
- Export all DNS record from the given domain as json to a file.
- This method does not not represent a Porkbun API method.
-
- :param domain: the domain for which the DNS record should be retrieved and saved
- :param filename: the filename where to save the exported DNS records
-
- :return: True if everything went well
- """
-
- assert domain is not None and len(domain) > 0
- assert filename is not None and len(filename) > 0
-
- print("retrieve current DNS records...")
- dns_records = self.dns_retrieve(domain)
-
- print("save DNS records to {} ...".format(filename))
- # merge the single DNS records into one single dict with the record id as key
- dns_records_dict = dict()
- for record in dns_records:
- dns_records_dict[record["id"]] = record
-
- filepath = Path(filename)
- if filepath.exists():
- raise Exception("File already exists. Please try another filename")
- with open(filepath, "w") as f:
- json.dump(dns_records_dict, f)
- print("export finished")
-
- return True
-
- def dns_import(self, domain: str, filename: str, restore_mode: DNSRestoreMode, **kwargs) -> bool:
- """
- Restore
- This method does not not represent a Porkbun API method.
-
- :param domain: the domain for which the DNS record should be restored
- :param filename: the filename from which the DNS records are to be restored
- :param restore_mode: The restore mode (DNS records are identified by the record id)
- clean: remove all existing DNS records and restore all DNS records from the provided file
- replace: replace only existing DNS records with the DNS records from the provided file,
- but do not create any new DNS records
- keep: keep the existing DNS records and only create new ones for all DNS records from
- the specified file if they do not exist
-
- :return: True if everything went well
- """
-
- assert domain is not None and len(domain) > 0
- assert filename is not None and len(filename) > 0
- assert isinstance(restore_mode, DNSRestoreMode)
-
- existing_dns_records = self.dns_retrieve(domain)
-
- with open(filename, "r") as f:
- exported_dns_records_dict = json.load(f)
-
- if restore_mode is DNSRestoreMode.clear:
- print("restore mode: clear")
-
- try:
- # delete all existing DNS records
- for record in existing_dns_records:
- self.dns_delete(domain, record["id"])
-
- # restore all exported records by creating new DNS records
- for _, exported_record in exported_dns_records_dict.items():
- name = ".".join(exported_record["name"].split(".")[:-2])
- self.dns_create(domain=domain,
- record_type=exported_record["type"],
- content=exported_record["content"],
- name=name,
- ttl=exported_record["ttl"],
- prio=exported_record["prio"])
- except Exception as e:
- print("something went wrong: {}".format(e.__str__()))
- self.__handle_error_backup__(existing_dns_records)
- print("import failed")
- return False
- elif restore_mode is DNSRestoreMode.replace:
- print("restore mode: replace")
-
- try:
- for existing_record in existing_dns_records:
- record_id = existing_record["id"]
- exported_record = exported_dns_records_dict.get(record_id, None)
- # also check if the exported dns record is different to the existing record,
- # so we can reduce unnecessary api calls
- if exported_record is not None and exported_record != existing_record:
- name = ".".join(exported_record["name"].split(".")[:-2])
- self.dns_edit(domain=domain,
- record_id=record_id,
- record_type=exported_record["type"],
- content=exported_record["content"],
- name=name,
- ttl=exported_record["ttl"],
- prio=exported_record["prio"])
- except Exception as e:
- print("something went wrong: {}".format(e.__str__()))
- self.__handle_error_backup__(existing_dns_records)
- print("import failed")
- return False
- elif restore_mode is DNSRestoreMode.keep:
- print("restore mode: keep")
-
- existing_dns_records_dict = dict()
- for record in existing_dns_records:
- existing_dns_records_dict[record["id"]] = record
-
- try:
- for _, exported_record in exported_dns_records_dict.items():
- if exported_record["id"] not in existing_dns_records_dict:
- name = ".".join(exported_record["name"].split(".")[:-2])
- self.dns_create(domain=domain,
- record_type=exported_record["type"],
- content=exported_record["content"],
- name=name,
- ttl=exported_record["ttl"],
- prio=exported_record["prio"])
- except Exception as e:
- print("something went wrong: {}".format(e.__str__()))
- self.__handle_error_backup__(existing_dns_records)
- print("import failed")
- return False
- else:
- raise Exception("restore mode not supported")
-
- print("import successfully completed")
-
- return True
-
- @staticmethod
- def get_domain_pricing(**kwargs) -> dict:
- """
- Get the pricing for porkbun domains
- see https://porkbun.com/api/json/v3/documentation#Domain%20Pricing for more info
-
- :return: dict with pricing
- """
-
- url = urljoin(API_ENDPOINT, "pricing/get")
- r = requests.post(url=url)
-
- if r.status_code == 200:
- return json.loads(r.text)
- else:
- raise Exception("ERROR: Domain pricing retrieve api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- def ssl_retrieve(self, domain, **kwargs) -> Tuple[str, str, str, str]:
- """
- API SSL bundle retrieve method: retrieve an SSL bundle for given domain
- see https://porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info
-
- :param domain: the domain for which the SSL bundle should be retrieved
-
- :return: tuple of intermediate certificate, certificate chain, private key, public key
- """
-
- assert domain is not None and len(domain) > 0
-
- url = urljoin(API_ENDPOINT, "ssl/retrieve/{}".format(domain))
- req_json = {
- "apikey": self.api_key,
- "secretapikey": self.secret_api_key
- }
- r = requests.post(url=url, json=req_json)
-
- if r.status_code == 200:
- ssl_bundle = json.loads(r.text)
-
- intermediate_certificate = ssl_bundle["intermediate_certificate"]
- certificate_chain = ssl_bundle["certificate_chain"]
- private_key = ssl_bundle["private_key"]
- public_key = ssl_bundle["public_key"]
-
- return intermediate_certificate, certificate_chain, private_key, public_key
- else:
- raise Exception("ERROR: SSL bundle retrieve api call was not successfully\n"
- "Status code: {}\n"
- "Message: {}".format(r.status_code, json.loads(r.text).get("message", "no message found")))
-
- @staticmethod
- def __handle_error_backup__(dns_records):
- # merge the single DNS records into one single dict with the record id as key
- dns_records_dict = dict()
- for record in dns_records:
- dns_records_dict[record["id"]] = record
-
- # generate filename with incremental suffix
- base_backup_filename = "pkb_client_dns_records_backup"
- suffix = 0
- backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix))
- while backup_file_path.exists():
- suffix += 1
- backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix))
-
- with open(backup_file_path, "w") as f:
- json.dump(dns_records_dict, f)
-
- print("a backup of your existing dns records was saved to {}".format(str(backup_file_path)))
diff --git a/pkb_client/client/__init__.py b/pkb_client/client/__init__.py
new file mode 100644
index 0000000..da8a42b
--- /dev/null
+++ b/pkb_client/client/__init__.py
@@ -0,0 +1,22 @@
+from .bind_file import BindFile, BindRecord, RecordClass
+from .client import PKBClient, PKBClientException, API_ENDPOINT
+from .dns import DNSRecord, DNSRestoreMode, DNSRecordType
+from .domain import DomainInfo
+from .forwarding import URLForwarding, URLForwardingType
+from .ssl_cert import SSLCertBundle
+
+__all__ = [
+ "PKBClient",
+ "PKBClientException",
+ "API_ENDPOINT",
+ "BindFile",
+ "BindRecord",
+ "RecordClass",
+ "DNSRecord",
+ "DNSRestoreMode",
+ "DNSRecordType",
+ "DomainInfo",
+ "URLForwarding",
+ "URLForwardingType",
+ "SSLCertBundle",
+]
diff --git a/pkb_client/client/bind_file.py b/pkb_client/client/bind_file.py
new file mode 100644
index 0000000..af9abe0
--- /dev/null
+++ b/pkb_client/client/bind_file.py
@@ -0,0 +1,169 @@
+import logging
+from dataclasses import dataclass
+from enum import Enum
+from typing import Optional, List
+
+from pkb_client.client.dns import DNSRecordType, DNS_RECORDS_WITH_PRIORITY
+
+
+class RecordClass(str, Enum):
+ IN = "IN"
+
+ def __str__(self):
+ return self.value
+
+
+@dataclass
+class BindRecord:
+ name: str
+ ttl: int
+ record_class: RecordClass
+ record_type: DNSRecordType
+ data: str
+ prio: Optional[int] = None
+ comment: Optional[str] = None
+
+ def __str__(self):
+ record_string = f"{self.name} {self.ttl} {self.record_class} {self.record_type}"
+ if self.prio is not None:
+ record_string += f" {self.prio}"
+ record_string += f" {self.data}"
+ if self.comment:
+ record_string += f" ; {self.comment}"
+ return record_string
+
+
+class BindFile:
+ origin: str
+ ttl: Optional[int] = None
+ records: List[BindRecord]
+
+ def __init__(
+ self,
+ origin: str,
+ ttl: Optional[int] = None,
+ records: Optional[List[BindRecord]] = None,
+ ) -> None:
+ self.origin = origin
+ self.ttl = ttl
+ self.records = records or []
+
+ @staticmethod
+ def from_file(file_path: str) -> "BindFile":
+ with open(file_path, "r") as f:
+ file_data = f.readlines()
+
+ # parse the file line by line
+ origin = None
+ ttl = None
+ records = []
+ for line in file_data:
+ if line.startswith("$ORIGIN"):
+ origin = line.split()[1]
+ elif line.startswith("$TTL"):
+ ttl = int(line.split()[1])
+ else:
+ # parse the records with the two possible formats:
+ # 1: name ttl record-class record-type record-data
+ # 2: name record-class ttl record-type record-data
+ # whereby the ttl is optional
+
+ # drop any right trailing comments
+ line_parts = line.split(";", 1)
+ line = line_parts[0].strip()
+ comment = line_parts[1].strip() if len(line_parts) > 1 else None
+ prio = None
+
+ # skip empty lines
+ if not line:
+ continue
+
+ # find which format the line is
+ record_parts = line.split()
+ if record_parts[1].isdigit():
+ # scheme 1
+ if record_parts[3] not in DNSRecordType.__members__:
+ logging.warning(f"Ignoring unsupported record type: {line}")
+ continue
+ if record_parts[2] not in RecordClass.__members__:
+ logging.warning(f"Ignoring unsupported record class: {line}")
+ continue
+ record_name = record_parts[0]
+ record_ttl = int(record_parts[1])
+ record_class = RecordClass[record_parts[2]]
+ record_type = DNSRecordType[record_parts[3]]
+ if record_type in DNS_RECORDS_WITH_PRIORITY:
+ prio = int(record_parts[4])
+ record_data = " ".join(record_parts[5:])
+ else:
+ record_data = " ".join(record_parts[4:])
+ elif record_parts[2].isdigit():
+ # scheme 2
+ if record_parts[3] not in DNSRecordType.__members__:
+ logging.warning(f"Ignoring unsupported record type: {line}")
+ continue
+ if record_parts[1] not in RecordClass.__members__:
+ logging.warning(f"Ignoring unsupported record class: {line}")
+ continue
+ record_name = record_parts[0]
+ record_ttl = int(record_parts[2])
+ record_class = RecordClass[record_parts[1]]
+ record_type = DNSRecordType[record_parts[3]]
+ if record_type in DNS_RECORDS_WITH_PRIORITY:
+ prio = int(record_parts[4])
+ record_data = " ".join(record_parts[5:])
+ else:
+ record_data = " ".join(record_parts[4:])
+ else:
+ # no ttl, use default or previous
+ if record_parts[2] not in DNSRecordType.__members__:
+ logging.warning(f"Ignoring unsupported record type: {line}")
+ continue
+ if record_parts[1] not in RecordClass.__members__:
+ logging.warning(f"Ignoring unsupported record class: {line}")
+ continue
+ record_name = record_parts[0]
+ if ttl is None and not records:
+ raise ValueError("No TTL found in file")
+ record_ttl = ttl or records[-1].ttl
+ record_class = RecordClass[record_parts[1]]
+ record_type = DNSRecordType[record_parts[2]]
+ if record_type in DNS_RECORDS_WITH_PRIORITY:
+ prio = int(record_parts[3])
+ record_data = " ".join(record_parts[4:])
+ else:
+ record_data = " ".join(record_parts[3:])
+
+ # replace @ in record name with origin
+ record_name = record_name.replace("@", origin)
+
+ records.append(
+ BindRecord(
+ record_name,
+ record_ttl,
+ record_class,
+ record_type,
+ record_data,
+ prio=prio,
+ comment=comment,
+ )
+ )
+
+ if origin is None:
+ raise ValueError("No origin found in file")
+
+ return BindFile(origin, ttl, records)
+
+ def to_file(self, file_path: str) -> None:
+ with open(file_path, "w") as f:
+ f.write(str(self))
+
+ def __str__(self) -> str:
+ bind = f"$ORIGIN {self.origin}\n"
+
+ if self.ttl is not None:
+ bind += f"$TTL {self.ttl}\n"
+
+ for record in self.records:
+ bind += f"{record}\n"
+ return bind
diff --git a/pkb_client/client/client.py b/pkb_client/client/client.py
new file mode 100644
index 0000000..86956a5
--- /dev/null
+++ b/pkb_client/client/client.py
@@ -0,0 +1,867 @@
+import json
+import logging
+from hashlib import sha256
+from pathlib import Path
+from typing import Optional, List, Union
+from urllib.parse import urljoin
+
+import dns.resolver
+import requests
+
+from pkb_client.client import BindFile
+from pkb_client.client.dns import (
+ DNSRecord,
+ DNSRestoreMode,
+ DNSRecordType,
+ DNS_RECORDS_WITH_PRIORITY,
+)
+from pkb_client.client.domain import DomainInfo
+from pkb_client.client.forwarding import URLForwarding, URLForwardingType
+from pkb_client.client.ssl_cert import SSLCertBundle
+
+API_ENDPOINT = "https://api.porkbun.com/api/json/v3/"
+
+logger = logging.getLogger("pkb_client")
+logging.basicConfig(level=logging.INFO)
+
+
+class PKBClientException(Exception):
+ def __init__(self, status, message):
+ super().__init__(f"{status}: {message}")
+
+
+class PKBClient:
+ """
+ API client for Porkbun.
+ """
+
+ default_ttl: int = 300
+
+ def __init__(
+ self,
+ api_key: Optional[str] = None,
+ secret_api_key: Optional[str] = None,
+ api_endpoint: str = API_ENDPOINT,
+ debug: bool = False,
+ ) -> None:
+ """
+ Creates a new PKBClient object.
+
+ :param api_key: the API key used for Porkbun API calls
+ :param secret_api_key: the API secret used for Porkbun API calls
+ :param api_endpoint: the endpoint of the Porkbun API.
+ :param debug: boolean to enable debug logging
+ """
+ self.api_key = api_key
+ self.secret_api_key = secret_api_key
+ self.api_endpoint = api_endpoint
+ self.debug = debug
+ if self.debug:
+ logger.setLevel(logging.DEBUG)
+
+ def _get_auth_request_json(self) -> dict:
+ """
+ Get the request json for the authentication of the Porkbun API calls.
+
+ :return: the request json for the authentication of the Porkbun API calls
+ """
+
+ if self.api_key is None or self.secret_api_key is None:
+ raise ValueError("api_key and secret_api_key must be set")
+
+ return {"apikey": self.api_key, "secretapikey": self.secret_api_key}
+
+ def ping(self) -> str:
+ """
+ API ping method: get the current public ip address of the requesting system; can also be used for auth checking.
+ See https://api.porkbun.com/api/json/v3/documentation#Authentication for more info.
+
+ :return: the current public ip address of the requesting system
+ """
+
+ url = urljoin(self.api_endpoint, "ping")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return json.loads(r.text).get("yourIp", None)
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def create_dns_record(
+ self,
+ domain: str,
+ record_type: DNSRecordType,
+ content: str,
+ name: Optional[str] = None,
+ ttl: int = default_ttl,
+ prio: Optional[int] = None,
+ ) -> str:
+ """
+ API DNS create method: create a new DNS record for given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Create%20Record for more info.
+
+ :param domain: the domain for which the DNS record should be created
+ :param record_type: the type of the new DNS record
+ :param content: the content of the new DNS record
+ :param name: the subdomain for which the new DNS record entry should apply; the * can be used for a
+ wildcard DNS record; if not used, then a DNS record for the root domain will be created
+ :param ttl: the time to live in seconds of the new DNS record; have to be between 300 and 86400
+ :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None
+ :return: the id of the new created DNS record
+ """
+
+ if ttl > 86400 or ttl < self.default_ttl:
+ raise ValueError(f"ttl must be between {self.default_ttl} and 86400")
+
+ if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY:
+ raise ValueError(
+ f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}"
+ )
+
+ url = urljoin(self.api_endpoint, f"dns/create/{domain}")
+ req_json = {
+ **self._get_auth_request_json(),
+ "name": name,
+ "type": record_type,
+ "content": content,
+ "ttl": ttl,
+ "prio": prio,
+ }
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return str(json.loads(r.text).get("id", None))
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def update_dns_record(
+ self,
+ domain: str,
+ record_id: str,
+ record_type: DNSRecordType,
+ content: str,
+ name: Optional[str] = None,
+ ttl: int = default_ttl,
+ prio: Optional[int] = None,
+ ) -> bool:
+ """
+ API DNS edit method: edit an existing DNS record specified by the id for a given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record for more info.
+
+ :param domain: the domain for which the DNS record should be edited
+ :param record_id: the id of the DNS record which should be edited
+ :param record_type: the new type of the DNS record
+ :param content: the new content of the DNS record
+ :param name: the new value of the subdomain for which the DNS record should apply; the * can be used for a
+ wildcard DNS record; if not set, the record will be set for the record domain
+ :param ttl: the new time to live in seconds of the DNS record, have to be between 300 and 86400
+ :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None
+
+ :return: True if the editing was successful
+ """
+
+ if ttl > 86400 or ttl < self.default_ttl:
+ raise ValueError(f"ttl must be between {self.default_ttl} and 86400")
+
+ if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY:
+ raise ValueError(
+ f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}"
+ )
+
+ url = urljoin(self.api_endpoint, f"dns/edit/{domain}/{record_id}")
+ req_json = {
+ **self._get_auth_request_json(),
+ "name": name,
+ "type": record_type,
+ "content": content,
+ "ttl": ttl,
+ "prio": prio,
+ }
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def update_all_dns_records(
+ self,
+ domain: str,
+ record_type: DNSRecordType,
+ subdomain: str,
+ content: str,
+ ttl: int = default_ttl,
+ prio: Optional[int] = None,
+ ) -> bool:
+ """
+ API DNS edit method: edit all existing DNS record matching the domain, record type and subdomain.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Edit%20Record%20by%20Domain,%20Subdomain%20and%20Type for more info.
+
+ :param domain: the domain for which the DNS record should be edited
+ :param record_type: the type of the DNS record
+ :param subdomain: the subdomain of the DNS record can be empty string for root domain
+ :param content: the new content of the DNS record
+ :param ttl: the new time to live in seconds of the DNS record, have to be between 300 and 86400
+ :param prio: the priority of the new DNS record (only records of type MX and SRV) otherwise None
+
+ :return: True if the editing was successful
+ """
+
+ if ttl > 86400 or ttl < self.default_ttl:
+ raise ValueError(f"ttl must be between {self.default_ttl} and 86400")
+
+ if prio is not None and record_type not in DNS_RECORDS_WITH_PRIORITY:
+ raise ValueError(
+ f"Priority can only be set for {DNS_RECORDS_WITH_PRIORITY}"
+ )
+
+ url = urljoin(
+ self.api_endpoint, f"dns/editByNameType/{domain}/{record_type}/{subdomain}"
+ )
+ req_json = {
+ **self._get_auth_request_json(),
+ "type": record_type,
+ "content": content,
+ "ttl": ttl,
+ "prio": prio,
+ }
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def delete_dns_record(self, domain: str, record_id: str) -> bool:
+ """
+ API DNS delete method: delete an existing DNS record specified by the id for a given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Delete%20Record for more info.
+
+ :param domain: the domain for which the DNS record should be deleted
+ :param record_id: the id of the DNS record which should be deleted
+
+ :return: True if the deletion was successful
+ """
+
+ url = urljoin(self.api_endpoint, f"dns/delete/{domain}/{record_id}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def delete_all_dns_records(
+ self, domain: str, record_type: DNSRecordType, subdomain: str
+ ) -> bool:
+ """
+ API DNS delete method: delete all existing DNS record matching the domain, record type and subdomain.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Delete%20Records%20by%20Domain,%20Subdomain%20and%20Type for more info.
+
+ :param domain: the domain for which the DNS record should be deleted
+ :param record_type: the type of the DNS record
+ :param subdomain: the subdomain of the DNS record can be empty string for root domain
+
+ :return: True if the deletion was successful
+ """
+
+ url = urljoin(
+ self.api_endpoint,
+ f"dns/deleteByNameType/{domain}/{record_type}/{subdomain}",
+ )
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_dns_records(
+ self, domain, record_id: Optional[str] = None
+ ) -> List[DNSRecord]:
+ """
+ API DNS retrieve method: retrieve all DNS records for given domain if no record id is specified.
+ Otherwise, retrieve the DNS record of the specified domain with the given record id.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records for more info.
+
+ :param domain: the domain for which the DNS records should be retrieved
+ :param record_id: the id of the DNS record which should be retrieved
+
+ :return: list of DNSRecords objects
+ """
+
+ if record_id is None:
+ url = urljoin(self.api_endpoint, f"dns/retrieve/{domain}")
+ else:
+ url = urljoin(self.api_endpoint, f"dns/retrieve/{domain}/{record_id}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return [
+ DNSRecord.from_dict(record)
+ for record in json.loads(r.text).get("records", [])
+ ]
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_all_dns_records(
+ self, domain: str, record_type: DNSRecordType, subdomain: str
+ ) -> List[DNSRecord]:
+ """
+ API DNS retrieve method: retrieve all DNS records matching the domain, record type and subdomain.
+ See https://api.porkbun.com/api/json/v3/documentation#DNS%20Retrieve%20Records%20by%20Domain,%20Subdomain%20and%20Type for more info.
+
+ :param domain: the domain for which the DNS records should be retrieved
+ :param record_type: the type of the DNS records
+ :param subdomain: the subdomain of the DNS records can be empty string for root domain
+
+ :return: list of DNSRecords objects
+ """
+
+ url = urljoin(
+ self.api_endpoint,
+ f"dns/retrieveByNameType/{domain}/{record_type}/{subdomain}",
+ )
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return [
+ DNSRecord.from_dict(record)
+ for record in json.loads(r.text).get("records", [])
+ ]
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def export_dns_records(self, domain: str, filepath: Union[Path, str]) -> bool:
+ """
+ Export all DNS record from the given domain to a json file.
+ This method does not represent a Porkbun API method.
+ DNS records with all custom fields like notes are exported.
+
+ :param domain: the domain for which the DNS record should be retrieved and saved
+ :param filepath: the filepath where to save the exported DNS records
+
+ :return: True if everything went well
+ """
+
+ filepath = Path(filepath)
+
+ logger.debug("retrieve current DNS records...")
+ dns_records = self.get_dns_records(domain)
+
+ logger.debug("save DNS records to {} ...".format(filepath))
+ # merge the single DNS records into one single dict with the record id as key
+ dns_records_dict = dict()
+ for record in dns_records:
+ dns_records_dict[record.id] = record
+
+ if filepath.exists():
+ logger.warning("file already exists, overwriting...")
+
+ with open(filepath, "w") as f:
+ json.dump(dns_records_dict, f, default=lambda o: o.__dict__, indent=4)
+
+ logger.info("export finished")
+
+ return True
+
+ def export_bind_dns_records(self, domain: str, filepath: Union[Path, str]) -> bool:
+ """
+ Export all DNS record from the given domain to a BIND file.
+ This method does not represent a Porkbun API method.
+ Porkbun DNS record notes are exported as comments.
+
+ :param domain: the domain for which the DNS record should be retrieved and saved
+ :param filepath: the filepath where to save the exported DNS records
+
+ :return: True if everything went well
+ """
+
+ filepath = Path(filepath)
+
+ logger.debug("retrieve current DNS records...")
+ dns_records = self.get_dns_records(domain)
+
+ logger.debug("save DNS records to {} ...".format(filepath))
+ # merge the single DNS records into one single dict with the record id as key
+ dns_records_dict = dict()
+ for record in dns_records:
+ dns_records_dict[record.id] = record
+
+ if filepath.exists():
+ logger.warning("file already exists, overwriting...")
+
+ # domain header
+ bind_file_content = f"$ORIGIN {domain}"
+
+ # SOA record
+ soa_records = dns.resolver.resolve(domain, "SOA")
+ if soa_records:
+ soa_record = soa_records[0]
+ bind_file_content += f"\n@ IN SOA {soa_record.mname} {soa_record.rname} ({soa_record.serial} {soa_record.refresh} {soa_record.retry} {soa_record.expire} {soa_record.minimum})"
+
+ # records
+ for record in dns_records:
+ # name record class ttl record type record data
+ if record.prio:
+ record_content = f"{record.prio} {record.content}"
+ else:
+ record_content = record.content
+ bind_file_content += (
+ f"\n{record.name} IN {record.ttl} {record.type} {record_content}"
+ )
+
+ if record.notes:
+ bind_file_content += f" ; {record.notes}"
+
+ with open(filepath, "w") as f:
+ f.write(bind_file_content)
+
+ logger.info("export finished")
+
+ return True
+
+ def import_dns_records(
+ self, domain: str, filepath: Union[Path, str], restore_mode: DNSRestoreMode
+ ) -> bool:
+ """
+ Restore all DNS records from a json file to the given domain.
+ This method does not represent a Porkbun API method.
+
+ :param domain: the domain for which the DNS record should be restored
+ :param filepath: the filepath from which the DNS records are to be restored
+ :param restore_mode: The restore mode (DNS records are identified by the record type, name and prio if supported):
+ clear: remove all existing DNS records and restore all DNS records from the provided file
+ replace: replace only existing DNS records with the DNS records from the provided file,
+ but do not create any new DNS records
+ keep: keep the existing DNS records and only create new ones for all DNS records from
+ the specified file if they do not exist
+
+ :return: True if everything went well
+ """
+
+ filepath = Path(filepath)
+
+ existing_dns_records = self.get_dns_records(domain)
+
+ with open(filepath, "r") as f:
+ exported_dns_records_dict = json.load(f)
+
+ if restore_mode is DNSRestoreMode.clear:
+ logger.debug("restore mode: clear")
+
+ try:
+ # delete all existing DNS records
+ for record in existing_dns_records:
+ self.delete_dns_record(domain, record.id)
+
+ # restore all exported records by creating new DNS records
+ for _, exported_record in exported_dns_records_dict.items():
+ name = ".".join(exported_record["name"].split(".")[:-2])
+ self.create_dns_record(
+ domain=domain,
+ record_type=exported_record["type"],
+ content=exported_record["content"],
+ name=name,
+ ttl=exported_record["ttl"],
+ prio=exported_record["prio"],
+ )
+ except Exception as e:
+ logger.error("something went wrong: {}".format(e.__str__()))
+ self.__handle_error_backup__(existing_dns_records)
+ logger.error("import failed")
+ return False
+ elif restore_mode is DNSRestoreMode.replace:
+ logger.debug("restore mode: replace")
+
+ try:
+ existing_dns_record_hashed = {
+ sha256(
+ f"{record.type}{record.name}{record.prio}".encode()
+ ).hexdigest(): record
+ for record in existing_dns_records
+ }
+ for record in exported_dns_records_dict.values():
+ record_hash = sha256(
+ f"{record['type']}{record['name']}{record['prio']}".encode()
+ ).hexdigest()
+ existing_record = existing_dns_record_hashed.get(record_hash, None)
+ # check if the exported dns record is different to the existing record,
+ # so we can reduce unnecessary api calls
+ if existing_record is not None and (
+ record["content"] != existing_record.content
+ or record["ttl"] != existing_record.ttl
+ or record["prio"] != existing_record.prio
+ ):
+ self.update_dns_record(
+ domain=domain,
+ record_id=existing_record.id,
+ record_type=record["type"],
+ content=record["content"],
+ name=record["name"].replace(f".{domain}", ""),
+ ttl=record["ttl"],
+ prio=record["prio"],
+ )
+ except Exception as e:
+ logger.error("something went wrong: {}".format(e.__str__()))
+ self.__handle_error_backup__(existing_dns_records)
+ logger.error("import failed")
+ return False
+ elif restore_mode is DNSRestoreMode.keep:
+ logger.debug("restore mode: keep")
+
+ existing_dns_record_hashed = {
+ sha256(
+ f"{record.type}{record.name}{record.prio}".encode()
+ ).hexdigest(): record
+ for record in existing_dns_records
+ }
+
+ try:
+ for record in exported_dns_records_dict.values():
+ record_hash = sha256(
+ f"{record['type']}{record['name']}{record['prio']}".encode()
+ ).hexdigest()
+ existing_record = existing_dns_record_hashed.get(record_hash, None)
+ if existing_record is None:
+ self.create_dns_record(
+ domain=domain,
+ record_type=record["type"],
+ content=record["content"],
+ name=record["name"].replace(f".{domain}", ""),
+ ttl=record["ttl"],
+ prio=record["prio"],
+ )
+ except Exception as e:
+ logger.error("something went wrong: {}".format(e.__str__()))
+ self.__handle_error_backup__(existing_dns_records)
+ logger.error("import failed")
+ return False
+ else:
+ raise Exception("restore mode not supported")
+
+ logger.info("import successfully completed")
+
+ return True
+
+ def import_bind_dns_records(
+ self, filepath: Union[Path, str], restore_mode: DNSRestoreMode
+ ) -> bool:
+ """
+ Restore all DNS records from a BIND file.
+ This method does not represent a Porkbun API method.
+
+ :param filepath: the bind filepath from which the DNS records are to be restored
+ :param restore_mode: The restore mode:
+ clear: remove all existing DNS records and restore all DNS records from the provided file
+ :return: True if everything went well
+ """
+
+ bind_file = BindFile.from_file(filepath)
+
+ existing_dns_records = self.get_dns_records(bind_file.origin[:-1])
+
+ if restore_mode is DNSRestoreMode.clear:
+ logger.debug("restore mode: clear")
+
+ try:
+ # delete all existing DNS records
+ for record in existing_dns_records:
+ self.delete_dns_record(bind_file.origin[:-1], record.id)
+
+ # restore all records from BIND file by creating new DNS records
+ for record in bind_file.records:
+ # extract subdomain from record name
+ subdomain = record.name.replace(bind_file.origin, "")
+ # replace trailing dot
+ subdomain = subdomain[:-1] if subdomain.endswith(".") else subdomain
+ self.create_dns_record(
+ domain=bind_file.origin[:-1],
+ record_type=record.record_type,
+ content=record.data,
+ name=subdomain,
+ ttl=record.ttl,
+ prio=record.prio,
+ )
+
+ except Exception as e:
+ logger.error("something went wrong: {}".format(e.__str__()))
+ self.__handle_error_backup__(existing_dns_records)
+ logger.error("import failed")
+ return False
+ else:
+ raise Exception(f"restore mode '{restore_mode.value}' not supported")
+
+ logger.info("import successfully completed")
+
+ return True
+
+ def update_dns_servers(self, domain: str, name_servers: List[str]) -> bool:
+ """
+ Update the name servers of the specified domain.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20Update%20Name%20Servers for more info.
+
+ :return: True if everything went well
+ """
+
+ url = urljoin(self.api_endpoint, f"domain/updateNs/{domain}")
+ req_json = {**self._get_auth_request_json(), "ns": name_servers}
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200 and json.loads(r.text).get("status", None) == "SUCCESS":
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_dns_servers(self, domain: str) -> List[str]:
+ """
+ Get the name servers for the given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20Get%20Name%20Servers for more info.
+
+ :return: list of name servers
+ """
+
+ url = urljoin(self.api_endpoint, f"domain/getNs/{domain}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return json.loads(r.text).get("ns", [])
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_domains(self, start: int = 0) -> List[DomainInfo]:
+ """
+ Get all domains for the account in chunks of 1000. If you reach the end of all domains, the list will be empty.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20List%20All for more info.
+
+ :param start: the index of the first domain to retrieve
+
+ :return: list of DomainInfo objects
+ """
+
+ url = urljoin(self.api_endpoint, "domain/listAll")
+
+ req_json = {**self._get_auth_request_json(), "start": start}
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return [
+ DomainInfo.from_dict(domain)
+ for domain in json.loads(r.text).get("domains", [])
+ ]
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_url_forwards(self, domain: str) -> List[URLForwarding]:
+ """
+ Get the url forwarding for the given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20Get%20URL%20Forwarding for more info.
+
+ :return: list of URLForwarding objects
+ """
+
+ url = urljoin(self.api_endpoint, f"domain/getUrlForwarding/{domain}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return [
+ URLForwarding.from_dict(forwarding)
+ for forwarding in json.loads(r.text).get("forwards", [])
+ ]
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def create_url_forward(
+ self,
+ domain: str,
+ subdomain: str,
+ location: str,
+ type: URLForwardingType,
+ include_path: bool,
+ wildcard: bool,
+ ) -> bool:
+ """
+ Add a url forward for the given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20Add%20URL%20Forward for more info.
+
+ :param domain: the domain for which the url forwarding should be added
+ :param subdomain: the subdomain for which the url forwarding should be added, can be empty for root domain
+ :param location: the location to which the url forwarding should redirect
+ :param type: the type of the url forwarding
+ :param include_path: if the path should be included in the url forwarding
+ :param wildcard: if the url forwarding should also be applied to all subdomains
+
+ :return: True if the forwarding was added successfully
+ """
+
+ url = urljoin(self.api_endpoint, f"domain/addUrlForward/{domain}")
+ req_json = {
+ **self._get_auth_request_json(),
+ "subdomain": subdomain,
+ "location": location,
+ "type": type,
+ "includePath": include_path,
+ "wildcard": wildcard,
+ }
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def delete_url_forward(self, domain: str, id: str) -> bool:
+ """
+ Delete an url forward for the given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20Delete%20URL%20Forward for more info.
+
+ :param domain: the domain for which the url forwarding should be deleted
+ :param id: the id of the url forwarding which should be deleted
+
+ :return: True if the deletion was successful
+ """
+
+ url = urljoin(self.api_endpoint, f"domain/deleteUrlForward/{domain}/{id}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ return True
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_domain_pricing(self) -> dict:
+ """
+ Get the pricing for all Porkbun domains.
+ See https://api.porkbun.com/api/json/v3/documentation#Domain%20Pricing for more info.
+
+ :return: dict with pricing
+ """
+
+ url = urljoin(self.api_endpoint, "pricing/get")
+ r = requests.post(url=url)
+
+ if r.status_code == 200:
+ return json.loads(r.text)["pricing"]
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ def get_ssl_bundle(self, domain) -> SSLCertBundle:
+ """
+ API SSL bundle retrieve method: retrieve an SSL bundle for the given domain.
+ See https://api.porkbun.com/api/json/v3/documentation#SSL%20Retrieve%20Bundle%20by%20Domain for more info.
+
+ :param domain: the domain for which the SSL bundle should be retrieved
+
+ :return: tuple of intermediate certificate, certificate chain, private key, public key
+ """
+
+ url = urljoin(self.api_endpoint, f"ssl/retrieve/{domain}")
+ req_json = self._get_auth_request_json()
+ r = requests.post(url=url, json=req_json)
+
+ if r.status_code == 200:
+ ssl_bundle = json.loads(r.text)
+
+ return SSLCertBundle(
+ certificate_chain=ssl_bundle["certificatechain"],
+ private_key=ssl_bundle["privatekey"],
+ public_key=ssl_bundle["publickey"],
+ )
+ else:
+ response_json = json.loads(r.text)
+ raise PKBClientException(
+ response_json.get("status", "Unknown status"),
+ response_json.get("message", "Unknown message"),
+ )
+
+ @staticmethod
+ def __handle_error_backup__(dns_records):
+ # merge the single DNS records into one single dict with the record id as key
+ dns_records_dict = dict()
+ for record in dns_records:
+ dns_records_dict[record["id"]] = record
+
+ # generate filename with incremental suffix
+ base_backup_filename = "pkb_client_dns_records_backup"
+ suffix = 0
+ backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix))
+ while backup_file_path.exists():
+ suffix += 1
+ backup_file_path = Path("{}_{}.json".format(base_backup_filename, suffix))
+
+ with open(backup_file_path, "w") as f:
+ json.dump(dns_records_dict, f)
+
+ logger.warning(
+ "a backup of your existing dns records was saved to {}".format(
+ str(backup_file_path)
+ )
+ )
diff --git a/pkb_client/client/dns.py b/pkb_client/client/dns.py
new file mode 100644
index 0000000..85ae971
--- /dev/null
+++ b/pkb_client/client/dns.py
@@ -0,0 +1,63 @@
+from dataclasses import dataclass
+from enum import Enum
+from typing import Optional
+
+
+class DNSRecordType(str, Enum):
+ A = "A"
+ AAAA = "AAAA"
+ MX = "MX"
+ CNAME = "CNAME"
+ ALIAS = "ALIAS"
+ TXT = "TXT"
+ NS = "NS"
+ SRV = "SRV"
+ TLSA = "TLSA"
+ CAA = "CAA"
+
+ def __str__(self):
+ return self.value
+
+
+DNS_RECORDS_WITH_PRIORITY = {DNSRecordType.MX, DNSRecordType.SRV}
+
+
+@dataclass
+class DNSRecord:
+ id: str
+ name: str
+ type: DNSRecordType
+ content: str
+ ttl: int
+ prio: Optional[int]
+ notes: str
+
+ @staticmethod
+ def from_dict(d):
+ # only use prio for supported record types since the API returns it for all records with default value 0
+ prio = int(d["prio"]) if d["type"] in DNS_RECORDS_WITH_PRIORITY else None
+ return DNSRecord(
+ id=d["id"],
+ name=d["name"],
+ type=DNSRecordType[d["type"]],
+ content=d["content"],
+ ttl=int(d["ttl"]),
+ prio=prio,
+ notes=d["notes"],
+ )
+
+
+class DNSRestoreMode(Enum):
+ clear = 0
+ replace = 1
+ keep = 2
+
+ def __str__(self):
+ return self.name
+
+ @staticmethod
+ def from_string(a):
+ try:
+ return DNSRestoreMode[a]
+ except KeyError:
+ return a
diff --git a/pkb_client/client/domain.py b/pkb_client/client/domain.py
new file mode 100644
index 0000000..a44a904
--- /dev/null
+++ b/pkb_client/client/domain.py
@@ -0,0 +1,29 @@
+from dataclasses import dataclass
+from datetime import datetime
+
+
+@dataclass
+class DomainInfo:
+ domain: str
+ status: str
+ tld: str
+ create_date: datetime
+ expire_date: datetime
+ security_lock: bool
+ whois_privacy: bool
+ auto_renew: bool
+ not_local: bool
+
+ @staticmethod
+ def from_dict(d):
+ return DomainInfo(
+ domain=d["domain"],
+ status=d["status"],
+ tld=d["tld"],
+ create_date=datetime.fromisoformat(d["createDate"]),
+ expire_date=datetime.fromisoformat(d["expireDate"]),
+ security_lock=bool(d["securityLock"]),
+ whois_privacy=bool(d["whoisPrivacy"]),
+ auto_renew=bool(d["autoRenew"]),
+ not_local=bool(d["notLocal"]),
+ )
diff --git a/pkb_client/client/forwarding.py b/pkb_client/client/forwarding.py
new file mode 100644
index 0000000..64962ca
--- /dev/null
+++ b/pkb_client/client/forwarding.py
@@ -0,0 +1,28 @@
+from dataclasses import dataclass
+from enum import Enum
+
+
+class URLForwardingType(str, Enum):
+ temporary = "temporary"
+ permanent = "permanent"
+
+
+@dataclass
+class URLForwarding:
+ id: str
+ subdomain: str
+ location: str
+ type: URLForwardingType
+ include_path: bool
+ wildcard: bool
+
+ @staticmethod
+ def from_dict(d):
+ return URLForwarding(
+ id=d["id"],
+ subdomain=d["subdomain"],
+ location=d["location"],
+ type=URLForwardingType[d["type"]],
+ include_path=d["includePath"] == "yes",
+ wildcard=d["wildcard"] == "yes",
+ )
diff --git a/pkb_client/client/ssl_cert.py b/pkb_client/client/ssl_cert.py
new file mode 100644
index 0000000..0662ce4
--- /dev/null
+++ b/pkb_client/client/ssl_cert.py
@@ -0,0 +1,13 @@
+from dataclasses import dataclass
+
+
+@dataclass
+class SSLCertBundle:
+ # The complete certificate chain.
+ certificate_chain: str
+
+ # The private key.
+ private_key: str
+
+ # The public key.
+ public_key: str
diff --git a/pkb_client/helper.py b/pkb_client/helper.py
deleted file mode 100644
index 0c08167..0000000
--- a/pkb_client/helper.py
+++ /dev/null
@@ -1,15 +0,0 @@
-def parse_dns_record(record: dict) -> dict:
- """
- Parse the DNS record.
- Replace the ttl and prio string values with the int values.
-
- :param record: the unparsed DNS record dict
-
- :return: the parsed dns record dict
- """
- if record.get("ttl", None) is not None:
- record["ttl"] = int(record["ttl"])
- if record.get("prio", None) is not None:
- record["prio"] = int(record["prio"])
-
- return record
diff --git a/requirements.txt b/requirements.txt
index 2c81c10..b6b0625 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,2 +1,6 @@
setuptools>=39.0.1
requests>=2.20.0
+sphinx~=7.4
+dnspython~=2.7
+responses~=0.25.3
+ruff~=0.7
diff --git a/setup.py b/setup.py
index fcc4a15..d1c8f04 100644
--- a/setup.py
+++ b/setup.py
@@ -10,17 +10,17 @@ setup(
version=pkb_client.__version__,
author="infinityofspace",
url="https://github.com/infinityofspace/pkb_client",
- description="Unofficial client for the Porkbun API",
+ description="Python client for the Porkbun API",
long_description=long_description,
long_description_content_type="text/markdown",
license="MIT",
classifiers=[
- "Development Status :: 4 - Beta",
"Programming Language :: Python :: 3",
- "Programming Language :: Python :: 3.6",
- "Programming Language :: Python :: 3.7",
- "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
+ "Programming Language :: Python :: 3.10",
+ "Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
+ "Programming Language :: Python :: 3.13",
"License :: OSI Approved :: MIT License",
"Topic :: Internet :: WWW/HTTP",
"Topic :: Internet :: Name Service (DNS)",
@@ -28,14 +28,11 @@ setup(
"Topic :: System :: Systems Administration",
],
packages=find_packages(),
- python_requires=">=3.6",
- install_requires=[
- "setuptools>=39.0.1",
- "requests>=2.20.0"
- ],
+ python_requires=">=3.9",
+ install_requires=["setuptools>=39.0.1", "requests>=2.20.0", "dnspython~=2.6"],
entry_points={
"console_scripts": [
- "pkb-client = pkb_client.cli:main",
+ "pkb-client = pkb_client.cli.cli:main",
]
- }
+ },
)
diff --git a/tests/bind_file.py b/tests/bind_file.py
new file mode 100644
index 0000000..4d1114f
--- /dev/null
+++ b/tests/bind_file.py
@@ -0,0 +1,148 @@
+import tempfile
+import unittest
+from importlib import resources
+
+from pkb_client.client.bind_file import BindFile, BindRecord, RecordClass
+from pkb_client.client.dns import DNSRecordType
+from tests import data
+
+
+class TestBindFileParsing(unittest.TestCase):
+ def test_reading_bind_file(self):
+ with self.subTest("With default TTL"):
+ with resources.open_text(data, "test.bind") as f:
+ bind_file = BindFile.from_file(f.name)
+
+ self.assertEqual("test.com.", bind_file.origin)
+ self.assertEqual(1234, bind_file.ttl)
+ self.assertEqual(5, len(bind_file.records))
+ self.assertEqual(
+ BindRecord(
+ "test.com.", 600, RecordClass.IN, DNSRecordType.A, "1.2.3.4"
+ ),
+ bind_file.records[0],
+ )
+ self.assertEqual(
+ BindRecord(
+ "sub.test.com.", 600, RecordClass.IN, DNSRecordType.A, "4.3.2.1"
+ ),
+ bind_file.records[1],
+ )
+ self.assertEqual(
+ BindRecord(
+ "test.com.",
+ 600,
+ RecordClass.IN,
+ DNSRecordType.AAAA,
+ "2001:db8::1",
+ comment="This is a comment",
+ ),
+ bind_file.records[2],
+ )
+ self.assertEqual(
+ BindRecord(
+ "test.com.", 1234, RecordClass.IN, DNSRecordType.TXT, "pkb-client"
+ ),
+ bind_file.records[3],
+ )
+ self.assertEqual(
+ BindRecord(
+ "test.com.",
+ 600,
+ RecordClass.IN,
+ DNSRecordType.MX,
+ "mail.test.com.",
+ prio=10,
+ ),
+ bind_file.records[4],
+ )
+
+ with self.subTest("Without default TTL"):
+ with resources.open_text(data, "test_no_ttl.bind") as f:
+ bind_file = BindFile.from_file(f.name)
+
+ self.assertEqual("test.com.", bind_file.origin)
+ self.assertEqual(None, bind_file.ttl)
+ self.assertEqual(5, len(bind_file.records))
+ self.assertEqual(
+ BindRecord(
+ "test.com.", 600, RecordClass.IN, DNSRecordType.A, "1.2.3.4"
+ ),
+ bind_file.records[0],
+ )
+ self.assertEqual(
+ BindRecord(
+ "sub.test.com.", 600, RecordClass.IN, DNSRecordType.A, "4.3.2.1"
+ ),
+ bind_file.records[1],
+ )
+ self.assertEqual(
+ BindRecord(
+ "test.com.",
+ 700,
+ RecordClass.IN,
+ DNSRecordType.AAAA,
+ "2001:db8::1",
+ comment="This is a comment",
+ ),
+ bind_file.records[2],
+ )
+ self.assertEqual(
+ BindRecord(
+ "test.com.", 700, RecordClass.IN, DNSRecordType.TXT, "pkb-client"
+ ),
+ bind_file.records[3],
+ )
+ self.assertEqual(
+ BindRecord(
+ "test.com.",
+ 600,
+ RecordClass.IN,
+ DNSRecordType.MX,
+ "mail.test.com.",
+ prio=10,
+ ),
+ bind_file.records[4],
+ )
+
+ def test_writing_bind_file(self):
+ records = [
+ BindRecord("test.com.", 600, RecordClass.IN, DNSRecordType.A, "1.2.3.4"),
+ BindRecord(
+ "sub.test.com.", 700, RecordClass.IN, DNSRecordType.A, "4.3.2.1"
+ ),
+ BindRecord(
+ "test.com.", 600, RecordClass.IN, DNSRecordType.AAAA, "2001:db8::1"
+ ),
+ BindRecord(
+ "test.com.", 600, RecordClass.IN, DNSRecordType.TXT, "pkb-client"
+ ),
+ BindRecord(
+ "test.com.",
+ 600,
+ RecordClass.IN,
+ DNSRecordType.MX,
+ "mail.test.com.",
+ prio=10,
+ ),
+ ]
+ bind_file = BindFile("test.com.", 1234, records)
+
+ file_content = (
+ "$ORIGIN test.com.\n"
+ "$TTL 1234\n"
+ "test.com. 600 IN A 1.2.3.4\n"
+ "sub.test.com. 700 IN A 4.3.2.1\n"
+ "test.com. 600 IN AAAA 2001:db8::1\n"
+ "test.com. 600 IN TXT pkb-client\n"
+ "test.com. 600 IN MX 10 mail.test.com.\n"
+ )
+
+ with tempfile.NamedTemporaryFile() as f:
+ bind_file.to_file(f.name)
+ with open(f.name) as f2:
+ self.assertEqual(file_content.strip(), f2.read().strip())
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/client.py b/tests/client.py
new file mode 100644
index 0000000..9362be2
--- /dev/null
+++ b/tests/client.py
@@ -0,0 +1,1025 @@
+import json
+import tempfile
+import unittest
+from pathlib import Path
+from urllib.parse import urljoin
+
+import responses
+from responses import matchers
+from responses.registries import OrderedRegistry
+
+from pkb_client.client import (
+ PKBClient,
+ PKBClientException,
+ API_ENDPOINT,
+ DNSRestoreMode,
+)
+from pkb_client.client import SSLCertBundle
+from pkb_client.client.dns import DNSRecord, DNSRecordType
+from pkb_client.client.forwarding import URLForwarding, URLForwardingType
+
+
+class TestClientAuth(unittest.TestCase):
+ @responses.activate
+ def test_valid_auth(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "ping"),
+ json={"status": "SUCCESS", "yourIp": "127.0.0.1"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ ip_address = pkb_client.ping()
+ self.assertEqual("127.0.0.1", ip_address)
+
+ @responses.activate
+ def test_invalid_auth(self):
+ pkb_client = PKBClient("key" + "s", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "ping"),
+ json={"status": "ERROR", "message": "Invalid credentials"},
+ status=401,
+ )
+ with self.assertRaises(PKBClientException):
+ pkb_client.ping()
+
+ @responses.activate
+ def test_ping(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "ping"),
+ json={"status": "SUCCESS", "yourIp": "127.0.0.1"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ ip_address = pkb_client.ping()
+ self.assertEqual("127.0.0.1", ip_address)
+
+ @responses.activate(registry=OrderedRegistry)
+ def test_create_dns_record(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "123456"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": 3600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+ assert "123456" == pkb_client.create_dns_record(
+ "example.com", DNSRecordType.A, "127.0.0.1", "sub.example.com", 3600
+ )
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "234561"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "sub.example.com",
+ "type": "MX",
+ "content": "127.0.0.1",
+ "ttl": 3600,
+ "prio": 2,
+ }
+ )
+ ],
+ )
+ assert "234561" == pkb_client.create_dns_record(
+ "example.com", DNSRecordType.MX, "127.0.0.1", "sub.example.com", 3600, 2
+ )
+
+ def test_create_dns_record_invalid_prio_record_type(self):
+ pkb_client = PKBClient("key", "secret")
+ with self.assertRaises(ValueError):
+ pkb_client.create_dns_record(
+ "example.com", DNSRecordType.A, "127.0.0.1", "sub.example.com", 3600, 2
+ )
+
+ @responses.activate(registry=OrderedRegistry)
+ def test_update_dns_record(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/edit/example.com/123456"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "type": "A",
+ "content": "127.0.0.1",
+ "name": "sub.example.com",
+ "ttl": 3600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+
+ success = pkb_client.update_dns_record(
+ "example.com",
+ "123456",
+ DNSRecordType.A,
+ "127.0.0.1",
+ "sub.example.com",
+ 3600,
+ )
+ self.assertTrue(success)
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/edit/example.com/123456"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "type": "MX",
+ "content": "127.0.0.1",
+ "name": "sub.example.com",
+ "ttl": 3600,
+ "prio": 2,
+ }
+ )
+ ],
+ )
+
+ success = pkb_client.update_dns_record(
+ "example.com",
+ "123456",
+ DNSRecordType.MX,
+ "127.0.0.1",
+ "sub.example.com",
+ 3600,
+ 2,
+ )
+ self.assertTrue(success)
+
+ def test_update_dns_records_invalid_prio_record_type(self):
+ pkb_client = PKBClient("key", "secret")
+ with self.assertRaises(ValueError):
+ pkb_client.update_dns_record(
+ "example.com",
+ "123456",
+ DNSRecordType.A,
+ "127.0.0.1",
+ "sub.example.com",
+ 3600,
+ 2,
+ )
+
+ @responses.activate
+ def test_update_all_dns_records(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/editByNameType/example.com/A/sub"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": 1234,
+ "prio": None,
+ }
+ )
+ ],
+ )
+
+ success = pkb_client.update_all_dns_records(
+ "example.com", DNSRecordType.A, "sub", "127.0.0.1", 1234
+ )
+
+ self.assertTrue(success)
+
+ def test_update_all_dns_records_all_invalid_prio_record_type(self):
+ pkb_client = PKBClient("key", "secret")
+
+ with self.assertRaises(ValueError):
+ pkb_client.update_all_dns_records(
+ "example.com", DNSRecordType.A, "sub", "127.0.0.1", 1234, 2
+ )
+
+ @responses.activate
+ def test_delete_dns_record(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/delete/example.com/123456"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ success = pkb_client.delete_dns_record("example.com", "123456")
+
+ self.assertTrue(success)
+
+ @responses.activate
+ def test_delete_all_dns_records(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/deleteByNameType/example.com/A/sub"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ success = pkb_client.delete_all_dns_records(
+ "example.com", DNSRecordType.A, "sub"
+ )
+
+ self.assertTrue(success)
+
+ @responses.activate
+ def test_get_dns_records(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": "600",
+ "prio": None,
+ "notes": "",
+ },
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ },
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ records = pkb_client.get_dns_records("example.com")
+
+ expected_records = [
+ DNSRecord(
+ "123456", "example.com", DNSRecordType.A, "127.0.0.1", 600, None, ""
+ ),
+ DNSRecord(
+ "1234567",
+ "sub.example.com",
+ DNSRecordType.A,
+ "127.0.0.2",
+ 600,
+ None,
+ "",
+ ),
+ ]
+ self.assertEqual(expected_records, records)
+
+ @responses.activate
+ def test_get_all_dns_records(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieveByNameType/example.com/A/sub"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ }
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ records = pkb_client.get_all_dns_records("example.com", DNSRecordType.A, "sub")
+
+ expected_records = [
+ DNSRecord(
+ "1234567",
+ "sub.example.com",
+ DNSRecordType.A,
+ "127.0.0.2",
+ 600,
+ None,
+ "",
+ )
+ ]
+ self.assertEqual(expected_records, records)
+
+ @responses.activate
+ def test_update_dns_servers(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "domain/updateNs/example.com"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "ns": ["ns1.example.com", "ns2.example.com"],
+ }
+ )
+ ],
+ )
+
+ success = pkb_client.update_dns_servers(
+ "example.com", ["ns1.example.com", "ns2.example.com"]
+ )
+
+ self.assertTrue(success)
+
+ @responses.activate
+ def test_get_url_forwards(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "domain/getUrlForwarding/example.com"),
+ json={
+ "status": "SUCCESS",
+ "forwards": [
+ {
+ "id": "123456",
+ "subdomain": "",
+ "location": "https://example.com",
+ "type": "temporary",
+ "includePath": "no",
+ "wildcard": "yes",
+ },
+ {
+ "id": "234567",
+ "subdomain": "sub1",
+ "location": "https://sub1.example.com",
+ "type": "permanent",
+ "includePath": "no",
+ "wildcard": "yes",
+ },
+ ],
+ },
+ )
+
+ forwards = pkb_client.get_url_forwards("example.com")
+
+ expected_forwards = [
+ URLForwarding(
+ "123456",
+ "",
+ "https://example.com",
+ URLForwardingType.temporary,
+ False,
+ True,
+ ),
+ URLForwarding(
+ "234567",
+ "sub1",
+ "https://sub1.example.com",
+ URLForwardingType.permanent,
+ False,
+ True,
+ ),
+ ]
+
+ self.assertEqual(expected_forwards, forwards)
+
+ @responses.activate
+ def test_create_url_forward(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "domain/addUrlForward/example.com"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "subdomain": "sub.example.com",
+ "location": "https://www.example.com",
+ "type": "permanent",
+ "includePath": False,
+ "wildcard": False,
+ }
+ )
+ ],
+ )
+
+ success = pkb_client.create_url_forward(
+ "example.com",
+ "sub.example.com",
+ "https://www.example.com",
+ URLForwardingType.permanent,
+ False,
+ False,
+ )
+
+ self.assertTrue(success)
+
+ @responses.activate
+ def test_delete_url_forward(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "domain/deleteUrlForward/example.com/123456"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ success = pkb_client.delete_url_forward("example.com", "123456")
+
+ self.assertTrue(success)
+
+ @responses.activate
+ def test_get_domain_pricing(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "pricing/get"),
+ json={
+ "status": "SUCCESS",
+ "pricing": {
+ "com": {
+ "registration": "42.42",
+ "renewal": "4.2",
+ "transfer": "42.2",
+ "coupons": [],
+ },
+ "test": {
+ "registration": "4.42",
+ "renewal": "44.2",
+ "transfer": "4.2",
+ "coupons": [],
+ },
+ },
+ },
+ )
+
+ pricing = pkb_client.get_domain_pricing()
+
+ expected_pricing = {
+ "com": {
+ "registration": "42.42",
+ "renewal": "4.2",
+ "transfer": "42.2",
+ "coupons": [],
+ },
+ "test": {
+ "registration": "4.42",
+ "renewal": "44.2",
+ "transfer": "4.2",
+ "coupons": [],
+ },
+ }
+
+ self.assertEqual(expected_pricing, pricing)
+
+ @responses.activate
+ def test_get_ssl_bundle(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "ssl/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "certificatechain": "----BEGIN CERTIFICATE-----\nabc1-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc2-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc3-----END CERTIFICATE-----\n",
+ "privatekey": "-----BEGIN PRIVATE KEY-----\nabc4-----END PRIVATE KEY-----\n",
+ "publickey": "-----BEGIN PUBLIC KEY-----\nabc5-----END PUBLIC KEY-----\n",
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ ssl_cert_bundle = pkb_client.get_ssl_bundle("example.com")
+
+ expected_ssl_cert_bundle = SSLCertBundle(
+ certificate_chain="----BEGIN CERTIFICATE-----\nabc1-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc2-----END CERTIFICATE-----\n\n----BEGIN CERTIFICATE-----\nabc3-----END CERTIFICATE-----\n",
+ private_key="-----BEGIN PRIVATE KEY-----\nabc4-----END PRIVATE KEY-----\n",
+ public_key="-----BEGIN PUBLIC KEY-----\nabc5-----END PUBLIC KEY-----\n",
+ )
+
+ self.assertEqual(expected_ssl_cert_bundle, ssl_cert_bundle)
+
+ @responses.activate
+ def test_export_dns_records(self):
+ pkb_client = PKBClient("key", "secret")
+
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": "600",
+ "prio": None,
+ "notes": "",
+ },
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": "1200",
+ "prio": None,
+ "notes": "This is a comment",
+ },
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ with tempfile.NamedTemporaryFile() as f:
+ pkb_client.export_dns_records("example.com", f.name)
+
+ with open(f.name, "r") as f:
+ exported_dns_file = json.load(f)
+
+ expected_exported_dns_file = {
+ "123456": {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ },
+ "1234567": {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 1200,
+ "prio": None,
+ "notes": "This is a comment",
+ },
+ }
+
+ self.assertEqual(expected_exported_dns_file, exported_dns_file)
+
+ @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True)
+ def test_import_dns_records_clear(self):
+ pkb_client = PKBClient("key", "secret")
+
+ # first all records should be retrieved
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": "600",
+ "prio": None,
+ "notes": "",
+ },
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ },
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ # then all records should be deleted
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/delete/example.com/123456"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/delete/example.com/1234567"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ # then all records should be imported / created
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "123456"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "",
+ "type": "A",
+ "content": "127.0.0.3",
+ "ttl": 600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "1234567"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "sub",
+ "type": "A",
+ "content": "127.0.0.4",
+ "ttl": 600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ filename = Path(temp_dir, "records.json")
+ with open(filename, "w") as f:
+ json.dump(
+ {
+ "123456": {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.3",
+ "ttl": 600,
+ "prio": None,
+ },
+ "1234567": {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.4",
+ "ttl": 600,
+ "prio": None,
+ },
+ },
+ f,
+ )
+
+ pkb_client.import_dns_records(
+ "example.com", str(filename), DNSRestoreMode.clear
+ )
+
+ @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True)
+ def test_import_dns_records_replace(self):
+ pkb_client = PKBClient("key", "secret")
+
+ # first all records should be retrieved
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": "600",
+ "prio": None,
+ "notes": "",
+ },
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ },
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ # same record should be updated
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/edit/example.com/1234567"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "type": "A",
+ "content": "127.0.0.3",
+ "name": "sub",
+ "ttl": 600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ filename = Path(temp_dir, "records.json")
+ with open(filename, "w") as f:
+ json.dump(
+ {
+ "123451": {
+ "id": "123451",
+ "name": "test.example.com",
+ "type": "A",
+ "content": "127.0.0.4",
+ "ttl": 600,
+ "prio": None,
+ },
+ "1234562": {
+ "id": "1234562",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.3",
+ "ttl": 600,
+ "prio": None,
+ },
+ },
+ f,
+ )
+
+ pkb_client.import_dns_records(
+ "example.com", str(filename), DNSRestoreMode.replace
+ )
+
+ @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True)
+ def test_import_dns_records_keep(self):
+ pkb_client = PKBClient("key", "secret")
+
+ # first all records should be retrieved
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": "600",
+ "prio": None,
+ "notes": "",
+ },
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ },
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+
+ # only new records should be created
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "1234562"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "test",
+ "type": "A",
+ "content": "127.0.0.4",
+ "ttl": 600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ filename = Path(temp_dir, "records.json")
+ with open(filename, "w") as f:
+ json.dump(
+ {
+ "123451": {
+ "id": "123451",
+ "name": "test.example.com",
+ "type": "A",
+ "content": "127.0.0.4",
+ "ttl": 600,
+ "prio": None,
+ },
+ "1234562": {
+ "id": "1234562",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.3",
+ "ttl": 600,
+ "prio": None,
+ },
+ },
+ f,
+ )
+
+ pkb_client.import_dns_records(
+ "example.com", str(filename), DNSRestoreMode.keep
+ )
+
+ @responses.activate(registry=OrderedRegistry, assert_all_requests_are_fired=True)
+ def test_import_bind_dns_records(self):
+ pkb_client = PKBClient("key", "secret")
+
+ # first all records should be retrieved
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/retrieve/example.com"),
+ json={
+ "status": "SUCCESS",
+ "records": [
+ {
+ "id": "123456",
+ "name": "example.com",
+ "type": "A",
+ "content": "127.0.0.1",
+ "ttl": "600",
+ "prio": None,
+ "notes": "",
+ },
+ {
+ "id": "1234567",
+ "name": "sub.example.com",
+ "type": "A",
+ "content": "127.0.0.2",
+ "ttl": 600,
+ "prio": None,
+ "notes": "",
+ },
+ ],
+ },
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ # then all records should be deleted
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/delete/example.com/123456"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/delete/example.com/1234567"),
+ json={"status": "SUCCESS"},
+ match=[
+ matchers.json_params_matcher(
+ {"apikey": "key", "secretapikey": "secret"}
+ )
+ ],
+ )
+ # then all records should be imported / created
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "123456"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "",
+ "type": "A",
+ "content": "127.0.0.3",
+ "ttl": 600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+ responses.post(
+ url=urljoin(API_ENDPOINT, "dns/create/example.com"),
+ json={"status": "SUCCESS", "id": "1234567"},
+ match=[
+ matchers.json_params_matcher(
+ {
+ "apikey": "key",
+ "secretapikey": "secret",
+ "name": "sub",
+ "type": "A",
+ "content": "127.0.0.4",
+ "ttl": 600,
+ "prio": None,
+ }
+ )
+ ],
+ )
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ filename = Path(temp_dir, "records.bind")
+ with open(filename, "w") as f:
+ f.write(
+ (
+ "$ORIGIN example.com.\n"
+ "$TTL 1234\n"
+ "@ IN SOA dns.example.com. dns2.example.com. (100 300 100 6000 600)\n"
+ "example.com. IN 600 A 127.0.0.3\n"
+ "sub.example.com. 600 IN A 127.0.0.4"
+ )
+ )
+
+ pkb_client.import_bind_dns_records(filename, DNSRestoreMode.clear)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/data/__init__.py b/tests/data/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/data/__init__.py
diff --git a/tests/data/test.bind b/tests/data/test.bind
new file mode 100644
index 0000000..b99dcf4
--- /dev/null
+++ b/tests/data/test.bind
@@ -0,0 +1,10 @@
+$ORIGIN test.com.
+$TTL 1234
+@ IN SOA dns.example.com. dns2.example.com. (100 300 100 6000 600)
+test.com. IN 600 A 1.2.3.4
+test.com. HS 600 A 1.2.3.4
+sub.test.com. 600 IN A 4.3.2.1
+@ IN 600 AAAA 2001:db8::1 ; This is a comment
+
+test.com. IN TXT pkb-client
+test.com. 600 IN MX 10 mail.test.com.
diff --git a/tests/data/test_no_ttl.bind b/tests/data/test_no_ttl.bind
new file mode 100644
index 0000000..7dac0ff
--- /dev/null
+++ b/tests/data/test_no_ttl.bind
@@ -0,0 +1,9 @@
+$ORIGIN test.com.
+@ IN SOA dns.example.com. dns2.example.com. (100 300 100 6000 600)
+test.com. IN 600 A 1.2.3.4
+test.com. HS 600 A 1.2.3.4
+sub.test.com. 600 IN A 4.3.2.1
+@ IN 700 AAAA 2001:db8::1 ; This is a comment
+
+test.com. IN TXT pkb-client
+test.com. 600 IN MX 10 mail.test.com.
diff --git a/tests/pkb_client_tests.py b/tests/pkb_client_tests.py
deleted file mode 100644
index 2668a3c..0000000
--- a/tests/pkb_client_tests.py
+++ /dev/null
@@ -1,569 +0,0 @@
-import json
-import os
-import unittest
-from pathlib import Path
-
-import requests
-
-from pkb_client.client import PKBClient, DNSRestoreMode
-
-"""
-WARNING: DO NOT RUN THIS TEST WITH A PRODUCTION DOMAIN OR IN A PRODUCTION ENVIRONMENT!!
- This test sets, edits and deletes dns record entries and if the test fails,
- unintended changes to dns entries may result.
-"""
-
-TEST_DOMAIN = os.environ.get("TEST_DOMAIN")
-PORKBUN_API_KEY = os.environ.get("PORKBUN_API_KEY")
-PORKBUN_API_SECRET = os.environ.get("PORKBUN_API_SECRET")
-DNS_RECORDS = os.environ.get("DNS_RECORDS")
-
-PUBLIC_IP_URL = "https://api64.ipify.org"
-
-
-class DNSTestWithCleanup(unittest.TestCase):
-
- def tearDown(self):
- if hasattr(self, "record_id") and self.record_id is not None:
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- pkb_client.dns_delete(TEST_DOMAIN, self.record_id)
-
-
-class TestClientAuth(unittest.TestCase):
- def test_valid_auth(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- ip_address = pkb_client.ping()
-
- self.assertEqual(ip_address, requests.get(PUBLIC_IP_URL).text)
-
- def test_invalid_api_key(self):
- pkb_client = PKBClient("invalid-api-key", PORKBUN_API_SECRET)
- with self.assertRaises(Exception):
- pkb_client.ping()
-
- def test_invalid_api_secret(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, "invalid-api-secret")
- with self.assertRaises(Exception):
- pkb_client.ping()
-
- def test_invalid_api_key_and_secret(self):
- pkb_client = PKBClient("invalid-api-key", "invalid-api-secret")
- with self.assertRaises(Exception):
- pkb_client.ping()
-
-
-class TestPingMethod(unittest.TestCase):
- def test_ping(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- ip_address = pkb_client.ping()
-
- self.assertEqual(ip_address, requests.get(PUBLIC_IP_URL).text)
-
-
-class TestDNSCreateMethod(DNSTestWithCleanup):
-
- def test_valid_request(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- ttl = 342
- name = "test_pkb_client"
-
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name, ttl=ttl)
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
-
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest():
- self.assertEqual(txt_content, record["content"])
- with self.subTest():
- self.assertEqual(ttl, int(record["ttl"]))
- with self.subTest():
- self.assertEqual("{}.{}".format(name, TEST_DOMAIN), record["name"])
- return
- self.assertTrue(False)
-
- def test_invalid_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- with self.assertRaises(Exception):
- self.record_id = pkb_client.dns_create("notvaliddomain", "TXT", "interesting-content",
- name="test_pkb_client")
-
- def test_invalid_record_type(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- with self.assertRaises(AssertionError):
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "ABC", "interesting-content", name="test_pkb_client")
-
- def test_larger_than_allowed_content_length(self):
- # the api call should not fail because the api creates multiple TXT entries which will be concatenated
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- txt_content = "interesting-content-interesting-content-interesting-content-interesting-content-" \
- "interesting-content-interesting-content-interesting-content-interesting-content-" \
- "interesting-content-interesting-content-interesting-content-interesting-content-" \
- "interesting-content"
- assert len(txt_content) == 259
-
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client")
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- self.assertEqual(txt_content, record["content"])
- return
- self.assertTrue(False)
-
- def test_largest_allowed_content_length(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- txt_content = "interesting-content-interesting-content-interesting-content-interesting-content-" \
- "interesting-content-interesting-content-interesting-content-interesting-content-" \
- "interesting-content-interesting-content-interesting-content-interesting-content-" \
- "interesting-con"
- assert len(txt_content) == 255
-
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client")
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- self.assertEqual(txt_content, record["content"])
- return
- self.assertTrue(False)
-
- def test_empty_content_str(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- txt_content = ""
- assert len(txt_content) == 0
-
- with self.assertRaises(AssertionError):
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client")
-
- def test_none_content(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", None, name="test_pkb_client")
-
- def test_smaller_than_allowed_ttl(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", ttl=299,
- name="test_pkb_client")
-
- def test_negative_ttl(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", ttl=-1,
- name="test_pkb_client")
-
- def test_larger_than_allowed_ttl(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", "interesting-content", name="test_pkb_client",
- ttl=2147483648)
-
- def test_largest_allowed_ttl(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- ttl = 2147483647
-
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", ttl=ttl)
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest():
- self.assertEqual(txt_content, record["content"])
- with self.subTest():
- self.assertEqual(ttl, int(record["ttl"]))
- return
- self.assertTrue(False)
-
- def test_valid_prio_with_txt(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- prio = 10
-
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", prio=prio)
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest():
- self.assertEqual(txt_content, record["content"])
- with self.subTest():
- self.assertEqual(prio, int(record["prio"]))
- return
- self.assertTrue(False)
-
- def test_negative_prio_with_txt(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- prio = -42
-
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name="test_pkb_client", prio=prio)
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest():
- self.assertEqual(txt_content, record["content"])
- with self.subTest():
- self.assertEqual(prio, int(record["prio"]))
- return
- self.assertTrue(False)
-
-
-class TestDNSEditMethod(DNSTestWithCleanup):
- def test_valid_edit_request(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- name = "test_pkb_client"
- tll = 342
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name, ttl=tll)
-
- edited_txt_content = "more-interesting-content"
- edited_name = "more_test_pkb_client"
- edited_tll = 423
- pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content, name=edited_name, ttl=edited_tll)
-
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest("txt record content is not edited"):
- self.assertEqual(edited_txt_content, record["content"])
- with self.subTest("txt record name is not edited"):
- self.assertEqual("{}.{}".format(edited_name, TEST_DOMAIN), record["name"])
- with self.subTest("txt record ttl is not edited"):
- self.assertEqual(edited_tll, int(record["ttl"]))
- return
- self.assertTrue(False)
-
- def test_change_subdomain_to_root_txt_record(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- name = "test_pkb_client"
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name)
-
- edited_txt_content = "more-interesting-content"
- edited_name = ""
- pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content, name=edited_name)
-
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest("txt record content is not edited"):
- self.assertEqual(edited_txt_content, record["content"])
- with self.subTest("txt record name is not edited"):
- self.assertEqual(TEST_DOMAIN, record["name"])
- return
- self.assertTrue(False)
-
- def test_no_name_change(self):
- # the name is required for each edit, otherwise the record will apply for the root domain
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- name = "test_pkb_client"
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name)
-
- edited_txt_content = "more-interesting-content"
- pkb_client.dns_edit(TEST_DOMAIN, self.record_id, "TXT", edited_txt_content)
-
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest("txt record content is not edited"):
- self.assertEqual(edited_txt_content, record["content"])
- with self.subTest("txt record name is not edited"):
- self.assertEqual(TEST_DOMAIN, record["name"])
- return
- self.assertTrue(False)
-
- def test_record_type_change(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- name = "test_pkb_client"
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name)
-
- edited_txt_content = "more-interesting-content"
- name = "test_pkb_client"
- edited_record_type = "MX"
- pkb_client.dns_edit(TEST_DOMAIN, self.record_id, edited_record_type, edited_txt_content, name=name)
-
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in records:
- if record["id"] == self.record_id:
- with self.subTest("txt record content is not edited"):
- self.assertEqual(edited_txt_content, record["content"])
- with self.subTest("record type is not edited"):
- self.assertEqual(edited_record_type, record["type"])
- return
- self.assertTrue(False)
-
-
-class TestDNSDeleteMethod(DNSTestWithCleanup):
- def test_valid_delete_request(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- txt_content = "interesting-content"
- name = "test_pkb_client"
- self.record_id = pkb_client.dns_create(TEST_DOMAIN, "TXT", txt_content, name=name)
-
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- record_exists = False
- for record in records:
- if record["id"] == self.record_id:
- record_exists = True
- break
- with self.subTest("test txt record setup failed"):
- self.assertTrue(record_exists)
-
- pkb_client.dns_delete(TEST_DOMAIN, self.record_id)
-
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- record_exists = False
- for record in records:
- if record["id"] == self.record_id:
- record_exists = True
- break
- if not record_exists:
- self.record_id = None
- with self.subTest("txt record is not deleted"):
- self.assertFalse(record_exists)
-
-
-class TestDNSReceiveMethod(unittest.TestCase):
- def test_valid_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- records = pkb_client.dns_retrieve(TEST_DOMAIN)
- self.assertEqual(records, DNS_RECORDS)
-
- def test_invalid_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
- with self.assertRaises(Exception):
- pkb_client.dns_retrieve("invaliddomain")
-
-
-class TestDNSExport(unittest.TestCase):
- def test_valid_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- dns_records = pkb_client.dns_retrieve(domain=TEST_DOMAIN)
- # reformat the dns records to a single dict
- dns_records_dict = dict()
- for record in dns_records:
- dns_records_dict[record["id"]] = record
-
- filepath = Path("dns_backup.json")
- if filepath.exists():
- filepath.unlink()
- pkb_client.dns_export(domain=TEST_DOMAIN, filename=str(filepath))
-
- with open(str("dns_backup.json"), "r") as f:
- self.assertEqual(json.load(f), dns_records_dict)
-
- filepath.unlink()
-
- def test_invalid_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(Exception):
- pkb_client.dns_export(domain="invaliddomain", filename="dns_backup.json")
-
- def test_empty_str_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_export(domain="", filename="dns_backup.json")
-
- def test_none_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_export(domain=None, filename="dns_backup.json")
-
- def test_filename_already_exists(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- filepath = Path("dns_backup.json")
- filepath.touch()
-
- with self.assertRaises(Exception):
- pkb_client.dns_export(domain=TEST_DOMAIN, filename=str(filepath))
-
- filepath.unlink()
-
- def test_empty_str_filename(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_export(domain=TEST_DOMAIN, filename="")
-
- def test_none_filename(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_export(domain=TEST_DOMAIN, filename=None)
-
-
-class TestDNSImport(unittest.TestCase):
- def test_valid_clear_import(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- existing_dns_record_ids = set()
- dns_records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in dns_records:
- existing_dns_record_ids.add(record["id"])
-
- filename = "dns_backup_clear.json"
-
- with open(filename, "r") as f:
- file_dns_records = json.load(f)
-
- pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.clear)
-
- new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for new_record in new_dns_records:
- # test if the previous dns records still exists
- if new_record["id"] in existing_dns_record_ids:
- self.assertTrue(False)
- # test if the new dns record was created
- new_record_created = False
- for _, file_dns_record in file_dns_records.items():
- if file_dns_record["name"] == new_record["name"] \
- and file_dns_record["type"] == new_record["type"] \
- and file_dns_record["content"] == new_record["content"] \
- and file_dns_record["ttl"] == new_record["ttl"] \
- and file_dns_record["prio"] == new_record["prio"]:
- new_record_created = True
- self.assertTrue(new_record_created)
-
- def test_valid_replace_import(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- existing_dns_record_ids = set()
- dns_records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in dns_records:
- existing_dns_record_ids.add(record["id"])
-
- filename = "dns_backup_replace.json"
-
- with open(filename, "r") as f:
- file_dns_records = json.load(f)
-
- pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.replace)
-
- new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for new_record in new_dns_records:
- # test if the previous dns records still exists
- if new_record["id"] not in existing_dns_record_ids:
- self.assertTrue(False)
- # test if the dns record was edited
- record_edited = False
- for _, file_dns_record in file_dns_records.items():
- if file_dns_record["name"] == new_record["name"] \
- and file_dns_record["type"] == new_record["type"] \
- and file_dns_record["content"] == new_record["content"] \
- and file_dns_record["ttl"] == new_record["ttl"] \
- and file_dns_record["prio"] == new_record["prio"]:
- record_edited = True
- break
- self.assertTrue(record_edited)
-
- def test_valid_keep_import(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- existing_dns_records = dict()
- dns_records = pkb_client.dns_retrieve(TEST_DOMAIN)
- for record in dns_records:
- existing_dns_records[record["id"]] = record
-
- filename = "dns_backup_keep.json"
-
- with open(filename, "r") as f:
- file_dns_records = json.load(f)
-
- pkb_client.dns_import(domain=TEST_DOMAIN, filename=filename, restore_mode=DNSRestoreMode.keep)
-
- new_dns_records = pkb_client.dns_retrieve(TEST_DOMAIN)
-
- # test if the all old dns records are kept
- for _, existing_record in existing_dns_records.items():
- record_kept = False
- for new_record in new_dns_records:
- if existing_record["id"] == new_record["id"] \
- and existing_record["name"] == new_record["name"] \
- and existing_record["type"] == new_record["type"] \
- and existing_record["content"] == new_record["content"] \
- and existing_record["ttl"] == new_record["ttl"] \
- and existing_record["prio"] == new_record["prio"]:
- record_kept = True
- break
- with self.subTest():
- self.assertTrue(record_kept)
-
- # test if the new records are created
- for new_record in new_dns_records:
- if new_record["id"] not in existing_dns_records:
- record_created = False
- for _, file_dns_record in file_dns_records.items():
- if file_dns_record["name"] == new_record["name"] \
- and file_dns_record["type"] == new_record["type"] \
- and file_dns_record["content"] == new_record["content"] \
- and file_dns_record["ttl"] == new_record["ttl"] \
- and file_dns_record["prio"] == new_record["prio"]:
- record_created = True
- break
- with self.subTest():
- self.assertTrue(record_created)
-
- def test_invalid_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(Exception):
- pkb_client.dns_import(domain="invaliddomain", filename="dns_backup.json", restore_mode=DNSRestoreMode.clear)
-
- def test_empty_str_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain="", filename="dns_backup.json", restore_mode=DNSRestoreMode.clear)
-
- def test_none_domain(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain=None, filename="dns_backup.json", restore_mode=DNSRestoreMode.clear)
-
- def test_empty_str_filename(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain=TEST_DOMAIN, filename="", restore_mode=DNSRestoreMode.clear)
-
- def test_none_filename(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain=TEST_DOMAIN, filename=None, restore_mode=DNSRestoreMode.clear)
-
- def test_invalid_restore_mode(self):
- pkb_client = PKBClient(PORKBUN_API_KEY, PORKBUN_API_SECRET)
-
- with self.subTest("None as restore mode"):
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode=None)
- with self.subTest("empty string as restore mode"):
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode="")
- with self.subTest("number as restore mode"):
- with self.assertRaises(AssertionError):
- pkb_client.dns_import(domain=TEST_DOMAIN, filename="dns_backup.json", restore_mode=0)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/third-party-notices b/third-party-notices
deleted file mode 100644
index 85cdfbc..0000000
--- a/third-party-notices
+++ /dev/null
@@ -1,216 +0,0 @@
-This project uses other Python modules released under different license agreement than
-pkb_client. Below are the used modules and their license and notice:
-
-###########################################################################################
-## requests: ##
-
-License:
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
-Notice:
-
-Requests
-Copyright 2019 Kenneth Reitz
-
-###########################################################################################
-
-###########################################################################################
-## setuptools: ##
-
-License:
-
-Copyright Jason R. Coombs
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to
-deal in the Software without restriction, including without limitation the
-rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
-sell copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
-FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-IN THE SOFTWARE.
-
-########################################################################################### \ No newline at end of file