diff --git a/build-and-deploy.sh b/build-and-deploy.sh index 37c1318a..487ce294 100755 --- a/build-and-deploy.sh +++ b/build-and-deploy.sh @@ -65,6 +65,7 @@ for HOST in ${HOSTS[@]}; do echo -n "Restarting PVC daemons..." ssh $HOST $SUDO systemctl restart pvcapid &>/dev/null ssh $HOST $SUDO systemctl restart pvcworkerd &>/dev/null + ssh $HOST $SUDO systemctl restart pvchealthd &>/dev/null ssh $HOST $SUDO systemctl restart pvcnoded &>/dev/null echo " done." echo -n "Waiting for node daemon to be running..." diff --git a/build-unstable-deb.sh b/build-unstable-deb.sh index cce15a09..146cf5fd 100755 --- a/build-unstable-deb.sh +++ b/build-unstable-deb.sh @@ -13,9 +13,10 @@ echo ${new_ver} >&3 tmpdir=$( mktemp -d ) cp -a debian/changelog client-cli/setup.py ${tmpdir}/ cp -a node-daemon/pvcnoded/Daemon.py ${tmpdir}/node-Daemon.py +cp -a health-daemon/pvchealthd/Daemon.py ${tmpdir}/health-Daemon.py cp -a api-daemon/pvcapid/Daemon.py ${tmpdir}/api-Daemon.py # Replace the "base" version with the git revision version -sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py +sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py sed -i "s/${base_ver}-0/${new_ver}/" debian/changelog cat < debian/changelog pvc (${new_ver}) unstable; urgency=medium @@ -33,6 +34,7 @@ dpkg-buildpackage -us -uc cp -a ${tmpdir}/changelog debian/changelog cp -a ${tmpdir}/setup.py client-cli/setup.py cp -a ${tmpdir}/node-Daemon.py node-daemon/pvcnoded/Daemon.py +cp -a ${tmpdir}/health-Daemon.py health-daemon/pvchealthd/Daemon.py cp -a ${tmpdir}/api-Daemon.py api-daemon/pvcapid/Daemon.py # Clean up diff --git a/bump-version b/bump-version index 1e7b2b92..6225bf40 100755 --- a/bump-version +++ b/bump-version @@ -20,6 +20,7 @@ changelog="$( cat ${changelog_file} | grep -v '^#' | sed 's/^*/ */' )" rm ${changelog_file} sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcnoded/Daemon.py +sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvchealthd/Daemon.py sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," api-daemon/pvcapid/Daemon.py sed -i "s,version=\"${current_version}\",version=\"${new_version}\"," client-cli/setup.py echo ${new_version} > .version @@ -46,7 +47,7 @@ echo -e "${deb_changelog_new}" >> ${deb_changelog_file} echo -e "${deb_changelog_orig}" >> ${deb_changelog_file} mv ${deb_changelog_file} debian/changelog -git add node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version +git add node-daemon/pvcnoded/Daemon.py headlth-daemon/pvchealthd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version git commit -v popd &>/dev/null diff --git a/debian/control b/debian/control index 55278cd5..f642bd2d 100644 --- a/debian/control +++ b/debian/control @@ -8,12 +8,20 @@ X-Python3-Version: >= 3.2 Package: pvc-daemon-node Architecture: all -Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql +Depends: systemd, pvc-daemon-common, pvc-daemon-health, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql Description: Parallel Virtual Cluster node daemon A KVM/Zookeeper/Ceph-based VM and private cloud manager . This package installs the PVC node daemon +Package: pvc-daemon-health +Architecture: all +Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-yaml +Description: Parallel Virtual Cluster health daemon + A KVM/Zookeeper/Ceph-based VM and private cloud manager + . + This package installs the PVC health monitoring daemon + Package: pvc-daemon-api Architecture: all Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python-celery-common, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate, fio diff --git a/debian/pvc-daemon-api.preinst b/debian/pvc-daemon-api.preinst new file mode 100644 index 00000000..3b05fd62 --- /dev/null +++ b/debian/pvc-daemon-api.preinst @@ -0,0 +1,5 @@ +#!/bin/sh + +# Remove any cached CPython directories or files +echo "Cleaning up existing CPython files" +find /usr/share/pvc/pvcapid -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true diff --git a/debian/pvc-daemon-health.install b/debian/pvc-daemon-health.install new file mode 100644 index 00000000..f2e38228 --- /dev/null +++ b/debian/pvc-daemon-health.install @@ -0,0 +1,4 @@ +health-daemon/pvchealthd.py usr/share/pvc +health-daemon/pvchealthd usr/share/pvc +health-daemon/pvchealthd.service lib/systemd/system +health-daemon/plugins usr/share/pvc diff --git a/debian/pvc-daemon-health.postinst b/debian/pvc-daemon-health.postinst new file mode 100644 index 00000000..834a6f26 --- /dev/null +++ b/debian/pvc-daemon-health.postinst @@ -0,0 +1,14 @@ +#!/bin/sh + +# Reload systemd's view of the units +systemctl daemon-reload + +# Enable the service and target +systemctl enable /lib/systemd/system/pvchealthd.service + +# Inform administrator of the service restart/startup not occurring automatically +if systemctl is-active --quiet pvchealthd.service; then + echo "NOTE: The PVC health daemon (pvchealthd.service) has not been restarted; this is up to the administrator." +else + echo "NOTE: The PVC health daemon (pvchealthd.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it." +fi diff --git a/debian/pvc-daemon-health.preinst b/debian/pvc-daemon-health.preinst new file mode 100644 index 00000000..2eb6ac2c --- /dev/null +++ b/debian/pvc-daemon-health.preinst @@ -0,0 +1,5 @@ +#!/bin/sh + +# Remove any cached CPython directories or files +echo "Cleaning up existing CPython files" +find /usr/share/pvc/pvchealthd -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true diff --git a/debian/pvc-daemon-health.prerm b/debian/pvc-daemon-health.prerm new file mode 100644 index 00000000..43086aee --- /dev/null +++ b/debian/pvc-daemon-health.prerm @@ -0,0 +1,4 @@ +#!/bin/sh + +# Disable the services +systemctl disable pvchealthd.service diff --git a/debian/pvc-daemon-node.install b/debian/pvc-daemon-node.install index 6d2154a7..14ab9b64 100644 --- a/debian/pvc-daemon-node.install +++ b/debian/pvc-daemon-node.install @@ -4,4 +4,3 @@ node-daemon/pvcnoded.service lib/systemd/system node-daemon/pvc.target lib/systemd/system node-daemon/pvcautoready.service lib/systemd/system node-daemon/monitoring usr/share/pvc -node-daemon/plugins usr/share/pvc diff --git a/debian/pvc-daemon-node.preinst b/debian/pvc-daemon-node.preinst index 1cda1e03..b213c07d 100644 --- a/debian/pvc-daemon-node.preinst +++ b/debian/pvc-daemon-node.preinst @@ -2,4 +2,4 @@ # Remove any cached CPython directories or files echo "Cleaning up existing CPython files" -find /usr/share/pvc -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true +find /usr/share/pvc/pvcnoded -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true diff --git a/health-daemon/daemon_lib b/health-daemon/daemon_lib new file mode 120000 index 00000000..9df2c591 --- /dev/null +++ b/health-daemon/daemon_lib @@ -0,0 +1 @@ +../daemon-common \ No newline at end of file diff --git a/node-daemon/plugins/disk b/health-daemon/plugins/disk similarity index 99% rename from node-daemon/plugins/disk rename to health-daemon/plugins/disk index 9f8eb8a0..77e67cce 100644 --- a/node-daemon/plugins/disk +++ b/health-daemon/plugins/disk @@ -39,7 +39,7 @@ # This import is always required here, as MonitoringPlugin is used by the # MonitoringPluginScript class -from pvcnoded.objects.MonitoringInstance import MonitoringPlugin +from pvchealthd.objects.MonitoringInstance import MonitoringPlugin # A monitoring plugin script must always expose its nice name, which must be identical to diff --git a/node-daemon/plugins/dpkg b/health-daemon/plugins/dpkg similarity index 98% rename from node-daemon/plugins/dpkg rename to health-daemon/plugins/dpkg index 3abbdcf0..8497982b 100644 --- a/node-daemon/plugins/dpkg +++ b/health-daemon/plugins/dpkg @@ -40,7 +40,7 @@ # This import is always required here, as MonitoringPlugin is used by the # MonitoringPluginScript class -from pvcnoded.objects.MonitoringInstance import MonitoringPlugin +from pvchealthd.objects.MonitoringInstance import MonitoringPlugin # A monitoring plugin script must always expose its nice name, which must be identical to diff --git a/node-daemon/plugins/edac b/health-daemon/plugins/edac similarity index 98% rename from node-daemon/plugins/edac rename to health-daemon/plugins/edac index 6967ee20..a058e2da 100644 --- a/node-daemon/plugins/edac +++ b/health-daemon/plugins/edac @@ -38,7 +38,7 @@ # This import is always required here, as MonitoringPlugin is used by the # MonitoringPluginScript class -from pvcnoded.objects.MonitoringInstance import MonitoringPlugin +from pvchealthd.objects.MonitoringInstance import MonitoringPlugin # A monitoring plugin script must always expose its nice name, which must be identical to diff --git a/node-daemon/plugins/hwrd b/health-daemon/plugins/hwrd similarity index 99% rename from node-daemon/plugins/hwrd rename to health-daemon/plugins/hwrd index 0ebff282..aad20724 100644 --- a/node-daemon/plugins/hwrd +++ b/health-daemon/plugins/hwrd @@ -39,7 +39,7 @@ # This import is always required here, as MonitoringPlugin is used by the # MonitoringPluginScript class -from pvcnoded.objects.MonitoringInstance import MonitoringPlugin +from pvchealthd.objects.MonitoringInstance import MonitoringPlugin # A monitoring plugin script must always expose its nice name, which must be identical to diff --git a/node-daemon/plugins/ipmi b/health-daemon/plugins/ipmi similarity index 98% rename from node-daemon/plugins/ipmi rename to health-daemon/plugins/ipmi index c7e261f9..a7f73414 100644 --- a/node-daemon/plugins/ipmi +++ b/health-daemon/plugins/ipmi @@ -38,7 +38,7 @@ # This import is always required here, as MonitoringPlugin is used by the # MonitoringPluginScript class -from pvcnoded.objects.MonitoringInstance import MonitoringPlugin +from pvchealthd.objects.MonitoringInstance import MonitoringPlugin # A monitoring plugin script must always expose its nice name, which must be identical to diff --git a/node-daemon/plugins/kydb b/health-daemon/plugins/kydb similarity index 98% rename from node-daemon/plugins/kydb rename to health-daemon/plugins/kydb index 407ef64e..9f084c4e 100644 --- a/node-daemon/plugins/kydb +++ b/health-daemon/plugins/kydb @@ -38,7 +38,7 @@ # This import is always required here, as MonitoringPlugin is used by the # MonitoringPluginScript class -from pvcnoded.objects.MonitoringInstance import MonitoringPlugin +from pvchealthd.objects.MonitoringInstance import MonitoringPlugin # A monitoring plugin script must always expose its nice name, which must be identical to diff --git a/node-daemon/plugins/lbvt b/health-daemon/plugins/lbvt similarity index 98% rename from node-daemon/plugins/lbvt rename to health-daemon/plugins/lbvt index 6c7bdc88..fb77432d 100644 --- a/node-daemon/plugins/lbvt +++ b/health-daemon/plugins/lbvt @@ -38,7 +38,7 @@ # This import is always required here, as MonitoringPlugin is used by the # MonitoringPluginScript class -from pvcnoded.objects.MonitoringInstance import MonitoringPlugin +from pvchealthd.objects.MonitoringInstance import MonitoringPlugin # A monitoring plugin script must always expose its nice name, which must be identical to diff --git a/node-daemon/plugins/load b/health-daemon/plugins/load similarity index 98% rename from node-daemon/plugins/load rename to health-daemon/plugins/load index 8febce50..d7984f41 100644 --- a/node-daemon/plugins/load +++ b/health-daemon/plugins/load @@ -38,7 +38,7 @@ # This import is always required here, as MonitoringPlugin is used by the # MonitoringPluginScript class -from pvcnoded.objects.MonitoringInstance import MonitoringPlugin +from pvchealthd.objects.MonitoringInstance import MonitoringPlugin # A monitoring plugin script must always expose its nice name, which must be identical to diff --git a/node-daemon/plugins/nics b/health-daemon/plugins/nics similarity index 99% rename from node-daemon/plugins/nics rename to health-daemon/plugins/nics index a0eaabee..cd3b6779 100644 --- a/node-daemon/plugins/nics +++ b/health-daemon/plugins/nics @@ -39,7 +39,7 @@ # This import is always required here, as MonitoringPlugin is used by the # MonitoringPluginScript class -from pvcnoded.objects.MonitoringInstance import MonitoringPlugin +from pvchealthd.objects.MonitoringInstance import MonitoringPlugin # A monitoring plugin script must always expose its nice name, which must be identical to diff --git a/node-daemon/plugins/psql b/health-daemon/plugins/psql similarity index 98% rename from node-daemon/plugins/psql rename to health-daemon/plugins/psql index 53994039..08586316 100644 --- a/node-daemon/plugins/psql +++ b/health-daemon/plugins/psql @@ -38,7 +38,7 @@ # This import is always required here, as MonitoringPlugin is used by the # MonitoringPluginScript class -from pvcnoded.objects.MonitoringInstance import MonitoringPlugin +from pvchealthd.objects.MonitoringInstance import MonitoringPlugin # A monitoring plugin script must always expose its nice name, which must be identical to diff --git a/node-daemon/plugins/psur b/health-daemon/plugins/psur similarity index 98% rename from node-daemon/plugins/psur rename to health-daemon/plugins/psur index 7266c0b6..23320880 100644 --- a/node-daemon/plugins/psur +++ b/health-daemon/plugins/psur @@ -38,7 +38,7 @@ # This import is always required here, as MonitoringPlugin is used by the # MonitoringPluginScript class -from pvcnoded.objects.MonitoringInstance import MonitoringPlugin +from pvchealthd.objects.MonitoringInstance import MonitoringPlugin # A monitoring plugin script must always expose its nice name, which must be identical to diff --git a/node-daemon/plugins/zkpr b/health-daemon/plugins/zkpr similarity index 98% rename from node-daemon/plugins/zkpr rename to health-daemon/plugins/zkpr index 6a5171a4..5f6cf79b 100644 --- a/node-daemon/plugins/zkpr +++ b/health-daemon/plugins/zkpr @@ -38,7 +38,7 @@ # This import is always required here, as MonitoringPlugin is used by the # MonitoringPluginScript class -from pvcnoded.objects.MonitoringInstance import MonitoringPlugin +from pvchealthd.objects.MonitoringInstance import MonitoringPlugin # A monitoring plugin script must always expose its nice name, which must be identical to diff --git a/health-daemon/pvchealthd.py b/health-daemon/pvchealthd.py new file mode 100755 index 00000000..f3135673 --- /dev/null +++ b/health-daemon/pvchealthd.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 + +# pvchealthd.py - Health daemon startup stub +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import pvchealthd.Daemon # noqa: F401 + +pvchealthd.Daemon.entrypoint() diff --git a/health-daemon/pvchealthd.service b/health-daemon/pvchealthd.service new file mode 100644 index 00000000..f9e4b5af --- /dev/null +++ b/health-daemon/pvchealthd.service @@ -0,0 +1,20 @@ +# Parallel Virtual Cluster health daemon unit file + +[Unit] +Description = Parallel Virtual Cluster health daemon +After = network.target +Wants = network-online.target +PartOf = pvc.target + +[Service] +Type = simple +WorkingDirectory = /usr/share/pvc +Environment = PYTHONUNBUFFERED=true +Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf +ExecStartPre = /bin/sleep 2 +ExecStart = /usr/share/pvc/pvchealthd.py +ExecStopPost = /bin/sleep 2 +Restart = on-failure + +[Install] +WantedBy = pvc.target diff --git a/health-daemon/pvchealthd/Daemon.py b/health-daemon/pvchealthd/Daemon.py new file mode 100644 index 00000000..39a740fc --- /dev/null +++ b/health-daemon/pvchealthd/Daemon.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 + +# Daemon.py - Health daemon main entrypoing +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import pvchealthd.util.config +import pvchealthd.util.zookeeper + +import pvchealthd.objects.MonitoringInstance as MonitoringInstance +import pvchealthd.objects.NodeInstance as NodeInstance + +import daemon_lib.log as log + +from time import sleep + +import os +import signal + +# Daemon version +version = "0.9.82" + + +########################################################## +# Entrypoint +########################################################## + + +def entrypoint(): + monitoring_instance = None + + # Get our configuration + config = pvchealthd.util.config.get_configuration() + config["daemon_name"] = "pvchealthd" + config["daemon_version"] = version + + # Set up the logger instance + logger = log.Logger(config) + + # Print our startup message + logger.out("") + logger.out("|------------------------------------------------------------|") + logger.out("| |") + logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |") + logger.out("| ██ ▜█▙ ▟█▛ ██ |") + logger.out("| ███████████ ▜█▙ ▟█▛ ██ |") + logger.out("| ██ ▜█▙▟█▛ ███████████ |") + logger.out("| |") + logger.out("|------------------------------------------------------------|") + logger.out("| Parallel Virtual Cluster health daemon v{0: <18} |".format(version)) + logger.out("| Debug: {0: <51} |".format(str(config["debug"]))) + logger.out("| FQDN: {0: <52} |".format(config["node_fqdn"])) + logger.out("| Host: {0: <52} |".format(config["node_hostname"])) + logger.out("| ID: {0: <54} |".format(config["node_id"])) + logger.out("| IPMI hostname: {0: <43} |".format(config["ipmi_hostname"])) + logger.out("| Machine details: |") + logger.out("| CPUs: {0: <50} |".format(config["static_data"][0])) + logger.out("| Arch: {0: <50} |".format(config["static_data"][3])) + logger.out("| OS: {0: <52} |".format(config["static_data"][2])) + logger.out("| Kernel: {0: <48} |".format(config["static_data"][1])) + logger.out("|------------------------------------------------------------|") + logger.out("") + logger.out(f'Starting pvchealthd on host {config["node_fqdn"]}', state="s") + + # Connect to Zookeeper and return our handler and current schema version + zkhandler, _ = pvchealthd.util.zookeeper.connect(logger, config) + + # Define a cleanup function + def cleanup(failure=False): + nonlocal logger, zkhandler, monitoring_instance + + logger.out("Terminating pvchealthd and cleaning up", state="s") + + # Shut down the monitoring system + try: + logger.out("Shutting down monitoring subsystem", state="s") + monitoring_instance.shutdown() + except Exception: + pass + + # Close the Zookeeper connection + try: + zkhandler.disconnect(persistent=True) + del zkhandler + except Exception: + pass + + logger.out("Terminated health daemon", state="s") + logger.terminate() + + if failure: + retcode = 1 + else: + retcode = 0 + + os._exit(retcode) + + # Termination function + def term(signum="", frame=""): + cleanup(failure=False) + + # Hangup (logrotate) function + def hup(signum="", frame=""): + if config["file_logging"]: + logger.hup() + + # Handle signals gracefully + signal.signal(signal.SIGTERM, term) + signal.signal(signal.SIGINT, term) + signal.signal(signal.SIGQUIT, term) + signal.signal(signal.SIGHUP, hup) + + this_node = NodeInstance.NodeInstance( + config["node_hostname"], + zkhandler, + config, + logger, + ) + + # Set up the node monitoring instance and thread + monitoring_instance = MonitoringInstance.MonitoringInstance( + zkhandler, config, logger, this_node + ) + + # Tick loop; does nothing since everything is async + while True: + try: + sleep(1) + except Exception: + break diff --git a/health-daemon/pvchealthd/__init__.py b/health-daemon/pvchealthd/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/node-daemon/pvcnoded/objects/MonitoringInstance.py b/health-daemon/pvchealthd/objects/MonitoringInstance.py similarity index 99% rename from node-daemon/pvcnoded/objects/MonitoringInstance.py rename to health-daemon/pvchealthd/objects/MonitoringInstance.py index c87e01dd..5b55b630 100644 --- a/node-daemon/pvcnoded/objects/MonitoringInstance.py +++ b/health-daemon/pvchealthd/objects/MonitoringInstance.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# PluginInstance.py - Class implementing a PVC monitoring instance +# MonitoringInstance.py - Class implementing a PVC monitor in pvchealthd # Part of the Parallel Virtual Cluster (PVC) system # # Copyright (C) 2018-2022 Joshua M. Boniface diff --git a/health-daemon/pvchealthd/objects/NodeInstance.py b/health-daemon/pvchealthd/objects/NodeInstance.py new file mode 100644 index 00000000..2edcaf94 --- /dev/null +++ b/health-daemon/pvchealthd/objects/NodeInstance.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python3 + +# NodeInstance.py - Class implementing a PVC node in pvchealthd +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + + +class NodeInstance(object): + # Initialization function + def __init__( + self, + name, + zkhandler, + config, + logger, + ): + # Passed-in variables on creation + self.name = name + self.zkhandler = zkhandler + self.config = config + self.logger = logger + # States + self.daemon_state = "stop" + self.coordinator_state = "client" + self.domain_state = "flushed" + # Node resources + self.health = 100 + self.active_domains_count = 0 + self.provisioned_domains_count = 0 + self.memused = 0 + self.memfree = 0 + self.memalloc = 0 + self.vcpualloc = 0 + + # Zookeeper handlers for changed states + @self.zkhandler.zk_conn.DataWatch( + self.zkhandler.schema.path("node.state.daemon", self.name) + ) + def watch_node_daemonstate(data, stat, event=""): + if event and event.type == "DELETED": + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode("ascii") + except AttributeError: + data = "stop" + + if data != self.daemon_state: + self.daemon_state = data + + @self.zkhandler.zk_conn.DataWatch( + self.zkhandler.schema.path("node.state.router", self.name) + ) + def watch_node_routerstate(data, stat, event=""): + if event and event.type == "DELETED": + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode("ascii") + except AttributeError: + data = "client" + + if data != self.coordinator_state: + self.coordinator_state = data + + @self.zkhandler.zk_conn.DataWatch( + self.zkhandler.schema.path("node.state.domain", self.name) + ) + def watch_node_domainstate(data, stat, event=""): + if event and event.type == "DELETED": + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode("ascii") + except AttributeError: + data = "unknown" + + if data != self.domain_state: + self.domain_state = data + + @self.zkhandler.zk_conn.DataWatch( + self.zkhandler.schema.path("node.monitoring.health", self.name) + ) + def watch_node_health(data, stat, event=""): + if event and event.type == "DELETED": + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode("ascii") + except AttributeError: + data = 100 + + try: + data = int(data) + except ValueError: + pass + + if data != self.health: + self.health = data + + @self.zkhandler.zk_conn.DataWatch( + self.zkhandler.schema.path("node.memory.free", self.name) + ) + def watch_node_memfree(data, stat, event=""): + if event and event.type == "DELETED": + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode("ascii") + except AttributeError: + data = 0 + + if data != self.memfree: + self.memfree = data + + @self.zkhandler.zk_conn.DataWatch( + self.zkhandler.schema.path("node.memory.used", self.name) + ) + def watch_node_memused(data, stat, event=""): + if event and event.type == "DELETED": + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode("ascii") + except AttributeError: + data = 0 + + if data != self.memused: + self.memused = data + + @self.zkhandler.zk_conn.DataWatch( + self.zkhandler.schema.path("node.memory.allocated", self.name) + ) + def watch_node_memalloc(data, stat, event=""): + if event and event.type == "DELETED": + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode("ascii") + except AttributeError: + data = 0 + + if data != self.memalloc: + self.memalloc = data + + @self.zkhandler.zk_conn.DataWatch( + self.zkhandler.schema.path("node.vcpu.allocated", self.name) + ) + def watch_node_vcpualloc(data, stat, event=""): + if event and event.type == "DELETED": + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode("ascii") + except AttributeError: + data = 0 + + if data != self.vcpualloc: + self.vcpualloc = data + + @self.zkhandler.zk_conn.DataWatch( + self.zkhandler.schema.path("node.running_domains", self.name) + ) + def watch_node_runningdomains(data, stat, event=""): + if event and event.type == "DELETED": + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode("ascii").split() + except AttributeError: + data = [] + + if len(data) != self.active_domains_count: + self.active_domains_count = len(data) + + @self.zkhandler.zk_conn.DataWatch( + self.zkhandler.schema.path("node.count.provisioned_domains", self.name) + ) + def watch_node_domainscount(data, stat, event=""): + if event and event.type == "DELETED": + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode("ascii") + except AttributeError: + data = 0 + + if data != self.provisioned_domains_count: + self.provisioned_domains_count = data diff --git a/health-daemon/pvchealthd/objects/__init__.py b/health-daemon/pvchealthd/objects/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/health-daemon/pvchealthd/util/__init__.py b/health-daemon/pvchealthd/util/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/health-daemon/pvchealthd/util/config.py b/health-daemon/pvchealthd/util/config.py new file mode 100644 index 00000000..cc98e81e --- /dev/null +++ b/health-daemon/pvchealthd/util/config.py @@ -0,0 +1,696 @@ +#!/usr/bin/env python3 + +# config.py - Utility functions for pvcnoded configuration parsing +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import daemon_lib.common as common + +import os +import subprocess +import yaml + +from socket import gethostname +from re import findall +from psutil import cpu_count +from ipaddress import ip_address, ip_network +from json import loads + + +class MalformedConfigurationError(Exception): + """ + An except when parsing the PVC Node daemon configuration file + """ + + def __init__(self, error=None): + self.msg = f"ERROR: Configuration file is malformed: {error}" + + def __str__(self): + return str(self.msg) + + +def get_static_data(): + """ + Data that is obtained once at node startup for use later + """ + staticdata = list() + staticdata.append(str(cpu_count())) # CPU count + staticdata.append( + subprocess.run(["uname", "-r"], stdout=subprocess.PIPE) + .stdout.decode("ascii") + .strip() + ) + staticdata.append( + subprocess.run(["uname", "-o"], stdout=subprocess.PIPE) + .stdout.decode("ascii") + .strip() + ) + staticdata.append( + subprocess.run(["uname", "-m"], stdout=subprocess.PIPE) + .stdout.decode("ascii") + .strip() + ) + + return staticdata + + +def get_configuration_path(): + config_file = None + try: + _config_file = "/etc/pvc/pvcnoded.yaml" + if not os.path.exists(_config_file): + raise + config_file = _config_file + config_type = "legacy" + except Exception: + pass + try: + _config_file = os.environ["PVC_CONFIG_FILE"] + if not os.path.exists(_config_file): + raise + config_file = _config_file + config_type = "current" + except Exception: + pass + + if not config_file: + print('ERROR: The "PVC_CONFIG_FILE" environment variable must be set.') + os._exit(1) + + return config_file, config_type + + +def get_hostname(): + node_fqdn = gethostname() + node_hostname = node_fqdn.split(".", 1)[0] + node_domain = "".join(node_fqdn.split(".", 1)[1:]) + try: + node_id = findall(r"\d+", node_hostname)[-1] + except IndexError: + node_id = 0 + + return node_fqdn, node_hostname, node_domain, node_id + + +def validate_floating_ip(config, network): + if network not in ["cluster", "storage", "upstream"]: + return False, f'Specified network type "{network}" is not valid' + + floating_key = f"{network}_floating_ip" + network_key = f"{network}_network" + + # Verify the network provided is valid + try: + network = ip_network(config[network_key]) + except Exception: + return ( + False, + f"Network address {config[network_key]} for {network_key} is not valid", + ) + + # Verify that the floating IP is valid (and in the network) + try: + floating_address = ip_address(config[floating_key].split("/")[0]) + if floating_address not in list(network.hosts()): + raise + except Exception: + return ( + False, + f"Floating address {config[floating_key]} for {floating_key} is not valid", + ) + + return True, "" + + +def get_configuration_current(config_file): + print('Loading configuration from file "{}"'.format(config_file)) + + with open(config_file, "r") as cfgfh: + try: + o_config = yaml.load(cfgfh, Loader=yaml.SafeLoader) + except Exception as e: + print(f"ERROR: Failed to parse configuration file: {e}") + os._exit(1) + + config = dict() + + node_fqdn, node_hostname, node_domain, node_id = get_hostname() + + config_thisnode = { + "node": node_hostname, + "node_hostname": node_hostname, + "node_fqdn": node_fqdn, + "node_domain": node_domain, + "node_id": node_id, + } + config = {**config, **config_thisnode} + + try: + o_path = o_config["path"] + config_path = { + "plugin_directory": o_path.get( + "plugin_directory", "/usr/share/pvc/plugins" + ), + "dynamic_directory": o_path["dynamic_directory"], + "log_directory": o_path["system_log_directory"], + "console_log_directory": o_path["console_log_directory"], + "ceph_directory": o_path["ceph_directory"], + } + # Define our dynamic directory schema + config_path["dnsmasq_dynamic_directory"] = ( + config_path["dynamic_directory"] + "/dnsmasq" + ) + config_path["pdns_dynamic_directory"] = ( + config_path["dynamic_directory"] + "/pdns" + ) + config_path["nft_dynamic_directory"] = config_path["dynamic_directory"] + "/nft" + # Define our log directory schema + config_path["dnsmasq_log_directory"] = config_path["log_directory"] + "/dnsmasq" + config_path["pdns_log_directory"] = config_path["log_directory"] + "/pdns" + config_path["nft_log_directory"] = config_path["log_directory"] + "/nft" + config = {**config, **config_path} + + o_subsystem = o_config["subsystem"] + config_subsystem = { + "enable_hypervisor": o_subsystem.get("enable_hypervisor", True), + "enable_networking": o_subsystem.get("enable_networking", True), + "enable_storage": o_subsystem.get("enable_storage", True), + "enable_worker": o_subsystem.get("enable_worker", True), + "enable_api": o_subsystem.get("enable_api", True), + } + config = {**config, **config_subsystem} + + o_cluster = o_config["cluster"] + config_cluster = { + "cluster_name": o_cluster["name"], + "all_nodes": o_cluster["all_nodes"], + "coordinators": o_cluster["coordinator_nodes"], + } + config = {**config, **config_cluster} + + o_cluster_networks = o_cluster["networks"] + for network_type in ["cluster", "storage", "upstream"]: + o_cluster_networks_specific = o_cluster_networks[network_type] + config_cluster_networks_specific = { + f"{network_type}_domain": o_cluster_networks_specific["domain"], + f"{network_type}_dev": o_cluster_networks_specific["device"], + f"{network_type}_mtu": o_cluster_networks_specific["mtu"], + f"{network_type}_network": o_cluster_networks_specific["ipv4"][ + "network_address" + ] + + "/" + + str(o_cluster_networks_specific["ipv4"]["netmask"]), + f"{network_type}_floating_ip": o_cluster_networks_specific["ipv4"][ + "floating_address" + ] + + "/" + + str(o_cluster_networks_specific["ipv4"]["netmask"]), + f"{network_type}_node_ip_selection": o_cluster_networks_specific[ + "node_ip_selection" + ], + } + + if ( + o_cluster_networks_specific["ipv4"].get("gateway_address", None) + is not None + ): + config[f"{network_type}_gateway"] = o_cluster_networks_specific["ipv4"][ + "gateway_address" + ] + + result, msg = validate_floating_ip( + config_cluster_networks_specific, network_type + ) + if not result: + raise MalformedConfigurationError(msg) + + network = ip_network( + config_cluster_networks_specific[f"{network_type}_network"] + ) + + if ( + config_cluster_networks_specific[f"{network_type}_node_ip_selection"] + == "by-id" + ): + address_id = int(node_id) - 1 + else: + # This roundabout solution ensures the given IP is in the subnet and is something valid + address_id = [ + idx + for idx, ip in enumerate(list(network.hosts())) + if str(ip) + == config_cluster_networks_specific[ + f"{network_type}_node_ip_selection" + ] + ][0] + + config_cluster_networks_specific[ + f"{network_type}_dev_ip" + ] = f"{list(network.hosts())[address_id]}/{network.prefixlen}" + + config = {**config, **config_cluster_networks_specific} + + o_database = o_config["database"] + config_database = { + "zookeeper_port": o_database["zookeeper"]["port"], + "keydb_port": o_database["keydb"]["port"], + "keydb_host": o_database["keydb"]["hostname"], + "keydb_path": o_database["keydb"]["path"], + "metadata_postgresql_port": o_database["postgres"]["port"], + "metadata_postgresql_host": o_database["postgres"]["hostname"], + "metadata_postgresql_dbname": o_database["postgres"]["credentials"]["api"][ + "database" + ], + "metadata_postgresql_user": o_database["postgres"]["credentials"]["api"][ + "username" + ], + "metadata_postgresql_password": o_database["postgres"]["credentials"][ + "api" + ]["password"], + "pdns_postgresql_port": o_database["postgres"]["port"], + "pdns_postgresql_host": o_database["postgres"]["hostname"], + "pdns_postgresql_dbname": o_database["postgres"]["credentials"]["dns"][ + "database" + ], + "pdns_postgresql_user": o_database["postgres"]["credentials"]["dns"][ + "username" + ], + "pdns_postgresql_password": o_database["postgres"]["credentials"]["dns"][ + "password" + ], + } + config = {**config, **config_database} + + o_timer = o_config["timer"] + config_timer = { + "vm_shutdown_timeout": int(o_timer.get("vm_shutdown_timeout", 180)), + "keepalive_interval": int(o_timer.get("keepalive_interval", 5)), + "monitoring_interval": int(o_timer.get("monitoring_interval", 60)), + } + config = {**config, **config_timer} + + o_fencing = o_config["fencing"] + config_fencing = { + "disable_on_ipmi_failure": o_fencing["disable_on_ipmi_failure"], + "fence_intervals": int(o_fencing["intervals"].get("fence_intervals", 6)), + "suicide_intervals": int(o_fencing["intervals"].get("suicide_interval", 0)), + "successful_fence": o_fencing["actions"].get("successful_fence", None), + "failed_fence": o_fencing["actions"].get("failed_fence", None), + "ipmi_hostname": o_fencing["ipmi"]["hostname"].format(node_id=node_id), + "ipmi_username": o_fencing["ipmi"]["username"], + "ipmi_password": o_fencing["ipmi"]["password"], + } + config = {**config, **config_fencing} + + o_migration = o_config["migration"] + config_migration = { + "migration_target_selector": o_migration.get("target_selector", "mem"), + } + config = {**config, **config_migration} + + o_logging = o_config["logging"] + config_logging = { + "debug": o_logging.get("debug_logging", False), + "file_logging": o_logging.get("file_logging", False), + "stdout_logging": o_logging.get("stdout_logging", False), + "zookeeper_logging": o_logging.get("zookeeper_logging", False), + "log_colours": o_logging.get("log_colours", False), + "log_dates": o_logging.get("log_dates", False), + "log_keepalives": o_logging.get("log_keepalives", False), + "log_keepalive_cluster_details": o_logging.get( + "log_cluster_details", False + ), + "log_keepalive_plugin_details": o_logging.get( + "log_monitoring_details", False + ), + "console_log_lines": o_logging.get("console_log_lines", False), + "node_log_lines": o_logging.get("node_log_lines", False), + } + config = {**config, **config_logging} + + o_guest_networking = o_config["guest_networking"] + config_guest_networking = { + "bridge_dev": o_guest_networking["bridge_device"], + "bridge_mtu": o_guest_networking["bridge_mtu"], + "enable_sriov": o_guest_networking.get("sriov_enable", False), + "sriov_device": o_guest_networking.get("sriov_device", list()), + } + config = {**config, **config_guest_networking} + + o_ceph = o_config["ceph"] + config_ceph = { + "ceph_config_file": config["ceph_directory"] + + "/" + + o_ceph["ceph_config_file"], + "ceph_admin_keyring": config["ceph_directory"] + + "/" + + o_ceph["ceph_keyring_file"], + "ceph_monitor_port": o_ceph["monitor_port"], + "ceph_secret_uuid": o_ceph["secret_uuid"], + } + config = {**config, **config_ceph} + + # Add our node static data to the config + config["static_data"] = get_static_data() + + except Exception as e: + raise MalformedConfigurationError(e) + + return config + + +def get_configuration_legacy(pvcnoded_config_file): + print('Loading configuration from file "{}"'.format(pvcnoded_config_file)) + + with open(pvcnoded_config_file, "r") as cfgfile: + try: + o_config = yaml.load(cfgfile, Loader=yaml.SafeLoader) + except Exception as e: + print("ERROR: Failed to parse configuration file: {}".format(e)) + os._exit(1) + + node_fqdn, node_hostname, node_domain, node_id = get_hostname() + + # Create the configuration dictionary + config = dict() + + # Get the initial base configuration + try: + o_base = o_config["pvc"] + o_cluster = o_config["pvc"]["cluster"] + except Exception as e: + raise MalformedConfigurationError(e) + + config_general = { + "node": o_base.get("node", node_hostname), + "node_hostname": node_hostname, + "node_fqdn": node_fqdn, + "node_domain": node_domain, + "node_id": node_id, + "coordinators": o_cluster.get("coordinators", list()), + "debug": o_base.get("debug", False), + } + + config = {**config, **config_general} + + # Get the functions configuration + try: + o_functions = o_config["pvc"]["functions"] + except Exception as e: + raise MalformedConfigurationError(e) + + config_functions = { + "enable_hypervisor": o_functions.get("enable_hypervisor", False), + "enable_networking": o_functions.get("enable_networking", False), + "enable_storage": o_functions.get("enable_storage", False), + "enable_worker": o_functions.get("enable_worker", True), + "enable_api": o_functions.get("enable_api", False), + } + + config = {**config, **config_functions} + + # Get the directory configuration + try: + o_directories = o_config["pvc"]["system"]["configuration"]["directories"] + except Exception as e: + raise MalformedConfigurationError(e) + + config_directories = { + "plugin_directory": o_directories.get( + "plugin_directory", "/usr/share/pvc/plugins" + ), + "dynamic_directory": o_directories.get("dynamic_directory", None), + "log_directory": o_directories.get("log_directory", None), + "console_log_directory": o_directories.get("console_log_directory", None), + } + + # Define our dynamic directory schema + config_directories["dnsmasq_dynamic_directory"] = ( + config_directories["dynamic_directory"] + "/dnsmasq" + ) + config_directories["pdns_dynamic_directory"] = ( + config_directories["dynamic_directory"] + "/pdns" + ) + config_directories["nft_dynamic_directory"] = ( + config_directories["dynamic_directory"] + "/nft" + ) + + # Define our log directory schema + config_directories["dnsmasq_log_directory"] = ( + config_directories["log_directory"] + "/dnsmasq" + ) + config_directories["pdns_log_directory"] = ( + config_directories["log_directory"] + "/pdns" + ) + config_directories["nft_log_directory"] = ( + config_directories["log_directory"] + "/nft" + ) + + config = {**config, **config_directories} + + # Get the logging configuration + try: + o_logging = o_config["pvc"]["system"]["configuration"]["logging"] + except Exception as e: + raise MalformedConfigurationError(e) + + config_logging = { + "file_logging": o_logging.get("file_logging", False), + "stdout_logging": o_logging.get("stdout_logging", False), + "zookeeper_logging": o_logging.get("zookeeper_logging", False), + "log_colours": o_logging.get("log_colours", False), + "log_dates": o_logging.get("log_dates", False), + "log_keepalives": o_logging.get("log_keepalives", False), + "log_keepalive_cluster_details": o_logging.get( + "log_keepalive_cluster_details", False + ), + "log_keepalive_plugin_details": o_logging.get( + "log_keepalive_plugin_details", False + ), + "console_log_lines": o_logging.get("console_log_lines", False), + "node_log_lines": o_logging.get("node_log_lines", False), + } + + config = {**config, **config_logging} + + # Get the interval configuration + try: + o_intervals = o_config["pvc"]["system"]["intervals"] + except Exception as e: + raise MalformedConfigurationError(e) + + config_intervals = { + "vm_shutdown_timeout": int(o_intervals.get("vm_shutdown_timeout", 60)), + "keepalive_interval": int(o_intervals.get("keepalive_interval", 5)), + "monitoring_interval": int(o_intervals.get("monitoring_interval", 60)), + "fence_intervals": int(o_intervals.get("fence_intervals", 6)), + "suicide_intervals": int(o_intervals.get("suicide_interval", 0)), + } + + config = {**config, **config_intervals} + + # Get the fencing configuration + try: + o_fencing = o_config["pvc"]["system"]["fencing"] + o_fencing_actions = o_fencing["actions"] + o_fencing_ipmi = o_fencing["ipmi"] + except Exception as e: + raise MalformedConfigurationError(e) + + config_fencing = { + "successful_fence": o_fencing_actions.get("successful_fence", None), + "failed_fence": o_fencing_actions.get("failed_fence", None), + "ipmi_hostname": o_fencing_ipmi.get( + "host", f"{node_hostname}-lom.{node_domain}" + ), + "ipmi_username": o_fencing_ipmi.get("user", "null"), + "ipmi_password": o_fencing_ipmi.get("pass", "null"), + } + + config = {**config, **config_fencing} + + # Get the migration configuration + try: + o_migration = o_config["pvc"]["system"]["migration"] + except Exception as e: + raise MalformedConfigurationError(e) + + config_migration = { + "migration_target_selector": o_migration.get("target_selector", "mem"), + } + + config = {**config, **config_migration} + + if config["enable_networking"]: + # Get the node networks configuration + try: + o_networks = o_config["pvc"]["cluster"]["networks"] + o_network_cluster = o_networks["cluster"] + o_network_storage = o_networks["storage"] + o_network_upstream = o_networks["upstream"] + o_sysnetworks = o_config["pvc"]["system"]["configuration"]["networking"] + o_sysnetwork_cluster = o_sysnetworks["cluster"] + o_sysnetwork_storage = o_sysnetworks["storage"] + o_sysnetwork_upstream = o_sysnetworks["upstream"] + except Exception as e: + raise MalformedConfigurationError(e) + + config_networks = { + "cluster_domain": o_network_cluster.get("domain", None), + "cluster_network": o_network_cluster.get("network", None), + "cluster_floating_ip": o_network_cluster.get("floating_ip", None), + "cluster_dev": o_sysnetwork_cluster.get("device", None), + "cluster_mtu": o_sysnetwork_cluster.get("mtu", None), + "cluster_dev_ip": o_sysnetwork_cluster.get("address", None), + "storage_domain": o_network_storage.get("domain", None), + "storage_network": o_network_storage.get("network", None), + "storage_floating_ip": o_network_storage.get("floating_ip", None), + "storage_dev": o_sysnetwork_storage.get("device", None), + "storage_mtu": o_sysnetwork_storage.get("mtu", None), + "storage_dev_ip": o_sysnetwork_storage.get("address", None), + "upstream_domain": o_network_upstream.get("domain", None), + "upstream_network": o_network_upstream.get("network", None), + "upstream_floating_ip": o_network_upstream.get("floating_ip", None), + "upstream_gateway": o_network_upstream.get("gateway", None), + "upstream_dev": o_sysnetwork_upstream.get("device", None), + "upstream_mtu": o_sysnetwork_upstream.get("mtu", None), + "upstream_dev_ip": o_sysnetwork_upstream.get("address", None), + "bridge_dev": o_sysnetworks.get("bridge_device", None), + "bridge_mtu": o_sysnetworks.get("bridge_mtu", None), + "enable_sriov": o_sysnetworks.get("sriov_enable", False), + "sriov_device": o_sysnetworks.get("sriov_device", list()), + } + + if config_networks["bridge_mtu"] is None: + # Read the current MTU of bridge_dev and set bridge_mtu to it; avoids weird resets + retcode, stdout, stderr = common.run_os_command( + f"ip -json link show dev {config_networks['bridge_dev']}" + ) + current_bridge_mtu = loads(stdout)[0]["mtu"] + print( + f"Config key bridge_mtu not explicitly set; using live MTU {current_bridge_mtu} from {config_networks['bridge_dev']}" + ) + config_networks["bridge_mtu"] = current_bridge_mtu + + config = {**config, **config_networks} + + for network_type in ["cluster", "storage", "upstream"]: + result, msg = validate_floating_ip(config, network_type) + if not result: + raise MalformedConfigurationError(msg) + + address_key = "{}_dev_ip".format(network_type) + network_key = f"{network_type}_network" + network = ip_network(config[network_key]) + # With autoselection of addresses, construct an IP from the relevant network + if config[address_key] == "by-id": + # The NodeID starts at 1, but indexes start at 0 + address_id = int(config["node_id"]) - 1 + # Grab the nth address from the network + config[address_key] = "{}/{}".format( + list(network.hosts())[address_id], network.prefixlen + ) + # Validate the provided IP instead + else: + try: + address = ip_address(config[address_key].split("/")[0]) + if address not in list(network.hosts()): + raise + except Exception: + raise MalformedConfigurationError( + f"IP address {config[address_key]} for {address_key} is not valid" + ) + + # Get the PowerDNS aggregator database configuration + try: + o_pdnsdb = o_config["pvc"]["coordinator"]["dns"]["database"] + except Exception as e: + raise MalformedConfigurationError(e) + + config_pdnsdb = { + "pdns_postgresql_host": o_pdnsdb.get("host", None), + "pdns_postgresql_port": o_pdnsdb.get("port", None), + "pdns_postgresql_dbname": o_pdnsdb.get("name", None), + "pdns_postgresql_user": o_pdnsdb.get("user", None), + "pdns_postgresql_password": o_pdnsdb.get("pass", None), + } + + config = {**config, **config_pdnsdb} + + # Get the Cloud-Init Metadata database configuration + try: + o_metadatadb = o_config["pvc"]["coordinator"]["metadata"]["database"] + except Exception as e: + raise MalformedConfigurationError(e) + + config_metadatadb = { + "metadata_postgresql_host": o_metadatadb.get("host", None), + "metadata_postgresql_port": o_metadatadb.get("port", None), + "metadata_postgresql_dbname": o_metadatadb.get("name", None), + "metadata_postgresql_user": o_metadatadb.get("user", None), + "metadata_postgresql_password": o_metadatadb.get("pass", None), + } + + config = {**config, **config_metadatadb} + + if config["enable_storage"]: + # Get the storage configuration + try: + o_storage = o_config["pvc"]["system"]["configuration"]["storage"] + except Exception as e: + raise MalformedConfigurationError(e) + + config_storage = { + "ceph_config_file": o_storage.get("ceph_config_file", None), + "ceph_admin_keyring": o_storage.get("ceph_admin_keyring", None), + } + + config = {**config, **config_storage} + + # Add our node static data to the config + config["static_data"] = get_static_data() + + return config + + +def get_configuration(): + """ + Parse the configuration of the node daemon. + """ + pvc_config_file, pvc_config_type = get_configuration_path() + + if pvc_config_type == "legacy": + config = get_configuration_legacy(pvc_config_file) + else: + config = get_configuration_current(pvc_config_file) + + return config + + +def validate_directories(config): + if not os.path.exists(config["dynamic_directory"]): + os.makedirs(config["dynamic_directory"]) + os.makedirs(config["dnsmasq_dynamic_directory"]) + os.makedirs(config["pdns_dynamic_directory"]) + os.makedirs(config["nft_dynamic_directory"]) + + if not os.path.exists(config["log_directory"]): + os.makedirs(config["log_directory"]) + os.makedirs(config["dnsmasq_log_directory"]) + os.makedirs(config["pdns_log_directory"]) + os.makedirs(config["nft_log_directory"]) diff --git a/health-daemon/pvchealthd/util/fencing.py b/health-daemon/pvchealthd/util/fencing.py new file mode 100644 index 00000000..15956428 --- /dev/null +++ b/health-daemon/pvchealthd/util/fencing.py @@ -0,0 +1,338 @@ +#!/usr/bin/env python3 + +# fencing.py - Utility functions for pvcnoded fencing +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import time + +import daemon_lib.common as common + +from daemon_lib.vm import vm_worker_flush_locks + + +# +# Fence thread entry function +# +def fence_node(node_name, zkhandler, config, logger): + # We allow exactly 6 saving throws (30 seconds) for the host to come back online or we kill it + failcount_limit = 6 + failcount = 0 + while failcount < failcount_limit: + # Wait 5 seconds + time.sleep(config["keepalive_interval"]) + # Get the state + node_daemon_state = zkhandler.read(("node.state.daemon", node_name)) + # Is it still 'dead' + if node_daemon_state == "dead": + failcount += 1 + logger.out( + f"Node {node_name} failed {failcount}/{failcount_limit} saving throws", + state="s", + prefix=f"fencing {node_name}", + ) + # It changed back to something else so it must be alive + else: + logger.out( + f"Node {node_name} passed a saving throw; cancelling fance", + state="o", + prefix=f"fencing {node_name}", + ) + return + + logger.out( + f"Fencing node {node_name} via IPMI reboot signal", + state="s", + prefix=f"fencing {node_name}", + ) + + # Get IPMI information + ipmi_hostname = zkhandler.read(("node.ipmi.hostname", node_name)) + ipmi_username = zkhandler.read(("node.ipmi.username", node_name)) + ipmi_password = zkhandler.read(("node.ipmi.password", node_name)) + + # Shoot it in the head + fence_status = reboot_via_ipmi( + node_name, ipmi_hostname, ipmi_username, ipmi_password, logger + ) + + # Hold to ensure the fence takes effect and system stabilizes + logger.out( + f"Waiting {config['keepalive_interval']}s for fence of node {node_name} to take effect", + state="i", + prefix=f"fencing {node_name}", + ) + time.sleep(config["keepalive_interval"]) + + if fence_status: + logger.out( + f"Marking node {node_name} as fenced", + state="i", + prefix=f"fencing {node_name}", + ) + while True: + try: + zkhandler.write([(("node.state.daemon", node_name), "fenced")]) + break + except Exception: + continue + + # Force into secondary network state if needed + if node_name in config["coordinators"]: + logger.out( + f"Forcing secondary coordinator state for node {node_name}", + state="i", + prefix=f"fencing {node_name}", + ) + zkhandler.write([(("node.state.router", node_name), "secondary")]) + if zkhandler.read("base.config.primary_node") == node_name: + zkhandler.write([("base.config.primary_node", "none")]) + + # If the fence succeeded and successful_fence is migrate + if fence_status and config["successful_fence"] == "migrate": + migrateFromFencedNode(zkhandler, node_name, config, logger) + + # If the fence failed and failed_fence is migrate + if ( + not fence_status + and config["failed_fence"] == "migrate" + and config["suicide_intervals"] != "0" + ): + migrateFromFencedNode(zkhandler, node_name, config, logger) + + +# Migrate hosts away from a fenced node +def migrateFromFencedNode(zkhandler, node_name, config, logger): + logger.out( + f"Migrating VMs from dead node {node_name} to new hosts", + state="i", + prefix=f"fencing {node_name}", + ) + + # Get the list of VMs + dead_node_running_domains = zkhandler.read( + ("node.running_domains", node_name) + ).split() + + # Set the node to a custom domainstate so we know what's happening + zkhandler.write([(("node.state.domain", node_name), "fence-flush")]) + + # Migrate a VM after a flush + def fence_migrate_vm(dom_uuid): + logger.out( + f"Flushing locks of VM {dom_uuid} due to fence", + state="i", + prefix=f"fencing {node_name}", + ) + vm_worker_flush_locks(zkhandler, None, dom_uuid, force_unlock=True) + + target_node = common.findTargetNode(zkhandler, dom_uuid) + + if target_node is not None: + logger.out( + f"Migrating VM {dom_uuid} to node {target_node}", + state="i", + prefix=f"fencing {node_name}", + ) + zkhandler.write( + [ + (("domain.state", dom_uuid), "start"), + (("domain.node", dom_uuid), target_node), + (("domain.last_node", dom_uuid), node_name), + ] + ) + logger.out( + f"Successfully migrated running VM {dom_uuid} to node {target_node}", + state="o", + prefix=f"fencing {node_name}", + ) + else: + logger.out( + f"No target node found for VM {dom_uuid}; marking autostart=True on current node", + state="i", + prefix=f"fencing {node_name}", + ) + zkhandler.write( + { + (("domain.state", dom_uuid), "stopped"), + (("domain.meta.autostart", dom_uuid), "True"), + } + ) + logger.out( + f"Successfully marked autostart for running VM {dom_uuid} on current node", + state="o", + prefix=f"fencing {node_name}", + ) + + # Loop through the VMs + for dom_uuid in dead_node_running_domains: + try: + fence_migrate_vm(dom_uuid) + except Exception as e: + logger.out( + f"Failed to migrate VM {dom_uuid}, continuing: {e}", + state="w", + prefix=f"fencing {node_name}", + ) + + # Set node in flushed state for easy remigrating when it comes back + zkhandler.write([(("node.state.domain", node_name), "flushed")]) + logger.out( + f"All VMs flushed from dead node {node_name} to other nodes", + state="i", + prefix=f"fencing {node_name}", + ) + + +# +# Perform an IPMI fence +# +def reboot_via_ipmi(node_name, ipmi_hostname, ipmi_user, ipmi_password, logger): + # Power off the node the node + logger.out( + "Sending power off to dead node", + state="i", + prefix=f"fencing {node_name}", + ) + ipmi_stop_retcode, ipmi_stop_stdout, ipmi_stop_stderr = common.run_os_command( + f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power off" + ) + if ipmi_stop_retcode != 0: + logger.out( + f"Failed to power off dead node: {ipmi_stop_stderr}", + state="e", + prefix=f"fencing {node_name}", + ) + + logger.out( + "Waiting 5s for power off to take effect", + state="i", + prefix=f"fencing {node_name}", + ) + time.sleep(5) + + # Check the chassis power state + logger.out( + "Checking power state of dead node", + state="i", + prefix=f"fencing {node_name}", + ) + ipmi_status_retcode, ipmi_status_stdout, ipmi_status_stderr = common.run_os_command( + f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status" + ) + if ipmi_status_retcode == 0: + logger.out( + f"Current chassis power state is: {ipmi_status_stdout.strip()}", + state="i", + prefix=f"fencing {node_name}", + ) + else: + logger.out( + "Current chassis power state is: Unknown", + state="w", + prefix=f"fencing {node_name}", + ) + + # Power on the node + logger.out( + "Sending power on to dead node", + state="i", + prefix=f"fencing {node_name}", + ) + ipmi_start_retcode, ipmi_start_stdout, ipmi_start_stderr = common.run_os_command( + f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power on" + ) + + if ipmi_start_retcode != 0: + logger.out( + f"Failed to power on dead node: {ipmi_start_stderr}", + state="w", + prefix=f"fencing {node_name}", + ) + + logger.out( + "Waiting 2s for power on to take effect", + state="i", + prefix=f"fencing {node_name}", + ) + time.sleep(2) + + # Check the chassis power state + logger.out( + "Checking power state of dead node", + state="i", + prefix=f"fencing {node_name}", + ) + ipmi_status_retcode, ipmi_status_stdout, ipmi_status_stderr = common.run_os_command( + f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status" + ) + + if ipmi_stop_retcode == 0: + if ipmi_status_stdout.strip() == "Chassis Power is on": + # We successfully rebooted the node and it is powered on; this is a succeessful fence + logger.out( + "Successfully rebooted dead node; proceeding with fence recovery action", + state="o", + prefix=f"fencing {node_name}", + ) + return True + elif ipmi_status_stdout.strip() == "Chassis Power is off": + # We successfully rebooted the node but it is powered off; this might be expected or not, but the node is confirmed off so we can call it a successful fence + logger.out( + "Chassis power is in confirmed off state after successfuly IPMI reboot; proceeding with fence recovery action", + state="o", + prefix=f"fencing {node_name}", + ) + return True + else: + # We successfully rebooted the node but it is in some unknown power state; since this might indicate a silent failure, we must call it a failed fence + logger.out( + f"Chassis power is in an unknown state ({ipmi_status_stdout.strip()}) after successful IPMI reboot; NOT proceeding fence recovery action", + state="e", + prefix=f"fencing {node_name}", + ) + return False + else: + if ipmi_status_stdout.strip() == "Chassis Power is off": + # We failed to reboot the node but it is powered off; it has probably suffered a serious hardware failure, but the node is confirmed off so we can call it a successful fence + logger.out( + "Chassis power is in confirmed off state after failed IPMI reboot; proceeding with fence recovery action", + state="o", + prefix=f"fencing {node_name}", + ) + return True + else: + # We failed to reboot the node but it is in some unknown power state (including "on"); since this might indicate a silent failure, we must call it a failed fence + logger.out( + "Chassis power is not in confirmed off state after failed IPMI reboot; NOT proceeding wiht fence recovery action", + state="e", + prefix=f"fencing {node_name}", + ) + return False + + +# +# Verify that IPMI connectivity to this host exists (used during node init) +# +def verify_ipmi(ipmi_hostname, ipmi_user, ipmi_password): + ipmi_command = f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status" + retcode, stdout, stderr = common.run_os_command(ipmi_command, timeout=2) + if retcode == 0 and stdout.strip() == "Chassis Power is on": + return True + else: + return False diff --git a/health-daemon/pvchealthd/util/keepalive.py b/health-daemon/pvchealthd/util/keepalive.py new file mode 100644 index 00000000..96da259f --- /dev/null +++ b/health-daemon/pvchealthd/util/keepalive.py @@ -0,0 +1,968 @@ +#!/usr/bin/env python3 + +# keepalive.py - Utility functions for pvcnoded Keepalives +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import pvcnoded.util.fencing + +import daemon_lib.common as common + +from apscheduler.schedulers.background import BackgroundScheduler +from rados import Rados +from xml.etree import ElementTree +from queue import Queue +from threading import Thread +from datetime import datetime + +import json +import re +import libvirt +import psutil +import os +import time + + +# State table for pretty stats +libvirt_vm_states = { + 0: "NOSTATE", + 1: "RUNNING", + 2: "BLOCKED", + 3: "PAUSED", + 4: "SHUTDOWN", + 5: "SHUTOFF", + 6: "CRASHED", + 7: "PMSUSPENDED", +} + + +def start_keepalive_timer(logger, config, zkhandler, this_node): + keepalive_interval = config["keepalive_interval"] + logger.out( + f"Starting keepalive timer ({keepalive_interval} second interval)", state="s" + ) + keepalive_timer = BackgroundScheduler() + keepalive_timer.add_job( + node_keepalive, + args=(logger, config, zkhandler, this_node), + trigger="interval", + seconds=keepalive_interval, + ) + keepalive_timer.start() + return keepalive_timer + + +def stop_keepalive_timer(logger, keepalive_timer): + try: + keepalive_timer.shutdown() + logger.out("Stopping keepalive timer", state="s") + except Exception: + logger.out("Failed to stop keepalive timer", state="w") + + +# Ceph stats update function +def collect_ceph_stats(logger, config, zkhandler, this_node, queue): + pool_list = zkhandler.children("base.pool") + osd_list = zkhandler.children("base.osd") + + debug = config["debug"] + if debug: + logger.out("Thread starting", state="d", prefix="ceph-thread") + + # Connect to the Ceph cluster + try: + ceph_conn = Rados( + conffile=config["ceph_config_file"], + conf=dict(keyring=config["ceph_admin_keyring"]), + ) + if debug: + logger.out("Connecting to cluster", state="d", prefix="ceph-thread") + ceph_conn.connect(timeout=1) + except Exception as e: + logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e") + return + + # Primary-only functions + if this_node.coordinator_state == "primary": + # Get Ceph status information (pretty) + if debug: + logger.out( + "Set Ceph status information in zookeeper (primary only)", + state="d", + prefix="ceph-thread", + ) + + command = {"prefix": "status", "format": "pretty"} + ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[ + 1 + ].decode("ascii") + try: + zkhandler.write([("base.storage", str(ceph_status))]) + except Exception as e: + logger.out("Failed to set Ceph status data: {}".format(e), state="e") + + # Get Ceph health information (JSON) + if debug: + logger.out( + "Set Ceph health information in zookeeper (primary only)", + state="d", + prefix="ceph-thread", + ) + + command = {"prefix": "health", "format": "json"} + ceph_health = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[ + 1 + ].decode("ascii") + try: + zkhandler.write([("base.storage.health", str(ceph_health))]) + except Exception as e: + logger.out("Failed to set Ceph health data: {}".format(e), state="e") + + # Get Ceph df information (pretty) + if debug: + logger.out( + "Set Ceph rados df information in zookeeper (primary only)", + state="d", + prefix="ceph-thread", + ) + + # Get rados df info + command = {"prefix": "df", "format": "pretty"} + ceph_df = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1].decode( + "ascii" + ) + try: + zkhandler.write([("base.storage.util", str(ceph_df))]) + except Exception as e: + logger.out("Failed to set Ceph utilization data: {}".format(e), state="e") + + if debug: + logger.out( + "Set pool information in zookeeper (primary only)", + state="d", + prefix="ceph-thread", + ) + + # Get pool info + command = {"prefix": "df", "format": "json"} + ceph_df_output = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[ + 1 + ].decode("ascii") + try: + ceph_pool_df_raw = json.loads(ceph_df_output)["pools"] + except Exception as e: + logger.out("Failed to obtain Pool data (ceph df): {}".format(e), state="w") + ceph_pool_df_raw = [] + + retcode, stdout, stderr = common.run_os_command( + "rados df --format json", timeout=1 + ) + try: + rados_pool_df_raw = json.loads(stdout)["pools"] + except Exception as e: + logger.out("Failed to obtain Pool data (rados df): {}".format(e), state="w") + rados_pool_df_raw = [] + + pool_count = len(ceph_pool_df_raw) + if debug: + logger.out( + "Getting info for {} pools".format(pool_count), + state="d", + prefix="ceph-thread", + ) + for pool_idx in range(0, pool_count): + try: + # Combine all the data for this pool + ceph_pool_df = ceph_pool_df_raw[pool_idx] + rados_pool_df = rados_pool_df_raw[pool_idx] + pool = ceph_pool_df + pool.update(rados_pool_df) + + # Ignore any pools that aren't in our pool list + if pool["name"] not in pool_list: + if debug: + logger.out( + "Pool {} not in pool list {}".format( + pool["name"], pool_list + ), + state="d", + prefix="ceph-thread", + ) + continue + else: + if debug: + logger.out( + "Parsing data for pool {}".format(pool["name"]), + state="d", + prefix="ceph-thread", + ) + + # Assemble a useful data structure + pool_df = { + "id": pool["id"], + "stored_bytes": pool["stats"]["stored"], + "free_bytes": pool["stats"]["max_avail"], + "used_bytes": pool["stats"]["bytes_used"], + "used_percent": pool["stats"]["percent_used"], + "num_objects": pool["stats"]["objects"], + "num_object_clones": pool["num_object_clones"], + "num_object_copies": pool["num_object_copies"], + "num_objects_missing_on_primary": pool[ + "num_objects_missing_on_primary" + ], + "num_objects_unfound": pool["num_objects_unfound"], + "num_objects_degraded": pool["num_objects_degraded"], + "read_ops": pool["read_ops"], + "read_bytes": pool["read_bytes"], + "write_ops": pool["write_ops"], + "write_bytes": pool["write_bytes"], + } + + # Write the pool data to Zookeeper + zkhandler.write( + [(("pool.stats", pool["name"]), str(json.dumps(pool_df)))] + ) + except Exception as e: + # One or more of the status commands timed out, just continue + logger.out( + "Failed to format and send pool data: {}".format(e), state="w" + ) + pass + + # Only grab OSD stats if there are OSDs to grab (otherwise `ceph osd df` hangs) + osds_this_node = 0 + if len(osd_list) > 0: + # Get data from Ceph OSDs + if debug: + logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread") + + # Parse the dump data + osd_dump = dict() + + command = {"prefix": "osd dump", "format": "json"} + osd_dump_output = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[ + 1 + ].decode("ascii") + try: + osd_dump_raw = json.loads(osd_dump_output)["osds"] + except Exception as e: + logger.out("Failed to obtain OSD data: {}".format(e), state="w") + osd_dump_raw = [] + + if debug: + logger.out("Loop through OSD dump", state="d", prefix="ceph-thread") + for osd in osd_dump_raw: + osd_dump.update( + { + str(osd["osd"]): { + "uuid": osd["uuid"], + "up": osd["up"], + "in": osd["in"], + "primary_affinity": osd["primary_affinity"], + } + } + ) + + # Parse the df data + if debug: + logger.out("Parse the OSD df data", state="d", prefix="ceph-thread") + + osd_df = dict() + + command = {"prefix": "osd df", "format": "json"} + try: + osd_df_raw = json.loads( + ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1] + )["nodes"] + except Exception as e: + logger.out("Failed to obtain OSD data: {}".format(e), state="w") + osd_df_raw = [] + + if debug: + logger.out("Loop through OSD df", state="d", prefix="ceph-thread") + for osd in osd_df_raw: + osd_df.update( + { + str(osd["id"]): { + "utilization": osd["utilization"], + "var": osd["var"], + "pgs": osd["pgs"], + "kb": osd["kb"], + "kb_used": osd["kb_used"], + "kb_used_data": osd["kb_used_data"], + "kb_used_omap": osd["kb_used_omap"], + "kb_used_meta": osd["kb_used_meta"], + "kb_avail": osd["kb_avail"], + "weight": osd["crush_weight"], + "reweight": osd["reweight"], + "class": osd["device_class"], + } + } + ) + + # Parse the status data + if debug: + logger.out("Parse the OSD status data", state="d", prefix="ceph-thread") + + osd_status = dict() + + command = {"prefix": "osd status", "format": "pretty"} + try: + osd_status_raw = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[ + 1 + ].decode("ascii") + except Exception as e: + logger.out("Failed to obtain OSD status data: {}".format(e), state="w") + osd_status_raw = [] + + if debug: + logger.out("Loop through OSD status data", state="d", prefix="ceph-thread") + + for line in osd_status_raw.split("\n"): + # Strip off colour + line = re.sub(r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))", "", line) + # Split it for parsing + line = line.split() + + # Ceph 14 format: + # ['|', '0', '|', 'hv1.p.u.bonilan.net', '|', '318G', '|', '463G', '|', '213', '|', '1430k', '|', '22', '|', '124k', '|', 'exists,up', '|'] + # Ceph 16 format: + # ['0', 'hv1.t.u.bonilan.net', '2489M', '236G', '0', '0', '0', '0', 'exists,up'] + + # Bypass obviously invalid lines + if len(line) < 1: + continue + elif line[0] == "+": + continue + + try: + # If line begins with | and second entry is a digit (i.e. OSD ID) + if line[0] == "|" and line[1].isdigit(): + # Parse the line in Ceph 14 format + osd_id = line[1] + node = line[3].split(".")[0] + used = line[5] + avail = line[7] + wr_ops = line[9] + wr_data = line[11] + rd_ops = line[13] + rd_data = line[15] + state = line[17] + # If first entry is a digit (i.e. OSD ID) + elif line[0].isdigit(): + # Parse the line in Ceph 16 format + osd_id = line[0] + node = line[1].split(".")[0] + used = line[2] + avail = line[3] + wr_ops = line[4] + wr_data = line[5] + rd_ops = line[6] + rd_data = line[7] + state = line[8] + # Otherwise, it's the header line and is ignored + else: + continue + except IndexError: + continue + + # I don't know why 2018 me used this construct instead of a normal + # dictionary update, but it works so not changing it. + # ref: bfbe9188ce830381f3f2fa1da11f1973f08eca8c + osd_status.update( + { + str(osd_id): { + "node": node, + "used": used, + "avail": avail, + "wr_ops": wr_ops, + "wr_data": wr_data, + "rd_ops": rd_ops, + "rd_data": rd_data, + "state": state, + } + } + ) + + # Merge them together into a single meaningful dict + if debug: + logger.out("Merge OSD data together", state="d", prefix="ceph-thread") + + osd_stats = dict() + + for osd in osd_list: + if zkhandler.read(("osd.node", osd)) == config["node_hostname"]: + osds_this_node += 1 + try: + this_dump = osd_dump[osd] + this_dump.update(osd_df[osd]) + this_dump.update(osd_status[osd]) + osd_stats[osd] = this_dump + except KeyError as e: + # One or more of the status commands timed out, just continue + logger.out( + "Failed to parse OSD stats into dictionary: {}".format(e), state="w" + ) + + # Upload OSD data for the cluster (primary-only) + if this_node.coordinator_state == "primary": + if debug: + logger.out( + "Trigger updates for each OSD", state="d", prefix="ceph-thread" + ) + + for osd in osd_list: + try: + stats = json.dumps(osd_stats[osd]) + zkhandler.write([(("osd.stats", osd), str(stats))]) + except KeyError as e: + # One or more of the status commands timed out, just continue + logger.out( + "Failed to upload OSD stats from dictionary: {}".format(e), + state="w", + ) + + ceph_conn.shutdown() + + queue.put(osds_this_node) + + if debug: + logger.out("Thread finished", state="d", prefix="ceph-thread") + + +# VM stats update function +def collect_vm_stats(logger, config, zkhandler, this_node, queue): + debug = config["debug"] + if debug: + logger.out("Thread starting", state="d", prefix="vm-thread") + + # Connect to libvirt + libvirt_name = "qemu:///system" + if debug: + logger.out("Connecting to libvirt", state="d", prefix="vm-thread") + try: + lv_conn = libvirt.open(libvirt_name) + if lv_conn is None: + raise Exception + except Exception: + logger.out('Failed to open connection to "{}"'.format(libvirt_name), state="e") + return + + memalloc = 0 + memprov = 0 + vcpualloc = 0 + # Toggle state management of dead VMs to restart them + if debug: + logger.out( + "Toggle state management of dead VMs to restart them", + state="d", + prefix="vm-thread", + ) + # Make a copy of the d_domain; if not, and it changes in flight, this can fail + fixed_d_domain = this_node.d_domain.copy() + for domain, instance in fixed_d_domain.items(): + if domain in this_node.domain_list: + if instance.getstate() == "start" and instance.getnode() == this_node.name: + if instance.getdom() is not None: + try: + if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING: + logger.out( + "VM {} has failed".format(instance.domname), + state="w", + prefix="vm-thread", + ) + raise + except Exception: + # Toggle a state "change" + logger.out( + "Resetting state to {} for VM {}".format( + instance.getstate(), instance.domname + ), + state="i", + prefix="vm-thread", + ) + zkhandler.write( + [(("domain.state", domain), instance.getstate())] + ) + elif instance.getnode() == this_node.name: + memprov += instance.getmemory() + + # Get list of running domains from Libvirt + running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE) + + # Get statistics from any running VMs + for domain in running_domains: + try: + # Get basic information about the VM + tree = ElementTree.fromstring(domain.XMLDesc()) + domain_uuid = domain.UUIDString() + domain_name = domain.name() + + # Get all the raw information about the VM + if debug: + logger.out( + "Getting general statistics for VM {}".format(domain_name), + state="d", + prefix="vm-thread", + ) + ( + domain_state, + domain_maxmem, + domain_mem, + domain_vcpus, + domain_cputime, + ) = domain.info() + # We can't properly gather stats from a non-running VMs so continue + if domain_state != libvirt.VIR_DOMAIN_RUNNING: + continue + domain_memory_stats = domain.memoryStats() + domain_cpu_stats = domain.getCPUStats(True)[0] + + # Add the allocated memory to our memalloc value + memalloc += instance.getmemory() + memprov += instance.getmemory() + vcpualloc += instance.getvcpus() + except Exception as e: + if debug: + try: + logger.out( + "Failed getting VM information for {}: {}".format( + domain.name(), e + ), + state="d", + prefix="vm-thread", + ) + except Exception: + pass + continue + + # Ensure VM is present in the domain_list + if domain_uuid not in this_node.domain_list: + this_node.domain_list.append(domain_uuid) + + if debug: + logger.out( + "Getting disk statistics for VM {}".format(domain_name), + state="d", + prefix="vm-thread", + ) + domain_disk_stats = [] + try: + for disk in tree.findall("devices/disk"): + disk_name = disk.find("source").get("name") + if not disk_name: + disk_name = disk.find("source").get("file") + disk_stats = domain.blockStats(disk.find("target").get("dev")) + domain_disk_stats.append( + { + "name": disk_name, + "rd_req": disk_stats[0], + "rd_bytes": disk_stats[1], + "wr_req": disk_stats[2], + "wr_bytes": disk_stats[3], + "err": disk_stats[4], + } + ) + except Exception as e: + if debug: + try: + logger.out( + "Failed getting disk stats for {}: {}".format(domain.name(), e), + state="d", + prefix="vm-thread", + ) + except Exception: + pass + continue + + if debug: + logger.out( + "Getting network statistics for VM {}".format(domain_name), + state="d", + prefix="vm-thread", + ) + domain_network_stats = [] + try: + for interface in tree.findall("devices/interface"): + interface_type = interface.get("type") + if interface_type not in ["bridge"]: + continue + interface_name = interface.find("target").get("dev") + interface_bridge = interface.find("source").get("bridge") + interface_stats = domain.interfaceStats(interface_name) + domain_network_stats.append( + { + "name": interface_name, + "bridge": interface_bridge, + "rd_bytes": interface_stats[0], + "rd_packets": interface_stats[1], + "rd_errors": interface_stats[2], + "rd_drops": interface_stats[3], + "wr_bytes": interface_stats[4], + "wr_packets": interface_stats[5], + "wr_errors": interface_stats[6], + "wr_drops": interface_stats[7], + } + ) + except Exception as e: + if debug: + try: + logger.out( + "Failed getting network stats for {}: {}".format( + domain.name(), e + ), + state="d", + prefix="vm-thread", + ) + except Exception: + pass + continue + + # Create the final dictionary + domain_stats = { + "state": libvirt_vm_states[domain_state], + "maxmem": domain_maxmem, + "livemem": domain_mem, + "cpus": domain_vcpus, + "cputime": domain_cputime, + "mem_stats": domain_memory_stats, + "cpu_stats": domain_cpu_stats, + "disk_stats": domain_disk_stats, + "net_stats": domain_network_stats, + } + + if debug: + logger.out( + "Writing statistics for VM {} to Zookeeper".format(domain_name), + state="d", + prefix="vm-thread", + ) + + try: + zkhandler.write( + [(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))] + ) + except Exception as e: + if debug: + logger.out( + "Failed to write domain statistics: {}".format(e), + state="d", + prefix="vm-thread", + ) + + # Close the Libvirt connection + lv_conn.close() + + if debug: + logger.out( + f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}", + state="d", + prefix="vm-thread", + ) + + queue.put(len(running_domains)) + queue.put(memalloc) + queue.put(memprov) + queue.put(vcpualloc) + + if debug: + logger.out("Thread finished", state="d", prefix="vm-thread") + + +# Keepalive update function +def node_keepalive(logger, config, zkhandler, this_node): + debug = config["debug"] + + # Display node information to the terminal + if config["log_keepalives"]: + if this_node.coordinator_state == "primary": + cst_colour = logger.fmt_green + elif this_node.coordinator_state == "secondary": + cst_colour = logger.fmt_blue + else: + cst_colour = logger.fmt_cyan + + active_coordinator_state = this_node.coordinator_state + + runtime_start = datetime.now() + + # Set the migration selector in Zookeeper for clients to read + if config["enable_hypervisor"]: + if this_node.coordinator_state == "primary": + try: + if ( + zkhandler.read("base.config.migration_target_selector") + != config["migration_target_selector"] + ): + zkhandler.write( + [ + ( + "base.config.migration_target_selector", + config["migration_target_selector"], + ) + ] + ) + except Exception: + logger.out( + "Failed to set migration target selector in Zookeeper", + state="e", + prefix="main-thread", + ) + + # Set the upstream IP in Zookeeper for clients to read + if config["enable_networking"]: + if this_node.coordinator_state == "primary": + try: + if ( + zkhandler.read("base.config.upstream_ip") + != config["upstream_floating_ip"] + ): + zkhandler.write( + [("base.config.upstream_ip", config["upstream_floating_ip"])] + ) + except Exception: + logger.out( + "Failed to set upstream floating IP in Zookeeper", + state="e", + prefix="main-thread", + ) + + # Get past state and update if needed + if debug: + logger.out( + "Get past state and update if needed", state="d", prefix="main-thread" + ) + + past_state = zkhandler.read(("node.state.daemon", this_node.name)) + if past_state != "run" and past_state != "shutdown": + this_node.daemon_state = "run" + zkhandler.write([(("node.state.daemon", this_node.name), "run")]) + else: + this_node.daemon_state = "run" + + # Ensure the primary key is properly set + if debug: + logger.out( + "Ensure the primary key is properly set", state="d", prefix="main-thread" + ) + if this_node.coordinator_state == "primary": + if zkhandler.read("base.config.primary_node") != this_node.name: + zkhandler.write([("base.config.primary_node", this_node.name)]) + + # Run VM statistics collection in separate thread for parallelization + if config["enable_hypervisor"]: + vm_thread_queue = Queue() + vm_stats_thread = Thread( + target=collect_vm_stats, + args=(logger, config, zkhandler, this_node, vm_thread_queue), + kwargs={}, + ) + vm_stats_thread.start() + + # Run Ceph status collection in separate thread for parallelization + if config["enable_storage"]: + ceph_thread_queue = Queue() + ceph_stats_thread = Thread( + target=collect_ceph_stats, + args=(logger, config, zkhandler, this_node, ceph_thread_queue), + kwargs={}, + ) + ceph_stats_thread.start() + + # Get node performance statistics + this_node.memtotal = int(psutil.virtual_memory().total / 1024 / 1024) + this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024) + this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024) + this_node.cpuload = round(os.getloadavg()[0], 2) + + # Join against running threads + if config["enable_hypervisor"]: + vm_stats_thread.join(timeout=config["keepalive_interval"]) + if vm_stats_thread.is_alive(): + logger.out("VM stats gathering exceeded timeout, continuing", state="w") + if config["enable_storage"]: + ceph_stats_thread.join(timeout=config["keepalive_interval"]) + if ceph_stats_thread.is_alive(): + logger.out("Ceph stats gathering exceeded timeout, continuing", state="w") + + # Get information from thread queues + if config["enable_hypervisor"]: + try: + this_node.domains_count = vm_thread_queue.get( + timeout=config["keepalive_interval"] + ) + this_node.memalloc = vm_thread_queue.get( + timeout=config["keepalive_interval"] + ) + this_node.memprov = vm_thread_queue.get( + timeout=config["keepalive_interval"] + ) + this_node.vcpualloc = vm_thread_queue.get( + timeout=config["keepalive_interval"] + ) + except Exception: + logger.out("VM stats queue get exceeded timeout, continuing", state="w") + else: + this_node.domains_count = 0 + this_node.memalloc = 0 + this_node.memprov = 0 + this_node.vcpualloc = 0 + + if config["enable_storage"]: + try: + osds_this_node = ceph_thread_queue.get( + timeout=(config["keepalive_interval"] - 1) + ) + except Exception: + logger.out("Ceph stats queue get exceeded timeout, continuing", state="w") + osds_this_node = "?" + else: + osds_this_node = "0" + + # Set our information in zookeeper + keepalive_time = int(time.time()) + if debug: + logger.out("Set our information in zookeeper", state="d", prefix="main-thread") + try: + zkhandler.write( + [ + (("node.memory.total", this_node.name), str(this_node.memtotal)), + (("node.memory.used", this_node.name), str(this_node.memused)), + (("node.memory.free", this_node.name), str(this_node.memfree)), + (("node.memory.allocated", this_node.name), str(this_node.memalloc)), + (("node.memory.provisioned", this_node.name), str(this_node.memprov)), + (("node.vcpu.allocated", this_node.name), str(this_node.vcpualloc)), + (("node.cpu.load", this_node.name), str(this_node.cpuload)), + ( + ("node.count.provisioned_domains", this_node.name), + str(this_node.domains_count), + ), + ( + ("node.running_domains", this_node.name), + " ".join(this_node.domain_list), + ), + (("node.keepalive", this_node.name), str(keepalive_time)), + ] + ) + except Exception: + logger.out("Failed to set keepalive data", state="e") + + if config["log_keepalives"]: + runtime_end = datetime.now() + runtime_delta = runtime_end - runtime_start + runtime = "{:0.02f}".format(runtime_delta.total_seconds()) + + logger.out( + "{start_colour}{hostname} keepalive @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] in {runtime} seconds".format( + start_colour=logger.fmt_purple, + cst_colour=logger.fmt_bold + cst_colour, + nofmt=logger.fmt_end, + hostname=config["node_hostname"], + starttime=runtime_start, + costate=active_coordinator_state, + runtime=runtime, + ), + state="t", + ) + + if this_node.maintenance is True: + maintenance_colour = logger.fmt_blue + else: + maintenance_colour = logger.fmt_green + + if isinstance(this_node.health, int): + if this_node.health > 90: + health_colour = logger.fmt_green + elif this_node.health > 50: + health_colour = logger.fmt_yellow + else: + health_colour = logger.fmt_red + health_text = str(this_node.health) + "%" + + else: + health_colour = logger.fmt_blue + health_text = "N/A" + + if config["log_keepalive_cluster_details"]: + logger.out( + "{bold}Maintenance:{nofmt} {maintenance_colour}{maintenance}{nofmt} " + "{bold}Health:{nofmt} {health_colour}{health}{nofmt} " + "{bold}VMs:{nofmt} {domcount} " + "{bold}OSDs:{nofmt} {osdcount} " + "{bold}Load:{nofmt} {load} " + "{bold}Memory [MiB]: " + "{bold}Used:{nofmt} {usedmem} " + "{bold}Free:{nofmt} {freemem}".format( + bold=logger.fmt_bold, + maintenance_colour=maintenance_colour, + health_colour=health_colour, + nofmt=logger.fmt_end, + maintenance=this_node.maintenance, + health=health_text, + domcount=this_node.domains_count, + osdcount=osds_this_node, + load=this_node.cpuload, + freemem=this_node.memfree, + usedmem=this_node.memused, + ), + state="t", + ) + + # Look for dead nodes and fence them + if not this_node.maintenance: + if debug: + logger.out( + "Look for dead nodes and fence them", state="d", prefix="main-thread" + ) + if config["daemon_mode"] == "coordinator": + for node_name in zkhandler.children("base.node"): + try: + node_daemon_state = zkhandler.read(("node.state.daemon", node_name)) + node_keepalive = int(zkhandler.read(("node.keepalive", node_name))) + except Exception: + node_daemon_state = "unknown" + node_keepalive = 0 + + # Handle deadtime and fencng if needed + # (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds + # out-of-date while in 'start' state) + node_deadtime = int(time.time()) - ( + int(config["keepalive_interval"]) * int(config["fence_intervals"]) + ) + if node_keepalive < node_deadtime and node_daemon_state == "run": + logger.out( + "Node {} seems dead - starting monitor for fencing".format( + node_name + ), + state="w", + ) + zk_lock = zkhandler.writelock(("node.state.daemon", node_name)) + with zk_lock: + # Ensures that, if we lost the lock race and come out of waiting, + # we won't try to trigger our own fence thread. + if zkhandler.read(("node.state.daemon", node_name)) != "dead": + fence_thread = Thread( + target=pvcnoded.util.fencing.fence_node, + args=(node_name, zkhandler, config, logger), + kwargs={}, + ) + fence_thread.start() + # Write the updated data after we start the fence thread + zkhandler.write( + [(("node.state.daemon", node_name), "dead")] + ) diff --git a/health-daemon/pvchealthd/util/libvirt.py b/health-daemon/pvchealthd/util/libvirt.py new file mode 100644 index 00000000..b769b9bb --- /dev/null +++ b/health-daemon/pvchealthd/util/libvirt.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +# libvirt.py - Utility functions for pvcnoded libvirt +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import libvirt + + +def validate_libvirtd(logger, config): + if config["enable_hypervisor"]: + libvirt_check_name = f'qemu+tcp://{config["node_hostname"]}/system' + logger.out(f"Connecting to Libvirt daemon at {libvirt_check_name}", state="i") + try: + lv_conn = libvirt.open(libvirt_check_name) + lv_conn.close() + except Exception as e: + logger.out(f"Failed to connect to Libvirt daemon: {e}", state="e") + return False + + return True diff --git a/health-daemon/pvchealthd/util/networking.py b/health-daemon/pvchealthd/util/networking.py new file mode 100644 index 00000000..0b6cfb79 --- /dev/null +++ b/health-daemon/pvchealthd/util/networking.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python3 + +# networking.py - Utility functions for pvcnoded networking +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import daemon_lib.common as common + +from time import sleep +from os import makedirs + + +def setup_sriov(logger, config): + logger.out("Setting up SR-IOV device support", state="i") + + # Enable unsafe interrupts for the vfio_iommu_type1 kernel module + try: + common.run_os_command("modprobe vfio_iommu_type1 allow_unsafe_interrupts=1") + with open( + "/sys/module/vfio_iommu_type1/parameters/allow_unsafe_interrupts", "w" + ) as mfh: + mfh.write("Y") + except Exception: + logger.out( + "Failed to enable vfio_iommu_type1 kernel module; SR-IOV may fail", + state="w", + ) + + # Loop through our SR-IOV NICs and enable the numvfs for each + for device in config["sriov_device"]: + logger.out( + f'Preparing SR-IOV PF {device["phy"]} with {device["vfcount"]} VFs', + state="i", + ) + try: + with open( + f'/sys/class/net/{device["phy"]}/device/sriov_numvfs', "r" + ) as vfh: + current_vf_count = vfh.read().strip() + with open( + f'/sys/class/net/{device["phy"]}/device/sriov_numvfs', "w" + ) as vfh: + vfh.write(str(device["vfcount"])) + except FileNotFoundError: + logger.out( + f'Failed to open SR-IOV configuration for PF {device["phy"]}; device may not support SR-IOV', + state="w", + ) + except OSError: + logger.out( + f'Failed to set SR-IOV VF count for PF {device["phy"]} to {device["vfcount"]}; already set to {current_vf_count}', + state="w", + ) + + if device.get("mtu", None) is not None: + logger.out( + f'Setting SR-IOV PF {device["phy"]} to MTU {device["mtu"]}', state="i" + ) + common.run_os_command(f'ip link set {device["phy"]} mtu {device["mtu"]} up') + + +def setup_interfaces(logger, config): + # Set up the Cluster interface + cluster_dev = config["cluster_dev"] + cluster_mtu = config["cluster_mtu"] + cluster_dev_ip = config["cluster_dev_ip"] + + logger.out( + f"Setting up Cluster network interface {cluster_dev} with MTU {cluster_mtu}", + state="i", + ) + + common.run_os_command(f"ip link set {cluster_dev} mtu {cluster_mtu} up") + + logger.out( + f"Setting up Cluster network bridge on interface {cluster_dev} with IP {cluster_dev_ip}", + state="i", + ) + + common.run_os_command("brctl addbr brcluster") + common.run_os_command(f"brctl addif brcluster {cluster_dev}") + common.run_os_command(f"ip link set brcluster mtu {cluster_mtu} up") + common.run_os_command(f"ip address add {cluster_dev_ip} dev brcluster") + + # Set up the Storage interface + storage_dev = config["storage_dev"] + storage_mtu = config["storage_mtu"] + storage_dev_ip = config["storage_dev_ip"] + + logger.out( + f"Setting up Storage network interface {storage_dev} with MTU {storage_mtu}", + state="i", + ) + + common.run_os_command(f"ip link set {storage_dev} mtu {storage_mtu} up") + + if storage_dev == cluster_dev: + if storage_dev_ip != cluster_dev_ip: + logger.out( + f"Setting up Storage network on Cluster network bridge with IP {storage_dev_ip}", + state="i", + ) + + common.run_os_command(f"ip address add {storage_dev_ip} dev brcluster") + else: + logger.out( + f"Setting up Storage network bridge on interface {storage_dev} with IP {storage_dev_ip}", + state="i", + ) + + common.run_os_command("brctl addbr brstorage") + common.run_os_command(f"brctl addif brstorage {storage_dev}") + common.run_os_command(f"ip link set brstorage mtu {storage_mtu} up") + common.run_os_command(f"ip address add {storage_dev_ip} dev brstorage") + + # Set up the Upstream interface + upstream_dev = config["upstream_dev"] + upstream_mtu = config["upstream_mtu"] + upstream_dev_ip = config["upstream_dev_ip"] + + logger.out( + f"Setting up Upstream network interface {upstream_dev} with MTU {upstream_mtu}", + state="i", + ) + + if upstream_dev == cluster_dev: + if upstream_dev_ip != cluster_dev_ip: + logger.out( + f"Setting up Upstream network on Cluster network bridge with IP {upstream_dev_ip}", + state="i", + ) + + common.run_os_command(f"ip address add {upstream_dev_ip} dev brcluster") + else: + logger.out( + f"Setting up Upstream network bridge on interface {upstream_dev} with IP {upstream_dev_ip}", + state="i", + ) + + common.run_os_command("brctl addbr brupstream") + common.run_os_command(f"brctl addif brupstream {upstream_dev}") + common.run_os_command(f"ip link set brupstream mtu {upstream_mtu} up") + common.run_os_command(f"ip address add {upstream_dev_ip} dev brupstream") + + upstream_gateway = config["upstream_gateway"] + if upstream_gateway is not None: + logger.out( + f"Setting up Upstream network default gateway IP {upstream_gateway}", + state="i", + ) + if upstream_dev == cluster_dev: + common.run_os_command( + f"ip route add default via {upstream_gateway} dev brcluster" + ) + else: + common.run_os_command( + f"ip route add default via {upstream_gateway} dev brupstream" + ) + + # Set up sysctl tweaks to optimize networking + # Enable routing functions + common.run_os_command("sysctl net.ipv4.ip_forward=1") + common.run_os_command("sysctl net.ipv6.ip_forward=1") + # Enable send redirects + common.run_os_command("sysctl net.ipv4.conf.all.send_redirects=1") + common.run_os_command("sysctl net.ipv4.conf.default.send_redirects=1") + common.run_os_command("sysctl net.ipv6.conf.all.send_redirects=1") + common.run_os_command("sysctl net.ipv6.conf.default.send_redirects=1") + # Accept source routes + common.run_os_command("sysctl net.ipv4.conf.all.accept_source_route=1") + common.run_os_command("sysctl net.ipv4.conf.default.accept_source_route=1") + common.run_os_command("sysctl net.ipv6.conf.all.accept_source_route=1") + common.run_os_command("sysctl net.ipv6.conf.default.accept_source_route=1") + # Disable RP filtering on Cluster and Upstream interfaces (to allow traffic pivoting) + common.run_os_command(f"sysctl net.ipv4.conf.{cluster_dev}.rp_filter=0") + common.run_os_command("sysctl net.ipv4.conf.brcluster.rp_filter=0") + common.run_os_command(f"sysctl net.ipv4.conf.{upstream_dev}.rp_filter=0") + common.run_os_command("sysctl net.ipv4.conf.brupstream.rp_filter=0") + common.run_os_command(f"sysctl net.ipv6.conf.{cluster_dev}.rp_filter=0") + common.run_os_command("sysctl net.ipv6.conf.brcluster.rp_filter=0") + common.run_os_command(f"sysctl net.ipv6.conf.{upstream_dev}.rp_filter=0") + common.run_os_command("sysctl net.ipv6.conf.brupstream.rp_filter=0") + + # Stop DNSMasq if it is running + common.run_os_command("systemctl stop dnsmasq.service") + + logger.out("Waiting 3 seconds for networking to come up", state="s") + sleep(3) + + +def create_nft_configuration(logger, config): + if config["enable_networking"]: + logger.out("Creating NFT firewall configuration", state="i") + + dynamic_directory = config["nft_dynamic_directory"] + + # Create directories + makedirs(f"{dynamic_directory}/networks", exist_ok=True) + makedirs(f"{dynamic_directory}/static", exist_ok=True) + + # Set up the base rules + nftables_base_rules = f"""# Base rules + flush ruleset + # Add the filter table and chains + add table inet filter + add chain inet filter forward {{ type filter hook forward priority 0; }} + add chain inet filter input {{ type filter hook input priority 0; }} + # Include static rules and network rules + include "{dynamic_directory}/static/*" + include "{dynamic_directory}/networks/*" + """ + + # Write the base firewall config + nftables_base_filename = f"{dynamic_directory}/base.nft" + with open(nftables_base_filename, "w") as nftfh: + nftfh.write(nftables_base_rules) + common.reload_firewall_rules(nftables_base_filename, logger) diff --git a/health-daemon/pvchealthd/util/services.py b/health-daemon/pvchealthd/util/services.py new file mode 100644 index 00000000..f2f12888 --- /dev/null +++ b/health-daemon/pvchealthd/util/services.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 + +# services.py - Utility functions for pvcnoded external services +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import daemon_lib.common as common +from time import sleep + + +def start_zookeeper(logger, config): + if config["daemon_mode"] == "coordinator": + logger.out("Starting Zookeeper daemon", state="i") + # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? + common.run_os_command("systemctl start zookeeper.service") + + +def start_libvirtd(logger, config): + if config["enable_hypervisor"]: + logger.out("Starting Libvirt daemon", state="i") + # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? + common.run_os_command("systemctl start libvirtd.service") + + +def start_patroni(logger, config): + if config["enable_networking"] and config["daemon_mode"] == "coordinator": + logger.out("Starting Patroni daemon", state="i") + # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? + common.run_os_command("systemctl start patroni.service") + + +def start_frrouting(logger, config): + if config["enable_networking"] and config["daemon_mode"] == "coordinator": + logger.out("Starting FRRouting daemon", state="i") + # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? + common.run_os_command("systemctl start frr.service") + + +def start_ceph_mon(logger, config): + if config["enable_storage"] and config["daemon_mode"] == "coordinator": + logger.out("Starting Ceph Monitor daemon", state="i") + # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? + common.run_os_command( + f'systemctl start ceph-mon@{config["node_hostname"]}.service' + ) + + +def start_ceph_mgr(logger, config): + if config["enable_storage"] and config["daemon_mode"] == "coordinator": + logger.out("Starting Ceph Manager daemon", state="i") + # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? + common.run_os_command( + f'systemctl start ceph-mgr@{config["node_hostname"]}.service' + ) + + +def start_keydb(logger, config): + if (config["enable_api"] or config["enable_worker"]) and config[ + "daemon_mode" + ] == "coordinator": + logger.out("Starting KeyDB daemon", state="i") + # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? + common.run_os_command("systemctl start keydb-server.service") + + +def start_worker(logger, config): + if config["enable_worker"]: + logger.out("Starting Celery Worker daemon", state="i") + # TODO: Move our handling out of Systemd and integrate it directly as a subprocess? + common.run_os_command("systemctl start pvcworkerd.service") + + +def start_system_services(logger, config): + start_zookeeper(logger, config) + start_libvirtd(logger, config) + start_patroni(logger, config) + start_frrouting(logger, config) + start_ceph_mon(logger, config) + start_ceph_mgr(logger, config) + start_keydb(logger, config) + start_worker(logger, config) + + logger.out("Waiting 10 seconds for daemons to start", state="s") + sleep(10) diff --git a/health-daemon/pvchealthd/util/zookeeper.py b/health-daemon/pvchealthd/util/zookeeper.py new file mode 100644 index 00000000..c4b7d447 --- /dev/null +++ b/health-daemon/pvchealthd/util/zookeeper.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 + +# - +# zookeeper.py - Utility functions for pvcnoded Zookeeper connections +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2022 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################## + +from daemon_lib.zkhandler import ZKHandler + +import os +import time + + +def connect(logger, config): + # Create an instance of the handler + zkhandler = ZKHandler(config, logger) + + try: + logger.out( + "Connecting to Zookeeper on coordinator nodes {}".format( + config["coordinators"] + ), + state="i", + ) + # Start connection + zkhandler.connect(persistent=True) + except Exception as e: + logger.out( + "ERROR: Failed to connect to Zookeeper cluster: {}".format(e), state="e" + ) + os._exit(1) + + logger.out("Validating Zookeeper schema", state="i") + + try: + node_schema_version = int( + zkhandler.read(("node.data.active_schema", config["node_hostname"])) + ) + except Exception: + node_schema_version = int(zkhandler.read("base.schema.version")) + zkhandler.write( + [ + ( + ("node.data.active_schema", config["node_hostname"]), + node_schema_version, + ) + ] + ) + + # Load in the current node schema version + zkhandler.schema.load(node_schema_version) + + # Record the latest intalled schema version + latest_schema_version = zkhandler.schema.find_latest() + logger.out("Latest installed schema is {}".format(latest_schema_version), state="i") + zkhandler.write( + [(("node.data.latest_schema", config["node_hostname"]), latest_schema_version)] + ) + + # If we are the last node to get a schema update, fire the master update + if latest_schema_version > node_schema_version: + node_latest_schema_version = list() + for node in zkhandler.children("base.node"): + node_latest_schema_version.append( + int(zkhandler.read(("node.data.latest_schema", node))) + ) + + # This is true if all elements of the latest schema version are identical to the latest version, + # i.e. they have all had the latest schema installed and ready to load. + if node_latest_schema_version.count(latest_schema_version) == len( + node_latest_schema_version + ): + zkhandler.write([("base.schema.version", latest_schema_version)]) + + return zkhandler, node_schema_version + + +def validate_schema(logger, zkhandler): + # Validate our schema against the active version + if not zkhandler.schema.validate(zkhandler, logger): + logger.out("Found schema violations, applying", state="i") + zkhandler.schema.apply(zkhandler) + else: + logger.out("Schema successfully validated", state="o") + + +def setup_node(logger, config, zkhandler): + # Check if our node exists in Zookeeper, and create it if not + if config["daemon_mode"] == "coordinator": + init_routerstate = "secondary" + else: + init_routerstate = "client" + + if zkhandler.exists(("node", config["node_hostname"])): + logger.out( + f"Node is {logger.fmt_green}present{logger.fmt_end} in Zookeeper", state="i" + ) + # Update static data just in case it's changed + zkhandler.write( + [ + (("node", config["node_hostname"]), config["daemon_mode"]), + (("node.mode", config["node_hostname"]), config["daemon_mode"]), + (("node.state.daemon", config["node_hostname"]), "init"), + (("node.state.router", config["node_hostname"]), init_routerstate), + ( + ("node.data.static", config["node_hostname"]), + " ".join(config["static_data"]), + ), + ( + ("node.data.pvc_version", config["node_hostname"]), + config["daemon_version"], + ), + ( + ("node.ipmi.hostname", config["node_hostname"]), + config["ipmi_hostname"], + ), + ( + ("node.ipmi.username", config["node_hostname"]), + config["ipmi_username"], + ), + ( + ("node.ipmi.password", config["node_hostname"]), + config["ipmi_password"], + ), + ] + ) + else: + logger.out( + f"Node is {logger.fmt_red}absent{logger.fmt_end} in Zookeeper; adding new node", + state="i", + ) + keepalive_time = int(time.time()) + zkhandler.write( + [ + (("node", config["node_hostname"]), config["daemon_mode"]), + (("node.keepalive", config["node_hostname"]), str(keepalive_time)), + (("node.mode", config["node_hostname"]), config["daemon_mode"]), + (("node.state.daemon", config["node_hostname"]), "init"), + (("node.state.domain", config["node_hostname"]), "flushed"), + (("node.state.router", config["node_hostname"]), init_routerstate), + ( + ("node.data.static", config["node_hostname"]), + " ".join(config["static_data"]), + ), + ( + ("node.data.pvc_version", config["node_hostname"]), + config["daemon_version"], + ), + ( + ("node.ipmi.hostname", config["node_hostname"]), + config["ipmi_hostname"], + ), + ( + ("node.ipmi.username", config["node_hostname"]), + config["ipmi_username"], + ), + ( + ("node.ipmi.password", config["node_hostname"]), + config["ipmi_password"], + ), + (("node.memory.total", config["node_hostname"]), "0"), + (("node.memory.used", config["node_hostname"]), "0"), + (("node.memory.free", config["node_hostname"]), "0"), + (("node.memory.allocated", config["node_hostname"]), "0"), + (("node.memory.provisioned", config["node_hostname"]), "0"), + (("node.vcpu.allocated", config["node_hostname"]), "0"), + (("node.cpu.load", config["node_hostname"]), "0.0"), + (("node.running_domains", config["node_hostname"]), "0"), + (("node.count.provisioned_domains", config["node_hostname"]), "0"), + (("node.count.networks", config["node_hostname"]), "0"), + ] + ) diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index 9fb4f338..49356afe 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -27,7 +27,6 @@ import pvcnoded.util.services import pvcnoded.util.libvirt import pvcnoded.util.zookeeper -import pvcnoded.objects.MonitoringInstance as MonitoringInstance import pvcnoded.objects.DNSAggregatorInstance as DNSAggregatorInstance import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance import pvcnoded.objects.VMInstance as VMInstance @@ -59,7 +58,6 @@ version = "0.9.82" def entrypoint(): keepalive_timer = None - monitoring_instance = None # Get our configuration config = pvcnoded.util.config.get_configuration() @@ -202,7 +200,7 @@ def entrypoint(): # Define a cleanup function def cleanup(failure=False): - nonlocal logger, zkhandler, keepalive_timer, d_domain, monitoring_instance + nonlocal logger, zkhandler, keepalive_timer, d_domain logger.out("Terminating pvcnoded and cleaning up", state="s") @@ -251,13 +249,6 @@ def entrypoint(): except Exception: pass - # Shut down the monitoring system - try: - logger.out("Shutting down monitoring subsystem", state="s") - monitoring_instance.shutdown() - except Exception: - pass - # Set stop state in Zookeeper zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")]) @@ -1009,11 +1000,6 @@ def entrypoint(): state="i", ) - # Set up the node monitoring instance and thread - monitoring_instance = MonitoringInstance.MonitoringInstance( - zkhandler, config, logger, this_node - ) - # Start keepalived thread keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer( logger, config, zkhandler, this_node