267 lines
8.2 KiB
Python
267 lines
8.2 KiB
Python
import enum
|
|
import functools
|
|
import logging
|
|
from typing import cast, Any, Iterator, Optional
|
|
from xml.etree import ElementTree as etree
|
|
|
|
import libvirt
|
|
|
|
from ansible.plugins.inventory import BaseInventoryPlugin, Constructable
|
|
|
|
|
|
DOCUMENTATION = r'''
|
|
name: pyrocufflink
|
|
extends_documentation_fragment:
|
|
- constructed
|
|
short_description: Pyrocufflink inventory source
|
|
version_added: ''
|
|
description:
|
|
- Dynamic inventory for Pyrocufflink machines
|
|
author:
|
|
- Dustin C. Hatch <dustin@hatch.name>
|
|
options:
|
|
plugin:
|
|
description:
|
|
Token that ensures this is a source file for the 'pyrocufflink' plugin.
|
|
required: True
|
|
choices: ['pyrocufflink']
|
|
uri:
|
|
description: libvirt connection URI
|
|
type: list
|
|
required: true
|
|
read_only:
|
|
description: Open a read-only connection to libvirt
|
|
type: boolean
|
|
default: false
|
|
dns_domain:
|
|
description: DNS doman to append to single-label VM names
|
|
type: string
|
|
exclude_off:
|
|
description: Exclude libvirt domains that are powered off
|
|
type: boolean
|
|
default: false
|
|
exclude_os:
|
|
description: Exclude domains running specified operating systems
|
|
type: list
|
|
default: []
|
|
exclude_test:
|
|
description: Exclude domains whose name implies they are testing machines
|
|
type: boolean
|
|
default: true
|
|
env:
|
|
- name: PYROCUFFLINK_EXCLUDE_TEST
|
|
log_excluded:
|
|
description:
|
|
Print a log message about excluded domains, useful for troubleshooting
|
|
type: boolean
|
|
default: false
|
|
'''
|
|
|
|
|
|
log = logging.getLogger('ansible.plugins.inventory.pyrocufflink')
|
|
|
|
|
|
# Remove the default error handler, which prints to stderr
|
|
libvirt.registerErrorHandler(lambda *_: None, None)
|
|
|
|
|
|
class XMLNS:
|
|
dch = 'http://du5t1n.me/xmlns/libvirt/metadata/'
|
|
libosinfo = 'http://libosinfo.org/xmlns/libvirt/domain/1.0'
|
|
|
|
|
|
class DomainState(enum.IntEnum):
|
|
nostate = 0
|
|
running = 1
|
|
blockde = 2
|
|
paused = 3
|
|
shutdown = 4
|
|
shutoff = 5
|
|
crashed = 6
|
|
pmsuspend = 7
|
|
|
|
|
|
class InventoryModule(BaseInventoryPlugin, Constructable):
|
|
NAME = 'pyrocufflink'
|
|
|
|
def parse(self, inventory, loader, path, cache=True):
|
|
super().parse(inventory, loader, path, cache=cache)
|
|
config_data = self._read_config_data(path)
|
|
self._consume_options(config_data)
|
|
|
|
uri_list = self.get_option('uri')
|
|
if not isinstance(uri_list, list):
|
|
uri_list = [uri_list]
|
|
read_only = cast(bool, self.get_option('read_only'))
|
|
dns_domain = self.get_option('dns_domain')
|
|
exclude_off = cast(bool, self.get_option('exclude_off'))
|
|
exclude_os = cast(list[str], self.get_option('exclude_os'))
|
|
exclude_test = cast(list[str], self.get_option('exclude_test'))
|
|
log_excluded = cast(bool, self.get_option('log_excluded'))
|
|
|
|
assert self.inventory
|
|
for uri in uri_list:
|
|
if read_only:
|
|
conn = libvirt.openReadOnly(uri)
|
|
else:
|
|
conn = libvirt.open(uri)
|
|
for dom in conn.listAllDomains():
|
|
host = Host(dom)
|
|
state = host.get_state()[0]
|
|
if state == DomainState.shutoff and exclude_off:
|
|
if log_excluded:
|
|
log.warning(
|
|
'Excluding libvirt domain %s in state %r',
|
|
host.name,
|
|
DomainState.shutoff.name,
|
|
)
|
|
continue
|
|
if host.os_id in exclude_os:
|
|
if log_excluded:
|
|
log.warning(
|
|
'Excluding libvirt domain %s with OS %r',
|
|
host.name,
|
|
host.os_id,
|
|
)
|
|
continue
|
|
if exclude_test and host.is_test_host:
|
|
if log_excluded:
|
|
log.warning(
|
|
'Excluding libvirt domain %s (test machine)',
|
|
host.name,
|
|
)
|
|
continue
|
|
if host.title:
|
|
inventory_hostname = host.title
|
|
else:
|
|
inventory_hostname = host.name
|
|
if dns_domain and '.' not in inventory_hostname:
|
|
inventory_hostname = f'{inventory_hostname}.{dns_domain}'
|
|
self.inventory.add_host(inventory_hostname)
|
|
for group in host.groups:
|
|
self.inventory.add_group(group)
|
|
self.inventory.add_host(inventory_hostname, group)
|
|
self.inventory.set_variable(
|
|
inventory_hostname, 'libvirt_uri', uri
|
|
)
|
|
self.inventory.set_variable(
|
|
inventory_hostname, 'libvirt', host.variables(read_only)
|
|
)
|
|
|
|
|
|
class Host:
|
|
|
|
def __init__(self, domain: libvirt.virDomain) -> None:
|
|
self.domain = domain
|
|
|
|
@functools.cached_property
|
|
def description(self) -> Optional[str]:
|
|
try:
|
|
return self.domain.metadata(0, None)
|
|
except libvirt.libvirtError as e:
|
|
log.debug(
|
|
'Could not get description for domain %s: %s', self.name, e
|
|
)
|
|
return None
|
|
|
|
@functools.cached_property
|
|
def groups(self) -> list[str]:
|
|
return list(self._groups())
|
|
|
|
@functools.cached_property
|
|
def is_test_host(self) -> bool:
|
|
return self.name.startswith('test-')
|
|
|
|
@functools.cached_property
|
|
def libosinfo(self) -> Optional[etree.Element]:
|
|
try:
|
|
metadata = self.domain.metadata(2, XMLNS.libosinfo)
|
|
except libvirt.libvirtError as e:
|
|
log.debug(
|
|
'Could not get extended domain metadata for %s: %s',
|
|
self.name,
|
|
e,
|
|
)
|
|
return None
|
|
return etree.fromstring(metadata)
|
|
|
|
@functools.cached_property
|
|
def metadata(self) -> Optional[etree.Element]:
|
|
try:
|
|
metadata = self.domain.metadata(2, XMLNS.dch)
|
|
except libvirt.libvirtError as e:
|
|
log.debug(
|
|
'Could not get extended domain metadata for %s: %s',
|
|
self.name,
|
|
e,
|
|
)
|
|
return None
|
|
return etree.fromstring(metadata)
|
|
|
|
@functools.cached_property
|
|
def name(self) -> str:
|
|
return self.domain.name()
|
|
|
|
@functools.cached_property
|
|
def os_id(self) -> Optional[str]:
|
|
if self.libosinfo is None:
|
|
return
|
|
if (os := self.libosinfo.find('os')) is not None:
|
|
return os.get('id')
|
|
|
|
@functools.cached_property
|
|
def title(self) -> Optional[str]:
|
|
try:
|
|
return self.domain.metadata(1, None)
|
|
except libvirt.libvirtError as e:
|
|
log.debug('Could not get title for domain %s: %s', self.name, e)
|
|
return None
|
|
|
|
def get_guest_info(self) -> Optional[dict[str, str]]:
|
|
try:
|
|
return self.domain.guestInfo()
|
|
except libvirt.libvirtError as e:
|
|
log.error('Could not get guest info for %s: %s', self.name, e)
|
|
|
|
def get_state(self) -> tuple[int, int]:
|
|
state, reason = self.domain.state()
|
|
return state, reason
|
|
|
|
def variables(self, read_only: bool = False) -> dict[str, Any]:
|
|
values = {}
|
|
|
|
if title := self.title:
|
|
values['title'] = title
|
|
if description := self.description:
|
|
values['description'] = description
|
|
|
|
if self.os_id:
|
|
values['os_id'] = self.os_id
|
|
|
|
state_code, reason_code = self.get_state()
|
|
values['state_code'] = state_code
|
|
values['reason_code'] = reason_code
|
|
try:
|
|
state = DomainState(state_code)
|
|
except KeyError:
|
|
log.warning(
|
|
'Unknown state for domain %s: %s:', self.name, state_code
|
|
)
|
|
state = None
|
|
else:
|
|
values['state'] = state.name
|
|
|
|
if not read_only and state is DomainState.running:
|
|
if guest_info := self.get_guest_info():
|
|
values['guest_info'] = guest_info
|
|
|
|
return values
|
|
|
|
def _groups(self) -> Iterator[str]:
|
|
if self.metadata is None:
|
|
return
|
|
if groups := self.metadata.find('groups'):
|
|
for elem in groups.iter('group'):
|
|
if group_name := elem.get('name'):
|
|
yield group_name
|