netbox-pve-sync/app/__main__.py

448 lines
16 KiB
Python

import os
import sys
from typing import Optional
import pynetbox
from proxmoxer import ProxmoxAPI
def _load_nb_objects(_nb_api: pynetbox.api) -> dict:
_nb_objects = {
'devices': {},
'virtual_machines': {},
'virtual_machines_interfaces': {},
'mac_addresses': {},
'prefixes': {},
'ip_addresses': {},
'vlans': {},
'disks': {},
'tags': {},
}
# Load NetBox devices
for _nb_device in _nb_api.dcim.devices.all():
_nb_objects['devices'][_nb_device.name.lower()] = _nb_device
# Load NetBox virtual machines
for _nb_virtual_machine in _nb_api.virtualization.virtual_machines.all():
_nb_objects['virtual_machines'][_nb_virtual_machine.serial] = _nb_virtual_machine
# Load NetBox interfaces
for _nb_interface in _nb_api.virtualization.interfaces.all():
if _nb_interface.virtual_machine.id not in _nb_objects['virtual_machines_interfaces']:
_nb_objects['virtual_machines_interfaces'][_nb_interface.virtual_machine.id] = {}
_nb_objects['virtual_machines_interfaces'][_nb_interface.virtual_machine.id][_nb_interface.name] = _nb_interface
# Load NetBox mac addresses
for _nb_mac_address in _nb_api.dcim.mac_addresses.all():
_nb_objects['mac_addresses'][_nb_mac_address.mac_address] = _nb_mac_address
# Load NetBox IP ranges
for _nb_prefix in _nb_api.ipam.prefixes.all():
_nb_objects['prefixes'][_nb_prefix.prefix] = _nb_prefix
# Load NetBox IP addresses
for _nb_ip_address in _nb_api.ipam.ip_addresses.all():
_nb_objects['ip_addresses'][_nb_ip_address['address']] = _nb_ip_address
# Load NetBox vLANs
for _nb_vlan in _nb_api.ipam.vlans.all():
_nb_objects['vlans'][str(_nb_vlan.vid)] = _nb_vlan
# Load NetBox disks
for _nb_disk in _nb_api.virtualization.virtual_disks.all():
if _nb_disk.virtual_machine.id not in _nb_objects['disks']:
_nb_objects['disks'][_nb_disk.virtual_machine.id] = {}
_nb_objects['disks'][_nb_disk.virtual_machine.id][_nb_disk.name] = _nb_disk
# Load NetBox tags
for _nb_tag in _nb_api.extras.tags.all():
_nb_objects['tags'][_nb_tag.name] = _nb_tag
return _nb_objects
def _process_pve_tags(
_pve_api: ProxmoxAPI,
_nb_api: pynetbox.api,
_nb_objects: dict,
) -> dict:
# TODO: First tags
# Then pool (we treat them as tags)
for _pve_pool in _pve_api.pools.get():
_tag_name = f'Pool/{_pve_pool["poolid"]}'
_nb_tag = _nb_objects['tags'].get(_tag_name)
if _nb_tag is None:
_nb_tag = _nb_api.extras.tags.create(
name=_tag_name,
slug=f'pool-{_pve_pool["poolid"]}'.lower(),
description=f'Proxmox pool {_pve_pool["poolid"]}',
)
return _nb_objects
def _process_pve_virtual_machine(
_pve_api: ProxmoxAPI,
_nb_api: pynetbox.api,
_nb_objects: dict,
_nb_device: any,
_pve_tags: [str],
_pve_virtual_machine: dict
) -> dict:
_pve_node_name = _nb_device.name.lower()
pve_virtual_machine_config = _pve_api.nodes(_pve_node_name).qemu(_pve_virtual_machine['vmid']).config.get()
try:
pve_virtual_machine_agent_interfaces = _pve_api \
.nodes(_pve_node_name) \
.qemu(_pve_virtual_machine['vmid']) \
.agent('network-get-interfaces') \
.get()
except Exception:
pve_virtual_machine_agent_interfaces = {'result': []}
# Extract IP addresses from QEMU
pve_virtual_machine_ip_addresses = {}
for result in pve_virtual_machine_agent_interfaces['result']:
pve_virtual_machine_ip_addresses[result['name']] = result['ip-addresses']
# Create the virtual machine if it exists, update it otherwise
nb_virtual_machine = _nb_objects['virtual_machines'].get(str(_pve_virtual_machine['vmid']))
if nb_virtual_machine is None:
nb_virtual_machine = _nb_api.virtualization.virtual_machines.create(
serial=_pve_virtual_machine['vmid'],
name=_pve_virtual_machine['name'],
site=_nb_device.site.id,
cluster=1, # TODO
device=_nb_device.id,
vcpus=pve_virtual_machine_config['cores'],
memory=int(pve_virtual_machine_config['memory']),
status='active' if _pve_virtual_machine['status'] == 'running' else 'offline',
tags=list(map(lambda _pve_tag_name: _nb_objects['tags'][_pve_tag_name], _pve_tags)),
custom_fields={
'autostart': pve_virtual_machine_config.get('onboot') == 1,
}
)
else:
nb_virtual_machine.name = _pve_virtual_machine['name']
nb_virtual_machine.site = _nb_device.site.id
nb_virtual_machine.cluster = 1
nb_virtual_machine.device = _nb_device.id
nb_virtual_machine.vcpus = pve_virtual_machine_config['cores']
nb_virtual_machine.memory = int(pve_virtual_machine_config['memory'])
nb_virtual_machine.status = 'active' if _pve_virtual_machine['status'] == 'running' else 'offline'
nb_virtual_machine.tags = list(map(lambda _pve_tag_name: _nb_objects['tags'][_pve_tag_name], _pve_tags))
nb_virtual_machine.custom_fields['autostart'] = pve_virtual_machine_config.get('onboot') == 1
nb_virtual_machine.save()
# Handle the VM network interfaces
_process_pve_virtual_machine_network_interfaces(
_nb_api,
_nb_objects,
pve_virtual_machine_config,
nb_virtual_machine,
pve_virtual_machine_ip_addresses,
)
# Handle the VM disks
_process_pve_virtual_machine_disks(
_nb_api,
_nb_objects,
pve_virtual_machine_config,
nb_virtual_machine,
)
return _nb_objects
def _process_pve_virtual_machine_network_interfaces(
_nb_api: pynetbox.api,
_nb_objects: dict,
_pve_virtual_machine_config: dict,
_nb_virtual_machine: any,
_pve_virtual_machine_ip_addresses: dict,
) -> dict:
# Handle the VM network interfaces
for (_config_key, _config_value) in _pve_virtual_machine_config.items():
if not _config_key.startswith('net'):
continue
_network_definition = _parse_pve_network_definition(_config_value)
# Determinate MAC address
network_mac_address = None
for _model in ['virtio', 'e1000']:
if _model in _network_definition:
network_mac_address = _network_definition[_model]
break
if network_mac_address is None:
continue
_process_pve_virtual_machine_network_interface(
_nb_api,
_nb_objects,
_nb_virtual_machine,
_config_key,
network_mac_address,
_network_definition.get('tag'),
_pve_virtual_machine_ip_addresses,
)
return _nb_objects
def _process_pve_virtual_machine_network_interface(
_nb_api: pynetbox.api,
_nb_objects: dict,
_nb_virtual_machine: any,
_interface_name: str,
_interface_mac_address: str,
_interface_vlan_id: Optional[int],
_pve_virtual_machine_ip_addresses: dict,
) -> dict:
nb_virtual_machines_interface = _nb_objects['virtual_machines_interfaces'] \
.get(_nb_virtual_machine.id, {}) \
.get(_interface_name)
if nb_virtual_machines_interface is None:
nb_virtual_machines_interface = _nb_api.virtualization.interfaces.create(
virtual_machine=_nb_virtual_machine.id,
name=_interface_name,
description=_interface_mac_address,
)
if _nb_virtual_machine.id not in _nb_objects['virtual_machines_interfaces']:
_nb_objects['virtual_machines_interfaces'][_nb_virtual_machine.id] = {}
_nb_objects['virtual_machines_interfaces'][_nb_virtual_machine.id][
_interface_name] = nb_virtual_machines_interface
# Create the MAC address and link it to the VM
nb_mac_address = _nb_objects['mac_addresses'].get(_interface_mac_address)
if nb_mac_address is None:
nb_mac_address = _nb_api.dcim.mac_addresses.create(
mac_address=_interface_mac_address,
assigned_object_type='virtualization.vminterface',
assigned_object_id=nb_virtual_machines_interface.id,
)
_nb_objects['mac_addresses'][_interface_mac_address] = nb_mac_address
nb_virtual_machines_interface.primary_mac_address = nb_mac_address.id
nb_virtual_machines_interface.save()
# TODO: Improve Multiple IP address handling
_pve_virtual_machine_ip_address = None
for raw_interface_name in ['ens18', 'ens19']:
if raw_interface_name in _pve_virtual_machine_ip_addresses:
_pve_virtual_machine_ip_address = _pve_virtual_machine_ip_addresses[raw_interface_name][0]
break
if _pve_virtual_machine_ip_address is not None:
_virtual_machine_address = _pve_virtual_machine_ip_address['ip-address']
_virtual_machine_address_mask = _pve_virtual_machine_ip_address['prefix']
_virtual_machine_full_address = f'{_virtual_machine_address}/{_virtual_machine_address_mask}'
# First, determinate if the prefix exists
_prefix_network_address = '.'.join(_virtual_machine_address.split('.')[:-1]) + '.0'
_prefix_network_full_address = f'{_prefix_network_address}/{_virtual_machine_address_mask}'
nb_prefix = _nb_objects['prefixes'].get(_prefix_network_full_address)
if nb_prefix is None:
nb_prefix = _nb_api.ipam.prefixes.create(prefix=_prefix_network_full_address)
_nb_objects['prefixes'][nb_prefix.prefix] = nb_prefix
if 'dns_name' in nb_prefix.custom_fields and nb_prefix.custom_fields['dns_name'] is not None:
ip_address_dns_name = f'{_nb_virtual_machine.name}.{nb_prefix.custom_fields["dns_name"]}'
else:
ip_address_dns_name = ''
nb_ip_address = _nb_objects['ip_addresses'].get(_virtual_machine_full_address)
if nb_ip_address is None:
nb_ip_address = _nb_api.ipam.ip_addresses.create(
address=_virtual_machine_full_address,
assigned_object_type='virtualization.vminterface',
assigned_object_id=nb_virtual_machines_interface.id,
dns_name=ip_address_dns_name
)
_nb_objects['ip_addresses'][nb_ip_address.address] = nb_ip_address
else:
nb_ip_address.assigned_object_type = 'virtualization.vminterface'
nb_ip_address.assigned_object_id = nb_virtual_machines_interface.id
nb_ip_address.dns_name = ip_address_dns_name
nb_ip_address.save()
_nb_virtual_machine.primary_ip4 = nb_ip_address.id
_nb_virtual_machine.save()
# Handle VLAN
if _interface_vlan_id is not None:
nb_vlan = _nb_objects['vlans'].get(str(_interface_vlan_id))
if nb_vlan is None:
nb_vlan = _nb_api.ipam.vlans.create(
vid=_interface_vlan_id,
name=f'VLAN {_interface_vlan_id}',
)
_nb_objects['vlans'][_interface_vlan_id] = nb_vlan
nb_prefix.vlan = nb_vlan.id
nb_prefix.save()
return _nb_objects
def _process_pve_virtual_machine_disks(
_nb_api: pynetbox.api,
_nb_objects: dict,
_pve_virtual_machine_config: dict,
_nb_virtual_machine: any,
) -> dict:
# Handle the VM disks
for (_config_key, _config_value) in _pve_virtual_machine_config.items():
if not _config_key.startswith('scsi'):
continue
if _config_key == 'scsihw':
continue
_disk_definition = _parse_pve_disk_definition(_config_value)
_process_pve_virtual_machine_disk(
_nb_api,
_nb_objects,
_nb_virtual_machine,
_disk_definition['name'],
_process_pve_disk_size(_disk_definition['size']),
_disk_definition.get('backup', '1') == '1',
)
return _nb_objects
def _process_pve_virtual_machine_disk(
_nb_api: pynetbox.api,
_nb_objects: dict,
_nb_virtual_machine: any,
_disk_name: str,
_disk_size: int,
_has_backup: bool,
) -> dict:
nb_disk = _nb_objects['disks'].get(_nb_virtual_machine.id, {}).get(_disk_name)
if nb_disk is None:
_nb_api.virtualization.virtual_disks.create(
name=_disk_name,
size=_disk_size,
virtual_machine=_nb_virtual_machine.id,
custom_fields={
'backup': _has_backup,
}
)
else:
nb_disk.size = _disk_size
nb_disk.custom_fields['backup'] = _has_backup
nb_disk.save()
return _nb_objects
def _parse_pve_network_definition(_raw_network_definition: str) -> dict:
_network_definition = {}
for _component in _raw_network_definition.split(','):
_component_parts = _component.split('=')
_network_definition[_component_parts[0]] = _component_parts[1]
return _network_definition
def _parse_pve_disk_definition(_raw_disk_definition: str) -> dict:
_disk_definition = {}
for _component in _raw_disk_definition.split(','):
_component_parts = _component.split('=')
if len(_component_parts) == 1:
_disk_definition['name'] = _component_parts[0]
else:
_disk_definition[_component_parts[0]] = _component_parts[1]
return _disk_definition
def _process_pve_disk_size(_raw_disk_size: str) -> int:
size = _raw_disk_size[:-1]
size_unit = _raw_disk_size[-1]
if size_unit == 'G':
return int(size) * 1_000
if size_unit == 'T':
return int(size) * 1_000_000
return -1
def main():
# Instantiate connection to the Proxmox VE API
pve_api = ProxmoxAPI(
host=os.environ['PVE_API_HOST'],
user=os.environ['PVE_API_USER'],
token_name=os.environ['PVE_API_TOKEN'],
token_value=os.environ['PVE_API_SECRET'],
verify_ssl=os.getenv('PVE_API_VERIFY_SSL', 'false').lower() == 'true',
)
# Instantiate connection to the Netbox API
nb_api = pynetbox.api(
url=os.environ['NB_API_URL'],
token=os.environ['NB_API_TOKEN'],
)
# Load NetBox objects
nb_objects = _load_nb_objects(nb_api)
# Process Proxmox tags
_process_pve_tags(
pve_api,
nb_api,
nb_objects,
)
# Fetch VM tags from Proxmox
pve_vm_tags = {}
for pve_vm_resource in pve_api.cluster.resources.get(type='vm'):
pve_vm_tags[pve_vm_resource['vmid']] = []
pve_vm_tags[pve_vm_resource['vmid']].append(f'Pool/{pve_vm_resource["pool"]}')
if 'tags' in pve_vm_resource:
pass # TODO: pve_vm_tags[pve_vm_resource['vmid']].append(pve_vm_resource['tags'])
# Process Proxmox nodes
for pve_node in pve_api.nodes.get():
# This script does not create the hardware devices.
nb_device = nb_objects['devices'].get(pve_node['node'].lower())
if nb_device is None:
print(f'The device {pve_node["node"]} is not created on NetBox. Exiting.')
sys.exit(1)
else:
nb_device.status = 'active' if pve_node['status'] == 'online' else 'offline'
nb_device.save()
# Process Proxmox virtual machines per node
for pve_virtual_machine in pve_api.nodes(pve_node['node']).qemu.get():
_process_pve_virtual_machine(
pve_api,
nb_api,
nb_objects,
nb_device,
pve_vm_tags.get(pve_virtual_machine['vmid'], []),
pve_virtual_machine,
)
if __name__ == '__main__':
main()