Add CI (#4)
All checks were successful
CI / ci (push) Successful in 1m3s

Reviewed-on: #4
Co-authored-by: Aloïs Micard <a.micard@vold.lu>
Co-committed-by: Aloïs Micard <a.micard@vold.lu>
This commit is contained in:
Aloïs Micard 2025-02-19 09:06:33 +01:00 committed by Aloïs Micard
parent 393d8c6f67
commit 5cba5f88da
3 changed files with 138 additions and 32 deletions

View file

@ -0,0 +1,17 @@
name: CI
on: [ push ]
jobs:
ci:
runs-on: docker
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.13'
cache: 'pip'
- run: pip install -r requirements.txt
- run: pylint --max-line-length=120 app

View file

@ -1,18 +1,25 @@
# pylint: disable=wrong-import-order
"""
pfsense-netbox-sync: Allows to synchronize NetBox IPAM DNS information to a pfSense instance
"""
import os import os
from typing import List
import pynetbox import pynetbox
import requests import requests
import sys
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
def main(): def fetch_netbox_host_overrides(nb_api: pynetbox.api) -> dict:
# Instantiate connection to the Netbox API """
nb_api = pynetbox.api( Fetch and build a list of host override from a NetBox instance
url=os.environ['NB_API_URL'], :param nb_api: the NetBox API client
token=os.environ['NB_API_TOKEN'], :return: the list of host overrides mapped by their hostname
) """
# First, built the host overrides using Netbox as source
nb_host_overrides = {} nb_host_overrides = {}
for nb_ip_address in nb_api.ipam.ip_addresses.all(): for nb_ip_address in nb_api.ipam.ip_addresses.all():
if nb_ip_address.dns_name is None or nb_ip_address.dns_name == '': if nb_ip_address.dns_name is None or nb_ip_address.dns_name == '':
@ -30,16 +37,25 @@ def main():
'aliases': None, 'aliases': None,
} }
# Then fetch the actual host overrides from pfSense API return nb_host_overrides
def fetch_pfsense_host_overrides() -> dict:
"""
Fetch and build a list of host override from a pfSense instance
:return: the list of host overrides mapped by their hostname
"""
r = requests.get( r = requests.get(
f'{os.environ["PF_API_URL"]}/api/v2/services/dns_resolver/host_overrides', f'{os.environ["PF_API_URL"]}/api/v2/services/dns_resolver/host_overrides',
auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']), auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']),
verify=False, verify=False,
timeout=int(os.environ.get('HTTP_TIMEOUT', '5')),
) )
if r.status_code != 200: if r.status_code != 200:
print(f'Error while requesting host overrides from pfSense ({r.status_code})') print(f'Error while requesting host overrides from pfSense ({r.status_code})')
exit(1) sys.exit(1)
pf_host_overrides = {} pf_host_overrides = {}
for pf_host_override in r.json()['data']: for pf_host_override in r.json()['data']:
@ -49,32 +65,44 @@ def main():
pf_host_overrides[pf_host_override['host'] + '.' + pf_host_override['domain']] = pf_host_override pf_host_overrides[pf_host_override['host'] + '.' + pf_host_override['domain']] = pf_host_override
return pf_host_overrides
def compute_host_overrides_changes(
netbox_host_overrides: dict,
pfsense_host_overrides: dict,
) -> (List[dict], List[dict], List[dict]):
"""
Compute the changes between the host overrides from NetBox (source of truth) and pfSense (real)
:param netbox_host_overrides: the source host overrides from NetBox
:param pfsense_host_overrides: the source host overrides from pfSense
:return: the changes
"""
new_host_overrides = [] new_host_overrides = []
changed_host_overrides = [] changed_host_overrides = []
deleted_host_overrides = [] deleted_host_overrides = []
for (host, nb_host_override) in nb_host_overrides.items(): for (host, nb_host_override) in netbox_host_overrides.items():
if host not in pf_host_overrides: if host not in pfsense_host_overrides:
new_host_overrides.append(nb_host_override) new_host_overrides.append(nb_host_override)
elif nb_host_override['ip'] != pf_host_overrides[host]['ip']: elif nb_host_override['ip'] != pfsense_host_overrides[host]['ip']:
changed_host_overrides.append(nb_host_override) changed_host_overrides.append(nb_host_override)
for (host, pf_host_override) in pf_host_overrides.items(): for (host, pf_host_override) in pfsense_host_overrides.items():
if host not in nb_host_overrides: if host not in netbox_host_overrides:
deleted_host_overrides.append(pf_host_override) deleted_host_overrides.append(pf_host_override)
print(f'{len(new_host_overrides)} new host overrides') return new_host_overrides, changed_host_overrides, deleted_host_overrides
print(f'{len(changed_host_overrides)} changed host overrides')
print(f'{len(deleted_host_overrides)} deleted host overrides')
if len(new_host_overrides) == 0 and len(changed_host_overrides) == 0 and len(deleted_host_overrides) == 0:
print('no changes detected.')
exit(0)
print() def process_new_host_overrides(host_overrides: List[dict]):
"""
Process the new host overrides. This will create them into the pfSense instance
:param host_overrides: the new host overrides
"""
# First process the new host overrides for host_override in host_overrides:
for host_override in new_host_overrides:
print(f'[+] {host_override["host"]}.{host_override["domain"]} {host_override["ip"]}') print(f'[+] {host_override["host"]}.{host_override["domain"]} {host_override["ip"]}')
r = requests.post( r = requests.post(
@ -82,14 +110,22 @@ def main():
auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']), auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']),
verify=False, verify=False,
json=host_override, json=host_override,
timeout=int(os.environ.get('HTTP_TIMEOUT', '5')),
) )
if r.status_code != 200: if r.status_code != 200:
print(f'Error while creating host override ({r.status_code})') print(f'Error while creating host override ({r.status_code})')
exit(1) sys.exit(1)
# Then process the changed host overrides
for host_override in changed_host_overrides: def process_changed_host_overrides(pf_host_overrides: dict, host_overrides: List[dict]):
"""
Process the changed host overrides. This will update them into the pfSense instance
:param pf_host_overrides: the actual host overrides coming from the pfSense instance
:param host_overrides: the changed host overrides
"""
for host_override in host_overrides:
pf_host_override = pf_host_overrides[host_override['host'] + '.' + host_override['domain']] pf_host_override = pf_host_overrides[host_override['host'] + '.' + host_override['domain']]
print( print(
@ -103,36 +139,88 @@ def main():
auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']), auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']),
verify=False, verify=False,
json=host_override, json=host_override,
timeout=int(os.environ.get('HTTP_TIMEOUT', '5')),
) )
if r.status_code != 200: if r.status_code != 200:
print(f'Error while updating host override ({r.status_code})') print(f'Error while updating host override ({r.status_code})')
exit(1) sys.exit(1)
# Finally process the deleted host overrides
for host_override in deleted_host_overrides: def process_deleted_host_overrides(host_overrides: List[dict]):
"""
Process the deleted host overrides. This will delete them from the pfSense instance
:param host_overrides: the deleted host overrides
"""
for host_override in host_overrides:
print(f'[-] {host_override["host"]}.{host_override["domain"]} {host_override["ip"]}') print(f'[-] {host_override["host"]}.{host_override["domain"]} {host_override["ip"]}')
r = requests.delete( r = requests.delete(
f'{os.environ["PF_API_URL"]}/api/v2/services/dns_resolver/host_override?id={host_override["id"]}', f'{os.environ["PF_API_URL"]}/api/v2/services/dns_resolver/host_override?id={host_override["id"]}',
auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']), auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']),
verify=False, verify=False,
timeout=int(os.environ.get('HTTP_TIMEOUT', '5')),
) )
if r.status_code != 200: if r.status_code != 200:
print(f'Error while deleting host override ({r.status_code})') print(f'Error while deleting host override ({r.status_code})')
exit(1) sys.exit(1)
def main():
"""
pfsense-netbox-sync main entrypoint
"""
# Instantiate connection to the Netbox API
nb_api = pynetbox.api(
url=os.environ['NB_API_URL'],
token=os.environ['NB_API_TOKEN'],
)
# First, built the host overrides using Netbox as source
nb_host_overrides = fetch_netbox_host_overrides(nb_api)
# Then fetch the actual host overrides from pfSense API
pf_host_overrides = fetch_pfsense_host_overrides()
# Compute the changes
(new_host_overrides, changed_host_overrides, deleted_host_overrides) = compute_host_overrides_changes(
nb_host_overrides,
pf_host_overrides,
)
print(f'{len(new_host_overrides)} new host overrides')
print(f'{len(changed_host_overrides)} changed host overrides')
print(f'{len(deleted_host_overrides)} deleted host overrides')
if len(new_host_overrides) == 0 and len(changed_host_overrides) == 0 and len(deleted_host_overrides) == 0:
print('no changes detected.')
sys.exit(0)
print()
# First process the new host overrides
process_new_host_overrides(new_host_overrides)
# Then process the changed host overrides
process_changed_host_overrides(pf_host_overrides, changed_host_overrides)
# Finally process the deleted host overrides
process_deleted_host_overrides(deleted_host_overrides)
# Finally restart the DNS resolver # Finally restart the DNS resolver
r = requests.post( r = requests.post(
f'{os.environ["PF_API_URL"]}/api/v2/services/dns_resolver/apply', f'{os.environ["PF_API_URL"]}/api/v2/services/dns_resolver/apply',
auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']), auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']),
verify=False, verify=False,
timeout=int(os.environ.get('HTTP_TIMEOUT', '5')),
) )
if r.status_code != 200: if r.status_code != 200:
print(f'Error while restarting DNS resolver ({r.status_code})') print(f'Error while restarting DNS resolver ({r.status_code})')
exit(1) sys.exit(1)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -1,2 +1,3 @@
pynetbox==7.4.1 pynetbox==7.4.1
requests==2.32.3 requests==2.32.3
pylint==3.3.4