diff --git a/daemon-common/migrations/versions/11.json b/daemon-common/migrations/versions/11.json new file mode 100644 index 00000000..42ff801d --- /dev/null +++ b/daemon-common/migrations/versions/11.json @@ -0,0 +1 @@ +{"version": "11", "root": "", "base": {"root": "", "schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.primary_node.sync_lock": "/config/primary_node/sync_lock", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "logs": "/logs", "faults": "/faults", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.health": "/ceph/health", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "logs": {"node": "", "messages": "/messages"}, "faults": {"id": "", "last_time": "/last_time", "first_time": "/first_time", "ack_time": "/ack_time", "status": "/status", "delta": "/delta", "message": "/message"}, "node": {"name": "", "keepalive": "/keepalive", "mode": "/daemonmode", "data.active_schema": "/activeschema", "data.latest_schema": "/latestschema", "data.static": "/staticdata", "data.pvc_version": "/pvcversion", "running_domains": "/runningdomains", "count.provisioned_domains": "/domainscount", "count.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "cpu.load": "/cpuload", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword", "sriov": "/sriov", "sriov.pf": "/sriov/pf", "sriov.vf": "/sriov/vf", "monitoring.plugins": "/monitoring_plugins", "monitoring.data": "/monitoring_data", "monitoring.health": "/monitoring_health"}, "monitoring_plugin": {"name": "", "last_run": "/last_run", "health_delta": "/health_delta", "message": "/message", "data": "/data", "runtime": "/runtime"}, "sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, "sriov_vf": {"phy": "", "pf": "/pf", "mtu": "/mtu", "mac": "/mac", "phy_mac": "/phy_mac", "config": "/config", "config.vlan_id": "/config/vlan_id", "config.vlan_qos": "/config/vlan_qos", "config.tx_rate_min": "/config/tx_rate_min", "config.tx_rate_max": "/config/tx_rate_max", "config.spoof_check": "/config/spoof_check", "config.link_state": "/config/link_state", "config.trust": "/config/trust", "config.query_rss": "/config/query_rss", "pci": "/pci", "pci.domain": "/pci/domain", "pci.bus": "/pci/bus", "pci.slot": "/pci/slot", "pci.function": "/pci/function", "used": "/used", "used_by": "/used_by"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "storage.volumes": "/rbdlist", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit", "meta.tags": "/tags", "migrate.sync_lock": "/migrate_sync_lock"}, "tag": {"name": "", "type": "/type", "protected": "/protected"}, "network": {"vni": "", "type": "/nettype", "mtu": "/mtu", "rule": "/firewall_rules", "rule.in": "/firewall_rules/in", "rule.out": "/firewall_rules/out", "nameservers": "/name_servers", "domain": "/domain", "reservation": "/dhcp4_reservations", "lease": "/dhcp4_leases", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "reservation": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname"}, "lease": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname", "expiry": "/expiry", "client_id": "/clientid"}, "rule": {"description": "", "rule": "/rule", "order": "/order"}, "osd": {"id": "", "node": "/node", "device": "/device", "db_device": "/db_device", "fsid": "/fsid", "ofsid": "/fsid/osd", "cfsid": "/fsid/cluster", "lvm": "/lvm", "vg": "/lvm/vg", "lv": "/lvm/lv", "is_split": "/is_split", "stats": "/stats"}, "pool": {"name": "", "pgs": "/pgs", "tier": "/tier", "stats": "/stats"}, "volume": {"name": "", "stats": "/stats"}, "snapshot": {"name": "", "stats": "/stats"}} \ No newline at end of file diff --git a/daemon-common/zkhandler.py b/daemon-common/zkhandler.py index 76ff5557..fc4975d1 100644 --- a/daemon-common/zkhandler.py +++ b/daemon-common/zkhandler.py @@ -328,7 +328,7 @@ class ZKHandler(object): return True - def children(self, key): + def children(self, key, retval=None): """ Lists all children of a key """ @@ -336,11 +336,11 @@ class ZKHandler(object): path = self.get_schema_path(key) if path is None: # This path is invalid; this is likely due to missing schema entries, so return None - return None + return retval return self.zk_conn.get_children(path) except NoNodeError: - return None + return retval def rename(self, kkpairs): """ @@ -540,7 +540,7 @@ class ZKHandler(object): # class ZKSchema(object): # Current version - _version = 10 + _version = 11 # Root for doing nested keys _schema_root = "" @@ -560,7 +560,8 @@ class ZKSchema(object): "config.primary_node.sync_lock": f"{_schema_root}/config/primary_node/sync_lock", "config.upstream_ip": f"{_schema_root}/config/upstream_ip", "config.migration_target_selector": f"{_schema_root}/config/migration_target_selector", - "logs": "/logs", + "logs": f"{_schema_root}/logs", + "faults": f"{_schema_root}/faults", "node": f"{_schema_root}/nodes", "domain": f"{_schema_root}/domains", "network": f"{_schema_root}/networks", @@ -577,6 +578,16 @@ class ZKSchema(object): "node": "", # The root key "messages": "/messages", }, + # The schema of an individual logs entry (/logs/{id}) + "faults": { + "id": "", # The root key + "last_time": "/last_time", + "first_time": "/first_time", + "ack_time": "/ack_time", + "status": "/status", + "delta": "/delta", + "message": "/message", + }, # The schema of an individual node entry (/nodes/{node_name}) "node": { "name": "", # The root key @@ -619,7 +630,11 @@ class ZKSchema(object): "runtime": "/runtime", }, # The schema of an individual SR-IOV PF entry (/nodes/{node_name}/sriov/pf/{pf}) - "sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, # The root key + "sriov_pf": { + "phy": "", + "mtu": "/mtu", + "vfcount": "/vfcount", + }, # The root key # The schema of an individual SR-IOV VF entry (/nodes/{node_name}/sriov/vf/{vf}) "sriov_vf": { "phy": "", # The root key @@ -665,7 +680,11 @@ class ZKSchema(object): "migrate.sync_lock": "/migrate_sync_lock", }, # The schema of an individual domain tag entry (/domains/{domain}/tags/{tag}) - "tag": {"name": "", "type": "/type", "protected": "/protected"}, # The root key + "tag": { + "name": "", + "type": "/type", + "protected": "/protected", + }, # The root key # The schema of an individual network entry (/networks/{vni}) "network": { "vni": "", # The root key @@ -702,7 +721,11 @@ class ZKSchema(object): "client_id": "/clientid", }, # The schema for an individual network ACL entry (/networks/{vni}/firewall_rules/(in|out)/{acl} - "rule": {"description": "", "rule": "/rule", "order": "/order"}, # The root key + "rule": { + "description": "", + "rule": "/rule", + "order": "/order", + }, # The root key # The schema of an individual OSD entry (/ceph/osds/{osd_id}) "osd": { "id": "", # The root key @@ -726,9 +749,15 @@ class ZKSchema(object): "stats": "/stats", }, # The root key # The schema of an individual volume entry (/ceph/volumes/{pool_name}/{volume_name}) - "volume": {"name": "", "stats": "/stats"}, # The root key + "volume": { + "name": "", + "stats": "/stats", + }, # The root key # The schema of an individual snapshot entry (/ceph/volumes/{pool_name}/{volume_name}/{snapshot_name}) - "snapshot": {"name": "", "stats": "/stats"}, # The root key + "snapshot": { + "name": "", + "stats": "/stats", + }, # The root key } # Properties diff --git a/health-daemon/pvchealthd/objects/MonitoringInstance.py b/health-daemon/pvchealthd/objects/MonitoringInstance.py index ead8fd03..76b3d2b1 100644 --- a/health-daemon/pvchealthd/objects/MonitoringInstance.py +++ b/health-daemon/pvchealthd/objects/MonitoringInstance.py @@ -25,7 +25,8 @@ import importlib.util from os import walk from datetime import datetime -from json import dumps +from hashlib import sha1 +from json import dumps, loads from apscheduler.schedulers.background import BackgroundScheduler @@ -197,6 +198,97 @@ class MonitoringInstance(object): self.logger = logger self.this_node = this_node + # Create functions for each fault type + def get_node_daemon_states(): + return [ + (node, self.zkhandler.read(("node.state.daemon", node))) + for node in self.zkhandler.children("base.node") + ] + + def get_osd_out_states(): + return [ + (osd, loads(self.zkhandler.read(("osd.stats", osd))).get("out", 0)) + for osd in self.zkhandler.children("base.osd") + ] + + def get_ceph_health_entries(): + return [ + (value, key) + for key, value in loads(zkhandler.read("base.storage.health"))[ + "checks" + ].items() + ] + + def get_vm_states(): + return [ + ( + self.zkhandler.read(("domain.name", domain)), + self.zkhandler.read(("domain.state", domain)), + ) + for domain in self.zkhandler.children("base.domain") + ] + + def get_overprovisioned_memory(): + all_nodes = self.zkhandler.children("base.node") + current_memory_provisioned = sum( + [ + int(self.zkhandler.read(("node.memory.allocated", node))) + for node in all_nodes + ] + ) + node_memory_totals = [ + int(self.zkhandler.read(("node.memory.total", node))) + for node in all_nodes + ] + total_node_memory = sum(node_memory_totals) + most_node_memory = sorted(node_memory_totals)[-1] + available_node_memory = total_node_memory - most_node_memory + + if current_memory_provisioned >= available_node_memory: + op_str = "overprovisioned" + else: + op_str = "ok" + return [ + ( + f"{current_memory_provisioned}MB > {available_node_memory}MB (N-1)", + op_str, + ) + ] + + # This is a list of all possible faults (cluster error messages) and their corresponding details + self.faults_map = { + "dead_or_fenced_node": { + "entries": get_node_daemon_states, + "conditions": ["dead", "fenced"], + "delta": 50, + "message": "Node {entry} was dead and/or fenced.", + }, + "ceph_osd_out": { + "entries": get_osd_out_states, + "conditions": ["1"], + "delta": 25, + "message": "OSD {entry} was out.", + }, + "ceph_err": { + "entries": get_ceph_health_entries, + "conditions": ["HEALTH_ERR"], + "delta": 50, + "message": "Ceph cluster reported ERR: {entry}", + }, + "vm_failed": { + "entries": get_vm_states, + "conditions": ["fail"], + "delta": 10, + "message": "VM {entry} was failed.", + }, + "memory_overprovisioned": { + "entries": get_overprovisioned_memory, + "conditions": ["overprovisioned"], + "delta": 25, + "message": "Cluster memory was overprovisioned {entry}", + }, + } + # Get a list of plugins from the plugin_directory plugin_files = next(walk(self.config["plugin_directory"]), (None, None, []))[ 2 @@ -344,38 +436,133 @@ class MonitoringInstance(object): ) ) - self.run_plugins() - self.start_check_timer() + self.start_timer() def __del__(self): self.shutdown() def shutdown(self): - self.stop_check_timer() + self.stop_timer() self.run_cleanups() return - def start_check_timer(self): - check_interval = self.config["monitoring_interval"] + def start_timer(self): + check_interval = int(self.config["monitoring_interval"]) + + self.timer = BackgroundScheduler() + self.timer.add_job( + self.run_checks, + trigger="interval", + seconds=check_interval, + ) + self.logger.out( f"Starting monitoring check timer ({check_interval} second interval)", state="s", ) - self.check_timer = BackgroundScheduler() - self.check_timer.add_job( - self.run_plugins, - trigger="interval", - seconds=check_interval, - ) - self.check_timer.start() + self.timer.start() - def stop_check_timer(self): + self.run_faults() + self.run_plugins() + + def stop_timer(self): try: - self.check_timer.shutdown() self.logger.out("Stopping monitoring check timer", state="s") + self.timer.shutdown() except Exception: self.logger.out("Failed to stop monitoring check timer", state="w") + def generate_fault(self, fault_time, fault_delta, fault_message): + # Generate a fault ID from the fault_message and fault_delta + fault_str = f"{fault_delta} {fault_message}" + fault_id = int(sha1(fault_str.encode("utf-8")).hexdigest(), 16) % (10**8) + + self.logger.out( + f"Generating fault {fault_id}: {fault_message} @ {fault_time}", state="i" + ) + + # If a fault already exists with this ID, just update the time + if not self.zkhandler.exists("base.faults"): + self.logger.out( + "Skipping fault reporting due to missing Zookeeper schemas", state="w" + ) + return + + if fault_id in self.zkhandler.children("base.faults", retval=[]): + self.zkhandler.write( + [ + (("faults.last_time", fault_id), str(fault_time)), + ] + ) + # Otherwise, generate a new fault event + else: + self.zkhandler.write( + [ + (("faults.id", fault_id), ""), + (("faults.first_time", fault_id), str(fault_time)), + (("faults.last_time", fault_id), str(fault_time)), + (("faults.ack_time", fault_id), ""), + (("faults.status", fault_id), "new"), + (("faults.delta", fault_id), fault_delta), + (("faults.message", fault_id), fault_message), + ] + ) + + def run_faults(self): + if self.this_node.coordinator_state == "primary": + cst_colour = self.logger.fmt_green + elif self.this_node.coordinator_state == "secondary": + cst_colour = self.logger.fmt_blue + else: + cst_colour = self.logger.fmt_cyan + + active_coordinator_state = self.this_node.coordinator_state + + runtime_start = datetime.now() + self.logger.out( + "Starting monitoring fault check run", + state="t", + ) + + fault_count = 0 + for fault_type in self.faults_map.keys(): + fault_details = self.faults_map[fault_type] + + entries = fault_details["entries"]() + for _entry in entries: + entry = _entry[0] + detail = _entry[1] + for condition in fault_details["conditions"]: + if str(condition) in str(detail): + fault_time = datetime.now() + fault_delta = fault_details["delta"] + fault_message = fault_details["message"].format(entry=entry) + fault_count += 1 + self.generate_fault(fault_time, fault_delta, fault_message) + + runtime_end = datetime.now() + runtime_delta = runtime_end - runtime_start + runtime = "{:0.02f}".format(runtime_delta.total_seconds()) + if fault_count > 0: + fault_colour = self.logger.fmt_red + else: + fault_colour = self.logger.fmt_green + + self.logger.out( + "{start_colour}{hostname} fault check @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {fault_colour}{fault_count} faults{nofmt} in {runtime} seconds".format( + start_colour=self.logger.fmt_purple, + cst_colour=self.logger.fmt_bold + cst_colour, + fault_colour=fault_colour, + nofmt=self.logger.fmt_end, + hostname=self.config["node_hostname"], + starttime=runtime_start, + costate=active_coordinator_state, + fault_count=fault_count, + runtime=runtime, + ), + state="t", + ) + def run_plugin(self, plugin): time_start = datetime.now() try: @@ -406,7 +593,7 @@ class MonitoringInstance(object): runtime_start = datetime.now() self.logger.out( - "Starting monitoring healthcheck run", + "Starting monitoring plugin check run", state="t", ) @@ -459,7 +646,7 @@ class MonitoringInstance(object): health_text = "N/A" self.logger.out( - "{start_colour}{hostname} healthcheck @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {health_colour}{health}{nofmt} in {runtime} seconds".format( + "{start_colour}{hostname} plugin check @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] result is {health_colour}{health}{nofmt} in {runtime} seconds".format( start_colour=self.logger.fmt_purple, cst_colour=self.logger.fmt_bold + cst_colour, health_colour=health_colour, @@ -494,3 +681,7 @@ class MonitoringInstance(object): ), ] ) + + def run_checks(self): + self.run_faults() + self.run_plugins()