import enum import functools import logging from typing import cast, Any, Iterator, Optional from xml.etree import ElementTree as etree import libvirt from ansible.plugins.inventory import BaseInventoryPlugin, Constructable DOCUMENTATION = r''' name: pyrocufflink extends_documentation_fragment: - constructed short_description: Pyrocufflink inventory source version_added: '' description: - Dynamic inventory for Pyrocufflink machines author: - Dustin C. Hatch options: plugin: description: Token that ensures this is a source file for the 'pyrocufflink' plugin. required: True choices: ['pyrocufflink'] uri: description: libvirt connection URI type: list required: true read_only: description: Open a read-only connection to libvirt type: boolean default: false dns_domain: description: DNS doman to append to single-label VM names type: string exclude_off: description: Exclude libvirt domains that are powered off type: boolean default: false exclude_os: description: Exclude domains running specified operating systems type: list default: [] exclude_test: description: Exclude domains whose name implies they are testing machines type: boolean default: true env: - name: PYROCUFFLINK_EXCLUDE_TEST log_excluded: description: Print a log message about excluded domains, useful for troubleshooting type: boolean default: false ''' log = logging.getLogger('ansible.plugins.inventory.pyrocufflink') # Remove the default error handler, which prints to stderr libvirt.registerErrorHandler(lambda *_: None, None) class XMLNS: dch = 'http://du5t1n.me/xmlns/libvirt/metadata/' libosinfo = 'http://libosinfo.org/xmlns/libvirt/domain/1.0' class DomainState(enum.IntEnum): nostate = 0 running = 1 blockde = 2 paused = 3 shutdown = 4 shutoff = 5 crashed = 6 pmsuspend = 7 class InventoryModule(BaseInventoryPlugin, Constructable): NAME = 'pyrocufflink' def parse(self, inventory, loader, path, cache=True): super().parse(inventory, loader, path, cache=cache) config_data = self._read_config_data(path) self._consume_options(config_data) uri_list = self.get_option('uri') if not isinstance(uri_list, list): uri_list = [uri_list] read_only = cast(bool, self.get_option('read_only')) dns_domain = self.get_option('dns_domain') exclude_off = cast(bool, self.get_option('exclude_off')) exclude_os = cast(list[str], self.get_option('exclude_os')) exclude_test = cast(list[str], self.get_option('exclude_test')) log_excluded = cast(bool, self.get_option('log_excluded')) assert self.inventory for uri in uri_list: try: if read_only: conn = libvirt.openReadOnly(uri) else: conn = libvirt.open(uri) except libvirt.libvirtError as e: log.error('Unable to open to libvirt URI: %s', e) continue for dom in conn.listAllDomains(): host = Host(dom) state = host.get_state()[0] if state == DomainState.shutoff and exclude_off: if log_excluded: log.warning( 'Excluding libvirt domain %s in state %r', host.name, DomainState.shutoff.name, ) continue if host.os_id in exclude_os: if log_excluded: log.warning( 'Excluding libvirt domain %s with OS %r', host.name, host.os_id, ) continue if exclude_test and host.is_test_host: if log_excluded: log.warning( 'Excluding libvirt domain %s (test machine)', host.name, ) continue if host.title: inventory_hostname = host.title else: inventory_hostname = host.name if dns_domain and '.' not in inventory_hostname: inventory_hostname = f'{inventory_hostname}.{dns_domain}' self.inventory.add_host(inventory_hostname) for group in host.groups: self.inventory.add_group(group) self.inventory.add_host(inventory_hostname, group) self.inventory.set_variable( inventory_hostname, 'libvirt_uri', uri ) self.inventory.set_variable( inventory_hostname, 'libvirt', host.variables(read_only) ) class Host: def __init__(self, domain: libvirt.virDomain) -> None: self.domain = domain @functools.cached_property def description(self) -> Optional[str]: try: return self.domain.metadata(0, None) except libvirt.libvirtError as e: log.debug( 'Could not get description for domain %s: %s', self.name, e ) return None @functools.cached_property def groups(self) -> list[str]: return list(self._groups()) @functools.cached_property def is_test_host(self) -> bool: return self.name.startswith('test-') @functools.cached_property def libosinfo(self) -> Optional[etree.Element]: try: metadata = self.domain.metadata(2, XMLNS.libosinfo) except libvirt.libvirtError as e: log.debug( 'Could not get extended domain metadata for %s: %s', self.name, e, ) return None return etree.fromstring(metadata) @functools.cached_property def metadata(self) -> Optional[etree.Element]: try: metadata = self.domain.metadata(2, XMLNS.dch) except libvirt.libvirtError as e: log.debug( 'Could not get extended domain metadata for %s: %s', self.name, e, ) return None return etree.fromstring(metadata) @functools.cached_property def name(self) -> str: return self.domain.name() @functools.cached_property def os_id(self) -> Optional[str]: if self.libosinfo is None: return if (os := self.libosinfo.find('os')) is not None: return os.get('id') @functools.cached_property def title(self) -> Optional[str]: try: return self.domain.metadata(1, None) except libvirt.libvirtError as e: log.debug('Could not get title for domain %s: %s', self.name, e) return None def get_guest_info(self) -> Optional[dict[str, str]]: try: return self.domain.guestInfo() except libvirt.libvirtError as e: log.error('Could not get guest info for %s: %s', self.name, e) def get_state(self) -> tuple[int, int]: state, reason = self.domain.state() return state, reason def variables(self, read_only: bool = False) -> dict[str, Any]: values = {} if title := self.title: values['title'] = title if description := self.description: values['description'] = description if self.os_id: values['os_id'] = self.os_id state_code, reason_code = self.get_state() values['state_code'] = state_code values['reason_code'] = reason_code try: state = DomainState(state_code) except KeyError: log.warning( 'Unknown state for domain %s: %s:', self.name, state_code ) state = None else: values['state'] = state.name if not read_only and state is DomainState.running: if guest_info := self.get_guest_info(): values['guest_info'] = guest_info return values def _groups(self) -> Iterator[str]: if self.metadata is None: return if groups := self.metadata.find('groups'): for elem in groups.iter('group'): if group_name := elem.get('name'): yield group_name