diff --git a/health-daemon/pvchealthd/objects/MonitoringInstance.py b/health-daemon/pvchealthd/objects/MonitoringInstance.py index b5e231ab..ead8fd03 100644 --- a/health-daemon/pvchealthd/objects/MonitoringInstance.py +++ b/health-daemon/pvchealthd/objects/MonitoringInstance.py @@ -421,10 +421,7 @@ class MonitoringInstance(object): plugin_results.append(future.result()) for result in sorted(plugin_results, key=lambda x: x.plugin_name): - if ( - self.config["log_keepalives"] - and self.config["log_keepalive_plugin_details"] - ): + if self.config["log_monitoring_details"]: self.logger.out( result.message + f" [-{result.health_delta}]", state="t", diff --git a/health-daemon/pvchealthd/util/config.py b/health-daemon/pvchealthd/util/config.py index cc98e81e..e9548d43 100644 --- a/health-daemon/pvchealthd/util/config.py +++ b/health-daemon/pvchealthd/util/config.py @@ -332,12 +332,7 @@ def get_configuration_current(config_file): "log_colours": o_logging.get("log_colours", False), "log_dates": o_logging.get("log_dates", False), "log_keepalives": o_logging.get("log_keepalives", False), - "log_keepalive_cluster_details": o_logging.get( - "log_cluster_details", False - ), - "log_keepalive_plugin_details": o_logging.get( - "log_monitoring_details", False - ), + "log_monitoring_details": o_logging.get("log_monitoring_details", False), "console_log_lines": o_logging.get("console_log_lines", False), "node_log_lines": o_logging.get("node_log_lines", False), } @@ -476,12 +471,7 @@ def get_configuration_legacy(pvcnoded_config_file): "log_colours": o_logging.get("log_colours", False), "log_dates": o_logging.get("log_dates", False), "log_keepalives": o_logging.get("log_keepalives", False), - "log_keepalive_cluster_details": o_logging.get( - "log_keepalive_cluster_details", False - ), - "log_keepalive_plugin_details": o_logging.get( - "log_keepalive_plugin_details", False - ), + "log_monitoring_details": o_logging.get("log_keepalive_plugin_details", False), "console_log_lines": o_logging.get("console_log_lines", False), "node_log_lines": o_logging.get("node_log_lines", False), } diff --git a/health-daemon/pvchealthd/util/fencing.py b/health-daemon/pvchealthd/util/fencing.py deleted file mode 100644 index 15956428..00000000 --- a/health-daemon/pvchealthd/util/fencing.py +++ /dev/null @@ -1,338 +0,0 @@ -#!/usr/bin/env python3 - -# fencing.py - Utility functions for pvcnoded fencing -# Part of the Parallel Virtual Cluster (PVC) system -# -# Copyright (C) 2018-2022 Joshua M. Boniface -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, version 3. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -############################################################################### - -import time - -import daemon_lib.common as common - -from daemon_lib.vm import vm_worker_flush_locks - - -# -# Fence thread entry function -# -def fence_node(node_name, zkhandler, config, logger): - # We allow exactly 6 saving throws (30 seconds) for the host to come back online or we kill it - failcount_limit = 6 - failcount = 0 - while failcount < failcount_limit: - # Wait 5 seconds - time.sleep(config["keepalive_interval"]) - # Get the state - node_daemon_state = zkhandler.read(("node.state.daemon", node_name)) - # Is it still 'dead' - if node_daemon_state == "dead": - failcount += 1 - logger.out( - f"Node {node_name} failed {failcount}/{failcount_limit} saving throws", - state="s", - prefix=f"fencing {node_name}", - ) - # It changed back to something else so it must be alive - else: - logger.out( - f"Node {node_name} passed a saving throw; cancelling fance", - state="o", - prefix=f"fencing {node_name}", - ) - return - - logger.out( - f"Fencing node {node_name} via IPMI reboot signal", - state="s", - prefix=f"fencing {node_name}", - ) - - # Get IPMI information - ipmi_hostname = zkhandler.read(("node.ipmi.hostname", node_name)) - ipmi_username = zkhandler.read(("node.ipmi.username", node_name)) - ipmi_password = zkhandler.read(("node.ipmi.password", node_name)) - - # Shoot it in the head - fence_status = reboot_via_ipmi( - node_name, ipmi_hostname, ipmi_username, ipmi_password, logger - ) - - # Hold to ensure the fence takes effect and system stabilizes - logger.out( - f"Waiting {config['keepalive_interval']}s for fence of node {node_name} to take effect", - state="i", - prefix=f"fencing {node_name}", - ) - time.sleep(config["keepalive_interval"]) - - if fence_status: - logger.out( - f"Marking node {node_name} as fenced", - state="i", - prefix=f"fencing {node_name}", - ) - while True: - try: - zkhandler.write([(("node.state.daemon", node_name), "fenced")]) - break - except Exception: - continue - - # Force into secondary network state if needed - if node_name in config["coordinators"]: - logger.out( - f"Forcing secondary coordinator state for node {node_name}", - state="i", - prefix=f"fencing {node_name}", - ) - zkhandler.write([(("node.state.router", node_name), "secondary")]) - if zkhandler.read("base.config.primary_node") == node_name: - zkhandler.write([("base.config.primary_node", "none")]) - - # If the fence succeeded and successful_fence is migrate - if fence_status and config["successful_fence"] == "migrate": - migrateFromFencedNode(zkhandler, node_name, config, logger) - - # If the fence failed and failed_fence is migrate - if ( - not fence_status - and config["failed_fence"] == "migrate" - and config["suicide_intervals"] != "0" - ): - migrateFromFencedNode(zkhandler, node_name, config, logger) - - -# Migrate hosts away from a fenced node -def migrateFromFencedNode(zkhandler, node_name, config, logger): - logger.out( - f"Migrating VMs from dead node {node_name} to new hosts", - state="i", - prefix=f"fencing {node_name}", - ) - - # Get the list of VMs - dead_node_running_domains = zkhandler.read( - ("node.running_domains", node_name) - ).split() - - # Set the node to a custom domainstate so we know what's happening - zkhandler.write([(("node.state.domain", node_name), "fence-flush")]) - - # Migrate a VM after a flush - def fence_migrate_vm(dom_uuid): - logger.out( - f"Flushing locks of VM {dom_uuid} due to fence", - state="i", - prefix=f"fencing {node_name}", - ) - vm_worker_flush_locks(zkhandler, None, dom_uuid, force_unlock=True) - - target_node = common.findTargetNode(zkhandler, dom_uuid) - - if target_node is not None: - logger.out( - f"Migrating VM {dom_uuid} to node {target_node}", - state="i", - prefix=f"fencing {node_name}", - ) - zkhandler.write( - [ - (("domain.state", dom_uuid), "start"), - (("domain.node", dom_uuid), target_node), - (("domain.last_node", dom_uuid), node_name), - ] - ) - logger.out( - f"Successfully migrated running VM {dom_uuid} to node {target_node}", - state="o", - prefix=f"fencing {node_name}", - ) - else: - logger.out( - f"No target node found for VM {dom_uuid}; marking autostart=True on current node", - state="i", - prefix=f"fencing {node_name}", - ) - zkhandler.write( - { - (("domain.state", dom_uuid), "stopped"), - (("domain.meta.autostart", dom_uuid), "True"), - } - ) - logger.out( - f"Successfully marked autostart for running VM {dom_uuid} on current node", - state="o", - prefix=f"fencing {node_name}", - ) - - # Loop through the VMs - for dom_uuid in dead_node_running_domains: - try: - fence_migrate_vm(dom_uuid) - except Exception as e: - logger.out( - f"Failed to migrate VM {dom_uuid}, continuing: {e}", - state="w", - prefix=f"fencing {node_name}", - ) - - # Set node in flushed state for easy remigrating when it comes back - zkhandler.write([(("node.state.domain", node_name), "flushed")]) - logger.out( - f"All VMs flushed from dead node {node_name} to other nodes", - state="i", - prefix=f"fencing {node_name}", - ) - - -# -# Perform an IPMI fence -# -def reboot_via_ipmi(node_name, ipmi_hostname, ipmi_user, ipmi_password, logger): - # Power off the node the node - logger.out( - "Sending power off to dead node", - state="i", - prefix=f"fencing {node_name}", - ) - ipmi_stop_retcode, ipmi_stop_stdout, ipmi_stop_stderr = common.run_os_command( - f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power off" - ) - if ipmi_stop_retcode != 0: - logger.out( - f"Failed to power off dead node: {ipmi_stop_stderr}", - state="e", - prefix=f"fencing {node_name}", - ) - - logger.out( - "Waiting 5s for power off to take effect", - state="i", - prefix=f"fencing {node_name}", - ) - time.sleep(5) - - # Check the chassis power state - logger.out( - "Checking power state of dead node", - state="i", - prefix=f"fencing {node_name}", - ) - ipmi_status_retcode, ipmi_status_stdout, ipmi_status_stderr = common.run_os_command( - f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status" - ) - if ipmi_status_retcode == 0: - logger.out( - f"Current chassis power state is: {ipmi_status_stdout.strip()}", - state="i", - prefix=f"fencing {node_name}", - ) - else: - logger.out( - "Current chassis power state is: Unknown", - state="w", - prefix=f"fencing {node_name}", - ) - - # Power on the node - logger.out( - "Sending power on to dead node", - state="i", - prefix=f"fencing {node_name}", - ) - ipmi_start_retcode, ipmi_start_stdout, ipmi_start_stderr = common.run_os_command( - f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power on" - ) - - if ipmi_start_retcode != 0: - logger.out( - f"Failed to power on dead node: {ipmi_start_stderr}", - state="w", - prefix=f"fencing {node_name}", - ) - - logger.out( - "Waiting 2s for power on to take effect", - state="i", - prefix=f"fencing {node_name}", - ) - time.sleep(2) - - # Check the chassis power state - logger.out( - "Checking power state of dead node", - state="i", - prefix=f"fencing {node_name}", - ) - ipmi_status_retcode, ipmi_status_stdout, ipmi_status_stderr = common.run_os_command( - f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status" - ) - - if ipmi_stop_retcode == 0: - if ipmi_status_stdout.strip() == "Chassis Power is on": - # We successfully rebooted the node and it is powered on; this is a succeessful fence - logger.out( - "Successfully rebooted dead node; proceeding with fence recovery action", - state="o", - prefix=f"fencing {node_name}", - ) - return True - elif ipmi_status_stdout.strip() == "Chassis Power is off": - # We successfully rebooted the node but it is powered off; this might be expected or not, but the node is confirmed off so we can call it a successful fence - logger.out( - "Chassis power is in confirmed off state after successfuly IPMI reboot; proceeding with fence recovery action", - state="o", - prefix=f"fencing {node_name}", - ) - return True - else: - # We successfully rebooted the node but it is in some unknown power state; since this might indicate a silent failure, we must call it a failed fence - logger.out( - f"Chassis power is in an unknown state ({ipmi_status_stdout.strip()}) after successful IPMI reboot; NOT proceeding fence recovery action", - state="e", - prefix=f"fencing {node_name}", - ) - return False - else: - if ipmi_status_stdout.strip() == "Chassis Power is off": - # We failed to reboot the node but it is powered off; it has probably suffered a serious hardware failure, but the node is confirmed off so we can call it a successful fence - logger.out( - "Chassis power is in confirmed off state after failed IPMI reboot; proceeding with fence recovery action", - state="o", - prefix=f"fencing {node_name}", - ) - return True - else: - # We failed to reboot the node but it is in some unknown power state (including "on"); since this might indicate a silent failure, we must call it a failed fence - logger.out( - "Chassis power is not in confirmed off state after failed IPMI reboot; NOT proceeding wiht fence recovery action", - state="e", - prefix=f"fencing {node_name}", - ) - return False - - -# -# Verify that IPMI connectivity to this host exists (used during node init) -# -def verify_ipmi(ipmi_hostname, ipmi_user, ipmi_password): - ipmi_command = f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status" - retcode, stdout, stderr = common.run_os_command(ipmi_command, timeout=2) - if retcode == 0 and stdout.strip() == "Chassis Power is on": - return True - else: - return False diff --git a/health-daemon/pvchealthd/util/keepalive.py b/health-daemon/pvchealthd/util/keepalive.py deleted file mode 100644 index 96da259f..00000000 --- a/health-daemon/pvchealthd/util/keepalive.py +++ /dev/null @@ -1,968 +0,0 @@ -#!/usr/bin/env python3 - -# keepalive.py - Utility functions for pvcnoded Keepalives -# Part of the Parallel Virtual Cluster (PVC) system -# -# Copyright (C) 2018-2022 Joshua M. Boniface -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, version 3. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -############################################################################### - -import pvcnoded.util.fencing - -import daemon_lib.common as common - -from apscheduler.schedulers.background import BackgroundScheduler -from rados import Rados -from xml.etree import ElementTree -from queue import Queue -from threading import Thread -from datetime import datetime - -import json -import re -import libvirt -import psutil -import os -import time - - -# State table for pretty stats -libvirt_vm_states = { - 0: "NOSTATE", - 1: "RUNNING", - 2: "BLOCKED", - 3: "PAUSED", - 4: "SHUTDOWN", - 5: "SHUTOFF", - 6: "CRASHED", - 7: "PMSUSPENDED", -} - - -def start_keepalive_timer(logger, config, zkhandler, this_node): - keepalive_interval = config["keepalive_interval"] - logger.out( - f"Starting keepalive timer ({keepalive_interval} second interval)", state="s" - ) - keepalive_timer = BackgroundScheduler() - keepalive_timer.add_job( - node_keepalive, - args=(logger, config, zkhandler, this_node), - trigger="interval", - seconds=keepalive_interval, - ) - keepalive_timer.start() - return keepalive_timer - - -def stop_keepalive_timer(logger, keepalive_timer): - try: - keepalive_timer.shutdown() - logger.out("Stopping keepalive timer", state="s") - except Exception: - logger.out("Failed to stop keepalive timer", state="w") - - -# Ceph stats update function -def collect_ceph_stats(logger, config, zkhandler, this_node, queue): - pool_list = zkhandler.children("base.pool") - osd_list = zkhandler.children("base.osd") - - debug = config["debug"] - if debug: - logger.out("Thread starting", state="d", prefix="ceph-thread") - - # Connect to the Ceph cluster - try: - ceph_conn = Rados( - conffile=config["ceph_config_file"], - conf=dict(keyring=config["ceph_admin_keyring"]), - ) - if debug: - logger.out("Connecting to cluster", state="d", prefix="ceph-thread") - ceph_conn.connect(timeout=1) - except Exception as e: - logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e") - return - - # Primary-only functions - if this_node.coordinator_state == "primary": - # Get Ceph status information (pretty) - if debug: - logger.out( - "Set Ceph status information in zookeeper (primary only)", - state="d", - prefix="ceph-thread", - ) - - command = {"prefix": "status", "format": "pretty"} - ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[ - 1 - ].decode("ascii") - try: - zkhandler.write([("base.storage", str(ceph_status))]) - except Exception as e: - logger.out("Failed to set Ceph status data: {}".format(e), state="e") - - # Get Ceph health information (JSON) - if debug: - logger.out( - "Set Ceph health information in zookeeper (primary only)", - state="d", - prefix="ceph-thread", - ) - - command = {"prefix": "health", "format": "json"} - ceph_health = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[ - 1 - ].decode("ascii") - try: - zkhandler.write([("base.storage.health", str(ceph_health))]) - except Exception as e: - logger.out("Failed to set Ceph health data: {}".format(e), state="e") - - # Get Ceph df information (pretty) - if debug: - logger.out( - "Set Ceph rados df information in zookeeper (primary only)", - state="d", - prefix="ceph-thread", - ) - - # Get rados df info - command = {"prefix": "df", "format": "pretty"} - ceph_df = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1].decode( - "ascii" - ) - try: - zkhandler.write([("base.storage.util", str(ceph_df))]) - except Exception as e: - logger.out("Failed to set Ceph utilization data: {}".format(e), state="e") - - if debug: - logger.out( - "Set pool information in zookeeper (primary only)", - state="d", - prefix="ceph-thread", - ) - - # Get pool info - command = {"prefix": "df", "format": "json"} - ceph_df_output = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[ - 1 - ].decode("ascii") - try: - ceph_pool_df_raw = json.loads(ceph_df_output)["pools"] - except Exception as e: - logger.out("Failed to obtain Pool data (ceph df): {}".format(e), state="w") - ceph_pool_df_raw = [] - - retcode, stdout, stderr = common.run_os_command( - "rados df --format json", timeout=1 - ) - try: - rados_pool_df_raw = json.loads(stdout)["pools"] - except Exception as e: - logger.out("Failed to obtain Pool data (rados df): {}".format(e), state="w") - rados_pool_df_raw = [] - - pool_count = len(ceph_pool_df_raw) - if debug: - logger.out( - "Getting info for {} pools".format(pool_count), - state="d", - prefix="ceph-thread", - ) - for pool_idx in range(0, pool_count): - try: - # Combine all the data for this pool - ceph_pool_df = ceph_pool_df_raw[pool_idx] - rados_pool_df = rados_pool_df_raw[pool_idx] - pool = ceph_pool_df - pool.update(rados_pool_df) - - # Ignore any pools that aren't in our pool list - if pool["name"] not in pool_list: - if debug: - logger.out( - "Pool {} not in pool list {}".format( - pool["name"], pool_list - ), - state="d", - prefix="ceph-thread", - ) - continue - else: - if debug: - logger.out( - "Parsing data for pool {}".format(pool["name"]), - state="d", - prefix="ceph-thread", - ) - - # Assemble a useful data structure - pool_df = { - "id": pool["id"], - "stored_bytes": pool["stats"]["stored"], - "free_bytes": pool["stats"]["max_avail"], - "used_bytes": pool["stats"]["bytes_used"], - "used_percent": pool["stats"]["percent_used"], - "num_objects": pool["stats"]["objects"], - "num_object_clones": pool["num_object_clones"], - "num_object_copies": pool["num_object_copies"], - "num_objects_missing_on_primary": pool[ - "num_objects_missing_on_primary" - ], - "num_objects_unfound": pool["num_objects_unfound"], - "num_objects_degraded": pool["num_objects_degraded"], - "read_ops": pool["read_ops"], - "read_bytes": pool["read_bytes"], - "write_ops": pool["write_ops"], - "write_bytes": pool["write_bytes"], - } - - # Write the pool data to Zookeeper - zkhandler.write( - [(("pool.stats", pool["name"]), str(json.dumps(pool_df)))] - ) - except Exception as e: - # One or more of the status commands timed out, just continue - logger.out( - "Failed to format and send pool data: {}".format(e), state="w" - ) - pass - - # Only grab OSD stats if there are OSDs to grab (otherwise `ceph osd df` hangs) - osds_this_node = 0 - if len(osd_list) > 0: - # Get data from Ceph OSDs - if debug: - logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread") - - # Parse the dump data - osd_dump = dict() - - command = {"prefix": "osd dump", "format": "json"} - osd_dump_output = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[ - 1 - ].decode("ascii") - try: - osd_dump_raw = json.loads(osd_dump_output)["osds"] - except Exception as e: - logger.out("Failed to obtain OSD data: {}".format(e), state="w") - osd_dump_raw = [] - - if debug: - logger.out("Loop through OSD dump", state="d", prefix="ceph-thread") - for osd in osd_dump_raw: - osd_dump.update( - { - str(osd["osd"]): { - "uuid": osd["uuid"], - "up": osd["up"], - "in": osd["in"], - "primary_affinity": osd["primary_affinity"], - } - } - ) - - # Parse the df data - if debug: - logger.out("Parse the OSD df data", state="d", prefix="ceph-thread") - - osd_df = dict() - - command = {"prefix": "osd df", "format": "json"} - try: - osd_df_raw = json.loads( - ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1] - )["nodes"] - except Exception as e: - logger.out("Failed to obtain OSD data: {}".format(e), state="w") - osd_df_raw = [] - - if debug: - logger.out("Loop through OSD df", state="d", prefix="ceph-thread") - for osd in osd_df_raw: - osd_df.update( - { - str(osd["id"]): { - "utilization": osd["utilization"], - "var": osd["var"], - "pgs": osd["pgs"], - "kb": osd["kb"], - "kb_used": osd["kb_used"], - "kb_used_data": osd["kb_used_data"], - "kb_used_omap": osd["kb_used_omap"], - "kb_used_meta": osd["kb_used_meta"], - "kb_avail": osd["kb_avail"], - "weight": osd["crush_weight"], - "reweight": osd["reweight"], - "class": osd["device_class"], - } - } - ) - - # Parse the status data - if debug: - logger.out("Parse the OSD status data", state="d", prefix="ceph-thread") - - osd_status = dict() - - command = {"prefix": "osd status", "format": "pretty"} - try: - osd_status_raw = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[ - 1 - ].decode("ascii") - except Exception as e: - logger.out("Failed to obtain OSD status data: {}".format(e), state="w") - osd_status_raw = [] - - if debug: - logger.out("Loop through OSD status data", state="d", prefix="ceph-thread") - - for line in osd_status_raw.split("\n"): - # Strip off colour - line = re.sub(r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))", "", line) - # Split it for parsing - line = line.split() - - # Ceph 14 format: - # ['|', '0', '|', 'hv1.p.u.bonilan.net', '|', '318G', '|', '463G', '|', '213', '|', '1430k', '|', '22', '|', '124k', '|', 'exists,up', '|'] - # Ceph 16 format: - # ['0', 'hv1.t.u.bonilan.net', '2489M', '236G', '0', '0', '0', '0', 'exists,up'] - - # Bypass obviously invalid lines - if len(line) < 1: - continue - elif line[0] == "+": - continue - - try: - # If line begins with | and second entry is a digit (i.e. OSD ID) - if line[0] == "|" and line[1].isdigit(): - # Parse the line in Ceph 14 format - osd_id = line[1] - node = line[3].split(".")[0] - used = line[5] - avail = line[7] - wr_ops = line[9] - wr_data = line[11] - rd_ops = line[13] - rd_data = line[15] - state = line[17] - # If first entry is a digit (i.e. OSD ID) - elif line[0].isdigit(): - # Parse the line in Ceph 16 format - osd_id = line[0] - node = line[1].split(".")[0] - used = line[2] - avail = line[3] - wr_ops = line[4] - wr_data = line[5] - rd_ops = line[6] - rd_data = line[7] - state = line[8] - # Otherwise, it's the header line and is ignored - else: - continue - except IndexError: - continue - - # I don't know why 2018 me used this construct instead of a normal - # dictionary update, but it works so not changing it. - # ref: bfbe9188ce830381f3f2fa1da11f1973f08eca8c - osd_status.update( - { - str(osd_id): { - "node": node, - "used": used, - "avail": avail, - "wr_ops": wr_ops, - "wr_data": wr_data, - "rd_ops": rd_ops, - "rd_data": rd_data, - "state": state, - } - } - ) - - # Merge them together into a single meaningful dict - if debug: - logger.out("Merge OSD data together", state="d", prefix="ceph-thread") - - osd_stats = dict() - - for osd in osd_list: - if zkhandler.read(("osd.node", osd)) == config["node_hostname"]: - osds_this_node += 1 - try: - this_dump = osd_dump[osd] - this_dump.update(osd_df[osd]) - this_dump.update(osd_status[osd]) - osd_stats[osd] = this_dump - except KeyError as e: - # One or more of the status commands timed out, just continue - logger.out( - "Failed to parse OSD stats into dictionary: {}".format(e), state="w" - ) - - # Upload OSD data for the cluster (primary-only) - if this_node.coordinator_state == "primary": - if debug: - logger.out( - "Trigger updates for each OSD", state="d", prefix="ceph-thread" - ) - - for osd in osd_list: - try: - stats = json.dumps(osd_stats[osd]) - zkhandler.write([(("osd.stats", osd), str(stats))]) - except KeyError as e: - # One or more of the status commands timed out, just continue - logger.out( - "Failed to upload OSD stats from dictionary: {}".format(e), - state="w", - ) - - ceph_conn.shutdown() - - queue.put(osds_this_node) - - if debug: - logger.out("Thread finished", state="d", prefix="ceph-thread") - - -# VM stats update function -def collect_vm_stats(logger, config, zkhandler, this_node, queue): - debug = config["debug"] - if debug: - logger.out("Thread starting", state="d", prefix="vm-thread") - - # Connect to libvirt - libvirt_name = "qemu:///system" - if debug: - logger.out("Connecting to libvirt", state="d", prefix="vm-thread") - try: - lv_conn = libvirt.open(libvirt_name) - if lv_conn is None: - raise Exception - except Exception: - logger.out('Failed to open connection to "{}"'.format(libvirt_name), state="e") - return - - memalloc = 0 - memprov = 0 - vcpualloc = 0 - # Toggle state management of dead VMs to restart them - if debug: - logger.out( - "Toggle state management of dead VMs to restart them", - state="d", - prefix="vm-thread", - ) - # Make a copy of the d_domain; if not, and it changes in flight, this can fail - fixed_d_domain = this_node.d_domain.copy() - for domain, instance in fixed_d_domain.items(): - if domain in this_node.domain_list: - if instance.getstate() == "start" and instance.getnode() == this_node.name: - if instance.getdom() is not None: - try: - if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING: - logger.out( - "VM {} has failed".format(instance.domname), - state="w", - prefix="vm-thread", - ) - raise - except Exception: - # Toggle a state "change" - logger.out( - "Resetting state to {} for VM {}".format( - instance.getstate(), instance.domname - ), - state="i", - prefix="vm-thread", - ) - zkhandler.write( - [(("domain.state", domain), instance.getstate())] - ) - elif instance.getnode() == this_node.name: - memprov += instance.getmemory() - - # Get list of running domains from Libvirt - running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE) - - # Get statistics from any running VMs - for domain in running_domains: - try: - # Get basic information about the VM - tree = ElementTree.fromstring(domain.XMLDesc()) - domain_uuid = domain.UUIDString() - domain_name = domain.name() - - # Get all the raw information about the VM - if debug: - logger.out( - "Getting general statistics for VM {}".format(domain_name), - state="d", - prefix="vm-thread", - ) - ( - domain_state, - domain_maxmem, - domain_mem, - domain_vcpus, - domain_cputime, - ) = domain.info() - # We can't properly gather stats from a non-running VMs so continue - if domain_state != libvirt.VIR_DOMAIN_RUNNING: - continue - domain_memory_stats = domain.memoryStats() - domain_cpu_stats = domain.getCPUStats(True)[0] - - # Add the allocated memory to our memalloc value - memalloc += instance.getmemory() - memprov += instance.getmemory() - vcpualloc += instance.getvcpus() - except Exception as e: - if debug: - try: - logger.out( - "Failed getting VM information for {}: {}".format( - domain.name(), e - ), - state="d", - prefix="vm-thread", - ) - except Exception: - pass - continue - - # Ensure VM is present in the domain_list - if domain_uuid not in this_node.domain_list: - this_node.domain_list.append(domain_uuid) - - if debug: - logger.out( - "Getting disk statistics for VM {}".format(domain_name), - state="d", - prefix="vm-thread", - ) - domain_disk_stats = [] - try: - for disk in tree.findall("devices/disk"): - disk_name = disk.find("source").get("name") - if not disk_name: - disk_name = disk.find("source").get("file") - disk_stats = domain.blockStats(disk.find("target").get("dev")) - domain_disk_stats.append( - { - "name": disk_name, - "rd_req": disk_stats[0], - "rd_bytes": disk_stats[1], - "wr_req": disk_stats[2], - "wr_bytes": disk_stats[3], - "err": disk_stats[4], - } - ) - except Exception as e: - if debug: - try: - logger.out( - "Failed getting disk stats for {}: {}".format(domain.name(), e), - state="d", - prefix="vm-thread", - ) - except Exception: - pass - continue - - if debug: - logger.out( - "Getting network statistics for VM {}".format(domain_name), - state="d", - prefix="vm-thread", - ) - domain_network_stats = [] - try: - for interface in tree.findall("devices/interface"): - interface_type = interface.get("type") - if interface_type not in ["bridge"]: - continue - interface_name = interface.find("target").get("dev") - interface_bridge = interface.find("source").get("bridge") - interface_stats = domain.interfaceStats(interface_name) - domain_network_stats.append( - { - "name": interface_name, - "bridge": interface_bridge, - "rd_bytes": interface_stats[0], - "rd_packets": interface_stats[1], - "rd_errors": interface_stats[2], - "rd_drops": interface_stats[3], - "wr_bytes": interface_stats[4], - "wr_packets": interface_stats[5], - "wr_errors": interface_stats[6], - "wr_drops": interface_stats[7], - } - ) - except Exception as e: - if debug: - try: - logger.out( - "Failed getting network stats for {}: {}".format( - domain.name(), e - ), - state="d", - prefix="vm-thread", - ) - except Exception: - pass - continue - - # Create the final dictionary - domain_stats = { - "state": libvirt_vm_states[domain_state], - "maxmem": domain_maxmem, - "livemem": domain_mem, - "cpus": domain_vcpus, - "cputime": domain_cputime, - "mem_stats": domain_memory_stats, - "cpu_stats": domain_cpu_stats, - "disk_stats": domain_disk_stats, - "net_stats": domain_network_stats, - } - - if debug: - logger.out( - "Writing statistics for VM {} to Zookeeper".format(domain_name), - state="d", - prefix="vm-thread", - ) - - try: - zkhandler.write( - [(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))] - ) - except Exception as e: - if debug: - logger.out( - "Failed to write domain statistics: {}".format(e), - state="d", - prefix="vm-thread", - ) - - # Close the Libvirt connection - lv_conn.close() - - if debug: - logger.out( - f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}", - state="d", - prefix="vm-thread", - ) - - queue.put(len(running_domains)) - queue.put(memalloc) - queue.put(memprov) - queue.put(vcpualloc) - - if debug: - logger.out("Thread finished", state="d", prefix="vm-thread") - - -# Keepalive update function -def node_keepalive(logger, config, zkhandler, this_node): - debug = config["debug"] - - # Display node information to the terminal - if config["log_keepalives"]: - if this_node.coordinator_state == "primary": - cst_colour = logger.fmt_green - elif this_node.coordinator_state == "secondary": - cst_colour = logger.fmt_blue - else: - cst_colour = logger.fmt_cyan - - active_coordinator_state = this_node.coordinator_state - - runtime_start = datetime.now() - - # Set the migration selector in Zookeeper for clients to read - if config["enable_hypervisor"]: - if this_node.coordinator_state == "primary": - try: - if ( - zkhandler.read("base.config.migration_target_selector") - != config["migration_target_selector"] - ): - zkhandler.write( - [ - ( - "base.config.migration_target_selector", - config["migration_target_selector"], - ) - ] - ) - except Exception: - logger.out( - "Failed to set migration target selector in Zookeeper", - state="e", - prefix="main-thread", - ) - - # Set the upstream IP in Zookeeper for clients to read - if config["enable_networking"]: - if this_node.coordinator_state == "primary": - try: - if ( - zkhandler.read("base.config.upstream_ip") - != config["upstream_floating_ip"] - ): - zkhandler.write( - [("base.config.upstream_ip", config["upstream_floating_ip"])] - ) - except Exception: - logger.out( - "Failed to set upstream floating IP in Zookeeper", - state="e", - prefix="main-thread", - ) - - # Get past state and update if needed - if debug: - logger.out( - "Get past state and update if needed", state="d", prefix="main-thread" - ) - - past_state = zkhandler.read(("node.state.daemon", this_node.name)) - if past_state != "run" and past_state != "shutdown": - this_node.daemon_state = "run" - zkhandler.write([(("node.state.daemon", this_node.name), "run")]) - else: - this_node.daemon_state = "run" - - # Ensure the primary key is properly set - if debug: - logger.out( - "Ensure the primary key is properly set", state="d", prefix="main-thread" - ) - if this_node.coordinator_state == "primary": - if zkhandler.read("base.config.primary_node") != this_node.name: - zkhandler.write([("base.config.primary_node", this_node.name)]) - - # Run VM statistics collection in separate thread for parallelization - if config["enable_hypervisor"]: - vm_thread_queue = Queue() - vm_stats_thread = Thread( - target=collect_vm_stats, - args=(logger, config, zkhandler, this_node, vm_thread_queue), - kwargs={}, - ) - vm_stats_thread.start() - - # Run Ceph status collection in separate thread for parallelization - if config["enable_storage"]: - ceph_thread_queue = Queue() - ceph_stats_thread = Thread( - target=collect_ceph_stats, - args=(logger, config, zkhandler, this_node, ceph_thread_queue), - kwargs={}, - ) - ceph_stats_thread.start() - - # Get node performance statistics - this_node.memtotal = int(psutil.virtual_memory().total / 1024 / 1024) - this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024) - this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024) - this_node.cpuload = round(os.getloadavg()[0], 2) - - # Join against running threads - if config["enable_hypervisor"]: - vm_stats_thread.join(timeout=config["keepalive_interval"]) - if vm_stats_thread.is_alive(): - logger.out("VM stats gathering exceeded timeout, continuing", state="w") - if config["enable_storage"]: - ceph_stats_thread.join(timeout=config["keepalive_interval"]) - if ceph_stats_thread.is_alive(): - logger.out("Ceph stats gathering exceeded timeout, continuing", state="w") - - # Get information from thread queues - if config["enable_hypervisor"]: - try: - this_node.domains_count = vm_thread_queue.get( - timeout=config["keepalive_interval"] - ) - this_node.memalloc = vm_thread_queue.get( - timeout=config["keepalive_interval"] - ) - this_node.memprov = vm_thread_queue.get( - timeout=config["keepalive_interval"] - ) - this_node.vcpualloc = vm_thread_queue.get( - timeout=config["keepalive_interval"] - ) - except Exception: - logger.out("VM stats queue get exceeded timeout, continuing", state="w") - else: - this_node.domains_count = 0 - this_node.memalloc = 0 - this_node.memprov = 0 - this_node.vcpualloc = 0 - - if config["enable_storage"]: - try: - osds_this_node = ceph_thread_queue.get( - timeout=(config["keepalive_interval"] - 1) - ) - except Exception: - logger.out("Ceph stats queue get exceeded timeout, continuing", state="w") - osds_this_node = "?" - else: - osds_this_node = "0" - - # Set our information in zookeeper - keepalive_time = int(time.time()) - if debug: - logger.out("Set our information in zookeeper", state="d", prefix="main-thread") - try: - zkhandler.write( - [ - (("node.memory.total", this_node.name), str(this_node.memtotal)), - (("node.memory.used", this_node.name), str(this_node.memused)), - (("node.memory.free", this_node.name), str(this_node.memfree)), - (("node.memory.allocated", this_node.name), str(this_node.memalloc)), - (("node.memory.provisioned", this_node.name), str(this_node.memprov)), - (("node.vcpu.allocated", this_node.name), str(this_node.vcpualloc)), - (("node.cpu.load", this_node.name), str(this_node.cpuload)), - ( - ("node.count.provisioned_domains", this_node.name), - str(this_node.domains_count), - ), - ( - ("node.running_domains", this_node.name), - " ".join(this_node.domain_list), - ), - (("node.keepalive", this_node.name), str(keepalive_time)), - ] - ) - except Exception: - logger.out("Failed to set keepalive data", state="e") - - if config["log_keepalives"]: - runtime_end = datetime.now() - runtime_delta = runtime_end - runtime_start - runtime = "{:0.02f}".format(runtime_delta.total_seconds()) - - logger.out( - "{start_colour}{hostname} keepalive @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] in {runtime} seconds".format( - start_colour=logger.fmt_purple, - cst_colour=logger.fmt_bold + cst_colour, - nofmt=logger.fmt_end, - hostname=config["node_hostname"], - starttime=runtime_start, - costate=active_coordinator_state, - runtime=runtime, - ), - state="t", - ) - - if this_node.maintenance is True: - maintenance_colour = logger.fmt_blue - else: - maintenance_colour = logger.fmt_green - - if isinstance(this_node.health, int): - if this_node.health > 90: - health_colour = logger.fmt_green - elif this_node.health > 50: - health_colour = logger.fmt_yellow - else: - health_colour = logger.fmt_red - health_text = str(this_node.health) + "%" - - else: - health_colour = logger.fmt_blue - health_text = "N/A" - - if config["log_keepalive_cluster_details"]: - logger.out( - "{bold}Maintenance:{nofmt} {maintenance_colour}{maintenance}{nofmt} " - "{bold}Health:{nofmt} {health_colour}{health}{nofmt} " - "{bold}VMs:{nofmt} {domcount} " - "{bold}OSDs:{nofmt} {osdcount} " - "{bold}Load:{nofmt} {load} " - "{bold}Memory [MiB]: " - "{bold}Used:{nofmt} {usedmem} " - "{bold}Free:{nofmt} {freemem}".format( - bold=logger.fmt_bold, - maintenance_colour=maintenance_colour, - health_colour=health_colour, - nofmt=logger.fmt_end, - maintenance=this_node.maintenance, - health=health_text, - domcount=this_node.domains_count, - osdcount=osds_this_node, - load=this_node.cpuload, - freemem=this_node.memfree, - usedmem=this_node.memused, - ), - state="t", - ) - - # Look for dead nodes and fence them - if not this_node.maintenance: - if debug: - logger.out( - "Look for dead nodes and fence them", state="d", prefix="main-thread" - ) - if config["daemon_mode"] == "coordinator": - for node_name in zkhandler.children("base.node"): - try: - node_daemon_state = zkhandler.read(("node.state.daemon", node_name)) - node_keepalive = int(zkhandler.read(("node.keepalive", node_name))) - except Exception: - node_daemon_state = "unknown" - node_keepalive = 0 - - # Handle deadtime and fencng if needed - # (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds - # out-of-date while in 'start' state) - node_deadtime = int(time.time()) - ( - int(config["keepalive_interval"]) * int(config["fence_intervals"]) - ) - if node_keepalive < node_deadtime and node_daemon_state == "run": - logger.out( - "Node {} seems dead - starting monitor for fencing".format( - node_name - ), - state="w", - ) - zk_lock = zkhandler.writelock(("node.state.daemon", node_name)) - with zk_lock: - # Ensures that, if we lost the lock race and come out of waiting, - # we won't try to trigger our own fence thread. - if zkhandler.read(("node.state.daemon", node_name)) != "dead": - fence_thread = Thread( - target=pvcnoded.util.fencing.fence_node, - args=(node_name, zkhandler, config, logger), - kwargs={}, - ) - fence_thread.start() - # Write the updated data after we start the fence thread - zkhandler.write( - [(("node.state.daemon", node_name), "dead")] - ) diff --git a/health-daemon/pvchealthd/util/libvirt.py b/health-daemon/pvchealthd/util/libvirt.py deleted file mode 100644 index b769b9bb..00000000 --- a/health-daemon/pvchealthd/util/libvirt.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python3 - -# libvirt.py - Utility functions for pvcnoded libvirt -# Part of the Parallel Virtual Cluster (PVC) system -# -# Copyright (C) 2018-2022 Joshua M. Boniface -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, version 3. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -############################################################################### - -import libvirt - - -def validate_libvirtd(logger, config): - if config["enable_hypervisor"]: - libvirt_check_name = f'qemu+tcp://{config["node_hostname"]}/system' - logger.out(f"Connecting to Libvirt daemon at {libvirt_check_name}", state="i") - try: - lv_conn = libvirt.open(libvirt_check_name) - lv_conn.close() - except Exception as e: - logger.out(f"Failed to connect to Libvirt daemon: {e}", state="e") - return False - - return True diff --git a/health-daemon/pvchealthd/util/networking.py b/health-daemon/pvchealthd/util/networking.py deleted file mode 100644 index 0b6cfb79..00000000 --- a/health-daemon/pvchealthd/util/networking.py +++ /dev/null @@ -1,232 +0,0 @@ -#!/usr/bin/env python3 - -# networking.py - Utility functions for pvcnoded networking -# Part of the Parallel Virtual Cluster (PVC) system -# -# Copyright (C) 2018-2022 Joshua M. Boniface -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, version 3. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -############################################################################### - -import daemon_lib.common as common - -from time import sleep -from os import makedirs - - -def setup_sriov(logger, config): - logger.out("Setting up SR-IOV device support", state="i") - - # Enable unsafe interrupts for the vfio_iommu_type1 kernel module - try: - common.run_os_command("modprobe vfio_iommu_type1 allow_unsafe_interrupts=1") - with open( - "/sys/module/vfio_iommu_type1/parameters/allow_unsafe_interrupts", "w" - ) as mfh: - mfh.write("Y") - except Exception: - logger.out( - "Failed to enable vfio_iommu_type1 kernel module; SR-IOV may fail", - state="w", - ) - - # Loop through our SR-IOV NICs and enable the numvfs for each - for device in config["sriov_device"]: - logger.out( - f'Preparing SR-IOV PF {device["phy"]} with {device["vfcount"]} VFs', - state="i", - ) - try: - with open( - f'/sys/class/net/{device["phy"]}/device/sriov_numvfs', "r" - ) as vfh: - current_vf_count = vfh.read().strip() - with open( - f'/sys/class/net/{device["phy"]}/device/sriov_numvfs', "w" - ) as vfh: - vfh.write(str(device["vfcount"])) - except FileNotFoundError: - logger.out( - f'Failed to open SR-IOV configuration for PF {device["phy"]}; device may not support SR-IOV', - state="w", - ) - except OSError: - logger.out( - f'Failed to set SR-IOV VF count for PF {device["phy"]} to {device["vfcount"]}; already set to {current_vf_count}', - state="w", - ) - - if device.get("mtu", None) is not None: - logger.out( - f'Setting SR-IOV PF {device["phy"]} to MTU {device["mtu"]}', state="i" - ) - common.run_os_command(f'ip link set {device["phy"]} mtu {device["mtu"]} up') - - -def setup_interfaces(logger, config): - # Set up the Cluster interface - cluster_dev = config["cluster_dev"] - cluster_mtu = config["cluster_mtu"] - cluster_dev_ip = config["cluster_dev_ip"] - - logger.out( - f"Setting up Cluster network interface {cluster_dev} with MTU {cluster_mtu}", - state="i", - ) - - common.run_os_command(f"ip link set {cluster_dev} mtu {cluster_mtu} up") - - logger.out( - f"Setting up Cluster network bridge on interface {cluster_dev} with IP {cluster_dev_ip}", - state="i", - ) - - common.run_os_command("brctl addbr brcluster") - common.run_os_command(f"brctl addif brcluster {cluster_dev}") - common.run_os_command(f"ip link set brcluster mtu {cluster_mtu} up") - common.run_os_command(f"ip address add {cluster_dev_ip} dev brcluster") - - # Set up the Storage interface - storage_dev = config["storage_dev"] - storage_mtu = config["storage_mtu"] - storage_dev_ip = config["storage_dev_ip"] - - logger.out( - f"Setting up Storage network interface {storage_dev} with MTU {storage_mtu}", - state="i", - ) - - common.run_os_command(f"ip link set {storage_dev} mtu {storage_mtu} up") - - if storage_dev == cluster_dev: - if storage_dev_ip != cluster_dev_ip: - logger.out( - f"Setting up Storage network on Cluster network bridge with IP {storage_dev_ip}", - state="i", - ) - - common.run_os_command(f"ip address add {storage_dev_ip} dev brcluster") - else: - logger.out( - f"Setting up Storage network bridge on interface {storage_dev} with IP {storage_dev_ip}", - state="i", - ) - - common.run_os_command("brctl addbr brstorage") - common.run_os_command(f"brctl addif brstorage {storage_dev}") - common.run_os_command(f"ip link set brstorage mtu {storage_mtu} up") - common.run_os_command(f"ip address add {storage_dev_ip} dev brstorage") - - # Set up the Upstream interface - upstream_dev = config["upstream_dev"] - upstream_mtu = config["upstream_mtu"] - upstream_dev_ip = config["upstream_dev_ip"] - - logger.out( - f"Setting up Upstream network interface {upstream_dev} with MTU {upstream_mtu}", - state="i", - ) - - if upstream_dev == cluster_dev: - if upstream_dev_ip != cluster_dev_ip: - logger.out( - f"Setting up Upstream network on Cluster network bridge with IP {upstream_dev_ip}", - state="i", - ) - - common.run_os_command(f"ip address add {upstream_dev_ip} dev brcluster") - else: - logger.out( - f"Setting up Upstream network bridge on interface {upstream_dev} with IP {upstream_dev_ip}", - state="i", - ) - - common.run_os_command("brctl addbr brupstream") - common.run_os_command(f"brctl addif brupstream {upstream_dev}") - common.run_os_command(f"ip link set brupstream mtu {upstream_mtu} up") - common.run_os_command(f"ip address add {upstream_dev_ip} dev brupstream") - - upstream_gateway = config["upstream_gateway"] - if upstream_gateway is not None: - logger.out( - f"Setting up Upstream network default gateway IP {upstream_gateway}", - state="i", - ) - if upstream_dev == cluster_dev: - common.run_os_command( - f"ip route add default via {upstream_gateway} dev brcluster" - ) - else: - common.run_os_command( - f"ip route add default via {upstream_gateway} dev brupstream" - ) - - # Set up sysctl tweaks to optimize networking - # Enable routing functions - common.run_os_command("sysctl net.ipv4.ip_forward=1") - common.run_os_command("sysctl net.ipv6.ip_forward=1") - # Enable send redirects - common.run_os_command("sysctl net.ipv4.conf.all.send_redirects=1") - common.run_os_command("sysctl net.ipv4.conf.default.send_redirects=1") - common.run_os_command("sysctl net.ipv6.conf.all.send_redirects=1") - common.run_os_command("sysctl net.ipv6.conf.default.send_redirects=1") - # Accept source routes - common.run_os_command("sysctl net.ipv4.conf.all.accept_source_route=1") - common.run_os_command("sysctl net.ipv4.conf.default.accept_source_route=1") - common.run_os_command("sysctl net.ipv6.conf.all.accept_source_route=1") - common.run_os_command("sysctl net.ipv6.conf.default.accept_source_route=1") - # Disable RP filtering on Cluster and Upstream interfaces (to allow traffic pivoting) - common.run_os_command(f"sysctl net.ipv4.conf.{cluster_dev}.rp_filter=0") - common.run_os_command("sysctl net.ipv4.conf.brcluster.rp_filter=0") - common.run_os_command(f"sysctl net.ipv4.conf.{upstream_dev}.rp_filter=0") - common.run_os_command("sysctl net.ipv4.conf.brupstream.rp_filter=0") - common.run_os_command(f"sysctl net.ipv6.conf.{cluster_dev}.rp_filter=0") - common.run_os_command("sysctl net.ipv6.conf.brcluster.rp_filter=0") - common.run_os_command(f"sysctl net.ipv6.conf.{upstream_dev}.rp_filter=0") - common.run_os_command("sysctl net.ipv6.conf.brupstream.rp_filter=0") - - # Stop DNSMasq if it is running - common.run_os_command("systemctl stop dnsmasq.service") - - logger.out("Waiting 3 seconds for networking to come up", state="s") - sleep(3) - - -def create_nft_configuration(logger, config): - if config["enable_networking"]: - logger.out("Creating NFT firewall configuration", state="i") - - dynamic_directory = config["nft_dynamic_directory"] - - # Create directories - makedirs(f"{dynamic_directory}/networks", exist_ok=True) - makedirs(f"{dynamic_directory}/static", exist_ok=True) - - # Set up the base rules - nftables_base_rules = f"""# Base rules - flush ruleset - # Add the filter table and chains - add table inet filter - add chain inet filter forward {{ type filter hook forward priority 0; }} - add chain inet filter input {{ type filter hook input priority 0; }} - # Include static rules and network rules - include "{dynamic_directory}/static/*" - include "{dynamic_directory}/networks/*" - """ - - # Write the base firewall config - nftables_base_filename = f"{dynamic_directory}/base.nft" - with open(nftables_base_filename, "w") as nftfh: - nftfh.write(nftables_base_rules) - common.reload_firewall_rules(nftables_base_filename, logger) diff --git a/health-daemon/pvchealthd/util/services.py b/health-daemon/pvchealthd/util/services.py deleted file mode 100644 index f2f12888..00000000 --- a/health-daemon/pvchealthd/util/services.py +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/env python3 - -# services.py - Utility functions for pvcnoded external services -# Part of the Parallel Virtual Cluster (PVC) system -# -# Copyright (C) 2018-2022 Joshua M. Boniface -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, version 3. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -############################################################################### - -import daemon_lib.common as common -from time import sleep - - -def start_zookeeper(logger, config): - if config["daemon_mode"] == "coordinator": - logger.out("Starting Zookeeper daemon", state="i") - # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? - common.run_os_command("systemctl start zookeeper.service") - - -def start_libvirtd(logger, config): - if config["enable_hypervisor"]: - logger.out("Starting Libvirt daemon", state="i") - # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? - common.run_os_command("systemctl start libvirtd.service") - - -def start_patroni(logger, config): - if config["enable_networking"] and config["daemon_mode"] == "coordinator": - logger.out("Starting Patroni daemon", state="i") - # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? - common.run_os_command("systemctl start patroni.service") - - -def start_frrouting(logger, config): - if config["enable_networking"] and config["daemon_mode"] == "coordinator": - logger.out("Starting FRRouting daemon", state="i") - # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? - common.run_os_command("systemctl start frr.service") - - -def start_ceph_mon(logger, config): - if config["enable_storage"] and config["daemon_mode"] == "coordinator": - logger.out("Starting Ceph Monitor daemon", state="i") - # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? - common.run_os_command( - f'systemctl start ceph-mon@{config["node_hostname"]}.service' - ) - - -def start_ceph_mgr(logger, config): - if config["enable_storage"] and config["daemon_mode"] == "coordinator": - logger.out("Starting Ceph Manager daemon", state="i") - # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? - common.run_os_command( - f'systemctl start ceph-mgr@{config["node_hostname"]}.service' - ) - - -def start_keydb(logger, config): - if (config["enable_api"] or config["enable_worker"]) and config[ - "daemon_mode" - ] == "coordinator": - logger.out("Starting KeyDB daemon", state="i") - # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? - common.run_os_command("systemctl start keydb-server.service") - - -def start_worker(logger, config): - if config["enable_worker"]: - logger.out("Starting Celery Worker daemon", state="i") - # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? - common.run_os_command("systemctl start pvcworkerd.service") - - -def start_system_services(logger, config): - start_zookeeper(logger, config) - start_libvirtd(logger, config) - start_patroni(logger, config) - start_frrouting(logger, config) - start_ceph_mon(logger, config) - start_ceph_mgr(logger, config) - start_keydb(logger, config) - start_worker(logger, config) - - logger.out("Waiting 10 seconds for daemons to start", state="s") - sleep(10)