pvc/health-daemon/pvchealthd/objects/MonitoringInstance.py

749 lines
26 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# MonitoringInstance.py - Class implementing a PVC monitor in pvchealthd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import concurrent.futures
import time
import importlib.util
from os import walk
from datetime import datetime
from hashlib import md5
from json import dumps, loads
from apscheduler.schedulers.background import BackgroundScheduler
2023-02-15 10:11:38 -05:00
class PluginError(Exception):
"""
An exception that results from a plugin failing setup
"""
pass
class PluginResult(object):
def __init__(self, zkhandler, config, logger, this_node, plugin_name):
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.this_node = this_node
self.plugin_name = plugin_name
self.current_time = int(time.time())
self.health_delta = 0
2023-02-22 14:33:41 -05:00
self.message = "N/A"
2023-02-13 14:37:44 -05:00
self.data = {}
self.runtime = "0.00"
def set_health_delta(self, new_delta):
self.health_delta = new_delta
def set_message(self, new_message):
self.message = new_message
def set_data(self, new_data):
self.data = new_data
def set_runtime(self, new_runtime):
self.runtime = new_runtime
def to_zookeeper(self):
self.zkhandler.write(
[
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.name",
self.plugin_name,
),
self.plugin_name,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.last_run",
self.plugin_name,
),
self.current_time,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.health_delta",
self.plugin_name,
),
self.health_delta,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.message",
self.plugin_name,
),
self.message,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.data",
self.plugin_name,
),
2023-02-13 14:37:44 -05:00
dumps(self.data),
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.runtime",
self.plugin_name,
),
self.runtime,
),
]
)
class MonitoringPlugin(object):
def __init__(self, zkhandler, config, logger, this_node, plugin_name):
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.this_node = this_node
self.plugin_name = plugin_name
self.plugin_result = PluginResult(
self.zkhandler,
self.config,
self.logger,
self.this_node,
self.plugin_name,
)
def __str__(self):
return self.plugin_name
#
# Helper functions; exposed to child MonitoringPluginScript instances
#
def log(self, message, state="d"):
"""
Log a message to the PVC logger instance using the plugin name as a prefix
Takes "state" values as defined by the PVC logger instance, defaulting to debug:
"d": debug
"i": informational
"t": tick/keepalive
"w": warning
"e": error
"""
if state == "d" and not self.config["debug"]:
return
self.logger.out(message, state=state, prefix=self.plugin_name)
#
# Primary class functions; implemented by the individual plugins
#
def setup(self):
"""
setup(): Perform setup of the plugin; run once during daemon startup
2023-02-15 10:11:38 -05:00
This step is optional and should be used sparingly.
If you wish for the plugin to not load in certain conditions, do any checks here
and return a non-None failure message to indicate the error.
"""
pass
def run(self, coordinator_state=None):
"""
run(): Run the plugin, returning a PluginResult object
2023-09-15 16:51:04 -04:00
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
return self.plugin_result
def cleanup(self):
"""
cleanup(): Clean up after the plugin; run once during daemon shutdown
OPTIONAL
"""
pass
class MonitoringInstance(object):
def __init__(self, zkhandler, config, logger, this_node):
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.this_node = this_node
# Create functions for each fault type
def get_node_health_states():
node_entries = list()
for node in self.zkhandler.children("base.node"):
node_health = self.zkhandler.read(("node.monitoring.health", node))
node_faulty_plugins = list()
all_plugins = self.zkhandler.children(("node.monitoring.data", node))
for plugin in all_plugins:
plugin_delta = self.zkhandler.read(
(
"node.monitoring.data",
node,
"monitoring_plugin.health_delta",
plugin,
)
)
if int(plugin_delta) > 0:
node_faulty_plugins.append(f"{plugin}@-{plugin_delta}%")
node_entries.append(
(
f"{node} was at {node_health}% ({', '.join(node_faulty_plugins)})",
node_health,
)
)
return node_entries
def get_node_daemon_states():
return [
(node, self.zkhandler.read(("node.state.daemon", node)))
for node in self.zkhandler.children("base.node")
]
def get_osd_out_states():
return [
(osd, loads(self.zkhandler.read(("osd.stats", osd))).get("out", 0))
for osd in self.zkhandler.children("base.osd")
]
def get_ceph_health_entries():
return [
(value, key)
for key, value in loads(zkhandler.read("base.storage.health"))[
"checks"
].items()
]
def get_vm_states():
return [
(
self.zkhandler.read(("domain.name", domain)),
self.zkhandler.read(("domain.state", domain)),
)
for domain in self.zkhandler.children("base.domain")
]
def get_overprovisioned_memory():
all_nodes = self.zkhandler.children("base.node")
current_memory_provisioned = sum(
[
int(self.zkhandler.read(("node.memory.allocated", node)))
for node in all_nodes
]
)
node_memory_totals = [
int(self.zkhandler.read(("node.memory.total", node)))
for node in all_nodes
]
total_node_memory = sum(node_memory_totals)
most_node_memory = sorted(node_memory_totals)[-1]
available_node_memory = total_node_memory - most_node_memory
if current_memory_provisioned >= available_node_memory:
op_str = "overprovisioned"
else:
op_str = "ok"
return [
(
f"{current_memory_provisioned}MB > {available_node_memory}MB (N-1)",
op_str,
)
]
# This is a list of all possible faults (cluster error messages) and their corresponding details
self.cluster_faults_map = {
"unhealthy_node": {
"entries": get_node_health_states,
"conditions": range(50, 0, -1),
"delta": 0,
"message": "Node {entry} <= 50% health",
},
"dead_or_fenced_node": {
"entries": get_node_daemon_states,
"conditions": ["dead", "fenced"],
"delta": 50,
"message": "Node {entry} was dead and/or fenced",
},
"ceph_osd_out": {
"entries": get_osd_out_states,
"conditions": ["1"],
"delta": 25,
"message": "OSD {entry} was marked out",
},
"ceph_err": {
"entries": get_ceph_health_entries,
"conditions": ["HEALTH_ERR"],
"delta": 50,
"message": "HEALTH_ERR {entry} reported by Ceph",
},
"vm_failed": {
"entries": get_vm_states,
"conditions": ["fail"],
"delta": 10,
"message": "VM {entry} was failed",
},
"memory_overprovisioned": {
"entries": get_overprovisioned_memory,
"conditions": ["overprovisioned"],
"delta": 25,
"message": "Cluster memory was overprovisioned {entry}",
},
}
# Get a list of plugins from the plugin_directory
plugin_files = next(walk(self.config["plugin_directory"]), (None, None, []))[
2
] # [] if no file
self.all_plugins = list()
self.all_plugin_names = list()
2023-02-13 21:45:33 -05:00
successful_plugins = 0
# Load each plugin file into the all_plugins list
for plugin_file in sorted(plugin_files):
try:
self.logger.out(
f"Loading monitoring plugin from {self.config['plugin_directory']}/{plugin_file}",
state="i",
)
loader = importlib.machinery.SourceFileLoader(
"plugin_script", f"{self.config['plugin_directory']}/{plugin_file}"
)
spec = importlib.util.spec_from_loader(loader.name, loader)
plugin_script = importlib.util.module_from_spec(spec)
spec.loader.exec_module(plugin_script)
plugin = plugin_script.MonitoringPluginScript(
self.zkhandler,
self.config,
self.logger,
self.this_node,
plugin_script.PLUGIN_NAME,
)
2023-02-15 10:11:38 -05:00
failed_setup = plugin.setup()
if failed_setup is not None:
raise PluginError(f"{failed_setup}")
# Create plugin key
self.zkhandler.write(
[
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.name",
plugin.plugin_name,
),
plugin.plugin_name,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.last_run",
plugin.plugin_name,
),
"0",
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.health_delta",
plugin.plugin_name,
),
"0",
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.message",
plugin.plugin_name,
),
"Initializing",
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.data",
plugin.plugin_name,
),
2023-02-13 14:37:44 -05:00
dumps({}),
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.runtime",
plugin.plugin_name,
),
"0.00",
),
]
)
2023-02-13 21:45:33 -05:00
self.all_plugins.append(plugin)
self.all_plugin_names.append(plugin.plugin_name)
successful_plugins += 1
self.logger.out(
f"Successfully loaded monitoring plugin '{plugin.plugin_name}'",
state="o",
)
except Exception as e:
self.logger.out(
f"Failed to load monitoring plugin: {e}",
state="w",
)
self.zkhandler.write(
[
(
("node.monitoring.plugins", self.this_node.name),
2023-02-13 14:37:44 -05:00
" ".join(self.all_plugin_names),
),
]
)
2023-02-13 21:45:33 -05:00
if successful_plugins < 1:
2023-11-29 16:20:04 -05:00
self.logger.out(
"No plugins loaded; pvchealthd going into noop loop. Incorrect plugin directory? Fix and restart pvchealthd.",
state="e",
)
2023-02-13 21:45:33 -05:00
return
2023-11-29 16:20:04 -05:00
self.logger.out(
2023-11-29 16:22:31 -05:00
f'{self.logger.fmt_cyan}Plugin list:{self.logger.fmt_end} {" ".join(self.all_plugin_names)}',
state="s",
2023-11-29 16:20:04 -05:00
)
# Clean up any old plugin data for which a plugin file no longer exists
2023-09-02 01:36:17 -04:00
plugins_data = self.zkhandler.children(
("node.monitoring.data", self.this_node.name)
2023-09-02 01:36:17 -04:00
)
if plugins_data is not None:
for plugin_key in plugins_data:
if plugin_key not in self.all_plugin_names:
self.zkhandler.delete(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin",
plugin_key,
)
)
self.start_timer()
def __del__(self):
self.shutdown()
def shutdown(self):
self.stop_timer()
self.run_cleanups()
2023-11-29 16:08:13 -05:00
return
def start_timer(self):
check_interval = int(self.config["monitoring_interval"])
self.timer = BackgroundScheduler()
self.timer.add_job(
self.run_checks,
trigger="interval",
seconds=check_interval,
)
self.logger.out(
f"Starting monitoring check timer ({check_interval} second interval)",
state="s",
)
self.timer.start()
self.run_checks()
def stop_timer(self):
try:
self.logger.out("Stopping monitoring check timer", state="s")
self.timer.shutdown()
except Exception:
self.logger.out("Failed to stop monitoring check timer", state="w")
def generate_fault(self, fault_name, fault_time, fault_delta, fault_message):
# Generate a fault ID from the fault_message and fault_delta
fault_str = f"{fault_name} {fault_delta} {fault_message}"
fault_id = str(md5(fault_str.encode("utf-8")).hexdigest())
# If a fault already exists with this ID, just update the time
if not self.zkhandler.exists("base.faults"):
self.logger.out(
f"Skipping fault reporting for {fault_id} due to missing Zookeeper schemas",
state="w",
)
return
existing_faults = self.zkhandler.children("base.faults")
if fault_id in existing_faults:
self.logger.out(
f"Updating fault {fault_id}: {fault_message} @ {fault_time}", state="i"
)
else:
self.logger.out(
f"Generating fault {fault_id}: {fault_message} @ {fault_time}",
state="i",
)
if self.zkhandler.read("base.config.maintenance") == "true":
self.logger.out(
f"Skipping fault reporting for {fault_id} due to maintenance mode",
state="w",
)
return
if fault_id in existing_faults:
self.zkhandler.write(
[
(("faults.last_time", fault_id), str(fault_time)),
]
)
# Otherwise, generate a new fault event
else:
self.zkhandler.write(
[
(("faults.id", fault_id), ""),
(("faults.first_time", fault_id), str(fault_time)),
(("faults.last_time", fault_id), str(fault_time)),
(("faults.ack_time", fault_id), ""),
(("faults.status", fault_id), "new"),
(("faults.delta", fault_id), fault_delta),
(("faults.message", fault_id), fault_message),
]
)
def run_faults(self):
coordinator_state = self.this_node.coordinator_state
if coordinator_state == "primary":
cst_colour = self.logger.fmt_green
elif coordinator_state == "secondary":
cst_colour = self.logger.fmt_blue
else:
cst_colour = self.logger.fmt_cyan
if coordinator_state not in ["primary", "secondary", "takeover", "relinquish"]:
return
runtime_start = datetime.now()
self.logger.out(
"Starting monitoring fault check run",
state="t",
)
fault_count = 0
for fault_type in self.cluster_faults_map.keys():
fault_details = self.cluster_faults_map[fault_type]
entries = fault_details["entries"]()
for _entry in entries:
entry = _entry[0]
check = _entry[1]
for condition in fault_details["conditions"]:
if str(condition) == str(check):
fault_time = datetime.now()
fault_delta = fault_details["delta"]
fault_message = fault_details["message"].format(entry=entry)
fault_count += 1
self.generate_fault(
fault_type, fault_time, fault_delta, fault_message
)
runtime_end = datetime.now()
runtime_delta = runtime_end - runtime_start
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
if fault_count > 0:
fault_colour = self.logger.fmt_red
else:
fault_colour = self.logger.fmt_green
self.logger.out(
"{start_colour}{hostname} fault check @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {fault_colour}{fault_count} faults{nofmt} in {runtime} seconds".format(
start_colour=self.logger.fmt_purple,
cst_colour=self.logger.fmt_bold + cst_colour,
fault_colour=fault_colour,
nofmt=self.logger.fmt_end,
hostname=self.config["node_hostname"],
starttime=runtime_start,
costate=coordinator_state,
fault_count=fault_count,
runtime=runtime,
),
state="t",
)
def run_plugin(self, plugin):
time_start = datetime.now()
try:
result = plugin.run(coordinator_state=self.this_node.coordinator_state)
except Exception as e:
self.logger.out(
f"Monitoring plugin {plugin.plugin_name} failed: {type(e).__name__}: {e}",
state="e",
)
# Whatever it had, we try to return
return plugin.plugin_result
time_end = datetime.now()
time_delta = time_end - time_start
runtime = "{:0.02f}".format(time_delta.total_seconds())
result.set_runtime(runtime)
result.to_zookeeper()
return result
def run_plugins(self):
coordinator_state = self.this_node.coordinator_state
if coordinator_state == "primary":
cst_colour = self.logger.fmt_green
elif coordinator_state == "secondary":
cst_colour = self.logger.fmt_blue
else:
cst_colour = self.logger.fmt_cyan
runtime_start = datetime.now()
2023-11-29 16:24:28 -05:00
self.logger.out(
"Starting monitoring plugin check run",
2023-11-29 16:24:28 -05:00
state="t",
)
total_health = 100
plugin_results = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor:
to_future_plugin_results = {
executor.submit(self.run_plugin, plugin): plugin
for plugin in self.all_plugins
}
for future in concurrent.futures.as_completed(to_future_plugin_results):
plugin_results.append(future.result())
for result in sorted(plugin_results, key=lambda x: x.plugin_name):
2023-11-29 16:34:17 -05:00
if self.config["log_monitoring_details"]:
self.logger.out(
result.message + f" [-{result.health_delta}]",
state="t",
prefix=f"{result.plugin_name} ({result.runtime}s)",
)
# Leaving this code if we ever want plugins to directly generate faults
# if result.health_delta >= 25:
# fault_type = f"plugin.{self.this_node.name}.{result.plugin_name}"
# fault_time = datetime.now()
# fault_delta = result.health_delta
# fault_message = (
# f"{self.this_node.name} {result.plugin_name} {result.message}"
# )
# self.generate_fault(fault_type, fault_time, fault_delta, fault_message)
total_health -= result.health_delta
2023-02-15 16:49:12 -05:00
if total_health < 0:
total_health = 0
2023-02-13 14:37:44 -05:00
self.zkhandler.write(
[
(
("node.monitoring.health", self.this_node.name),
total_health,
),
]
)
runtime_end = datetime.now()
runtime_delta = runtime_end - runtime_start
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
time.sleep(0.2)
if isinstance(self.this_node.health, int):
if self.this_node.health > 90:
health_colour = self.logger.fmt_green
elif self.this_node.health > 50:
health_colour = self.logger.fmt_yellow
else:
health_colour = self.logger.fmt_red
health_text = str(self.this_node.health) + "%"
else:
health_colour = self.logger.fmt_blue
health_text = "N/A"
self.logger.out(
"{start_colour}{hostname} plugin check @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {health_colour}{health}{nofmt} in {runtime} seconds".format(
start_colour=self.logger.fmt_purple,
cst_colour=self.logger.fmt_bold + cst_colour,
health_colour=health_colour,
nofmt=self.logger.fmt_end,
hostname=self.config["node_hostname"],
starttime=runtime_start,
costate=coordinator_state,
health=health_text,
runtime=runtime,
),
state="t",
)
def run_cleanup(self, plugin):
return plugin.cleanup()
def run_cleanups(self):
with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor:
to_future_plugin_results = {
executor.submit(self.run_cleanup, plugin): plugin
for plugin in self.all_plugins
}
for future in concurrent.futures.as_completed(to_future_plugin_results):
# This doesn't do anything, just lets us wait for them all to complete
pass
# Set the node health to None as no previous checks are now valid
self.zkhandler.write(
[
(
("node.monitoring.health", self.this_node.name),
None,
),
]
)
def run_checks(self):
self.run_plugins()
self.run_faults()