diff --git a/daemon-common/zkhandler.py b/daemon-common/zkhandler.py index fc4975d1..51b250df 100644 --- a/daemon-common/zkhandler.py +++ b/daemon-common/zkhandler.py @@ -328,19 +328,19 @@ class ZKHandler(object): return True - def children(self, key, retval=None): + def children(self, key): """ Lists all children of a key """ try: path = self.get_schema_path(key) if path is None: - # This path is invalid; this is likely due to missing schema entries, so return None - return retval + raise NoNodeError return self.zk_conn.get_children(path) except NoNodeError: - return retval + # This path is invalid; this is likely due to missing schema entries, so return None + return None def rename(self, kkpairs): """ diff --git a/health-daemon/pvchealthd/objects/MonitoringInstance.py b/health-daemon/pvchealthd/objects/MonitoringInstance.py index 76b3d2b1..e14c52a9 100644 --- a/health-daemon/pvchealthd/objects/MonitoringInstance.py +++ b/health-daemon/pvchealthd/objects/MonitoringInstance.py @@ -25,7 +25,7 @@ import importlib.util from os import walk from datetime import datetime -from hashlib import sha1 +from hashlib import md5 from json import dumps, loads from apscheduler.schedulers.background import BackgroundScheduler @@ -199,6 +199,33 @@ class MonitoringInstance(object): self.this_node = this_node # Create functions for each fault type + def get_node_health_states(): + node_entries = list() + for node in self.zkhandler.children("base.node"): + node_health = self.zkhandler.read(("node.monitoring.health", node)) + node_faulty_plugins = list() + all_plugins = self.zkhandler.children(("node.monitoring.data", node)) + for plugin in all_plugins: + plugin_delta = self.zkhandler.read( + ( + "node.monitoring.data", + node, + "monitoring_plugin.health_delta", + plugin, + ) + ) + if int(plugin_delta) > 0: + node_faulty_plugins.append(f"{plugin}@-{plugin_delta}%") + + node_entries.append( + ( + f"{node} was at {node_health}% ({', '.join(node_faulty_plugins)})", + node_health, + ) + ) + + return node_entries + def get_node_daemon_states(): return [ (node, self.zkhandler.read(("node.state.daemon", node))) @@ -256,30 +283,36 @@ class MonitoringInstance(object): ] # This is a list of all possible faults (cluster error messages) and their corresponding details - self.faults_map = { + self.cluster_faults_map = { + "unhealthy_node": { + "entries": get_node_health_states, + "conditions": range(50, 0, -1), + "delta": 0, + "message": "Node {entry} <= 50% health", + }, "dead_or_fenced_node": { "entries": get_node_daemon_states, "conditions": ["dead", "fenced"], "delta": 50, - "message": "Node {entry} was dead and/or fenced.", + "message": "Node {entry} was dead and/or fenced", }, "ceph_osd_out": { "entries": get_osd_out_states, "conditions": ["1"], "delta": 25, - "message": "OSD {entry} was out.", + "message": "OSD {entry} was marked out", }, "ceph_err": { "entries": get_ceph_health_entries, "conditions": ["HEALTH_ERR"], "delta": 50, - "message": "Ceph cluster reported ERR: {entry}", + "message": "HEALTH_ERR {entry} reported by Ceph", }, "vm_failed": { "entries": get_vm_states, "conditions": ["fail"], "delta": 10, - "message": "VM {entry} was failed.", + "message": "VM {entry} was failed", }, "memory_overprovisioned": { "entries": get_overprovisioned_memory, @@ -462,8 +495,7 @@ class MonitoringInstance(object): ) self.timer.start() - self.run_faults() - self.run_plugins() + self.run_checks() def stop_timer(self): try: @@ -472,23 +504,25 @@ class MonitoringInstance(object): except Exception: self.logger.out("Failed to stop monitoring check timer", state="w") - def generate_fault(self, fault_time, fault_delta, fault_message): + def generate_fault(self, fault_name, fault_time, fault_delta, fault_message): # Generate a fault ID from the fault_message and fault_delta - fault_str = f"{fault_delta} {fault_message}" - fault_id = int(sha1(fault_str.encode("utf-8")).hexdigest(), 16) % (10**8) - - self.logger.out( - f"Generating fault {fault_id}: {fault_message} @ {fault_time}", state="i" - ) + fault_str = f"{fault_name} {fault_delta} {fault_message}" + fault_id = str(md5(fault_str.encode("utf-8")).hexdigest()) # If a fault already exists with this ID, just update the time if not self.zkhandler.exists("base.faults"): self.logger.out( - "Skipping fault reporting due to missing Zookeeper schemas", state="w" + "Skipping fault reporting for {fault_id} due to missing Zookeeper schemas", + state="w", ) return - if fault_id in self.zkhandler.children("base.faults", retval=[]): + existing_faults = self.zkhandler.children("base.faults") + if fault_id in existing_faults: + self.logger.out( + f"Updating fault {fault_id}: {fault_message} @ {fault_time}", state="i" + ) + self.zkhandler.write( [ (("faults.last_time", fault_id), str(fault_time)), @@ -496,6 +530,10 @@ class MonitoringInstance(object): ) # Otherwise, generate a new fault event else: + self.logger.out( + f"Generating fault {fault_id}: {fault_message} @ {fault_time}", + state="i", + ) self.zkhandler.write( [ (("faults.id", fault_id), ""), @@ -509,14 +547,17 @@ class MonitoringInstance(object): ) def run_faults(self): - if self.this_node.coordinator_state == "primary": + coordinator_state = self.this_node.coordinator_state + + if coordinator_state == "primary": cst_colour = self.logger.fmt_green - elif self.this_node.coordinator_state == "secondary": + elif coordinator_state == "secondary": cst_colour = self.logger.fmt_blue else: cst_colour = self.logger.fmt_cyan - active_coordinator_state = self.this_node.coordinator_state + if coordinator_state not in ["primary", "secondary", "takeover", "relinquish"]: + return runtime_start = datetime.now() self.logger.out( @@ -525,20 +566,22 @@ class MonitoringInstance(object): ) fault_count = 0 - for fault_type in self.faults_map.keys(): - fault_details = self.faults_map[fault_type] + for fault_type in self.cluster_faults_map.keys(): + fault_details = self.cluster_faults_map[fault_type] entries = fault_details["entries"]() for _entry in entries: entry = _entry[0] - detail = _entry[1] + check = _entry[1] for condition in fault_details["conditions"]: - if str(condition) in str(detail): + if str(condition) == str(check): fault_time = datetime.now() fault_delta = fault_details["delta"] fault_message = fault_details["message"].format(entry=entry) fault_count += 1 - self.generate_fault(fault_time, fault_delta, fault_message) + self.generate_fault( + fault_type, fault_time, fault_delta, fault_message + ) runtime_end = datetime.now() runtime_delta = runtime_end - runtime_start @@ -556,7 +599,7 @@ class MonitoringInstance(object): nofmt=self.logger.fmt_end, hostname=self.config["node_hostname"], starttime=runtime_start, - costate=active_coordinator_state, + costate=coordinator_state, fault_count=fault_count, runtime=runtime, ), @@ -582,15 +625,15 @@ class MonitoringInstance(object): return result def run_plugins(self): - if self.this_node.coordinator_state == "primary": + coordinator_state = self.this_node.coordinator_state + + if coordinator_state == "primary": cst_colour = self.logger.fmt_green - elif self.this_node.coordinator_state == "secondary": + elif coordinator_state == "secondary": cst_colour = self.logger.fmt_blue else: cst_colour = self.logger.fmt_cyan - active_coordinator_state = self.this_node.coordinator_state - runtime_start = datetime.now() self.logger.out( "Starting monitoring plugin check run", @@ -614,6 +657,15 @@ class MonitoringInstance(object): state="t", prefix=f"{result.plugin_name} ({result.runtime}s)", ) + # Leaving this code if we ever want plugins to directly generate faults + # if result.health_delta >= 25: + # fault_type = f"plugin.{self.this_node.name}.{result.plugin_name}" + # fault_time = datetime.now() + # fault_delta = result.health_delta + # fault_message = ( + # f"{self.this_node.name} {result.plugin_name} {result.message}" + # ) + # self.generate_fault(fault_type, fault_time, fault_delta, fault_message) total_health -= result.health_delta if total_health < 0: @@ -653,7 +705,7 @@ class MonitoringInstance(object): nofmt=self.logger.fmt_end, hostname=self.config["node_hostname"], starttime=runtime_start, - costate=active_coordinator_state, + costate=coordinator_state, health=health_text, runtime=runtime, ), @@ -683,5 +735,5 @@ class MonitoringInstance(object): ) def run_checks(self): - self.run_faults() self.run_plugins() + self.run_faults()