Add node health to fault states
Adjusts ordering and ensures that node health states are included in faults if they are less than 50%. Also adjusts fault ID generation and runs fault checks only coordinator nodes to avoid too many runs.
This commit is contained in:
		@@ -328,19 +328,19 @@ class ZKHandler(object):
 | 
			
		||||
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def children(self, key, retval=None):
 | 
			
		||||
    def children(self, key):
 | 
			
		||||
        """
 | 
			
		||||
        Lists all children of a key
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            path = self.get_schema_path(key)
 | 
			
		||||
            if path is None:
 | 
			
		||||
                # This path is invalid; this is likely due to missing schema entries, so return None
 | 
			
		||||
                return retval
 | 
			
		||||
                raise NoNodeError
 | 
			
		||||
 | 
			
		||||
            return self.zk_conn.get_children(path)
 | 
			
		||||
        except NoNodeError:
 | 
			
		||||
            return retval
 | 
			
		||||
            # This path is invalid; this is likely due to missing schema entries, so return None
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
    def rename(self, kkpairs):
 | 
			
		||||
        """
 | 
			
		||||
 
 | 
			
		||||
@@ -25,7 +25,7 @@ import importlib.util
 | 
			
		||||
 | 
			
		||||
from os import walk
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from hashlib import sha1
 | 
			
		||||
from hashlib import md5
 | 
			
		||||
from json import dumps, loads
 | 
			
		||||
from apscheduler.schedulers.background import BackgroundScheduler
 | 
			
		||||
 | 
			
		||||
@@ -199,6 +199,33 @@ class MonitoringInstance(object):
 | 
			
		||||
        self.this_node = this_node
 | 
			
		||||
 | 
			
		||||
        # Create functions for each fault type
 | 
			
		||||
        def get_node_health_states():
 | 
			
		||||
            node_entries = list()
 | 
			
		||||
            for node in self.zkhandler.children("base.node"):
 | 
			
		||||
                node_health = self.zkhandler.read(("node.monitoring.health", node))
 | 
			
		||||
                node_faulty_plugins = list()
 | 
			
		||||
                all_plugins = self.zkhandler.children(("node.monitoring.data", node))
 | 
			
		||||
                for plugin in all_plugins:
 | 
			
		||||
                    plugin_delta = self.zkhandler.read(
 | 
			
		||||
                        (
 | 
			
		||||
                            "node.monitoring.data",
 | 
			
		||||
                            node,
 | 
			
		||||
                            "monitoring_plugin.health_delta",
 | 
			
		||||
                            plugin,
 | 
			
		||||
                        )
 | 
			
		||||
                    )
 | 
			
		||||
                    if int(plugin_delta) > 0:
 | 
			
		||||
                        node_faulty_plugins.append(f"{plugin}@-{plugin_delta}%")
 | 
			
		||||
 | 
			
		||||
                node_entries.append(
 | 
			
		||||
                    (
 | 
			
		||||
                        f"{node} was at {node_health}% ({', '.join(node_faulty_plugins)})",
 | 
			
		||||
                        node_health,
 | 
			
		||||
                    )
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            return node_entries
 | 
			
		||||
 | 
			
		||||
        def get_node_daemon_states():
 | 
			
		||||
            return [
 | 
			
		||||
                (node, self.zkhandler.read(("node.state.daemon", node)))
 | 
			
		||||
@@ -256,30 +283,36 @@ class MonitoringInstance(object):
 | 
			
		||||
            ]
 | 
			
		||||
 | 
			
		||||
        # This is a list of all possible faults (cluster error messages) and their corresponding details
 | 
			
		||||
        self.faults_map = {
 | 
			
		||||
        self.cluster_faults_map = {
 | 
			
		||||
            "unhealthy_node": {
 | 
			
		||||
                "entries": get_node_health_states,
 | 
			
		||||
                "conditions": range(50, 0, -1),
 | 
			
		||||
                "delta": 0,
 | 
			
		||||
                "message": "Node {entry} <= 50% health",
 | 
			
		||||
            },
 | 
			
		||||
            "dead_or_fenced_node": {
 | 
			
		||||
                "entries": get_node_daemon_states,
 | 
			
		||||
                "conditions": ["dead", "fenced"],
 | 
			
		||||
                "delta": 50,
 | 
			
		||||
                "message": "Node {entry} was dead and/or fenced.",
 | 
			
		||||
                "message": "Node {entry} was dead and/or fenced",
 | 
			
		||||
            },
 | 
			
		||||
            "ceph_osd_out": {
 | 
			
		||||
                "entries": get_osd_out_states,
 | 
			
		||||
                "conditions": ["1"],
 | 
			
		||||
                "delta": 25,
 | 
			
		||||
                "message": "OSD {entry} was out.",
 | 
			
		||||
                "message": "OSD {entry} was marked out",
 | 
			
		||||
            },
 | 
			
		||||
            "ceph_err": {
 | 
			
		||||
                "entries": get_ceph_health_entries,
 | 
			
		||||
                "conditions": ["HEALTH_ERR"],
 | 
			
		||||
                "delta": 50,
 | 
			
		||||
                "message": "Ceph cluster reported ERR: {entry}",
 | 
			
		||||
                "message": "HEALTH_ERR {entry} reported by Ceph",
 | 
			
		||||
            },
 | 
			
		||||
            "vm_failed": {
 | 
			
		||||
                "entries": get_vm_states,
 | 
			
		||||
                "conditions": ["fail"],
 | 
			
		||||
                "delta": 10,
 | 
			
		||||
                "message": "VM {entry} was failed.",
 | 
			
		||||
                "message": "VM {entry} was failed",
 | 
			
		||||
            },
 | 
			
		||||
            "memory_overprovisioned": {
 | 
			
		||||
                "entries": get_overprovisioned_memory,
 | 
			
		||||
@@ -462,8 +495,7 @@ class MonitoringInstance(object):
 | 
			
		||||
        )
 | 
			
		||||
        self.timer.start()
 | 
			
		||||
 | 
			
		||||
        self.run_faults()
 | 
			
		||||
        self.run_plugins()
 | 
			
		||||
        self.run_checks()
 | 
			
		||||
 | 
			
		||||
    def stop_timer(self):
 | 
			
		||||
        try:
 | 
			
		||||
@@ -472,23 +504,25 @@ class MonitoringInstance(object):
 | 
			
		||||
        except Exception:
 | 
			
		||||
            self.logger.out("Failed to stop monitoring check timer", state="w")
 | 
			
		||||
 | 
			
		||||
    def generate_fault(self, fault_time, fault_delta, fault_message):
 | 
			
		||||
    def generate_fault(self, fault_name, fault_time, fault_delta, fault_message):
 | 
			
		||||
        # Generate a fault ID from the fault_message and fault_delta
 | 
			
		||||
        fault_str = f"{fault_delta} {fault_message}"
 | 
			
		||||
        fault_id = int(sha1(fault_str.encode("utf-8")).hexdigest(), 16) % (10**8)
 | 
			
		||||
 | 
			
		||||
        self.logger.out(
 | 
			
		||||
            f"Generating fault {fault_id}: {fault_message} @ {fault_time}", state="i"
 | 
			
		||||
        )
 | 
			
		||||
        fault_str = f"{fault_name} {fault_delta} {fault_message}"
 | 
			
		||||
        fault_id = str(md5(fault_str.encode("utf-8")).hexdigest())
 | 
			
		||||
 | 
			
		||||
        # If a fault already exists with this ID, just update the time
 | 
			
		||||
        if not self.zkhandler.exists("base.faults"):
 | 
			
		||||
            self.logger.out(
 | 
			
		||||
                "Skipping fault reporting due to missing Zookeeper schemas", state="w"
 | 
			
		||||
                "Skipping fault reporting for {fault_id} due to missing Zookeeper schemas",
 | 
			
		||||
                state="w",
 | 
			
		||||
            )
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        if fault_id in self.zkhandler.children("base.faults", retval=[]):
 | 
			
		||||
        existing_faults = self.zkhandler.children("base.faults")
 | 
			
		||||
        if fault_id in existing_faults:
 | 
			
		||||
            self.logger.out(
 | 
			
		||||
                f"Updating fault {fault_id}: {fault_message} @ {fault_time}", state="i"
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            self.zkhandler.write(
 | 
			
		||||
                [
 | 
			
		||||
                    (("faults.last_time", fault_id), str(fault_time)),
 | 
			
		||||
@@ -496,6 +530,10 @@ class MonitoringInstance(object):
 | 
			
		||||
            )
 | 
			
		||||
        # Otherwise, generate a new fault event
 | 
			
		||||
        else:
 | 
			
		||||
            self.logger.out(
 | 
			
		||||
                f"Generating fault {fault_id}: {fault_message} @ {fault_time}",
 | 
			
		||||
                state="i",
 | 
			
		||||
            )
 | 
			
		||||
            self.zkhandler.write(
 | 
			
		||||
                [
 | 
			
		||||
                    (("faults.id", fault_id), ""),
 | 
			
		||||
@@ -509,14 +547,17 @@ class MonitoringInstance(object):
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def run_faults(self):
 | 
			
		||||
        if self.this_node.coordinator_state == "primary":
 | 
			
		||||
        coordinator_state = self.this_node.coordinator_state
 | 
			
		||||
 | 
			
		||||
        if coordinator_state == "primary":
 | 
			
		||||
            cst_colour = self.logger.fmt_green
 | 
			
		||||
        elif self.this_node.coordinator_state == "secondary":
 | 
			
		||||
        elif coordinator_state == "secondary":
 | 
			
		||||
            cst_colour = self.logger.fmt_blue
 | 
			
		||||
        else:
 | 
			
		||||
            cst_colour = self.logger.fmt_cyan
 | 
			
		||||
 | 
			
		||||
        active_coordinator_state = self.this_node.coordinator_state
 | 
			
		||||
        if coordinator_state not in ["primary", "secondary", "takeover", "relinquish"]:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        runtime_start = datetime.now()
 | 
			
		||||
        self.logger.out(
 | 
			
		||||
@@ -525,20 +566,22 @@ class MonitoringInstance(object):
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        fault_count = 0
 | 
			
		||||
        for fault_type in self.faults_map.keys():
 | 
			
		||||
            fault_details = self.faults_map[fault_type]
 | 
			
		||||
        for fault_type in self.cluster_faults_map.keys():
 | 
			
		||||
            fault_details = self.cluster_faults_map[fault_type]
 | 
			
		||||
 | 
			
		||||
            entries = fault_details["entries"]()
 | 
			
		||||
            for _entry in entries:
 | 
			
		||||
                entry = _entry[0]
 | 
			
		||||
                detail = _entry[1]
 | 
			
		||||
                check = _entry[1]
 | 
			
		||||
                for condition in fault_details["conditions"]:
 | 
			
		||||
                    if str(condition) in str(detail):
 | 
			
		||||
                    if str(condition) == str(check):
 | 
			
		||||
                        fault_time = datetime.now()
 | 
			
		||||
                        fault_delta = fault_details["delta"]
 | 
			
		||||
                        fault_message = fault_details["message"].format(entry=entry)
 | 
			
		||||
                        fault_count += 1
 | 
			
		||||
                        self.generate_fault(fault_time, fault_delta, fault_message)
 | 
			
		||||
                        self.generate_fault(
 | 
			
		||||
                            fault_type, fault_time, fault_delta, fault_message
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
        runtime_end = datetime.now()
 | 
			
		||||
        runtime_delta = runtime_end - runtime_start
 | 
			
		||||
@@ -556,7 +599,7 @@ class MonitoringInstance(object):
 | 
			
		||||
                nofmt=self.logger.fmt_end,
 | 
			
		||||
                hostname=self.config["node_hostname"],
 | 
			
		||||
                starttime=runtime_start,
 | 
			
		||||
                costate=active_coordinator_state,
 | 
			
		||||
                costate=coordinator_state,
 | 
			
		||||
                fault_count=fault_count,
 | 
			
		||||
                runtime=runtime,
 | 
			
		||||
            ),
 | 
			
		||||
@@ -582,15 +625,15 @@ class MonitoringInstance(object):
 | 
			
		||||
        return result
 | 
			
		||||
 | 
			
		||||
    def run_plugins(self):
 | 
			
		||||
        if self.this_node.coordinator_state == "primary":
 | 
			
		||||
        coordinator_state = self.this_node.coordinator_state
 | 
			
		||||
 | 
			
		||||
        if coordinator_state == "primary":
 | 
			
		||||
            cst_colour = self.logger.fmt_green
 | 
			
		||||
        elif self.this_node.coordinator_state == "secondary":
 | 
			
		||||
        elif coordinator_state == "secondary":
 | 
			
		||||
            cst_colour = self.logger.fmt_blue
 | 
			
		||||
        else:
 | 
			
		||||
            cst_colour = self.logger.fmt_cyan
 | 
			
		||||
 | 
			
		||||
        active_coordinator_state = self.this_node.coordinator_state
 | 
			
		||||
 | 
			
		||||
        runtime_start = datetime.now()
 | 
			
		||||
        self.logger.out(
 | 
			
		||||
            "Starting monitoring plugin check run",
 | 
			
		||||
@@ -614,6 +657,15 @@ class MonitoringInstance(object):
 | 
			
		||||
                    state="t",
 | 
			
		||||
                    prefix=f"{result.plugin_name} ({result.runtime}s)",
 | 
			
		||||
                )
 | 
			
		||||
            # Leaving this code if we ever want plugins to directly generate faults
 | 
			
		||||
            # if result.health_delta >= 25:
 | 
			
		||||
            #    fault_type = f"plugin.{self.this_node.name}.{result.plugin_name}"
 | 
			
		||||
            #    fault_time = datetime.now()
 | 
			
		||||
            #    fault_delta = result.health_delta
 | 
			
		||||
            #    fault_message = (
 | 
			
		||||
            #        f"{self.this_node.name} {result.plugin_name} {result.message}"
 | 
			
		||||
            #    )
 | 
			
		||||
            #    self.generate_fault(fault_type, fault_time, fault_delta, fault_message)
 | 
			
		||||
            total_health -= result.health_delta
 | 
			
		||||
 | 
			
		||||
        if total_health < 0:
 | 
			
		||||
@@ -653,7 +705,7 @@ class MonitoringInstance(object):
 | 
			
		||||
                nofmt=self.logger.fmt_end,
 | 
			
		||||
                hostname=self.config["node_hostname"],
 | 
			
		||||
                starttime=runtime_start,
 | 
			
		||||
                costate=active_coordinator_state,
 | 
			
		||||
                costate=coordinator_state,
 | 
			
		||||
                health=health_text,
 | 
			
		||||
                runtime=runtime,
 | 
			
		||||
            ),
 | 
			
		||||
@@ -683,5 +735,5 @@ class MonitoringInstance(object):
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def run_checks(self):
 | 
			
		||||
        self.run_faults()
 | 
			
		||||
        self.run_plugins()
 | 
			
		||||
        self.run_faults()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user