#!/usr/bin/env python3 # cluster.py - PVC client function library, cluster management # Part of the Parallel Virtual Cluster (PVC) system # # Copyright (C) 2018-2022 Joshua M. Boniface # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # ############################################################################### from distutils.util import strtobool from json import loads import daemon_lib.common as common import daemon_lib.faults as faults import daemon_lib.node as pvc_node def set_maintenance(zkhandler, maint_state): current_maint_state = zkhandler.read("base.config.maintenance") if maint_state == current_maint_state: if maint_state == "true": return True, "Cluster is already in maintenance mode" else: return True, "Cluster is already in normal mode" if maint_state == "true": zkhandler.write([("base.config.maintenance", "true")]) return True, "Successfully set cluster in maintenance mode" else: zkhandler.write([("base.config.maintenance", "false")]) return True, "Successfully set cluster in normal mode" def getClusterHealthFromFaults(zkhandler, faults_list): unacknowledged_faults = [fault for fault in faults_list if fault["status"] != "ack"] # Generate total cluster health numbers cluster_health_value = 100 cluster_health_messages = list() for fault in sorted( unacknowledged_faults, key=lambda x: (x["health_delta"], x["last_reported"]), reverse=True, ): cluster_health_value -= fault["health_delta"] message = { "id": fault["id"], "health_delta": fault["health_delta"], "text": fault["message"], } cluster_health_messages.append(message) if cluster_health_value < 0: cluster_health_value = 0 cluster_health = { "health": cluster_health_value, "messages": cluster_health_messages, } return cluster_health def getClusterHealth(zkhandler, node_list, vm_list, ceph_osd_list): health_delta_map = { "node_stopped": 50, "node_flushed": 10, "vm_stopped": 10, "osd_out": 50, "osd_down": 10, "osd_full": 50, "osd_nearfull": 10, "memory_overprovisioned": 50, "ceph_err": 50, "ceph_warn": 10, } # Generate total cluster health numbers cluster_health_value = 100 cluster_health_messages = list() for index, node in enumerate(node_list): # Apply node health values to total health number try: node_health_int = int(node["health"]) except Exception: node_health_int = 100 cluster_health_value -= 100 - node_health_int for entry in node["health_details"]: if entry["health_delta"] > 0: cluster_health_messages.append( f"{node['name']}: plugin '{entry['name']}': {entry['message']}" ) # Handle unhealthy node states if node["daemon_state"] not in ["run"]: cluster_health_value -= health_delta_map["node_stopped"] cluster_health_messages.append( f"cluster: Node {node['name']} in {node['daemon_state'].upper()} daemon state" ) elif node["domain_state"] not in ["ready"]: cluster_health_value -= health_delta_map["node_flushed"] cluster_health_messages.append( f"cluster: Node {node['name']} in {node['domain_state'].upper()} domain state" ) for index, vm in enumerate(vm_list): # Handle unhealthy VM states if vm["state"] in ["stop", "fail"]: cluster_health_value -= health_delta_map["vm_stopped"] cluster_health_messages.append( f"cluster: VM {vm['name']} in {vm['state'].upper()} state" ) for index, ceph_osd in enumerate(ceph_osd_list): in_texts = {1: "in", 0: "out"} up_texts = {1: "up", 0: "down"} # Handle unhealthy OSD states if in_texts[ceph_osd["stats"]["in"]] not in ["in"]: cluster_health_value -= health_delta_map["osd_out"] cluster_health_messages.append( f"cluster: Ceph OSD {ceph_osd['id']} in {in_texts[ceph_osd['stats']['in']].upper()} state" ) elif up_texts[ceph_osd["stats"]["up"]] not in ["up"]: cluster_health_value -= health_delta_map["osd_down"] cluster_health_messages.append( f"cluster: Ceph OSD {ceph_osd['id']} in {up_texts[ceph_osd['stats']['up']].upper()} state" ) # Handle full or nearfull OSDs (>85%) if ceph_osd["stats"]["utilization"] >= 90: cluster_health_value -= health_delta_map["osd_full"] cluster_health_messages.append( f"cluster: Ceph OSD {ceph_osd['id']} is FULL ({ceph_osd['stats']['utilization']:.1f}% > 90%)" ) elif ceph_osd["stats"]["utilization"] >= 85: cluster_health_value -= health_delta_map["osd_nearfull"] cluster_health_messages.append( f"cluster: Ceph OSD {ceph_osd['id']} is NEARFULL ({ceph_osd['stats']['utilization']:.1f}% > 85%)" ) # Check for (n-1) overprovisioning # Assume X nodes. If the total VM memory allocation (counting only running VMss) is greater than # the total memory of the (n-1) smallest nodes, trigger this warning. n_minus_1_total = 0 alloc_total = 0 node_largest_index = None node_largest_count = 0 for index, node in enumerate(node_list): node_mem_total = node["memory"]["total"] node_mem_alloc = node["memory"]["allocated"] alloc_total += node_mem_alloc # Determine if this node is the largest seen so far if node_mem_total > node_largest_count: node_largest_index = index node_largest_count = node_mem_total n_minus_1_node_list = list() for index, node in enumerate(node_list): if index == node_largest_index: continue n_minus_1_node_list.append(node) for index, node in enumerate(n_minus_1_node_list): n_minus_1_total += node["memory"]["total"] if alloc_total > n_minus_1_total: cluster_health_value -= health_delta_map["memory_overprovisioned"] cluster_health_messages.append( f"cluster: Total memory is OVERPROVISIONED ({alloc_total} > {n_minus_1_total} @ N-1)" ) # Check Ceph cluster health ceph_health = loads(zkhandler.read("base.storage.health")) ceph_health_status = ceph_health["status"] ceph_health_entries = ceph_health["checks"].keys() ceph_health_status_map = { "HEALTH_ERR": "ERROR", "HEALTH_WARN": "WARNING", } for entry in ceph_health_entries: cluster_health_messages.append( f"cluster: Ceph {ceph_health_status_map[ceph_health['checks'][entry]['severity']]} {entry}: {ceph_health['checks'][entry]['summary']['message']}" ) if ceph_health_status == "HEALTH_ERR": cluster_health_value -= health_delta_map["ceph_err"] elif ceph_health_status == "HEALTH_WARN": cluster_health_value -= health_delta_map["ceph_warn"] if cluster_health_value < 0: cluster_health_value = 0 cluster_health = { "health": cluster_health_value, "messages": cluster_health_messages, } return cluster_health def getNodeHealth(zkhandler, node_list): # Get the health state of all nodes node_health_reads = list() for node in node_list: node_health_reads += [ ("node.monitoring.health", node), ("node.monitoring.plugins", node), ] all_node_health_details = zkhandler.read_many(node_health_reads) # Parse out the Node health details node_health = dict() for nidx, node in enumerate(node_list): # Split the large list of return values by the IDX of this node # Each node result is 2 fields long pos_start = nidx * 2 pos_end = nidx * 2 + 2 node_health_value, node_health_plugins = tuple( all_node_health_details[pos_start:pos_end] ) node_health_details = pvc_node.getNodeHealthDetails( zkhandler, node, node_health_plugins.split() ) node_health_messages = list() for entry in node_health_details: if entry["health_delta"] > 0: node_health_messages.append(f"'{entry['name']}': {entry['message']}") node_health_entry = { "health": int(node_health_value), "messages": node_health_messages, } node_health[node] = node_health_entry return node_health def getClusterInformation(zkhandler): # Get cluster maintenance state maintenance_state = zkhandler.read("base.config.maintenance") # Get primary node maintenance_state, primary_node = zkhandler.read_many( [ ("base.config.maintenance"), ("base.config.primary_node"), ] ) # Get PVC version of primary node pvc_version = zkhandler.read(("node.data.pvc_version", primary_node)) # Get the list of Nodes node_list = zkhandler.children("base.node") node_count = len(node_list) # Get the daemon and domain states of all Nodes node_state_reads = list() for node in node_list: node_state_reads += [ ("node.state.daemon", node), ("node.state.domain", node), ] all_node_states = zkhandler.read_many(node_state_reads) # Parse out the Node states node_data = list() formatted_node_states = {"total": node_count} for nidx, node in enumerate(node_list): # Split the large list of return values by the IDX of this node # Each node result is 2 fields long pos_start = nidx * 2 pos_end = nidx * 2 + 2 node_daemon_state, node_domain_state = tuple(all_node_states[pos_start:pos_end]) node_data.append( { "name": node, "daemon_state": node_daemon_state, "domain_state": node_domain_state, } ) node_state = f"{node_daemon_state},{node_domain_state}" # Add to the count for this node's state if node_state in common.node_state_combinations: if formatted_node_states.get(node_state) is not None: formatted_node_states[node_state] += 1 else: formatted_node_states[node_state] = 1 # Get the list of VMs vm_list = zkhandler.children("base.domain") vm_count = len(vm_list) # Get the states of all VMs vm_state_reads = list() for vm in vm_list: vm_state_reads += [ ("domain", vm), ("domain.state", vm), ] all_vm_states = zkhandler.read_many(vm_state_reads) # Parse out the VM states vm_data = list() formatted_vm_states = {"total": vm_count} for vidx, vm in enumerate(vm_list): # Split the large list of return values by the IDX of this VM # Each VM result is 2 field long pos_start = vidx * 2 pos_end = vidx * 2 + 2 vm_name, vm_state = tuple(all_vm_states[pos_start:pos_end]) vm_data.append( { "uuid": vm, "name": vm_name, "state": vm_state, } ) # Add to the count for this VM's state if vm_state in common.vm_state_combinations: if formatted_vm_states.get(vm_state) is not None: formatted_vm_states[vm_state] += 1 else: formatted_vm_states[vm_state] = 1 # Get the list of Ceph OSDs ceph_osd_list = zkhandler.children("base.osd") ceph_osd_count = len(ceph_osd_list) # Get the states of all OSDs ("stat" is not a typo since we're reading stats; states are in # the stats JSON object) osd_stat_reads = list() for osd in ceph_osd_list: osd_stat_reads += [("osd.stats", osd)] all_osd_stats = zkhandler.read_many(osd_stat_reads) # Parse out the OSD states osd_data = list() formatted_osd_states = {"total": ceph_osd_count} up_texts = {1: "up", 0: "down"} in_texts = {1: "in", 0: "out"} for oidx, osd in enumerate(ceph_osd_list): # Split the large list of return values by the IDX of this OSD # Each OSD result is 1 field long, so just use the IDX _osd_stats = all_osd_stats[oidx] # We have to load this JSON object and get our up/in states from it osd_stats = loads(_osd_stats) # Get our states osd_up = up_texts[osd_stats["up"]] osd_in = in_texts[osd_stats["in"]] osd_data.append( { "id": osd, "up": osd_up, "in": osd_in, } ) osd_state = f"{osd_up},{osd_in}" # Add to the count for this OSD's state if osd_state in common.ceph_osd_state_combinations: if formatted_osd_states.get(osd_state) is not None: formatted_osd_states[osd_state] += 1 else: formatted_osd_states[osd_state] = 1 # Get the list of Networks network_list = zkhandler.children("base.network") network_count = len(network_list) # Get the list of Ceph pools ceph_pool_list = zkhandler.children("base.pool") ceph_pool_count = len(ceph_pool_list) # Get the list of Ceph volumes ceph_volume_list = list() for pool in ceph_pool_list: ceph_volume_list_pool = zkhandler.children(("volume", pool)) if ceph_volume_list_pool is not None: ceph_volume_list += [f"{pool}/{volume}" for volume in ceph_volume_list_pool] ceph_volume_count = len(ceph_volume_list) # Get the list of Ceph snapshots ceph_snapshot_list = list() for volume in ceph_volume_list: ceph_snapshot_list_volume = zkhandler.children(("snapshot", volume)) if ceph_snapshot_list_volume is not None: ceph_snapshot_list += [ f"{volume}@{snapshot}" for snapshot in ceph_snapshot_list_volume ] ceph_snapshot_count = len(ceph_snapshot_list) # Get the list of faults faults_data = faults.getAllFaults(zkhandler) # Format the status data cluster_information = { "cluster_health": getClusterHealthFromFaults(zkhandler, faults_data), "node_health": getNodeHealth(zkhandler, node_list), "maintenance": maintenance_state, "primary_node": primary_node, "pvc_version": pvc_version, "upstream_ip": zkhandler.read("base.config.upstream_ip"), "nodes": formatted_node_states, "vms": formatted_vm_states, "networks": network_count, "osds": formatted_osd_states, "pools": ceph_pool_count, "volumes": ceph_volume_count, "snapshots": ceph_snapshot_count, "detail": { "node": node_data, "vm": vm_data, "osd": osd_data, "faults": faults_data, }, } return cluster_information def get_info(zkhandler): # This is a thin wrapper function for naming purposes cluster_information = getClusterInformation(zkhandler) if cluster_information: return True, cluster_information else: return False, "ERROR: Failed to obtain cluster information!" def get_metrics(zkhandler): # Get general cluster information status_retflag, status_data = get_info(zkhandler) if not status_retflag: return False, "Error: Status data threw error" faults_data = status_data["detail"]["faults"] node_data = status_data["detail"]["node"] vm_data = status_data["detail"]["vm"] osd_data = status_data["detail"]["osd"] output_lines = list() output_lines.append("# HELP pvc_info PVC cluster information") output_lines.append("# TYPE pvc_info gauge") output_lines.append( f"pvc_info{{primary_node=\"{status_data['primary_node']}\", version=\"{status_data['pvc_version']}\", upstream_ip=\"{status_data['upstream_ip']}\"}} 1" ) output_lines.append("# HELP pvc_cluster_maintenance PVC cluster maintenance state") output_lines.append("# TYPE pvc_cluster_maintenance gauge") output_lines.append( f"pvc_cluster_maintenance {1 if bool(strtobool(status_data['maintenance'])) else 0}" ) output_lines.append("# HELP pvc_cluster_health PVC cluster health status") output_lines.append("# TYPE pvc_cluster_health gauge") output_lines.append(f"pvc_cluster_health {status_data['cluster_health']['health']}") output_lines.append("# HELP pvc_cluster_faults PVC cluster new faults") output_lines.append("# TYPE pvc_cluster_faults gauge") fault_map = dict() for fault_type in common.fault_state_combinations: fault_map[fault_type] = 0 for fault in faults_data: fault_map[fault["status"]] += 1 for fault_type in fault_map: output_lines.append( f'pvc_cluster_faults{{status="{fault_type}"}} {fault_map[fault_type]}' ) # output_lines.append("# HELP pvc_cluster_faults PVC cluster health faults") # output_lines.append("# TYPE pvc_cluster_faults gauge") # for fault_msg in status_data["cluster_health"]["messages"]: # output_lines.append( # f"pvc_cluster_faults{{id=\"{fault_msg['id']}\", message=\"{fault_msg['text']}\"}} {fault_msg['health_delta']}" # ) output_lines.append("# HELP pvc_node_health PVC cluster node health status") output_lines.append("# TYPE pvc_node_health gauge") for node in status_data["node_health"]: if isinstance(status_data["node_health"][node]["health"], int): output_lines.append( f"pvc_node_health{{node=\"{node}\"}} {status_data['node_health'][node]['health']}" ) output_lines.append("# HELP pvc_node_daemon_states PVC Node daemon state counts") output_lines.append("# TYPE pvc_node_daemon_states gauge") node_daemon_state_map = dict() for state in set([s.split(",")[0] for s in common.node_state_combinations]): node_daemon_state_map[state] = 0 for node in node_data: node_daemon_state_map[node["daemon_state"]] += 1 for state in node_daemon_state_map: output_lines.append( f'pvc_node_daemon_states{{state="{state}"}} {node_daemon_state_map[state]}' ) output_lines.append("# HELP pvc_node_domain_states PVC Node domain state counts") output_lines.append("# TYPE pvc_node_domain_states gauge") node_domain_state_map = dict() for state in set([s.split(",")[1] for s in common.node_state_combinations]): node_domain_state_map[state] = 0 for node in node_data: node_domain_state_map[node["domain_state"]] += 1 for state in node_domain_state_map: output_lines.append( f'pvc_node_domain_states{{state="{state}"}} {node_domain_state_map[state]}' ) output_lines.append("# HELP pvc_vm_states PVC VM state counts") output_lines.append("# TYPE pvc_vm_states gauge") vm_state_map = dict() for state in set(common.vm_state_combinations): vm_state_map[state] = 0 for vm in vm_data: vm_state_map[vm["state"]] += 1 for state in vm_state_map: output_lines.append(f'pvc_vm_states{{state="{state}"}} {vm_state_map[state]}') output_lines.append("# HELP pvc_osd_up_states PVC OSD up state counts") output_lines.append("# TYPE pvc_osd_up_states gauge") osd_up_state_map = dict() for state in set([s.split(",")[0] for s in common.ceph_osd_state_combinations]): osd_up_state_map[state] = 0 for osd in osd_data: if osd["up"] == "up": osd_up_state_map["up"] += 1 else: osd_up_state_map["down"] += 1 for state in osd_up_state_map: output_lines.append( f'pvc_osd_up_states{{state="{state}"}} {osd_up_state_map[state]}' ) output_lines.append("# HELP pvc_osd_in_states PVC OSD in state counts") output_lines.append("# TYPE pvc_osd_in_states gauge") osd_in_state_map = dict() for state in set([s.split(",")[1] for s in common.ceph_osd_state_combinations]): osd_in_state_map[state] = 0 for osd in osd_data: if osd["in"] == "in": osd_in_state_map["in"] += 1 else: osd_in_state_map["out"] += 1 for state in osd_in_state_map: output_lines.append( f'pvc_osd_in_states{{state="{state}"}} {osd_in_state_map[state]}' ) output_lines.append("# HELP pvc_nodes PVC Node count") output_lines.append("# TYPE pvc_nodes gauge") output_lines.append(f"pvc_nodes {status_data['nodes']['total']}") output_lines.append("# HELP pvc_vms PVC VM count") output_lines.append("# TYPE pvc_vms gauge") output_lines.append(f"pvc_vms {status_data['vms']['total']}") output_lines.append("# HELP pvc_osds PVC OSD count") output_lines.append("# TYPE pvc_osds gauge") output_lines.append(f"pvc_osds {status_data['osds']['total']}") output_lines.append("# HELP pvc_networks PVC Network count") output_lines.append("# TYPE pvc_networks gauge") output_lines.append(f"pvc_networks {status_data['networks']}") output_lines.append("# HELP pvc_pools PVC Storage Pool count") output_lines.append("# TYPE pvc_pools gauge") output_lines.append(f"pvc_pools {status_data['pools']}") output_lines.append("# HELP pvc_volumes PVC Storage Volume count") output_lines.append("# TYPE pvc_volumes gauge") output_lines.append(f"pvc_volumes {status_data['volumes']}") output_lines.append("# HELP pvc_snapshots PVC Storage Snapshot count") output_lines.append("# TYPE pvc_snapshots gauge") output_lines.append(f"pvc_snapshots {status_data['snapshots']}") return True, "\n".join(output_lines) + "\n" def cluster_initialize(zkhandler, overwrite=False): # Abort if we've initialized the cluster before if zkhandler.exists("base.config.primary_node") and not overwrite: return False, "ERROR: Cluster contains data and overwrite not set." if overwrite: # Delete the existing keys for key in zkhandler.schema.keys("base"): if key == "root": # Don't delete the root key continue status = zkhandler.delete("base.{}".format(key), recursive=True) if not status: return ( False, "ERROR: Failed to delete data in cluster; running nodes perhaps?", ) # Create the root keys zkhandler.schema.apply(zkhandler) return True, "Successfully initialized cluster" def cluster_backup(zkhandler): # Dictionary of values to come cluster_data = dict() def get_data(path): data = zkhandler.read(path) children = zkhandler.children(path) cluster_data[path] = data if children: if path == "/": child_prefix = "/" else: child_prefix = path + "/" for child in children: if child_prefix + child == "/zookeeper": # We must skip the built-in /zookeeper tree continue if child_prefix + child == "/patroni": # We must skip the /patroni tree continue get_data(child_prefix + child) try: get_data("/") except Exception as e: return False, "ERROR: Failed to obtain backup: {}".format(e) return True, cluster_data def cluster_restore(zkhandler, cluster_data): # Build a key+value list kv = [] schema_version = None for key in cluster_data: if key == zkhandler.schema.path("base.schema.version"): schema_version = cluster_data[key] data = cluster_data[key] kv.append((key, data)) if int(schema_version) != int(zkhandler.schema.version): return ( False, "ERROR: Schema version of backup ({}) does not match cluster schema version ({}).".format( schema_version, zkhandler.schema.version ), ) # Close the Zookeeper connection result = zkhandler.write(kv) if result: return True, "Restore completed successfully." else: return False, "Restore failed."