From 3c742a827b6cc175cdfe6a22a4ba7f8240f336da Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Mon, 13 Feb 2023 03:06:06 -0500 Subject: [PATCH] Initial implementation of monitoring plugin system --- daemon-common/migrations/versions/9.json | 1 + daemon-common/zkhandler.py | 20 +- debian/pvc-daemon-node.install | 1 + node-daemon/plugins/dpkg | 158 ++++++++ node-daemon/plugins/load | 105 ++++++ node-daemon/pvcnoded.sample.yaml | 2 + node-daemon/pvcnoded/Daemon.py | 18 +- .../pvcnoded/objects/MonitoringInstance.py | 357 ++++++++++++++++++ node-daemon/pvcnoded/util/config.py | 3 + node-daemon/pvcnoded/util/keepalive.py | 8 +- 10 files changed, 664 insertions(+), 9 deletions(-) create mode 100644 daemon-common/migrations/versions/9.json create mode 100644 node-daemon/plugins/dpkg create mode 100644 node-daemon/plugins/load create mode 100644 node-daemon/pvcnoded/objects/MonitoringInstance.py diff --git a/daemon-common/migrations/versions/9.json b/daemon-common/migrations/versions/9.json new file mode 100644 index 00000000..84ea8ac1 --- /dev/null +++ b/daemon-common/migrations/versions/9.json @@ -0,0 +1 @@ +{"version": "9", "root": "", "base": {"root": "", "schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.primary_node.sync_lock": "/config/primary_node/sync_lock", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "cmd": "/cmd", "cmd.node": "/cmd/nodes", "cmd.domain": "/cmd/domains", "cmd.ceph": "/cmd/ceph", "logs": "/logs", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "logs": {"node": "", "messages": "/messages"}, "node": {"name": "", "keepalive": "/keepalive", "mode": "/daemonmode", "data.active_schema": "/activeschema", "data.latest_schema": "/latestschema", "data.static": "/staticdata", "data.pvc_version": "/pvcversion", "running_domains": "/runningdomains", "count.provisioned_domains": "/domainscount", "count.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "cpu.load": "/cpuload", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword", "sriov": "/sriov", "sriov.pf": "/sriov/pf", "sriov.vf": "/sriov/vf", "monitoring.plugins": "/monitoring_plugins", "monitoring.data": "/monitoring_data"}, "monitoring_plugin": {"name": "", "last_run": "/last_run", "health_delta": "/health_delta", "message": "/message", "data": "/data", "runtime": "/runtime"}, "sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, "sriov_vf": {"phy": "", "pf": "/pf", "mtu": "/mtu", "mac": "/mac", "phy_mac": "/phy_mac", "config": "/config", "config.vlan_id": "/config/vlan_id", "config.vlan_qos": "/config/vlan_qos", "config.tx_rate_min": "/config/tx_rate_min", "config.tx_rate_max": "/config/tx_rate_max", "config.spoof_check": "/config/spoof_check", "config.link_state": "/config/link_state", "config.trust": "/config/trust", "config.query_rss": "/config/query_rss", "pci": "/pci", "pci.domain": "/pci/domain", "pci.bus": "/pci/bus", "pci.slot": "/pci/slot", "pci.function": "/pci/function", "used": "/used", "used_by": "/used_by"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "storage.volumes": "/rbdlist", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit", "meta.tags": "/tags", "migrate.sync_lock": "/migrate_sync_lock"}, "tag": {"name": "", "type": "/type", "protected": "/protected"}, "network": {"vni": "", "type": "/nettype", "mtu": "/mtu", "rule": "/firewall_rules", "rule.in": "/firewall_rules/in", "rule.out": "/firewall_rules/out", "nameservers": "/name_servers", "domain": "/domain", "reservation": "/dhcp4_reservations", "lease": "/dhcp4_leases", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "reservation": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname"}, "lease": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname", "expiry": "/expiry", "client_id": "/clientid"}, "rule": {"description": "", "rule": "/rule", "order": "/order"}, "osd": {"id": "", "node": "/node", "device": "/device", "db_device": "/db_device", "fsid": "/fsid", "ofsid": "/fsid/osd", "cfsid": "/fsid/cluster", "lvm": "/lvm", "vg": "/lvm/vg", "lv": "/lvm/lv", "stats": "/stats"}, "pool": {"name": "", "pgs": "/pgs", "tier": "/tier", "stats": "/stats"}, "volume": {"name": "", "stats": "/stats"}, "snapshot": {"name": "", "stats": "/stats"}} \ No newline at end of file diff --git a/daemon-common/zkhandler.py b/daemon-common/zkhandler.py index 8e42d01e..5ec58b83 100644 --- a/daemon-common/zkhandler.py +++ b/daemon-common/zkhandler.py @@ -540,7 +540,7 @@ class ZKHandler(object): # class ZKSchema(object): # Current version - _version = 8 + _version = 9 # Root for doing nested keys _schema_root = "" @@ -608,6 +608,17 @@ class ZKSchema(object): "sriov": "/sriov", "sriov.pf": "/sriov/pf", "sriov.vf": "/sriov/vf", + "monitoring.plugins": "/monitoring_plugins", + "monitoring.data": "/monitoring_data", + }, + # The schema of an individual monitoring plugin data entry (/nodes/{node_name}/monitoring_data/{plugin}) + "monitoring_plugin": { + "name": "", # The root key + "last_run": "/last_run", + "health_delta": "/health_delta", + "message": "/message", + "data": "/data", + "runtime": "/runtime", }, # The schema of an individual SR-IOV PF entry (/nodes/{node_name}/sriov/pf/{pf}) "sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, # The root key @@ -874,9 +885,10 @@ class ZKSchema(object): if not zkhandler.zk_conn.exists(nkipath): result = False - # One might expect child keys under node (specifically, sriov.pf and sriov.vf) to be - # managed here as well, but those are created automatically every time pvcnoded starts - # and thus never need to be validated or applied. + # One might expect child keys under node (specifically, sriov.pf, sriov.vf, + # monitoring.data) to be managed here as well, but those are created + # automatically every time pvcnoded started and thus never need to be validated + # or applied. # These two have several children layers that must be parsed through for elem in ["volume"]: diff --git a/debian/pvc-daemon-node.install b/debian/pvc-daemon-node.install index 4b85c0e1..f428e6c1 100644 --- a/debian/pvc-daemon-node.install +++ b/debian/pvc-daemon-node.install @@ -5,3 +5,4 @@ node-daemon/pvcnoded.service lib/systemd/system node-daemon/pvc.target lib/systemd/system node-daemon/pvcautoready.service lib/systemd/system node-daemon/monitoring usr/share/pvc +node-daemon/plugins usr/share/pvc diff --git a/node-daemon/plugins/dpkg b/node-daemon/plugins/dpkg new file mode 100644 index 00000000..74d00789 --- /dev/null +++ b/node-daemon/plugins/dpkg @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 + +# dpkg.py - PVC Monitoring example plugin for dpkg status +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +# This script provides an example of a PVC monitoring plugin script. It will create +# a simple plugin to check the system dpkg status is as expected, with no invalid +# packages or obsolete configuration files, and will return a 1 health delta for each +# flaw in invalid packages, upgradable packages, and obsolete config files. + +# This script can thus be used as an example or reference implementation of a +# PVC monitoring pluginscript and expanded upon as required. + +# A monitoring plugin script must implement the class "MonitoringPluginScript" which +# extends "MonitoringPlugin", providing the 3 functions indicated. Detailed explanation +# of the role of each function is provided in context of the example; see the other +# examples for more potential uses. + +# WARNING: +# +# This script will run in the context of the node daemon keepalives as root. +# DO NOT install untrusted, unvetted plugins under any circumstances. + + +# This import is always required here, as MonitoringPlugin is used by the +# MonitoringPluginScript class +from pvcnoded.objects.MonitoringInstance import MonitoringPlugin + + +# A monitoring plugin script must always expose its nice name, which must be identical to +# the file name +PLUGIN_NAME = "dpkg" + + +# The MonitoringPluginScript class must be named as such, and extend MonitoringPlugin. +class MonitoringPluginScript(MonitoringPlugin): + def setup(self): + """ + setup(): Perform special setup steps during node daemon startup + + This step is optional and should be used sparingly. + """ + + pass + + def run(self): + """ + run(): Perform the check actions and return a PluginResult object + """ + + # Run any imports first + from re import match + from json import dumps + import daemon_lib.common as pvc_common + + # Get Debian version + with open('/etc/debian_version', 'r') as fh: + debian_version = fh.read().strip() + + # Get a list of dpkg packages for analysis + retcode, stdout, stderr = pvc_common.run_os_command("/usr/bin/dpkg --list") + + # Get a list of installed packages and states + packages = list() + for dpkg_line in stdout.split('\n'): + if match('^[a-z][a-z] ', dpkg_line): + line_split = dpkg_line.split() + package_state = line_split[0] + package_name = line_split[1] + packages.append((package_name, package_state)) + + count_ok = 0 + count_inconsistent = 0 + list_inconsistent = list() + + for package in packages: + if package[1] == "ii": + count_ok += 1 + else: + count_inconsistent += 1 + list_inconsistent.append(package[0]) + + # Get upgradable packages + retcode, stdout, stderr = pvc_common.run_os_command("/usr/bin/apt list --upgradable") + + list_upgradable = list() + for apt_line in stdout.split('\n'): + if match('^[a-z][a-z] ', apt_line): + line_split = apt_line.split('/') + package_name = line_split[0] + list_upgradable.append(package_name) + + count_upgradable = len(list_upgradable) + + # Get obsolete config files (dpkg-* or ucf-* under /etc) + retcode, stdout, stderr = pvc_common.run_os_command("/usr/bin/find /etc -type f -a \( -name '*.dpkg-*' -o -name '*.ucf-*' \)") + + obsolete_conffiles = list() + for conffile_line in stdout.split('\n'): + if conffile_line: + obsolete_conffiles.append(conffile_line) + + count_obsolete_conffiles = len(obsolete_conffiles) + + # Set health_delta based on the results + health_delta = 0 + if count_inconsistent > 0: + health_delta += 1 + if count_upgradable > 0: + health_delta += 1 + if count_obsolete_conffiles > 0: + health_delta += 1 + + # Set the health delta in our local PluginResult object + self.plugin_result.set_health_delta(health_delta) + + # Craft the message + message = f"Debian {debian_version}; Obsolete conffiles: {count_obsolete_conffiles}; Packages valid: {count_ok}, inconsistent: {count_inconsistent}, upgradable: {count_upgradable}" + + # Set the message in our local PluginResult object + self.plugin_result.set_message(message) + + # Set the detailed data in our local PluginResult object + detailed_data = { + "debian_version": debian_version, + "obsolete_conffiles": obsolete_conffiles, + "inconsistent_packages": list_inconsistent, + "upgradable_packages": list_upgradable, + } + self.plugin_result.set_data(dumps(detailed_data)) + + # Return our local PluginResult object + return self.plugin_result + + def cleanup(self): + """ + cleanup(): Perform special cleanup steps during node daemon termination + + This step is optional and should be used sparingly. + """ + + pass diff --git a/node-daemon/plugins/load b/node-daemon/plugins/load new file mode 100644 index 00000000..f3e4fb39 --- /dev/null +++ b/node-daemon/plugins/load @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 + +# load.py - PVC Monitoring example plugin for load +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +# This script provides an example of a PVC monitoring plugin script. It will create +# a simple plugin to check the system load against the total number of CPU cores, +# and return a 10 health delta (100 -> 90) if the load average is > 1/2 that number. + +# This script can thus be used as an example or reference implementation of a +# PVC monitoring pluginscript and expanded upon as required. + +# A monitoring plugin script must implement the class "MonitoringPluginScript" which +# extends "MonitoringPlugin", providing the 3 functions indicated. Detailed explanation +# of the role of each function is provided in context of the example; see the other +# examples for more potential uses. + +# WARNING: +# +# This script will run in the context of the node daemon keepalives as root. +# DO NOT install untrusted, unvetted plugins under any circumstances. + + +# This import is always required here, as MonitoringPlugin is used by the +# MonitoringPluginScript class +from pvcnoded.objects.MonitoringInstance import MonitoringPlugin + + +# A monitoring plugin script must always expose its nice name, which must be identical to +# the file name +PLUGIN_NAME = "load" + + +# The MonitoringPluginScript class must be named as such, and extend MonitoringPlugin. +class MonitoringPluginScript(MonitoringPlugin): + def setup(self): + """ + setup(): Perform special setup steps during node daemon startup + + This step is optional and should be used sparingly. + """ + + pass + + def run(self): + """ + run(): Perform the check actions and return a PluginResult object + """ + + # Run any imports first + from os import getloadavg + from psutil import cpu_count + + # Get the current 1-minute system load average + load_average = getloadavg()[0] + + # Get the number of CPU cores + cpu_cores = cpu_count() + + # Check that the load average is greater or equal to the cpu count + if load_average > float(cpu_cores): + # Set the health delta to 10 (subtract 10 from the total of 100) + health_delta = 10 + # Craft a message that can be used by the clients + message = f"Current load is {load_average} out of {cpu_cores} CPU cores" + + else: + # Set the health delta to 0 (no change) + health_delta = 0 + # Craft a message that can be used by the clients + message = f"Current load is {load_average} out of {cpu_cores} CPU cores" + + # Set the health delta in our local PluginResult object + self.plugin_result.set_health_delta(health_delta) + + # Set the message in our local PluginResult object + self.plugin_result.set_message(message) + + # Return our local PluginResult object + return self.plugin_result + + def cleanup(self): + """ + cleanup(): Perform special cleanup steps during node daemon termination + + This step is optional and should be used sparingly. + """ + + pass diff --git a/node-daemon/pvcnoded.sample.yaml b/node-daemon/pvcnoded.sample.yaml index de22a69d..ee36ee50 100644 --- a/node-daemon/pvcnoded.sample.yaml +++ b/node-daemon/pvcnoded.sample.yaml @@ -128,6 +128,8 @@ pvc: configuration: # directories: PVC system directories directories: + # plugin_directory: Directory containing node monitoring plugins + plugin_directory: "/usr/share/pvc/plugins" # dynamic_directory: Temporary in-memory directory for active configurations dynamic_directory: "/run/pvc" # log_directory: Logging directory diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index 46b89afa..a7974237 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -27,6 +27,7 @@ import pvcnoded.util.services import pvcnoded.util.libvirt import pvcnoded.util.zookeeper +import pvcnoded.objects.MonitoringInstance as MonitoringInstance import pvcnoded.objects.DNSAggregatorInstance as DNSAggregatorInstance import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance import pvcnoded.objects.VMInstance as VMInstance @@ -58,6 +59,7 @@ version = "0.9.61" def entrypoint(): keepalive_timer = None + monitoring_instance = None # Get our configuration config = pvcnoded.util.config.get_configuration() @@ -204,7 +206,7 @@ def entrypoint(): # Define a cleanup function def cleanup(failure=False): - nonlocal logger, zkhandler, keepalive_timer, d_domain + nonlocal logger, zkhandler, keepalive_timer, d_domain, monitoring_instance logger.out("Terminating pvcnoded and cleaning up", state="s") @@ -253,6 +255,13 @@ def entrypoint(): except Exception: pass + # Clean up any monitoring plugins that have cleanup + try: + logger.out("Performing monitoring plugin cleanup", state="s") + monitoring_instance.run_cleanups() + except Exception: + pass + # Set stop state in Zookeeper zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")]) @@ -1015,9 +1024,14 @@ def entrypoint(): state="i", ) + # Set up the node monitoring instance + monitoring_instance = MonitoringInstance.MonitoringInstance( + zkhandler, config, logger, this_node + ) + # Start keepalived thread keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer( - logger, config, zkhandler, this_node + logger, config, zkhandler, this_node, monitoring_instance ) # Tick loop; does nothing since everything is async diff --git a/node-daemon/pvcnoded/objects/MonitoringInstance.py b/node-daemon/pvcnoded/objects/MonitoringInstance.py new file mode 100644 index 00000000..7b79e1fc --- /dev/null +++ b/node-daemon/pvcnoded/objects/MonitoringInstance.py @@ -0,0 +1,357 @@ +#!/usr/bin/env python3 + +# PluginInstance.py - Class implementing a PVC monitoring instance +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import concurrent.futures +import time +import importlib.util + +from os import walk +from datetime import datetime + + +class PluginResult(object): + def __init__(self, zkhandler, config, logger, this_node, plugin_name): + self.zkhandler = zkhandler + self.config = config + self.logger = logger + self.this_node = this_node + self.plugin_name = plugin_name + self.current_time = int(time.time()) + self.health_delta = 0 + self.message = None + self.data = None + self.runtime = "0.00" + + def set_health_delta(self, new_delta): + self.health_delta = new_delta + + def set_message(self, new_message): + self.message = new_message + + def set_data(self, new_data): + self.data = new_data + + def set_runtime(self, new_runtime): + self.runtime = new_runtime + + def to_zookeeper(self): + self.zkhandler.write( + [ + ( + ( + "node.monitoring.data", + self.this_node.name, + "monitoring_plugin.name", + self.plugin_name, + ), + self.plugin_name, + ), + ( + ( + "node.monitoring.data", + self.this_node.name, + "monitoring_plugin.last_run", + self.plugin_name, + ), + self.current_time, + ), + ( + ( + "node.monitoring.data", + self.this_node.name, + "monitoring_plugin.health_delta", + self.plugin_name, + ), + self.health_delta, + ), + ( + ( + "node.monitoring.data", + self.this_node.name, + "monitoring_plugin.message", + self.plugin_name, + ), + self.message, + ), + ( + ( + "node.monitoring.data", + self.this_node.name, + "monitoring_plugin.data", + self.plugin_name, + ), + self.data, + ), + ( + ( + "node.monitoring.data", + self.this_node.name, + "monitoring_plugin.runtime", + self.plugin_name, + ), + self.runtime, + ), + ] + ) + + +class MonitoringPlugin(object): + def __init__(self, zkhandler, config, logger, this_node, plugin_name): + self.zkhandler = zkhandler + self.config = config + self.logger = logger + self.this_node = this_node + self.plugin_name = plugin_name + + self.plugin_result = PluginResult( + self.zkhandler, + self.config, + self.logger, + self.this_node, + self.plugin_name, + ) + + # + # Helper functions; exposed to child MonitoringPluginScript instances + # + def log(self, message, state="d"): + """ + Log a message to the PVC logger instance using the plugin name as a prefix + Takes "state" values as defined by the PVC logger instance, defaulting to debug: + "d": debug + "i": informational + "t": tick/keepalive + "w": warning + "e": error + """ + if state == "d" and not self.config["debug"]: + return + + self.logger.out(message, state=state, prefix=self.plugin_name) + + # + # Primary class functions; implemented by the individual plugins + # + def setup(self): + """ + setup(): Perform setup of the plugin; run once during daemon startup + OPTIONAL + """ + pass + + def run(self): + """ + run(): Run the plugin, returning a PluginResult object + """ + return self.plugin_result + + def cleanup(self): + """ + cleanup(): Clean up after the plugin; run once during daemon shutdown + OPTIONAL + """ + pass + + +class MonitoringInstance(object): + def __init__(self, zkhandler, config, logger, this_node): + self.zkhandler = zkhandler + self.config = config + self.logger = logger + self.this_node = this_node + + # Get a list of plugins from the plugin_directory + plugin_files = next(walk(self.config["plugin_directory"]), (None, None, []))[ + 2 + ] # [] if no file + + self.all_plugins = list() + self.all_plugin_names = list() + + # Load each plugin file into the all_plugins list + for plugin_file in sorted(plugin_files): + try: + self.logger.out( + f"Loading monitoring plugin from {self.config['plugin_directory']}/{plugin_file}", + state="i", + ) + loader = importlib.machinery.SourceFileLoader( + "plugin_script", f"{self.config['plugin_directory']}/{plugin_file}" + ) + spec = importlib.util.spec_from_loader(loader.name, loader) + plugin_script = importlib.util.module_from_spec(spec) + spec.loader.exec_module(plugin_script) + + plugin = plugin_script.MonitoringPluginScript( + self.zkhandler, + self.config, + self.logger, + self.this_node, + plugin_script.PLUGIN_NAME, + ) + self.all_plugins.append(plugin) + self.all_plugin_names.append(plugin.plugin_name) + + # Create plugin key + self.zkhandler.write( + [ + ( + ( + "node.monitoring.data", + self.this_node.name, + "monitoring_plugin.name", + plugin.plugin_name, + ), + plugin.plugin_name, + ), + ( + ( + "node.monitoring.data", + self.this_node.name, + "monitoring_plugin.last_run", + plugin.plugin_name, + ), + "0", + ), + ( + ( + "node.monitoring.data", + self.this_node.name, + "monitoring_plugin.health_delta", + plugin.plugin_name, + ), + "0", + ), + ( + ( + "node.monitoring.data", + self.this_node.name, + "monitoring_plugin.message", + plugin.plugin_name, + ), + "Initializing", + ), + ( + ( + "node.monitoring.data", + self.this_node.name, + "monitoring_plugin.data", + plugin.plugin_name, + ), + None, + ), + ( + ( + "node.monitoring.data", + self.this_node.name, + "monitoring_plugin.runtime", + plugin.plugin_name, + ), + "0.00", + ), + ] + ) + self.logger.out( + f"Successfully loaded monitoring plugin '{plugin.plugin_name}'", + state="o", + ) + except Exception as e: + self.logger.out( + f"Failed to load monitoring plugin: {e}", + state="w", + ) + + self.zkhandler.write( + [ + ( + ("node.monitoring.plugins", self.this_node.name), + self.all_plugin_names, + ), + ] + ) + + # Clean up any old plugin data for which a plugin file no longer exists + for plugin_key in self.zkhandler.children( + ("node.monitoring.data", self.this_node.name) + ): + if plugin_key not in self.all_plugin_names: + self.zkhandler.delete( + ( + "node.monitoring.data", + self.this_node.name, + "monitoring_plugin", + plugin_key, + ) + ) + + def run_plugin(self, plugin): + time_start = datetime.now() + result = plugin.run() + time_end = datetime.now() + time_delta = time_end - time_start + runtime = "{:0.02f}".format(time_delta.total_seconds()) + result.set_runtime(runtime) + self.logger.out( + result.message, state="t", prefix=f"{plugin.plugin_name} ({runtime}s)" + ) + result.to_zookeeper() + return result + + def run_plugins(self): + total_health = 100 + self.logger.out("Running monitoring plugins:", state="t") + plugin_results = list() + with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor: + to_future_plugin_results = { + executor.submit(self.run_plugin, plugin): plugin + for plugin in self.all_plugins + } + for future in concurrent.futures.as_completed(to_future_plugin_results): + plugin_results.append(future.result()) + + for result in plugin_results: + if result is not None: + total_health -= result.health_delta + + if total_health > 90: + health_colour = self.logger.fmt_green + elif total_health > 50: + health_colour = self.logger.fmt_yellow + else: + health_colour = self.logger.fmt_red + + self.logger.out( + f"System health: {health_colour}{total_health}/100{self.logger.fmt_end}", + state="t", + ) + + def run_cleanup(self, plugin): + return plugin.cleanup() + + def run_cleanups(self): + with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor: + to_future_plugin_results = { + executor.submit(self.run_cleanup, plugin): plugin + for plugin in self.all_plugins + } + for future in concurrent.futures.as_completed(to_future_plugin_results): + # This doesn't do anything, just lets us wait for them all to complete + pass diff --git a/node-daemon/pvcnoded/util/config.py b/node-daemon/pvcnoded/util/config.py index d4dd24c7..29543407 100644 --- a/node-daemon/pvcnoded/util/config.py +++ b/node-daemon/pvcnoded/util/config.py @@ -180,6 +180,9 @@ def get_configuration(): raise MalformedConfigurationError(e) config_directories = { + "plugin_directory": o_directories.get( + "plugin_directory", "/usr/share/pvc/plugins" + ), "dynamic_directory": o_directories.get("dynamic_directory", None), "log_directory": o_directories.get("log_directory", None), "console_log_directory": o_directories.get("console_log_directory", None), diff --git a/node-daemon/pvcnoded/util/keepalive.py b/node-daemon/pvcnoded/util/keepalive.py index 2fb07284..22f85dc1 100644 --- a/node-daemon/pvcnoded/util/keepalive.py +++ b/node-daemon/pvcnoded/util/keepalive.py @@ -51,7 +51,7 @@ libvirt_vm_states = { } -def start_keepalive_timer(logger, config, zkhandler, this_node): +def start_keepalive_timer(logger, config, zkhandler, this_node, monitoring_instance): keepalive_interval = config["keepalive_interval"] logger.out( f"Starting keepalive timer ({keepalive_interval} second interval)", state="s" @@ -59,7 +59,7 @@ def start_keepalive_timer(logger, config, zkhandler, this_node): keepalive_timer = BackgroundScheduler() keepalive_timer.add_job( node_keepalive, - args=(logger, config, zkhandler, this_node), + args=(logger, config, zkhandler, this_node, monitoring_instance), trigger="interval", seconds=keepalive_interval, ) @@ -648,7 +648,7 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue): # Keepalive update function -def node_keepalive(logger, config, zkhandler, this_node): +def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance): debug = config["debug"] if debug: logger.out("Keepalive starting", state="d", prefix="main-thread") @@ -918,5 +918,7 @@ def node_keepalive(logger, config, zkhandler, this_node): [(("node.state.daemon", node_name), "dead")] ) + monitoring_instance.run_plugins() + if debug: logger.out("Keepalive finished", state="d", prefix="main-thread")