diff --git a/.forgejo/workflows/ci.yaml b/.forgejo/workflows/ci.yaml index 8159102..5d335f3 100644 --- a/.forgejo/workflows/ci.yaml +++ b/.forgejo/workflows/ci.yaml @@ -14,4 +14,4 @@ jobs: - run: pip install -r requirements.txt - - run: pylint app \ No newline at end of file + - run: pylint --max-line-length=120 app \ No newline at end of file diff --git a/app/__main__.py b/app/__main__.py index b56a111..c7737ef 100644 --- a/app/__main__.py +++ b/app/__main__.py @@ -1,18 +1,25 @@ +# pylint: disable=wrong-import-order + +""" +pfsense-netbox-sync: Allows to synchronize NetBox IPAM DNS information to a pfSense instance +""" + import os +from typing import List import pynetbox import requests +import sys from requests.auth import HTTPBasicAuth -def main(): - # Instantiate connection to the Netbox API - nb_api = pynetbox.api( - url=os.environ['NB_API_URL'], - token=os.environ['NB_API_TOKEN'], - ) +def fetch_netbox_host_overrides(nb_api: pynetbox.api) -> dict: + """ + Fetch and build a list of host override from a NetBox instance + :param nb_api: the NetBox API client + :return: the list of host overrides mapped by their hostname + """ - # First, built the host overrides using Netbox as source nb_host_overrides = {} for nb_ip_address in nb_api.ipam.ip_addresses.all(): if nb_ip_address.dns_name is None or nb_ip_address.dns_name == '': @@ -30,16 +37,25 @@ def main(): 'aliases': None, } - # Then fetch the actual host overrides from pfSense API + return nb_host_overrides + + +def fetch_pfsense_host_overrides() -> dict: + """ + Fetch and build a list of host override from a pfSense instance + :return: the list of host overrides mapped by their hostname + """ + r = requests.get( f'{os.environ["PF_API_URL"]}/api/v2/services/dns_resolver/host_overrides', auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']), verify=False, + timeout=int(os.environ.get('HTTP_TIMEOUT', '5')), ) if r.status_code != 200: print(f'Error while requesting host overrides from pfSense ({r.status_code})') - exit(1) + sys.exit(1) pf_host_overrides = {} for pf_host_override in r.json()['data']: @@ -49,32 +65,44 @@ def main(): pf_host_overrides[pf_host_override['host'] + '.' + pf_host_override['domain']] = pf_host_override + return pf_host_overrides + + +def compute_host_overrides_changes( + netbox_host_overrides: dict, + pfsense_host_overrides: dict, +) -> (List[dict], List[dict], List[dict]): + """ + Compute the changes between the host overrides from NetBox (source of truth) and pfSense (real) + :param netbox_host_overrides: the source host overrides from NetBox + :param pfsense_host_overrides: the source host overrides from pfSense + :return: the changes + """ + new_host_overrides = [] changed_host_overrides = [] deleted_host_overrides = [] - for (host, nb_host_override) in nb_host_overrides.items(): - if host not in pf_host_overrides: + for (host, nb_host_override) in netbox_host_overrides.items(): + if host not in pfsense_host_overrides: new_host_overrides.append(nb_host_override) - elif nb_host_override['ip'] != pf_host_overrides[host]['ip']: + elif nb_host_override['ip'] != pfsense_host_overrides[host]['ip']: changed_host_overrides.append(nb_host_override) - for (host, pf_host_override) in pf_host_overrides.items(): - if host not in nb_host_overrides: + for (host, pf_host_override) in pfsense_host_overrides.items(): + if host not in netbox_host_overrides: deleted_host_overrides.append(pf_host_override) - print(f'{len(new_host_overrides)} new host overrides') - print(f'{len(changed_host_overrides)} changed host overrides') - print(f'{len(deleted_host_overrides)} deleted host overrides') + return new_host_overrides, changed_host_overrides, deleted_host_overrides - if len(new_host_overrides) == 0 and len(changed_host_overrides) == 0 and len(deleted_host_overrides) == 0: - print('no changes detected.') - exit(0) - print() +def process_new_host_overrides(host_overrides: List[dict]): + """ + Process the new host overrides. This will create them into the pfSense instance + :param host_overrides: the new host overrides + """ - # First process the new host overrides - for host_override in new_host_overrides: + for host_override in host_overrides: print(f'[+] {host_override["host"]}.{host_override["domain"]} {host_override["ip"]}') r = requests.post( @@ -82,14 +110,22 @@ def main(): auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']), verify=False, json=host_override, + timeout=int(os.environ.get('HTTP_TIMEOUT', '5')), ) if r.status_code != 200: print(f'Error while creating host override ({r.status_code})') - exit(1) + sys.exit(1) - # Then process the changed host overrides - for host_override in changed_host_overrides: + +def process_changed_host_overrides(pf_host_overrides: dict, host_overrides: List[dict]): + """ + Process the changed host overrides. This will update them into the pfSense instance + :param pf_host_overrides: the actual host overrides coming from the pfSense instance + :param host_overrides: the changed host overrides + """ + + for host_override in host_overrides: pf_host_override = pf_host_overrides[host_override['host'] + '.' + host_override['domain']] print( @@ -103,36 +139,88 @@ def main(): auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']), verify=False, json=host_override, + timeout=int(os.environ.get('HTTP_TIMEOUT', '5')), ) if r.status_code != 200: print(f'Error while updating host override ({r.status_code})') - exit(1) + sys.exit(1) - # Finally process the deleted host overrides - for host_override in deleted_host_overrides: + +def process_deleted_host_overrides(host_overrides: List[dict]): + """ + Process the deleted host overrides. This will delete them from the pfSense instance + :param host_overrides: the deleted host overrides + """ + + for host_override in host_overrides: print(f'[-] {host_override["host"]}.{host_override["domain"]} {host_override["ip"]}') r = requests.delete( f'{os.environ["PF_API_URL"]}/api/v2/services/dns_resolver/host_override?id={host_override["id"]}', auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']), verify=False, + timeout=int(os.environ.get('HTTP_TIMEOUT', '5')), ) if r.status_code != 200: print(f'Error while deleting host override ({r.status_code})') - exit(1) + sys.exit(1) + + +def main(): + """ + pfsense-netbox-sync main entrypoint + """ + + # Instantiate connection to the Netbox API + nb_api = pynetbox.api( + url=os.environ['NB_API_URL'], + token=os.environ['NB_API_TOKEN'], + ) + + # First, built the host overrides using Netbox as source + nb_host_overrides = fetch_netbox_host_overrides(nb_api) + + # Then fetch the actual host overrides from pfSense API + pf_host_overrides = fetch_pfsense_host_overrides() + + # Compute the changes + (new_host_overrides, changed_host_overrides, deleted_host_overrides) = compute_host_overrides_changes( + nb_host_overrides, + pf_host_overrides, + ) + + print(f'{len(new_host_overrides)} new host overrides') + print(f'{len(changed_host_overrides)} changed host overrides') + print(f'{len(deleted_host_overrides)} deleted host overrides') + + if len(new_host_overrides) == 0 and len(changed_host_overrides) == 0 and len(deleted_host_overrides) == 0: + print('no changes detected.') + sys.exit(0) + + print() + + # First process the new host overrides + process_new_host_overrides(new_host_overrides) + + # Then process the changed host overrides + process_changed_host_overrides(pf_host_overrides, changed_host_overrides) + + # Finally process the deleted host overrides + process_deleted_host_overrides(deleted_host_overrides) # Finally restart the DNS resolver r = requests.post( f'{os.environ["PF_API_URL"]}/api/v2/services/dns_resolver/apply', auth=HTTPBasicAuth(os.environ['PF_API_USER'], os.environ['PF_API_PASS']), verify=False, + timeout=int(os.environ.get('HTTP_TIMEOUT', '5')), ) if r.status_code != 200: print(f'Error while restarting DNS resolver ({r.status_code})') - exit(1) + sys.exit(1) if __name__ == '__main__':