pvc/health-daemon/pvchealthd/objects/MonitoringInstance.py

791 lines
28 KiB
Python

#!/usr/bin/env python3
# MonitoringInstance.py - Class implementing a PVC monitor in pvchealthd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import concurrent.futures
import time
import importlib.util
from os import walk
from datetime import datetime
from hashlib import md5
from json import dumps, loads
from apscheduler.schedulers.background import BackgroundScheduler
class PluginError(Exception):
"""
An exception that results from a plugin failing setup
"""
pass
class PluginResult(object):
def __init__(self, zkhandler, config, logger, this_node, plugin_name):
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.this_node = this_node
self.plugin_name = plugin_name
self.current_time = int(time.time())
self.health_delta = 0
self.message = "N/A"
self.data = {}
self.runtime = "0.00"
def set_health_delta(self, new_delta):
self.health_delta = new_delta
def set_message(self, new_message):
self.message = new_message
def set_data(self, new_data):
self.data = new_data
def set_runtime(self, new_runtime):
self.runtime = new_runtime
def to_zookeeper(self):
self.zkhandler.write(
[
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.name",
self.plugin_name,
),
self.plugin_name,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.last_run",
self.plugin_name,
),
self.current_time,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.health_delta",
self.plugin_name,
),
self.health_delta,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.message",
self.plugin_name,
),
self.message,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.data",
self.plugin_name,
),
dumps(self.data),
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.runtime",
self.plugin_name,
),
self.runtime,
),
]
)
class MonitoringPlugin(object):
def __init__(self, zkhandler, config, logger, this_node, plugin_name):
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.this_node = this_node
self.plugin_name = plugin_name
self.plugin_result = PluginResult(
self.zkhandler,
self.config,
self.logger,
self.this_node,
self.plugin_name,
)
def __str__(self):
return self.plugin_name
#
# Helper functions; exposed to child MonitoringPluginScript instances
#
def log(self, message, state="d"):
"""
Log a message to the PVC logger instance using the plugin name as a prefix
Takes "state" values as defined by the PVC logger instance, defaulting to debug:
"d": debug
"i": informational
"t": tick/keepalive
"w": warning
"e": error
"""
if state == "d" and not self.config["debug"]:
return
self.logger.out(message, state=state, prefix=self.plugin_name)
#
# Primary class functions; implemented by the individual plugins
#
def setup(self):
"""
setup(): Perform setup of the plugin; run once during daemon startup
This step is optional and should be used sparingly.
If you wish for the plugin to not load in certain conditions, do any checks here
and return a non-None failure message to indicate the error.
"""
pass
def run(self, coordinator_state=None):
"""
run(): Run the plugin, returning a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
return self.plugin_result
def cleanup(self):
"""
cleanup(): Clean up after the plugin; run once during daemon shutdown
OPTIONAL
"""
pass
class MonitoringInstance(object):
def __init__(self, zkhandler, config, logger, this_node):
self.zkhandler = zkhandler
self.config = config
self.logger = logger
self.this_node = this_node
# Create functions for each fault type
def get_node_health_states():
node_health_states = list()
for node in self.zkhandler.children("base.node"):
node_health = self.zkhandler.read(("node.monitoring.health", node))
node_faulty_plugins = list()
all_plugins = self.zkhandler.children(("node.monitoring.data", node))
for plugin in all_plugins:
plugin_delta = self.zkhandler.read(
(
"node.monitoring.data",
node,
"monitoring_plugin.health_delta",
plugin,
)
)
if int(plugin_delta) > 0:
node_faulty_plugins.append(f"{plugin}@-{plugin_delta}%")
node_health_states.append(
{
"entry": f"{node} was at {node_health}% ({', '.join(node_faulty_plugins)})",
"check": node_health,
"details": "",
}
)
return node_health_states
def get_node_daemon_states():
node_daemon_states = [
{
"entry": node,
"check": self.zkhandler.read(("node.state.daemon", node)),
"details": "",
}
for node in self.zkhandler.children("base.node")
]
return node_daemon_states
def get_osd_in_states():
osd_in_states = [
{
"entry": osd,
"check": loads(self.zkhandler.read(("osd.stats", osd))).get(
"in", 0
),
"details": "",
}
for osd in self.zkhandler.children("base.osd")
]
return osd_in_states
def get_ceph_health_entries():
ceph_health_entries = [
{
"entry": f"{value['severity']} {key}",
"check": value["severity"],
"details": value["summary"]["message"],
}
for key, value in loads(zkhandler.read("base.storage.health"))[
"checks"
].items()
]
return ceph_health_entries
def get_vm_states():
vm_states = [
{
"entry": self.zkhandler.read(("domain.name", domain)),
"check": self.zkhandler.read(("domain.state", domain)),
"details": self.zkhandler.read(("domain.failed_reason", domain)),
}
for domain in self.zkhandler.children("base.domain")
]
return vm_states
def get_overprovisioned_memory():
all_nodes = self.zkhandler.children("base.node")
current_memory_provisioned = sum(
[
int(self.zkhandler.read(("node.memory.allocated", node)))
for node in all_nodes
]
)
node_memory_totals = [
int(self.zkhandler.read(("node.memory.total", node)))
for node in all_nodes
]
total_node_memory = sum(node_memory_totals)
most_node_memory = sorted(node_memory_totals)[-1]
available_node_memory = total_node_memory - most_node_memory
if current_memory_provisioned >= available_node_memory:
op_str = "overprovisioned"
else:
op_str = "ok"
overprovisioned_memory = [
{
"entry": f"{current_memory_provisioned}MB > {available_node_memory}MB (N-1)",
"check": op_str,
"details": "",
}
]
return overprovisioned_memory
# This is a list of all possible faults (cluster error messages) and their corresponding details
self.cluster_faults_map = {
"unhealthy_node": {
"entries": get_node_health_states,
"conditions": range(50, 0, -1),
"delta": 0,
"message": "Node {entry} <= 50% health",
},
"dead_or_fenced_node": {
"entries": get_node_daemon_states,
"conditions": ["dead", "fenced"],
"delta": 50,
"message": "Node {entry} was dead and/or fenced",
},
"ceph_osd_out": {
"entries": get_osd_in_states,
"conditions": ["0"],
"delta": 25,
"message": "OSD {entry} was marked out",
},
"ceph_err": {
"entries": get_ceph_health_entries,
"conditions": ["HEALTH_ERR", "HEALTH_WARN"],
"delta": 50,
"message": "{entry} reported by Ceph ({details})",
},
"vm_failed": {
"entries": get_vm_states,
"conditions": ["fail"],
"delta": 10,
"message": "VM {entry} was failed ({details})",
},
"memory_overprovisioned": {
"entries": get_overprovisioned_memory,
"conditions": ["overprovisioned"],
"delta": 25,
"message": "Cluster memory was overprovisioned {entry}",
},
}
# Get a list of plugins from the plugin_directory
plugin_files = next(walk(self.config["plugin_directory"]), (None, None, []))[
2
] # [] if no file
self.all_plugins = list()
self.all_plugin_names = list()
successful_plugins = 0
# Load each plugin file into the all_plugins list
for plugin_file in sorted(plugin_files):
try:
self.logger.out(
f"Loading monitoring plugin from {self.config['plugin_directory']}/{plugin_file}",
state="i",
)
loader = importlib.machinery.SourceFileLoader(
"plugin_script", f"{self.config['plugin_directory']}/{plugin_file}"
)
spec = importlib.util.spec_from_loader(loader.name, loader)
plugin_script = importlib.util.module_from_spec(spec)
spec.loader.exec_module(plugin_script)
plugin = plugin_script.MonitoringPluginScript(
self.zkhandler,
self.config,
self.logger,
self.this_node,
plugin_script.PLUGIN_NAME,
)
failed_setup = plugin.setup()
if failed_setup is not None:
raise PluginError(f"{failed_setup}")
# Create plugin key
self.zkhandler.write(
[
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.name",
plugin.plugin_name,
),
plugin.plugin_name,
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.last_run",
plugin.plugin_name,
),
"0",
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.health_delta",
plugin.plugin_name,
),
"0",
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.message",
plugin.plugin_name,
),
"Initializing",
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.data",
plugin.plugin_name,
),
dumps({}),
),
(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin.runtime",
plugin.plugin_name,
),
"0.00",
),
]
)
self.all_plugins.append(plugin)
self.all_plugin_names.append(plugin.plugin_name)
successful_plugins += 1
self.logger.out(
f"Successfully loaded monitoring plugin '{plugin.plugin_name}'",
state="o",
)
except Exception as e:
self.logger.out(
f"Failed to load monitoring plugin: {e}",
state="w",
)
self.zkhandler.write(
[
(
("node.monitoring.plugins", self.this_node.name),
" ".join(self.all_plugin_names),
),
]
)
if successful_plugins < 1:
self.logger.out(
"No plugins loaded; pvchealthd going into noop loop. Incorrect plugin directory? Fix and restart pvchealthd.",
state="e",
)
return
self.logger.out(
f'{self.logger.fmt_cyan}Plugin list:{self.logger.fmt_end} {" ".join(self.all_plugin_names)}',
state="s",
)
# Clean up any old plugin data for which a plugin file no longer exists
plugins_data = self.zkhandler.children(
("node.monitoring.data", self.this_node.name)
)
if plugins_data is not None:
for plugin_key in plugins_data:
if plugin_key not in self.all_plugin_names:
self.zkhandler.delete(
(
"node.monitoring.data",
self.this_node.name,
"monitoring_plugin",
plugin_key,
)
)
self.start_timer()
def __del__(self):
self.shutdown()
def shutdown(self):
self.stop_timer()
self.run_cleanups()
return
def start_timer(self):
check_interval = int(self.config["monitoring_interval"])
self.timer = BackgroundScheduler()
self.timer.add_job(
self.run_checks,
trigger="interval",
seconds=check_interval,
)
self.logger.out(
f"Starting monitoring check timer ({check_interval} second interval)",
state="s",
)
self.timer.start()
self.run_checks()
def stop_timer(self):
try:
self.logger.out("Stopping monitoring check timer", state="s")
self.timer.shutdown()
except Exception:
self.logger.out("Failed to stop monitoring check timer", state="w")
def generate_fault(self, fault_name, fault_time, fault_delta, fault_message):
# Generate a fault ID from the fault_message and fault_delta
fault_str = f"{fault_name} {fault_delta} {fault_message}"
fault_id = str(md5(fault_str.encode("utf-8")).hexdigest())[:8]
# If a fault already exists with this ID, just update the time
if not self.zkhandler.exists("base.faults"):
self.logger.out(
f"Skipping fault reporting for {fault_id} due to missing Zookeeper schemas",
state="w",
)
return
existing_faults = self.zkhandler.children("base.faults")
if fault_id in existing_faults:
self.logger.out(
f"Updating fault {fault_id}: {fault_message} @ {fault_time}", state="i"
)
else:
self.logger.out(
f"Generating fault {fault_id}: {fault_message} @ {fault_time}",
state="i",
)
if self.zkhandler.read("base.config.maintenance") == "true":
self.logger.out(
f"Skipping fault reporting for {fault_id} due to maintenance mode",
state="w",
)
return
if fault_id in existing_faults:
self.zkhandler.write(
[
(("faults.last_time", fault_id), str(fault_time)),
]
)
# Otherwise, generate a new fault event
else:
self.zkhandler.write(
[
(("faults.id", fault_id), ""),
(("faults.first_time", fault_id), str(fault_time)),
(("faults.last_time", fault_id), str(fault_time)),
(("faults.ack_time", fault_id), ""),
(("faults.status", fault_id), "new"),
(("faults.delta", fault_id), fault_delta),
(("faults.message", fault_id), fault_message),
]
)
def run_faults(self):
coordinator_state = self.this_node.coordinator_state
if coordinator_state == "primary":
cst_colour = self.logger.fmt_green
elif coordinator_state == "secondary":
cst_colour = self.logger.fmt_blue
else:
cst_colour = self.logger.fmt_cyan
if coordinator_state not in ["primary", "secondary", "takeover", "relinquish"]:
return
runtime_start = datetime.now()
self.logger.out(
"Starting monitoring fault check run",
state="t",
)
fault_count = 0
for fault_type in self.cluster_faults_map.keys():
fault_details = self.cluster_faults_map[fault_type]
if self.config["log_monitoring_details"] or self.config["debug"]:
self.logger.out(
f"Running fault check {fault_type}",
state="t",
)
entries = fault_details["entries"]()
if self.config["debug"]:
self.logger.out(
f"Entries for fault check {fault_type}:",
state="d",
)
for line in dumps(entries, indent=2).split("\n"):
self.logger.out(
line,
state="d",
)
for _entry in entries:
entry = _entry["entry"]
check = _entry["check"]
details = _entry["details"]
for condition in fault_details["conditions"]:
if str(condition) == str(check):
fault_time = datetime.now()
fault_delta = fault_details["delta"]
fault_message = fault_details["message"].format(
entry=entry, details=details
)
fault_count += 1
self.generate_fault(
fault_type, fault_time, fault_delta, fault_message
)
runtime_end = datetime.now()
runtime_delta = runtime_end - runtime_start
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
if fault_count > 0:
fault_colour = self.logger.fmt_red
else:
fault_colour = self.logger.fmt_green
self.logger.out(
"{start_colour}{hostname} fault check @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {fault_colour}{fault_count} faults{nofmt} in {runtime} seconds".format(
start_colour=self.logger.fmt_purple,
cst_colour=self.logger.fmt_bold + cst_colour,
fault_colour=fault_colour,
nofmt=self.logger.fmt_end,
hostname=self.config["node_hostname"],
starttime=runtime_start,
costate=coordinator_state,
fault_count=fault_count,
runtime=runtime,
),
state="t",
)
def run_plugin(self, plugin):
time_start = datetime.now()
try:
result = plugin.run(coordinator_state=self.this_node.coordinator_state)
except Exception as e:
self.logger.out(
f"Monitoring plugin {plugin.plugin_name} failed: {type(e).__name__}: {e}",
state="e",
)
# Whatever it had, we try to return
return plugin.plugin_result
time_end = datetime.now()
time_delta = time_end - time_start
runtime = "{:0.02f}".format(time_delta.total_seconds())
result.set_runtime(runtime)
result.to_zookeeper()
return result
def run_plugins(self):
coordinator_state = self.this_node.coordinator_state
if coordinator_state == "primary":
cst_colour = self.logger.fmt_green
elif coordinator_state == "secondary":
cst_colour = self.logger.fmt_blue
else:
cst_colour = self.logger.fmt_cyan
runtime_start = datetime.now()
self.logger.out(
"Starting monitoring plugin check run",
state="t",
)
total_health = 100
plugin_results = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor:
to_future_plugin_results = {
executor.submit(self.run_plugin, plugin): plugin
for plugin in self.all_plugins
}
for future in concurrent.futures.as_completed(to_future_plugin_results):
plugin_results.append(future.result())
for result in sorted(plugin_results, key=lambda x: x.plugin_name):
if self.config["log_monitoring_details"]:
self.logger.out(
result.message + f" [-{result.health_delta}]",
state="t",
prefix=f"{result.plugin_name} ({result.runtime}s)",
)
# Leaving this code if we ever want plugins to directly generate faults
# if result.health_delta >= 25:
# fault_type = f"plugin.{self.this_node.name}.{result.plugin_name}"
# fault_time = datetime.now()
# fault_delta = result.health_delta
# fault_message = (
# f"{self.this_node.name} {result.plugin_name} {result.message}"
# )
# self.generate_fault(fault_type, fault_time, fault_delta, fault_message)
total_health -= result.health_delta
if total_health < 0:
total_health = 0
self.zkhandler.write(
[
(
("node.monitoring.health", self.this_node.name),
total_health,
),
]
)
runtime_end = datetime.now()
runtime_delta = runtime_end - runtime_start
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
time.sleep(0.2)
if isinstance(self.this_node.health, int):
if self.this_node.health > 90:
health_colour = self.logger.fmt_green
elif self.this_node.health > 50:
health_colour = self.logger.fmt_yellow
else:
health_colour = self.logger.fmt_red
health_text = str(self.this_node.health) + "%"
else:
health_colour = self.logger.fmt_blue
health_text = "N/A"
self.logger.out(
"{start_colour}{hostname} plugin check @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {health_colour}{health}{nofmt} in {runtime} seconds".format(
start_colour=self.logger.fmt_purple,
cst_colour=self.logger.fmt_bold + cst_colour,
health_colour=health_colour,
nofmt=self.logger.fmt_end,
hostname=self.config["node_hostname"],
starttime=runtime_start,
costate=coordinator_state,
health=health_text,
runtime=runtime,
),
state="t",
)
def run_cleanup(self, plugin):
return plugin.cleanup()
def run_cleanups(self):
with concurrent.futures.ThreadPoolExecutor(max_workers=99) as executor:
to_future_plugin_results = {
executor.submit(self.run_cleanup, plugin): plugin
for plugin in self.all_plugins
}
for future in concurrent.futures.as_completed(to_future_plugin_results):
# This doesn't do anything, just lets us wait for them all to complete
pass
# Set the node health to None as no previous checks are now valid
self.zkhandler.write(
[
(
("node.monitoring.health", self.this_node.name),
None,
),
]
)
def run_checks(self):
self.run_plugins()
self.run_faults()