Split health monitoring into discrete daemon/pkg

This commit is contained in:
Joshua Boniface 2023-11-29 15:36:49 -05:00
parent 74a416165d
commit 41f4e4fb2f
40 changed files with 3032 additions and 33 deletions

View File

@ -65,6 +65,7 @@ for HOST in ${HOSTS[@]}; do
echo -n "Restarting PVC daemons..."
ssh $HOST $SUDO systemctl restart pvcapid &>/dev/null
ssh $HOST $SUDO systemctl restart pvcworkerd &>/dev/null
ssh $HOST $SUDO systemctl restart pvchealthd &>/dev/null
ssh $HOST $SUDO systemctl restart pvcnoded &>/dev/null
echo " done."
echo -n "Waiting for node daemon to be running..."

View File

@ -13,9 +13,10 @@ echo ${new_ver} >&3
tmpdir=$( mktemp -d )
cp -a debian/changelog client-cli/setup.py ${tmpdir}/
cp -a node-daemon/pvcnoded/Daemon.py ${tmpdir}/node-Daemon.py
cp -a health-daemon/pvchealthd/Daemon.py ${tmpdir}/health-Daemon.py
cp -a api-daemon/pvcapid/Daemon.py ${tmpdir}/api-Daemon.py
# Replace the "base" version with the git revision version
sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py
sed -i "s/version = \"${base_ver}\"/version = \"${new_ver}\"/" node-daemon/pvcnoded/Daemon.py health-daemon/pvchealthd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py
sed -i "s/${base_ver}-0/${new_ver}/" debian/changelog
cat <<EOF > debian/changelog
pvc (${new_ver}) unstable; urgency=medium
@ -33,6 +34,7 @@ dpkg-buildpackage -us -uc
cp -a ${tmpdir}/changelog debian/changelog
cp -a ${tmpdir}/setup.py client-cli/setup.py
cp -a ${tmpdir}/node-Daemon.py node-daemon/pvcnoded/Daemon.py
cp -a ${tmpdir}/health-Daemon.py health-daemon/pvchealthd/Daemon.py
cp -a ${tmpdir}/api-Daemon.py api-daemon/pvcapid/Daemon.py
# Clean up

View File

@ -20,6 +20,7 @@ changelog="$( cat ${changelog_file} | grep -v '^#' | sed 's/^*/ */' )"
rm ${changelog_file}
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvcnoded/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," node-daemon/pvchealthd/Daemon.py
sed -i "s,version = \"${current_version}\",version = \"${new_version}\"," api-daemon/pvcapid/Daemon.py
sed -i "s,version=\"${current_version}\",version=\"${new_version}\"," client-cli/setup.py
echo ${new_version} > .version
@ -46,7 +47,7 @@ echo -e "${deb_changelog_new}" >> ${deb_changelog_file}
echo -e "${deb_changelog_orig}" >> ${deb_changelog_file}
mv ${deb_changelog_file} debian/changelog
git add node-daemon/pvcnoded/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version
git add node-daemon/pvcnoded/Daemon.py headlth-daemon/pvchealthd/Daemon.py api-daemon/pvcapid/Daemon.py client-cli/setup.py debian/changelog CHANGELOG.md .version
git commit -v
popd &>/dev/null

10
debian/control vendored
View File

@ -8,12 +8,20 @@ X-Python3-Version: >= 3.2
Package: pvc-daemon-node
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql
Depends: systemd, pvc-daemon-common, pvc-daemon-health, python3-kazoo, python3-psutil, python3-apscheduler, python3-libvirt, python3-psycopg2, python3-dnspython, python3-yaml, python3-distutils, python3-rados, python3-gevent, ipmitool, libvirt-daemon-system, arping, vlan, bridge-utils, dnsmasq, nftables, pdns-server, pdns-backend-pgsql
Description: Parallel Virtual Cluster node daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
This package installs the PVC node daemon
Package: pvc-daemon-health
Architecture: all
Depends: systemd, pvc-daemon-common, python3-kazoo, python3-psutil, python3-apscheduler, python3-yaml
Description: Parallel Virtual Cluster health daemon
A KVM/Zookeeper/Ceph-based VM and private cloud manager
.
This package installs the PVC health monitoring daemon
Package: pvc-daemon-api
Architecture: all
Depends: systemd, pvc-daemon-common, python3-yaml, python3-flask, python3-flask-restful, python3-celery, python-celery-common, python3-distutils, python3-redis, python3-lxml, python3-flask-migrate, fio

5
debian/pvc-daemon-api.preinst vendored Normal file
View File

@ -0,0 +1,5 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvcapid -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-health.install vendored Normal file
View File

@ -0,0 +1,4 @@
health-daemon/pvchealthd.py usr/share/pvc
health-daemon/pvchealthd usr/share/pvc
health-daemon/pvchealthd.service lib/systemd/system
health-daemon/plugins usr/share/pvc

14
debian/pvc-daemon-health.postinst vendored Normal file
View File

@ -0,0 +1,14 @@
#!/bin/sh
# Reload systemd's view of the units
systemctl daemon-reload
# Enable the service and target
systemctl enable /lib/systemd/system/pvchealthd.service
# Inform administrator of the service restart/startup not occurring automatically
if systemctl is-active --quiet pvchealthd.service; then
echo "NOTE: The PVC health daemon (pvchealthd.service) has not been restarted; this is up to the administrator."
else
echo "NOTE: The PVC health daemon (pvchealthd.service) has not been started; create a config file at /etc/pvc/pvc.conf then start it."
fi

5
debian/pvc-daemon-health.preinst vendored Normal file
View File

@ -0,0 +1,5 @@
#!/bin/sh
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc/pvchealthd -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

4
debian/pvc-daemon-health.prerm vendored Normal file
View File

@ -0,0 +1,4 @@
#!/bin/sh
# Disable the services
systemctl disable pvchealthd.service

View File

@ -4,4 +4,3 @@ node-daemon/pvcnoded.service lib/systemd/system
node-daemon/pvc.target lib/systemd/system
node-daemon/pvcautoready.service lib/systemd/system
node-daemon/monitoring usr/share/pvc
node-daemon/plugins usr/share/pvc

View File

@ -2,4 +2,4 @@
# Remove any cached CPython directories or files
echo "Cleaning up existing CPython files"
find /usr/share/pvc -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true
find /usr/share/pvc/pvcnoded -type d -name "__pycache__" -exec rm -rf {} \; &>/dev/null || true

1
health-daemon/daemon_lib Symbolic link
View File

@ -0,0 +1 @@
../daemon-common

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -40,7 +40,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -39,7 +39,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

View File

@ -38,7 +38,7 @@
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
from pvchealthd.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to

24
health-daemon/pvchealthd.py Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# pvchealthd.py - Health daemon startup stub
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvchealthd.Daemon # noqa: F401
pvchealthd.Daemon.entrypoint()

View File

@ -0,0 +1,20 @@
# Parallel Virtual Cluster health daemon unit file
[Unit]
Description = Parallel Virtual Cluster health daemon
After = network.target
Wants = network-online.target
PartOf = pvc.target
[Service]
Type = simple
WorkingDirectory = /usr/share/pvc
Environment = PYTHONUNBUFFERED=true
Environment = PVC_CONFIG_FILE=/etc/pvc/pvc.conf
ExecStartPre = /bin/sleep 2
ExecStart = /usr/share/pvc/pvchealthd.py
ExecStopPost = /bin/sleep 2
Restart = on-failure
[Install]
WantedBy = pvc.target

View File

@ -0,0 +1,145 @@
#!/usr/bin/env python3
# Daemon.py - Health daemon main entrypoing
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvchealthd.util.config
import pvchealthd.util.zookeeper
import pvchealthd.objects.MonitoringInstance as MonitoringInstance
import pvchealthd.objects.NodeInstance as NodeInstance
import daemon_lib.log as log
from time import sleep
import os
import signal
# Daemon version
version = "0.9.82"
##########################################################
# Entrypoint
##########################################################
def entrypoint():
monitoring_instance = None
# Get our configuration
config = pvchealthd.util.config.get_configuration()
config["daemon_name"] = "pvchealthd"
config["daemon_version"] = version
# Set up the logger instance
logger = log.Logger(config)
# Print our startup message
logger.out("")
logger.out("|------------------------------------------------------------|")
logger.out("| |")
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
logger.out("| ██ ▜█▙ ▟█▛ ██ |")
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
logger.out("| ██ ▜█▙▟█▛ ███████████ |")
logger.out("| |")
logger.out("|------------------------------------------------------------|")
logger.out("| Parallel Virtual Cluster health daemon v{0: <18} |".format(version))
logger.out("| Debug: {0: <51} |".format(str(config["debug"])))
logger.out("| FQDN: {0: <52} |".format(config["node_fqdn"]))
logger.out("| Host: {0: <52} |".format(config["node_hostname"]))
logger.out("| ID: {0: <54} |".format(config["node_id"]))
logger.out("| IPMI hostname: {0: <43} |".format(config["ipmi_hostname"]))
logger.out("| Machine details: |")
logger.out("| CPUs: {0: <50} |".format(config["static_data"][0]))
logger.out("| Arch: {0: <50} |".format(config["static_data"][3]))
logger.out("| OS: {0: <52} |".format(config["static_data"][2]))
logger.out("| Kernel: {0: <48} |".format(config["static_data"][1]))
logger.out("|------------------------------------------------------------|")
logger.out("")
logger.out(f'Starting pvchealthd on host {config["node_fqdn"]}', state="s")
# Connect to Zookeeper and return our handler and current schema version
zkhandler, _ = pvchealthd.util.zookeeper.connect(logger, config)
# Define a cleanup function
def cleanup(failure=False):
nonlocal logger, zkhandler, monitoring_instance
logger.out("Terminating pvchealthd and cleaning up", state="s")
# Shut down the monitoring system
try:
logger.out("Shutting down monitoring subsystem", state="s")
monitoring_instance.shutdown()
except Exception:
pass
# Close the Zookeeper connection
try:
zkhandler.disconnect(persistent=True)
del zkhandler
except Exception:
pass
logger.out("Terminated health daemon", state="s")
logger.terminate()
if failure:
retcode = 1
else:
retcode = 0
os._exit(retcode)
# Termination function
def term(signum="", frame=""):
cleanup(failure=False)
# Hangup (logrotate) function
def hup(signum="", frame=""):
if config["file_logging"]:
logger.hup()
# Handle signals gracefully
signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)
signal.signal(signal.SIGHUP, hup)
this_node = NodeInstance.NodeInstance(
config["node_hostname"],
zkhandler,
config,
logger,
)
# Set up the node monitoring instance and thread
monitoring_instance = MonitoringInstance.MonitoringInstance(
zkhandler, config, logger, this_node
)
# Tick loop; does nothing since everything is async
while True:
try:
sleep(1)
except Exception:
break

View File

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# PluginInstance.py - Class implementing a PVC monitoring instance
# MonitoringInstance.py - Class implementing a PVC monitor in pvchealthd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>

View File

@ -0,0 +1,224 @@
#!/usr/bin/env python3
# NodeInstance.py - Class implementing a PVC node in pvchealthd
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
class NodeInstance(object):
# Initialization function
def __init__(
self,
name,
zkhandler,
config,
logger,
):
# Passed-in variables on creation
self.name = name
self.zkhandler = zkhandler
self.config = config
self.logger = logger
# States
self.daemon_state = "stop"
self.coordinator_state = "client"
self.domain_state = "flushed"
# Node resources
self.health = 100
self.active_domains_count = 0
self.provisioned_domains_count = 0
self.memused = 0
self.memfree = 0
self.memalloc = 0
self.vcpualloc = 0
# Zookeeper handlers for changed states
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.daemon", self.name)
)
def watch_node_daemonstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "stop"
if data != self.daemon_state:
self.daemon_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.router", self.name)
)
def watch_node_routerstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "client"
if data != self.coordinator_state:
self.coordinator_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.state.domain", self.name)
)
def watch_node_domainstate(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = "unknown"
if data != self.domain_state:
self.domain_state = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.monitoring.health", self.name)
)
def watch_node_health(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 100
try:
data = int(data)
except ValueError:
pass
if data != self.health:
self.health = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.free", self.name)
)
def watch_node_memfree(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memfree:
self.memfree = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.used", self.name)
)
def watch_node_memused(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memused:
self.memused = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.memory.allocated", self.name)
)
def watch_node_memalloc(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.memalloc:
self.memalloc = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.vcpu.allocated", self.name)
)
def watch_node_vcpualloc(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.vcpualloc:
self.vcpualloc = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.running_domains", self.name)
)
def watch_node_runningdomains(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii").split()
except AttributeError:
data = []
if len(data) != self.active_domains_count:
self.active_domains_count = len(data)
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("node.count.provisioned_domains", self.name)
)
def watch_node_domainscount(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = 0
if data != self.provisioned_domains_count:
self.provisioned_domains_count = data

View File

@ -0,0 +1,696 @@
#!/usr/bin/env python3
# config.py - Utility functions for pvcnoded configuration parsing
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import daemon_lib.common as common
import os
import subprocess
import yaml
from socket import gethostname
from re import findall
from psutil import cpu_count
from ipaddress import ip_address, ip_network
from json import loads
class MalformedConfigurationError(Exception):
"""
An except when parsing the PVC Node daemon configuration file
"""
def __init__(self, error=None):
self.msg = f"ERROR: Configuration file is malformed: {error}"
def __str__(self):
return str(self.msg)
def get_static_data():
"""
Data that is obtained once at node startup for use later
"""
staticdata = list()
staticdata.append(str(cpu_count())) # CPU count
staticdata.append(
subprocess.run(["uname", "-r"], stdout=subprocess.PIPE)
.stdout.decode("ascii")
.strip()
)
staticdata.append(
subprocess.run(["uname", "-o"], stdout=subprocess.PIPE)
.stdout.decode("ascii")
.strip()
)
staticdata.append(
subprocess.run(["uname", "-m"], stdout=subprocess.PIPE)
.stdout.decode("ascii")
.strip()
)
return staticdata
def get_configuration_path():
config_file = None
try:
_config_file = "/etc/pvc/pvcnoded.yaml"
if not os.path.exists(_config_file):
raise
config_file = _config_file
config_type = "legacy"
except Exception:
pass
try:
_config_file = os.environ["PVC_CONFIG_FILE"]
if not os.path.exists(_config_file):
raise
config_file = _config_file
config_type = "current"
except Exception:
pass
if not config_file:
print('ERROR: The "PVC_CONFIG_FILE" environment variable must be set.')
os._exit(1)
return config_file, config_type
def get_hostname():
node_fqdn = gethostname()
node_hostname = node_fqdn.split(".", 1)[0]
node_domain = "".join(node_fqdn.split(".", 1)[1:])
try:
node_id = findall(r"\d+", node_hostname)[-1]
except IndexError:
node_id = 0
return node_fqdn, node_hostname, node_domain, node_id
def validate_floating_ip(config, network):
if network not in ["cluster", "storage", "upstream"]:
return False, f'Specified network type "{network}" is not valid'
floating_key = f"{network}_floating_ip"
network_key = f"{network}_network"
# Verify the network provided is valid
try:
network = ip_network(config[network_key])
except Exception:
return (
False,
f"Network address {config[network_key]} for {network_key} is not valid",
)
# Verify that the floating IP is valid (and in the network)
try:
floating_address = ip_address(config[floating_key].split("/")[0])
if floating_address not in list(network.hosts()):
raise
except Exception:
return (
False,
f"Floating address {config[floating_key]} for {floating_key} is not valid",
)
return True, ""
def get_configuration_current(config_file):
print('Loading configuration from file "{}"'.format(config_file))
with open(config_file, "r") as cfgfh:
try:
o_config = yaml.load(cfgfh, Loader=yaml.SafeLoader)
except Exception as e:
print(f"ERROR: Failed to parse configuration file: {e}")
os._exit(1)
config = dict()
node_fqdn, node_hostname, node_domain, node_id = get_hostname()
config_thisnode = {
"node": node_hostname,
"node_hostname": node_hostname,
"node_fqdn": node_fqdn,
"node_domain": node_domain,
"node_id": node_id,
}
config = {**config, **config_thisnode}
try:
o_path = o_config["path"]
config_path = {
"plugin_directory": o_path.get(
"plugin_directory", "/usr/share/pvc/plugins"
),
"dynamic_directory": o_path["dynamic_directory"],
"log_directory": o_path["system_log_directory"],
"console_log_directory": o_path["console_log_directory"],
"ceph_directory": o_path["ceph_directory"],
}
# Define our dynamic directory schema
config_path["dnsmasq_dynamic_directory"] = (
config_path["dynamic_directory"] + "/dnsmasq"
)
config_path["pdns_dynamic_directory"] = (
config_path["dynamic_directory"] + "/pdns"
)
config_path["nft_dynamic_directory"] = config_path["dynamic_directory"] + "/nft"
# Define our log directory schema
config_path["dnsmasq_log_directory"] = config_path["log_directory"] + "/dnsmasq"
config_path["pdns_log_directory"] = config_path["log_directory"] + "/pdns"
config_path["nft_log_directory"] = config_path["log_directory"] + "/nft"
config = {**config, **config_path}
o_subsystem = o_config["subsystem"]
config_subsystem = {
"enable_hypervisor": o_subsystem.get("enable_hypervisor", True),
"enable_networking": o_subsystem.get("enable_networking", True),
"enable_storage": o_subsystem.get("enable_storage", True),
"enable_worker": o_subsystem.get("enable_worker", True),
"enable_api": o_subsystem.get("enable_api", True),
}
config = {**config, **config_subsystem}
o_cluster = o_config["cluster"]
config_cluster = {
"cluster_name": o_cluster["name"],
"all_nodes": o_cluster["all_nodes"],
"coordinators": o_cluster["coordinator_nodes"],
}
config = {**config, **config_cluster}
o_cluster_networks = o_cluster["networks"]
for network_type in ["cluster", "storage", "upstream"]:
o_cluster_networks_specific = o_cluster_networks[network_type]
config_cluster_networks_specific = {
f"{network_type}_domain": o_cluster_networks_specific["domain"],
f"{network_type}_dev": o_cluster_networks_specific["device"],
f"{network_type}_mtu": o_cluster_networks_specific["mtu"],
f"{network_type}_network": o_cluster_networks_specific["ipv4"][
"network_address"
]
+ "/"
+ str(o_cluster_networks_specific["ipv4"]["netmask"]),
f"{network_type}_floating_ip": o_cluster_networks_specific["ipv4"][
"floating_address"
]
+ "/"
+ str(o_cluster_networks_specific["ipv4"]["netmask"]),
f"{network_type}_node_ip_selection": o_cluster_networks_specific[
"node_ip_selection"
],
}
if (
o_cluster_networks_specific["ipv4"].get("gateway_address", None)
is not None
):
config[f"{network_type}_gateway"] = o_cluster_networks_specific["ipv4"][
"gateway_address"
]
result, msg = validate_floating_ip(
config_cluster_networks_specific, network_type
)
if not result:
raise MalformedConfigurationError(msg)
network = ip_network(
config_cluster_networks_specific[f"{network_type}_network"]
)
if (
config_cluster_networks_specific[f"{network_type}_node_ip_selection"]
== "by-id"
):
address_id = int(node_id) - 1
else:
# This roundabout solution ensures the given IP is in the subnet and is something valid
address_id = [
idx
for idx, ip in enumerate(list(network.hosts()))
if str(ip)
== config_cluster_networks_specific[
f"{network_type}_node_ip_selection"
]
][0]
config_cluster_networks_specific[
f"{network_type}_dev_ip"
] = f"{list(network.hosts())[address_id]}/{network.prefixlen}"
config = {**config, **config_cluster_networks_specific}
o_database = o_config["database"]
config_database = {
"zookeeper_port": o_database["zookeeper"]["port"],
"keydb_port": o_database["keydb"]["port"],
"keydb_host": o_database["keydb"]["hostname"],
"keydb_path": o_database["keydb"]["path"],
"metadata_postgresql_port": o_database["postgres"]["port"],
"metadata_postgresql_host": o_database["postgres"]["hostname"],
"metadata_postgresql_dbname": o_database["postgres"]["credentials"]["api"][
"database"
],
"metadata_postgresql_user": o_database["postgres"]["credentials"]["api"][
"username"
],
"metadata_postgresql_password": o_database["postgres"]["credentials"][
"api"
]["password"],
"pdns_postgresql_port": o_database["postgres"]["port"],
"pdns_postgresql_host": o_database["postgres"]["hostname"],
"pdns_postgresql_dbname": o_database["postgres"]["credentials"]["dns"][
"database"
],
"pdns_postgresql_user": o_database["postgres"]["credentials"]["dns"][
"username"
],
"pdns_postgresql_password": o_database["postgres"]["credentials"]["dns"][
"password"
],
}
config = {**config, **config_database}
o_timer = o_config["timer"]
config_timer = {
"vm_shutdown_timeout": int(o_timer.get("vm_shutdown_timeout", 180)),
"keepalive_interval": int(o_timer.get("keepalive_interval", 5)),
"monitoring_interval": int(o_timer.get("monitoring_interval", 60)),
}
config = {**config, **config_timer}
o_fencing = o_config["fencing"]
config_fencing = {
"disable_on_ipmi_failure": o_fencing["disable_on_ipmi_failure"],
"fence_intervals": int(o_fencing["intervals"].get("fence_intervals", 6)),
"suicide_intervals": int(o_fencing["intervals"].get("suicide_interval", 0)),
"successful_fence": o_fencing["actions"].get("successful_fence", None),
"failed_fence": o_fencing["actions"].get("failed_fence", None),
"ipmi_hostname": o_fencing["ipmi"]["hostname"].format(node_id=node_id),
"ipmi_username": o_fencing["ipmi"]["username"],
"ipmi_password": o_fencing["ipmi"]["password"],
}
config = {**config, **config_fencing}
o_migration = o_config["migration"]
config_migration = {
"migration_target_selector": o_migration.get("target_selector", "mem"),
}
config = {**config, **config_migration}
o_logging = o_config["logging"]
config_logging = {
"debug": o_logging.get("debug_logging", False),
"file_logging": o_logging.get("file_logging", False),
"stdout_logging": o_logging.get("stdout_logging", False),
"zookeeper_logging": o_logging.get("zookeeper_logging", False),
"log_colours": o_logging.get("log_colours", False),
"log_dates": o_logging.get("log_dates", False),
"log_keepalives": o_logging.get("log_keepalives", False),
"log_keepalive_cluster_details": o_logging.get(
"log_cluster_details", False
),
"log_keepalive_plugin_details": o_logging.get(
"log_monitoring_details", False
),
"console_log_lines": o_logging.get("console_log_lines", False),
"node_log_lines": o_logging.get("node_log_lines", False),
}
config = {**config, **config_logging}
o_guest_networking = o_config["guest_networking"]
config_guest_networking = {
"bridge_dev": o_guest_networking["bridge_device"],
"bridge_mtu": o_guest_networking["bridge_mtu"],
"enable_sriov": o_guest_networking.get("sriov_enable", False),
"sriov_device": o_guest_networking.get("sriov_device", list()),
}
config = {**config, **config_guest_networking}
o_ceph = o_config["ceph"]
config_ceph = {
"ceph_config_file": config["ceph_directory"]
+ "/"
+ o_ceph["ceph_config_file"],
"ceph_admin_keyring": config["ceph_directory"]
+ "/"
+ o_ceph["ceph_keyring_file"],
"ceph_monitor_port": o_ceph["monitor_port"],
"ceph_secret_uuid": o_ceph["secret_uuid"],
}
config = {**config, **config_ceph}
# Add our node static data to the config
config["static_data"] = get_static_data()
except Exception as e:
raise MalformedConfigurationError(e)
return config
def get_configuration_legacy(pvcnoded_config_file):
print('Loading configuration from file "{}"'.format(pvcnoded_config_file))
with open(pvcnoded_config_file, "r") as cfgfile:
try:
o_config = yaml.load(cfgfile, Loader=yaml.SafeLoader)
except Exception as e:
print("ERROR: Failed to parse configuration file: {}".format(e))
os._exit(1)
node_fqdn, node_hostname, node_domain, node_id = get_hostname()
# Create the configuration dictionary
config = dict()
# Get the initial base configuration
try:
o_base = o_config["pvc"]
o_cluster = o_config["pvc"]["cluster"]
except Exception as e:
raise MalformedConfigurationError(e)
config_general = {
"node": o_base.get("node", node_hostname),
"node_hostname": node_hostname,
"node_fqdn": node_fqdn,
"node_domain": node_domain,
"node_id": node_id,
"coordinators": o_cluster.get("coordinators", list()),
"debug": o_base.get("debug", False),
}
config = {**config, **config_general}
# Get the functions configuration
try:
o_functions = o_config["pvc"]["functions"]
except Exception as e:
raise MalformedConfigurationError(e)
config_functions = {
"enable_hypervisor": o_functions.get("enable_hypervisor", False),
"enable_networking": o_functions.get("enable_networking", False),
"enable_storage": o_functions.get("enable_storage", False),
"enable_worker": o_functions.get("enable_worker", True),
"enable_api": o_functions.get("enable_api", False),
}
config = {**config, **config_functions}
# Get the directory configuration
try:
o_directories = o_config["pvc"]["system"]["configuration"]["directories"]
except Exception as e:
raise MalformedConfigurationError(e)
config_directories = {
"plugin_directory": o_directories.get(
"plugin_directory", "/usr/share/pvc/plugins"
),
"dynamic_directory": o_directories.get("dynamic_directory", None),
"log_directory": o_directories.get("log_directory", None),
"console_log_directory": o_directories.get("console_log_directory", None),
}
# Define our dynamic directory schema
config_directories["dnsmasq_dynamic_directory"] = (
config_directories["dynamic_directory"] + "/dnsmasq"
)
config_directories["pdns_dynamic_directory"] = (
config_directories["dynamic_directory"] + "/pdns"
)
config_directories["nft_dynamic_directory"] = (
config_directories["dynamic_directory"] + "/nft"
)
# Define our log directory schema
config_directories["dnsmasq_log_directory"] = (
config_directories["log_directory"] + "/dnsmasq"
)
config_directories["pdns_log_directory"] = (
config_directories["log_directory"] + "/pdns"
)
config_directories["nft_log_directory"] = (
config_directories["log_directory"] + "/nft"
)
config = {**config, **config_directories}
# Get the logging configuration
try:
o_logging = o_config["pvc"]["system"]["configuration"]["logging"]
except Exception as e:
raise MalformedConfigurationError(e)
config_logging = {
"file_logging": o_logging.get("file_logging", False),
"stdout_logging": o_logging.get("stdout_logging", False),
"zookeeper_logging": o_logging.get("zookeeper_logging", False),
"log_colours": o_logging.get("log_colours", False),
"log_dates": o_logging.get("log_dates", False),
"log_keepalives": o_logging.get("log_keepalives", False),
"log_keepalive_cluster_details": o_logging.get(
"log_keepalive_cluster_details", False
),
"log_keepalive_plugin_details": o_logging.get(
"log_keepalive_plugin_details", False
),
"console_log_lines": o_logging.get("console_log_lines", False),
"node_log_lines": o_logging.get("node_log_lines", False),
}
config = {**config, **config_logging}
# Get the interval configuration
try:
o_intervals = o_config["pvc"]["system"]["intervals"]
except Exception as e:
raise MalformedConfigurationError(e)
config_intervals = {
"vm_shutdown_timeout": int(o_intervals.get("vm_shutdown_timeout", 60)),
"keepalive_interval": int(o_intervals.get("keepalive_interval", 5)),
"monitoring_interval": int(o_intervals.get("monitoring_interval", 60)),
"fence_intervals": int(o_intervals.get("fence_intervals", 6)),
"suicide_intervals": int(o_intervals.get("suicide_interval", 0)),
}
config = {**config, **config_intervals}
# Get the fencing configuration
try:
o_fencing = o_config["pvc"]["system"]["fencing"]
o_fencing_actions = o_fencing["actions"]
o_fencing_ipmi = o_fencing["ipmi"]
except Exception as e:
raise MalformedConfigurationError(e)
config_fencing = {
"successful_fence": o_fencing_actions.get("successful_fence", None),
"failed_fence": o_fencing_actions.get("failed_fence", None),
"ipmi_hostname": o_fencing_ipmi.get(
"host", f"{node_hostname}-lom.{node_domain}"
),
"ipmi_username": o_fencing_ipmi.get("user", "null"),
"ipmi_password": o_fencing_ipmi.get("pass", "null"),
}
config = {**config, **config_fencing}
# Get the migration configuration
try:
o_migration = o_config["pvc"]["system"]["migration"]
except Exception as e:
raise MalformedConfigurationError(e)
config_migration = {
"migration_target_selector": o_migration.get("target_selector", "mem"),
}
config = {**config, **config_migration}
if config["enable_networking"]:
# Get the node networks configuration
try:
o_networks = o_config["pvc"]["cluster"]["networks"]
o_network_cluster = o_networks["cluster"]
o_network_storage = o_networks["storage"]
o_network_upstream = o_networks["upstream"]
o_sysnetworks = o_config["pvc"]["system"]["configuration"]["networking"]
o_sysnetwork_cluster = o_sysnetworks["cluster"]
o_sysnetwork_storage = o_sysnetworks["storage"]
o_sysnetwork_upstream = o_sysnetworks["upstream"]
except Exception as e:
raise MalformedConfigurationError(e)
config_networks = {
"cluster_domain": o_network_cluster.get("domain", None),
"cluster_network": o_network_cluster.get("network", None),
"cluster_floating_ip": o_network_cluster.get("floating_ip", None),
"cluster_dev": o_sysnetwork_cluster.get("device", None),
"cluster_mtu": o_sysnetwork_cluster.get("mtu", None),
"cluster_dev_ip": o_sysnetwork_cluster.get("address", None),
"storage_domain": o_network_storage.get("domain", None),
"storage_network": o_network_storage.get("network", None),
"storage_floating_ip": o_network_storage.get("floating_ip", None),
"storage_dev": o_sysnetwork_storage.get("device", None),
"storage_mtu": o_sysnetwork_storage.get("mtu", None),
"storage_dev_ip": o_sysnetwork_storage.get("address", None),
"upstream_domain": o_network_upstream.get("domain", None),
"upstream_network": o_network_upstream.get("network", None),
"upstream_floating_ip": o_network_upstream.get("floating_ip", None),
"upstream_gateway": o_network_upstream.get("gateway", None),
"upstream_dev": o_sysnetwork_upstream.get("device", None),
"upstream_mtu": o_sysnetwork_upstream.get("mtu", None),
"upstream_dev_ip": o_sysnetwork_upstream.get("address", None),
"bridge_dev": o_sysnetworks.get("bridge_device", None),
"bridge_mtu": o_sysnetworks.get("bridge_mtu", None),
"enable_sriov": o_sysnetworks.get("sriov_enable", False),
"sriov_device": o_sysnetworks.get("sriov_device", list()),
}
if config_networks["bridge_mtu"] is None:
# Read the current MTU of bridge_dev and set bridge_mtu to it; avoids weird resets
retcode, stdout, stderr = common.run_os_command(
f"ip -json link show dev {config_networks['bridge_dev']}"
)
current_bridge_mtu = loads(stdout)[0]["mtu"]
print(
f"Config key bridge_mtu not explicitly set; using live MTU {current_bridge_mtu} from {config_networks['bridge_dev']}"
)
config_networks["bridge_mtu"] = current_bridge_mtu
config = {**config, **config_networks}
for network_type in ["cluster", "storage", "upstream"]:
result, msg = validate_floating_ip(config, network_type)
if not result:
raise MalformedConfigurationError(msg)
address_key = "{}_dev_ip".format(network_type)
network_key = f"{network_type}_network"
network = ip_network(config[network_key])
# With autoselection of addresses, construct an IP from the relevant network
if config[address_key] == "by-id":
# The NodeID starts at 1, but indexes start at 0
address_id = int(config["node_id"]) - 1
# Grab the nth address from the network
config[address_key] = "{}/{}".format(
list(network.hosts())[address_id], network.prefixlen
)
# Validate the provided IP instead
else:
try:
address = ip_address(config[address_key].split("/")[0])
if address not in list(network.hosts()):
raise
except Exception:
raise MalformedConfigurationError(
f"IP address {config[address_key]} for {address_key} is not valid"
)
# Get the PowerDNS aggregator database configuration
try:
o_pdnsdb = o_config["pvc"]["coordinator"]["dns"]["database"]
except Exception as e:
raise MalformedConfigurationError(e)
config_pdnsdb = {
"pdns_postgresql_host": o_pdnsdb.get("host", None),
"pdns_postgresql_port": o_pdnsdb.get("port", None),
"pdns_postgresql_dbname": o_pdnsdb.get("name", None),
"pdns_postgresql_user": o_pdnsdb.get("user", None),
"pdns_postgresql_password": o_pdnsdb.get("pass", None),
}
config = {**config, **config_pdnsdb}
# Get the Cloud-Init Metadata database configuration
try:
o_metadatadb = o_config["pvc"]["coordinator"]["metadata"]["database"]
except Exception as e:
raise MalformedConfigurationError(e)
config_metadatadb = {
"metadata_postgresql_host": o_metadatadb.get("host", None),
"metadata_postgresql_port": o_metadatadb.get("port", None),
"metadata_postgresql_dbname": o_metadatadb.get("name", None),
"metadata_postgresql_user": o_metadatadb.get("user", None),
"metadata_postgresql_password": o_metadatadb.get("pass", None),
}
config = {**config, **config_metadatadb}
if config["enable_storage"]:
# Get the storage configuration
try:
o_storage = o_config["pvc"]["system"]["configuration"]["storage"]
except Exception as e:
raise MalformedConfigurationError(e)
config_storage = {
"ceph_config_file": o_storage.get("ceph_config_file", None),
"ceph_admin_keyring": o_storage.get("ceph_admin_keyring", None),
}
config = {**config, **config_storage}
# Add our node static data to the config
config["static_data"] = get_static_data()
return config
def get_configuration():
"""
Parse the configuration of the node daemon.
"""
pvc_config_file, pvc_config_type = get_configuration_path()
if pvc_config_type == "legacy":
config = get_configuration_legacy(pvc_config_file)
else:
config = get_configuration_current(pvc_config_file)
return config
def validate_directories(config):
if not os.path.exists(config["dynamic_directory"]):
os.makedirs(config["dynamic_directory"])
os.makedirs(config["dnsmasq_dynamic_directory"])
os.makedirs(config["pdns_dynamic_directory"])
os.makedirs(config["nft_dynamic_directory"])
if not os.path.exists(config["log_directory"]):
os.makedirs(config["log_directory"])
os.makedirs(config["dnsmasq_log_directory"])
os.makedirs(config["pdns_log_directory"])
os.makedirs(config["nft_log_directory"])

View File

@ -0,0 +1,338 @@
#!/usr/bin/env python3
# fencing.py - Utility functions for pvcnoded fencing
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import time
import daemon_lib.common as common
from daemon_lib.vm import vm_worker_flush_locks
#
# Fence thread entry function
#
def fence_node(node_name, zkhandler, config, logger):
# We allow exactly 6 saving throws (30 seconds) for the host to come back online or we kill it
failcount_limit = 6
failcount = 0
while failcount < failcount_limit:
# Wait 5 seconds
time.sleep(config["keepalive_interval"])
# Get the state
node_daemon_state = zkhandler.read(("node.state.daemon", node_name))
# Is it still 'dead'
if node_daemon_state == "dead":
failcount += 1
logger.out(
f"Node {node_name} failed {failcount}/{failcount_limit} saving throws",
state="s",
prefix=f"fencing {node_name}",
)
# It changed back to something else so it must be alive
else:
logger.out(
f"Node {node_name} passed a saving throw; cancelling fance",
state="o",
prefix=f"fencing {node_name}",
)
return
logger.out(
f"Fencing node {node_name} via IPMI reboot signal",
state="s",
prefix=f"fencing {node_name}",
)
# Get IPMI information
ipmi_hostname = zkhandler.read(("node.ipmi.hostname", node_name))
ipmi_username = zkhandler.read(("node.ipmi.username", node_name))
ipmi_password = zkhandler.read(("node.ipmi.password", node_name))
# Shoot it in the head
fence_status = reboot_via_ipmi(
node_name, ipmi_hostname, ipmi_username, ipmi_password, logger
)
# Hold to ensure the fence takes effect and system stabilizes
logger.out(
f"Waiting {config['keepalive_interval']}s for fence of node {node_name} to take effect",
state="i",
prefix=f"fencing {node_name}",
)
time.sleep(config["keepalive_interval"])
if fence_status:
logger.out(
f"Marking node {node_name} as fenced",
state="i",
prefix=f"fencing {node_name}",
)
while True:
try:
zkhandler.write([(("node.state.daemon", node_name), "fenced")])
break
except Exception:
continue
# Force into secondary network state if needed
if node_name in config["coordinators"]:
logger.out(
f"Forcing secondary coordinator state for node {node_name}",
state="i",
prefix=f"fencing {node_name}",
)
zkhandler.write([(("node.state.router", node_name), "secondary")])
if zkhandler.read("base.config.primary_node") == node_name:
zkhandler.write([("base.config.primary_node", "none")])
# If the fence succeeded and successful_fence is migrate
if fence_status and config["successful_fence"] == "migrate":
migrateFromFencedNode(zkhandler, node_name, config, logger)
# If the fence failed and failed_fence is migrate
if (
not fence_status
and config["failed_fence"] == "migrate"
and config["suicide_intervals"] != "0"
):
migrateFromFencedNode(zkhandler, node_name, config, logger)
# Migrate hosts away from a fenced node
def migrateFromFencedNode(zkhandler, node_name, config, logger):
logger.out(
f"Migrating VMs from dead node {node_name} to new hosts",
state="i",
prefix=f"fencing {node_name}",
)
# Get the list of VMs
dead_node_running_domains = zkhandler.read(
("node.running_domains", node_name)
).split()
# Set the node to a custom domainstate so we know what's happening
zkhandler.write([(("node.state.domain", node_name), "fence-flush")])
# Migrate a VM after a flush
def fence_migrate_vm(dom_uuid):
logger.out(
f"Flushing locks of VM {dom_uuid} due to fence",
state="i",
prefix=f"fencing {node_name}",
)
vm_worker_flush_locks(zkhandler, None, dom_uuid, force_unlock=True)
target_node = common.findTargetNode(zkhandler, dom_uuid)
if target_node is not None:
logger.out(
f"Migrating VM {dom_uuid} to node {target_node}",
state="i",
prefix=f"fencing {node_name}",
)
zkhandler.write(
[
(("domain.state", dom_uuid), "start"),
(("domain.node", dom_uuid), target_node),
(("domain.last_node", dom_uuid), node_name),
]
)
logger.out(
f"Successfully migrated running VM {dom_uuid} to node {target_node}",
state="o",
prefix=f"fencing {node_name}",
)
else:
logger.out(
f"No target node found for VM {dom_uuid}; marking autostart=True on current node",
state="i",
prefix=f"fencing {node_name}",
)
zkhandler.write(
{
(("domain.state", dom_uuid), "stopped"),
(("domain.meta.autostart", dom_uuid), "True"),
}
)
logger.out(
f"Successfully marked autostart for running VM {dom_uuid} on current node",
state="o",
prefix=f"fencing {node_name}",
)
# Loop through the VMs
for dom_uuid in dead_node_running_domains:
try:
fence_migrate_vm(dom_uuid)
except Exception as e:
logger.out(
f"Failed to migrate VM {dom_uuid}, continuing: {e}",
state="w",
prefix=f"fencing {node_name}",
)
# Set node in flushed state for easy remigrating when it comes back
zkhandler.write([(("node.state.domain", node_name), "flushed")])
logger.out(
f"All VMs flushed from dead node {node_name} to other nodes",
state="i",
prefix=f"fencing {node_name}",
)
#
# Perform an IPMI fence
#
def reboot_via_ipmi(node_name, ipmi_hostname, ipmi_user, ipmi_password, logger):
# Power off the node the node
logger.out(
"Sending power off to dead node",
state="i",
prefix=f"fencing {node_name}",
)
ipmi_stop_retcode, ipmi_stop_stdout, ipmi_stop_stderr = common.run_os_command(
f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power off"
)
if ipmi_stop_retcode != 0:
logger.out(
f"Failed to power off dead node: {ipmi_stop_stderr}",
state="e",
prefix=f"fencing {node_name}",
)
logger.out(
"Waiting 5s for power off to take effect",
state="i",
prefix=f"fencing {node_name}",
)
time.sleep(5)
# Check the chassis power state
logger.out(
"Checking power state of dead node",
state="i",
prefix=f"fencing {node_name}",
)
ipmi_status_retcode, ipmi_status_stdout, ipmi_status_stderr = common.run_os_command(
f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status"
)
if ipmi_status_retcode == 0:
logger.out(
f"Current chassis power state is: {ipmi_status_stdout.strip()}",
state="i",
prefix=f"fencing {node_name}",
)
else:
logger.out(
"Current chassis power state is: Unknown",
state="w",
prefix=f"fencing {node_name}",
)
# Power on the node
logger.out(
"Sending power on to dead node",
state="i",
prefix=f"fencing {node_name}",
)
ipmi_start_retcode, ipmi_start_stdout, ipmi_start_stderr = common.run_os_command(
f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power on"
)
if ipmi_start_retcode != 0:
logger.out(
f"Failed to power on dead node: {ipmi_start_stderr}",
state="w",
prefix=f"fencing {node_name}",
)
logger.out(
"Waiting 2s for power on to take effect",
state="i",
prefix=f"fencing {node_name}",
)
time.sleep(2)
# Check the chassis power state
logger.out(
"Checking power state of dead node",
state="i",
prefix=f"fencing {node_name}",
)
ipmi_status_retcode, ipmi_status_stdout, ipmi_status_stderr = common.run_os_command(
f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status"
)
if ipmi_stop_retcode == 0:
if ipmi_status_stdout.strip() == "Chassis Power is on":
# We successfully rebooted the node and it is powered on; this is a succeessful fence
logger.out(
"Successfully rebooted dead node; proceeding with fence recovery action",
state="o",
prefix=f"fencing {node_name}",
)
return True
elif ipmi_status_stdout.strip() == "Chassis Power is off":
# We successfully rebooted the node but it is powered off; this might be expected or not, but the node is confirmed off so we can call it a successful fence
logger.out(
"Chassis power is in confirmed off state after successfuly IPMI reboot; proceeding with fence recovery action",
state="o",
prefix=f"fencing {node_name}",
)
return True
else:
# We successfully rebooted the node but it is in some unknown power state; since this might indicate a silent failure, we must call it a failed fence
logger.out(
f"Chassis power is in an unknown state ({ipmi_status_stdout.strip()}) after successful IPMI reboot; NOT proceeding fence recovery action",
state="e",
prefix=f"fencing {node_name}",
)
return False
else:
if ipmi_status_stdout.strip() == "Chassis Power is off":
# We failed to reboot the node but it is powered off; it has probably suffered a serious hardware failure, but the node is confirmed off so we can call it a successful fence
logger.out(
"Chassis power is in confirmed off state after failed IPMI reboot; proceeding with fence recovery action",
state="o",
prefix=f"fencing {node_name}",
)
return True
else:
# We failed to reboot the node but it is in some unknown power state (including "on"); since this might indicate a silent failure, we must call it a failed fence
logger.out(
"Chassis power is not in confirmed off state after failed IPMI reboot; NOT proceeding wiht fence recovery action",
state="e",
prefix=f"fencing {node_name}",
)
return False
#
# Verify that IPMI connectivity to this host exists (used during node init)
#
def verify_ipmi(ipmi_hostname, ipmi_user, ipmi_password):
ipmi_command = f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status"
retcode, stdout, stderr = common.run_os_command(ipmi_command, timeout=2)
if retcode == 0 and stdout.strip() == "Chassis Power is on":
return True
else:
return False

View File

@ -0,0 +1,968 @@
#!/usr/bin/env python3
# keepalive.py - Utility functions for pvcnoded Keepalives
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvcnoded.util.fencing
import daemon_lib.common as common
from apscheduler.schedulers.background import BackgroundScheduler
from rados import Rados
from xml.etree import ElementTree
from queue import Queue
from threading import Thread
from datetime import datetime
import json
import re
import libvirt
import psutil
import os
import time
# State table for pretty stats
libvirt_vm_states = {
0: "NOSTATE",
1: "RUNNING",
2: "BLOCKED",
3: "PAUSED",
4: "SHUTDOWN",
5: "SHUTOFF",
6: "CRASHED",
7: "PMSUSPENDED",
}
def start_keepalive_timer(logger, config, zkhandler, this_node):
keepalive_interval = config["keepalive_interval"]
logger.out(
f"Starting keepalive timer ({keepalive_interval} second interval)", state="s"
)
keepalive_timer = BackgroundScheduler()
keepalive_timer.add_job(
node_keepalive,
args=(logger, config, zkhandler, this_node),
trigger="interval",
seconds=keepalive_interval,
)
keepalive_timer.start()
return keepalive_timer
def stop_keepalive_timer(logger, keepalive_timer):
try:
keepalive_timer.shutdown()
logger.out("Stopping keepalive timer", state="s")
except Exception:
logger.out("Failed to stop keepalive timer", state="w")
# Ceph stats update function
def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
pool_list = zkhandler.children("base.pool")
osd_list = zkhandler.children("base.osd")
debug = config["debug"]
if debug:
logger.out("Thread starting", state="d", prefix="ceph-thread")
# Connect to the Ceph cluster
try:
ceph_conn = Rados(
conffile=config["ceph_config_file"],
conf=dict(keyring=config["ceph_admin_keyring"]),
)
if debug:
logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
ceph_conn.connect(timeout=1)
except Exception as e:
logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e")
return
# Primary-only functions
if this_node.coordinator_state == "primary":
# Get Ceph status information (pretty)
if debug:
logger.out(
"Set Ceph status information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
command = {"prefix": "status", "format": "pretty"}
ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
try:
zkhandler.write([("base.storage", str(ceph_status))])
except Exception as e:
logger.out("Failed to set Ceph status data: {}".format(e), state="e")
# Get Ceph health information (JSON)
if debug:
logger.out(
"Set Ceph health information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
command = {"prefix": "health", "format": "json"}
ceph_health = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
try:
zkhandler.write([("base.storage.health", str(ceph_health))])
except Exception as e:
logger.out("Failed to set Ceph health data: {}".format(e), state="e")
# Get Ceph df information (pretty)
if debug:
logger.out(
"Set Ceph rados df information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
# Get rados df info
command = {"prefix": "df", "format": "pretty"}
ceph_df = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1].decode(
"ascii"
)
try:
zkhandler.write([("base.storage.util", str(ceph_df))])
except Exception as e:
logger.out("Failed to set Ceph utilization data: {}".format(e), state="e")
if debug:
logger.out(
"Set pool information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
# Get pool info
command = {"prefix": "df", "format": "json"}
ceph_df_output = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
try:
ceph_pool_df_raw = json.loads(ceph_df_output)["pools"]
except Exception as e:
logger.out("Failed to obtain Pool data (ceph df): {}".format(e), state="w")
ceph_pool_df_raw = []
retcode, stdout, stderr = common.run_os_command(
"rados df --format json", timeout=1
)
try:
rados_pool_df_raw = json.loads(stdout)["pools"]
except Exception as e:
logger.out("Failed to obtain Pool data (rados df): {}".format(e), state="w")
rados_pool_df_raw = []
pool_count = len(ceph_pool_df_raw)
if debug:
logger.out(
"Getting info for {} pools".format(pool_count),
state="d",
prefix="ceph-thread",
)
for pool_idx in range(0, pool_count):
try:
# Combine all the data for this pool
ceph_pool_df = ceph_pool_df_raw[pool_idx]
rados_pool_df = rados_pool_df_raw[pool_idx]
pool = ceph_pool_df
pool.update(rados_pool_df)
# Ignore any pools that aren't in our pool list
if pool["name"] not in pool_list:
if debug:
logger.out(
"Pool {} not in pool list {}".format(
pool["name"], pool_list
),
state="d",
prefix="ceph-thread",
)
continue
else:
if debug:
logger.out(
"Parsing data for pool {}".format(pool["name"]),
state="d",
prefix="ceph-thread",
)
# Assemble a useful data structure
pool_df = {
"id": pool["id"],
"stored_bytes": pool["stats"]["stored"],
"free_bytes": pool["stats"]["max_avail"],
"used_bytes": pool["stats"]["bytes_used"],
"used_percent": pool["stats"]["percent_used"],
"num_objects": pool["stats"]["objects"],
"num_object_clones": pool["num_object_clones"],
"num_object_copies": pool["num_object_copies"],
"num_objects_missing_on_primary": pool[
"num_objects_missing_on_primary"
],
"num_objects_unfound": pool["num_objects_unfound"],
"num_objects_degraded": pool["num_objects_degraded"],
"read_ops": pool["read_ops"],
"read_bytes": pool["read_bytes"],
"write_ops": pool["write_ops"],
"write_bytes": pool["write_bytes"],
}
# Write the pool data to Zookeeper
zkhandler.write(
[(("pool.stats", pool["name"]), str(json.dumps(pool_df)))]
)
except Exception as e:
# One or more of the status commands timed out, just continue
logger.out(
"Failed to format and send pool data: {}".format(e), state="w"
)
pass
# Only grab OSD stats if there are OSDs to grab (otherwise `ceph osd df` hangs)
osds_this_node = 0
if len(osd_list) > 0:
# Get data from Ceph OSDs
if debug:
logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
# Parse the dump data
osd_dump = dict()
command = {"prefix": "osd dump", "format": "json"}
osd_dump_output = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
try:
osd_dump_raw = json.loads(osd_dump_output)["osds"]
except Exception as e:
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
osd_dump_raw = []
if debug:
logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
for osd in osd_dump_raw:
osd_dump.update(
{
str(osd["osd"]): {
"uuid": osd["uuid"],
"up": osd["up"],
"in": osd["in"],
"primary_affinity": osd["primary_affinity"],
}
}
)
# Parse the df data
if debug:
logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
osd_df = dict()
command = {"prefix": "osd df", "format": "json"}
try:
osd_df_raw = json.loads(
ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1]
)["nodes"]
except Exception as e:
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
osd_df_raw = []
if debug:
logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
for osd in osd_df_raw:
osd_df.update(
{
str(osd["id"]): {
"utilization": osd["utilization"],
"var": osd["var"],
"pgs": osd["pgs"],
"kb": osd["kb"],
"kb_used": osd["kb_used"],
"kb_used_data": osd["kb_used_data"],
"kb_used_omap": osd["kb_used_omap"],
"kb_used_meta": osd["kb_used_meta"],
"kb_avail": osd["kb_avail"],
"weight": osd["crush_weight"],
"reweight": osd["reweight"],
"class": osd["device_class"],
}
}
)
# Parse the status data
if debug:
logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
osd_status = dict()
command = {"prefix": "osd status", "format": "pretty"}
try:
osd_status_raw = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
except Exception as e:
logger.out("Failed to obtain OSD status data: {}".format(e), state="w")
osd_status_raw = []
if debug:
logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
for line in osd_status_raw.split("\n"):
# Strip off colour
line = re.sub(r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))", "", line)
# Split it for parsing
line = line.split()
# Ceph 14 format:
# ['|', '0', '|', 'hv1.p.u.bonilan.net', '|', '318G', '|', '463G', '|', '213', '|', '1430k', '|', '22', '|', '124k', '|', 'exists,up', '|']
# Ceph 16 format:
# ['0', 'hv1.t.u.bonilan.net', '2489M', '236G', '0', '0', '0', '0', 'exists,up']
# Bypass obviously invalid lines
if len(line) < 1:
continue
elif line[0] == "+":
continue
try:
# If line begins with | and second entry is a digit (i.e. OSD ID)
if line[0] == "|" and line[1].isdigit():
# Parse the line in Ceph 14 format
osd_id = line[1]
node = line[3].split(".")[0]
used = line[5]
avail = line[7]
wr_ops = line[9]
wr_data = line[11]
rd_ops = line[13]
rd_data = line[15]
state = line[17]
# If first entry is a digit (i.e. OSD ID)
elif line[0].isdigit():
# Parse the line in Ceph 16 format
osd_id = line[0]
node = line[1].split(".")[0]
used = line[2]
avail = line[3]
wr_ops = line[4]
wr_data = line[5]
rd_ops = line[6]
rd_data = line[7]
state = line[8]
# Otherwise, it's the header line and is ignored
else:
continue
except IndexError:
continue
# I don't know why 2018 me used this construct instead of a normal
# dictionary update, but it works so not changing it.
# ref: bfbe9188ce830381f3f2fa1da11f1973f08eca8c
osd_status.update(
{
str(osd_id): {
"node": node,
"used": used,
"avail": avail,
"wr_ops": wr_ops,
"wr_data": wr_data,
"rd_ops": rd_ops,
"rd_data": rd_data,
"state": state,
}
}
)
# Merge them together into a single meaningful dict
if debug:
logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
osd_stats = dict()
for osd in osd_list:
if zkhandler.read(("osd.node", osd)) == config["node_hostname"]:
osds_this_node += 1
try:
this_dump = osd_dump[osd]
this_dump.update(osd_df[osd])
this_dump.update(osd_status[osd])
osd_stats[osd] = this_dump
except KeyError as e:
# One or more of the status commands timed out, just continue
logger.out(
"Failed to parse OSD stats into dictionary: {}".format(e), state="w"
)
# Upload OSD data for the cluster (primary-only)
if this_node.coordinator_state == "primary":
if debug:
logger.out(
"Trigger updates for each OSD", state="d", prefix="ceph-thread"
)
for osd in osd_list:
try:
stats = json.dumps(osd_stats[osd])
zkhandler.write([(("osd.stats", osd), str(stats))])
except KeyError as e:
# One or more of the status commands timed out, just continue
logger.out(
"Failed to upload OSD stats from dictionary: {}".format(e),
state="w",
)
ceph_conn.shutdown()
queue.put(osds_this_node)
if debug:
logger.out("Thread finished", state="d", prefix="ceph-thread")
# VM stats update function
def collect_vm_stats(logger, config, zkhandler, this_node, queue):
debug = config["debug"]
if debug:
logger.out("Thread starting", state="d", prefix="vm-thread")
# Connect to libvirt
libvirt_name = "qemu:///system"
if debug:
logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
try:
lv_conn = libvirt.open(libvirt_name)
if lv_conn is None:
raise Exception
except Exception:
logger.out('Failed to open connection to "{}"'.format(libvirt_name), state="e")
return
memalloc = 0
memprov = 0
vcpualloc = 0
# Toggle state management of dead VMs to restart them
if debug:
logger.out(
"Toggle state management of dead VMs to restart them",
state="d",
prefix="vm-thread",
)
# Make a copy of the d_domain; if not, and it changes in flight, this can fail
fixed_d_domain = this_node.d_domain.copy()
for domain, instance in fixed_d_domain.items():
if domain in this_node.domain_list:
if instance.getstate() == "start" and instance.getnode() == this_node.name:
if instance.getdom() is not None:
try:
if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
logger.out(
"VM {} has failed".format(instance.domname),
state="w",
prefix="vm-thread",
)
raise
except Exception:
# Toggle a state "change"
logger.out(
"Resetting state to {} for VM {}".format(
instance.getstate(), instance.domname
),
state="i",
prefix="vm-thread",
)
zkhandler.write(
[(("domain.state", domain), instance.getstate())]
)
elif instance.getnode() == this_node.name:
memprov += instance.getmemory()
# Get list of running domains from Libvirt
running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE)
# Get statistics from any running VMs
for domain in running_domains:
try:
# Get basic information about the VM
tree = ElementTree.fromstring(domain.XMLDesc())
domain_uuid = domain.UUIDString()
domain_name = domain.name()
# Get all the raw information about the VM
if debug:
logger.out(
"Getting general statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
(
domain_state,
domain_maxmem,
domain_mem,
domain_vcpus,
domain_cputime,
) = domain.info()
# We can't properly gather stats from a non-running VMs so continue
if domain_state != libvirt.VIR_DOMAIN_RUNNING:
continue
domain_memory_stats = domain.memoryStats()
domain_cpu_stats = domain.getCPUStats(True)[0]
# Add the allocated memory to our memalloc value
memalloc += instance.getmemory()
memprov += instance.getmemory()
vcpualloc += instance.getvcpus()
except Exception as e:
if debug:
try:
logger.out(
"Failed getting VM information for {}: {}".format(
domain.name(), e
),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
# Ensure VM is present in the domain_list
if domain_uuid not in this_node.domain_list:
this_node.domain_list.append(domain_uuid)
if debug:
logger.out(
"Getting disk statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
domain_disk_stats = []
try:
for disk in tree.findall("devices/disk"):
disk_name = disk.find("source").get("name")
if not disk_name:
disk_name = disk.find("source").get("file")
disk_stats = domain.blockStats(disk.find("target").get("dev"))
domain_disk_stats.append(
{
"name": disk_name,
"rd_req": disk_stats[0],
"rd_bytes": disk_stats[1],
"wr_req": disk_stats[2],
"wr_bytes": disk_stats[3],
"err": disk_stats[4],
}
)
except Exception as e:
if debug:
try:
logger.out(
"Failed getting disk stats for {}: {}".format(domain.name(), e),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
if debug:
logger.out(
"Getting network statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
domain_network_stats = []
try:
for interface in tree.findall("devices/interface"):
interface_type = interface.get("type")
if interface_type not in ["bridge"]:
continue
interface_name = interface.find("target").get("dev")
interface_bridge = interface.find("source").get("bridge")
interface_stats = domain.interfaceStats(interface_name)
domain_network_stats.append(
{
"name": interface_name,
"bridge": interface_bridge,
"rd_bytes": interface_stats[0],
"rd_packets": interface_stats[1],
"rd_errors": interface_stats[2],
"rd_drops": interface_stats[3],
"wr_bytes": interface_stats[4],
"wr_packets": interface_stats[5],
"wr_errors": interface_stats[6],
"wr_drops": interface_stats[7],
}
)
except Exception as e:
if debug:
try:
logger.out(
"Failed getting network stats for {}: {}".format(
domain.name(), e
),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
# Create the final dictionary
domain_stats = {
"state": libvirt_vm_states[domain_state],
"maxmem": domain_maxmem,
"livemem": domain_mem,
"cpus": domain_vcpus,
"cputime": domain_cputime,
"mem_stats": domain_memory_stats,
"cpu_stats": domain_cpu_stats,
"disk_stats": domain_disk_stats,
"net_stats": domain_network_stats,
}
if debug:
logger.out(
"Writing statistics for VM {} to Zookeeper".format(domain_name),
state="d",
prefix="vm-thread",
)
try:
zkhandler.write(
[(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))]
)
except Exception as e:
if debug:
logger.out(
"Failed to write domain statistics: {}".format(e),
state="d",
prefix="vm-thread",
)
# Close the Libvirt connection
lv_conn.close()
if debug:
logger.out(
f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}",
state="d",
prefix="vm-thread",
)
queue.put(len(running_domains))
queue.put(memalloc)
queue.put(memprov)
queue.put(vcpualloc)
if debug:
logger.out("Thread finished", state="d", prefix="vm-thread")
# Keepalive update function
def node_keepalive(logger, config, zkhandler, this_node):
debug = config["debug"]
# Display node information to the terminal
if config["log_keepalives"]:
if this_node.coordinator_state == "primary":
cst_colour = logger.fmt_green
elif this_node.coordinator_state == "secondary":
cst_colour = logger.fmt_blue
else:
cst_colour = logger.fmt_cyan
active_coordinator_state = this_node.coordinator_state
runtime_start = datetime.now()
# Set the migration selector in Zookeeper for clients to read
if config["enable_hypervisor"]:
if this_node.coordinator_state == "primary":
try:
if (
zkhandler.read("base.config.migration_target_selector")
!= config["migration_target_selector"]
):
zkhandler.write(
[
(
"base.config.migration_target_selector",
config["migration_target_selector"],
)
]
)
except Exception:
logger.out(
"Failed to set migration target selector in Zookeeper",
state="e",
prefix="main-thread",
)
# Set the upstream IP in Zookeeper for clients to read
if config["enable_networking"]:
if this_node.coordinator_state == "primary":
try:
if (
zkhandler.read("base.config.upstream_ip")
!= config["upstream_floating_ip"]
):
zkhandler.write(
[("base.config.upstream_ip", config["upstream_floating_ip"])]
)
except Exception:
logger.out(
"Failed to set upstream floating IP in Zookeeper",
state="e",
prefix="main-thread",
)
# Get past state and update if needed
if debug:
logger.out(
"Get past state and update if needed", state="d", prefix="main-thread"
)
past_state = zkhandler.read(("node.state.daemon", this_node.name))
if past_state != "run" and past_state != "shutdown":
this_node.daemon_state = "run"
zkhandler.write([(("node.state.daemon", this_node.name), "run")])
else:
this_node.daemon_state = "run"
# Ensure the primary key is properly set
if debug:
logger.out(
"Ensure the primary key is properly set", state="d", prefix="main-thread"
)
if this_node.coordinator_state == "primary":
if zkhandler.read("base.config.primary_node") != this_node.name:
zkhandler.write([("base.config.primary_node", this_node.name)])
# Run VM statistics collection in separate thread for parallelization
if config["enable_hypervisor"]:
vm_thread_queue = Queue()
vm_stats_thread = Thread(
target=collect_vm_stats,
args=(logger, config, zkhandler, this_node, vm_thread_queue),
kwargs={},
)
vm_stats_thread.start()
# Run Ceph status collection in separate thread for parallelization
if config["enable_storage"]:
ceph_thread_queue = Queue()
ceph_stats_thread = Thread(
target=collect_ceph_stats,
args=(logger, config, zkhandler, this_node, ceph_thread_queue),
kwargs={},
)
ceph_stats_thread.start()
# Get node performance statistics
this_node.memtotal = int(psutil.virtual_memory().total / 1024 / 1024)
this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024)
this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024)
this_node.cpuload = round(os.getloadavg()[0], 2)
# Join against running threads
if config["enable_hypervisor"]:
vm_stats_thread.join(timeout=config["keepalive_interval"])
if vm_stats_thread.is_alive():
logger.out("VM stats gathering exceeded timeout, continuing", state="w")
if config["enable_storage"]:
ceph_stats_thread.join(timeout=config["keepalive_interval"])
if ceph_stats_thread.is_alive():
logger.out("Ceph stats gathering exceeded timeout, continuing", state="w")
# Get information from thread queues
if config["enable_hypervisor"]:
try:
this_node.domains_count = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.memalloc = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.memprov = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.vcpualloc = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
except Exception:
logger.out("VM stats queue get exceeded timeout, continuing", state="w")
else:
this_node.domains_count = 0
this_node.memalloc = 0
this_node.memprov = 0
this_node.vcpualloc = 0
if config["enable_storage"]:
try:
osds_this_node = ceph_thread_queue.get(
timeout=(config["keepalive_interval"] - 1)
)
except Exception:
logger.out("Ceph stats queue get exceeded timeout, continuing", state="w")
osds_this_node = "?"
else:
osds_this_node = "0"
# Set our information in zookeeper
keepalive_time = int(time.time())
if debug:
logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
try:
zkhandler.write(
[
(("node.memory.total", this_node.name), str(this_node.memtotal)),
(("node.memory.used", this_node.name), str(this_node.memused)),
(("node.memory.free", this_node.name), str(this_node.memfree)),
(("node.memory.allocated", this_node.name), str(this_node.memalloc)),
(("node.memory.provisioned", this_node.name), str(this_node.memprov)),
(("node.vcpu.allocated", this_node.name), str(this_node.vcpualloc)),
(("node.cpu.load", this_node.name), str(this_node.cpuload)),
(
("node.count.provisioned_domains", this_node.name),
str(this_node.domains_count),
),
(
("node.running_domains", this_node.name),
" ".join(this_node.domain_list),
),
(("node.keepalive", this_node.name), str(keepalive_time)),
]
)
except Exception:
logger.out("Failed to set keepalive data", state="e")
if config["log_keepalives"]:
runtime_end = datetime.now()
runtime_delta = runtime_end - runtime_start
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
logger.out(
"{start_colour}{hostname} keepalive @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] in {runtime} seconds".format(
start_colour=logger.fmt_purple,
cst_colour=logger.fmt_bold + cst_colour,
nofmt=logger.fmt_end,
hostname=config["node_hostname"],
starttime=runtime_start,
costate=active_coordinator_state,
runtime=runtime,
),
state="t",
)
if this_node.maintenance is True:
maintenance_colour = logger.fmt_blue
else:
maintenance_colour = logger.fmt_green
if isinstance(this_node.health, int):
if this_node.health > 90:
health_colour = logger.fmt_green
elif this_node.health > 50:
health_colour = logger.fmt_yellow
else:
health_colour = logger.fmt_red
health_text = str(this_node.health) + "%"
else:
health_colour = logger.fmt_blue
health_text = "N/A"
if config["log_keepalive_cluster_details"]:
logger.out(
"{bold}Maintenance:{nofmt} {maintenance_colour}{maintenance}{nofmt} "
"{bold}Health:{nofmt} {health_colour}{health}{nofmt} "
"{bold}VMs:{nofmt} {domcount} "
"{bold}OSDs:{nofmt} {osdcount} "
"{bold}Load:{nofmt} {load} "
"{bold}Memory [MiB]: "
"{bold}Used:{nofmt} {usedmem} "
"{bold}Free:{nofmt} {freemem}".format(
bold=logger.fmt_bold,
maintenance_colour=maintenance_colour,
health_colour=health_colour,
nofmt=logger.fmt_end,
maintenance=this_node.maintenance,
health=health_text,
domcount=this_node.domains_count,
osdcount=osds_this_node,
load=this_node.cpuload,
freemem=this_node.memfree,
usedmem=this_node.memused,
),
state="t",
)
# Look for dead nodes and fence them
if not this_node.maintenance:
if debug:
logger.out(
"Look for dead nodes and fence them", state="d", prefix="main-thread"
)
if config["daemon_mode"] == "coordinator":
for node_name in zkhandler.children("base.node"):
try:
node_daemon_state = zkhandler.read(("node.state.daemon", node_name))
node_keepalive = int(zkhandler.read(("node.keepalive", node_name)))
except Exception:
node_daemon_state = "unknown"
node_keepalive = 0
# Handle deadtime and fencng if needed
# (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds
# out-of-date while in 'start' state)
node_deadtime = int(time.time()) - (
int(config["keepalive_interval"]) * int(config["fence_intervals"])
)
if node_keepalive < node_deadtime and node_daemon_state == "run":
logger.out(
"Node {} seems dead - starting monitor for fencing".format(
node_name
),
state="w",
)
zk_lock = zkhandler.writelock(("node.state.daemon", node_name))
with zk_lock:
# Ensures that, if we lost the lock race and come out of waiting,
# we won't try to trigger our own fence thread.
if zkhandler.read(("node.state.daemon", node_name)) != "dead":
fence_thread = Thread(
target=pvcnoded.util.fencing.fence_node,
args=(node_name, zkhandler, config, logger),
kwargs={},
)
fence_thread.start()
# Write the updated data after we start the fence thread
zkhandler.write(
[(("node.state.daemon", node_name), "dead")]
)

View File

@ -0,0 +1,36 @@
#!/usr/bin/env python3
# libvirt.py - Utility functions for pvcnoded libvirt
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import libvirt
def validate_libvirtd(logger, config):
if config["enable_hypervisor"]:
libvirt_check_name = f'qemu+tcp://{config["node_hostname"]}/system'
logger.out(f"Connecting to Libvirt daemon at {libvirt_check_name}", state="i")
try:
lv_conn = libvirt.open(libvirt_check_name)
lv_conn.close()
except Exception as e:
logger.out(f"Failed to connect to Libvirt daemon: {e}", state="e")
return False
return True

View File

@ -0,0 +1,232 @@
#!/usr/bin/env python3
# networking.py - Utility functions for pvcnoded networking
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import daemon_lib.common as common
from time import sleep
from os import makedirs
def setup_sriov(logger, config):
logger.out("Setting up SR-IOV device support", state="i")
# Enable unsafe interrupts for the vfio_iommu_type1 kernel module
try:
common.run_os_command("modprobe vfio_iommu_type1 allow_unsafe_interrupts=1")
with open(
"/sys/module/vfio_iommu_type1/parameters/allow_unsafe_interrupts", "w"
) as mfh:
mfh.write("Y")
except Exception:
logger.out(
"Failed to enable vfio_iommu_type1 kernel module; SR-IOV may fail",
state="w",
)
# Loop through our SR-IOV NICs and enable the numvfs for each
for device in config["sriov_device"]:
logger.out(
f'Preparing SR-IOV PF {device["phy"]} with {device["vfcount"]} VFs',
state="i",
)
try:
with open(
f'/sys/class/net/{device["phy"]}/device/sriov_numvfs', "r"
) as vfh:
current_vf_count = vfh.read().strip()
with open(
f'/sys/class/net/{device["phy"]}/device/sriov_numvfs', "w"
) as vfh:
vfh.write(str(device["vfcount"]))
except FileNotFoundError:
logger.out(
f'Failed to open SR-IOV configuration for PF {device["phy"]}; device may not support SR-IOV',
state="w",
)
except OSError:
logger.out(
f'Failed to set SR-IOV VF count for PF {device["phy"]} to {device["vfcount"]}; already set to {current_vf_count}',
state="w",
)
if device.get("mtu", None) is not None:
logger.out(
f'Setting SR-IOV PF {device["phy"]} to MTU {device["mtu"]}', state="i"
)
common.run_os_command(f'ip link set {device["phy"]} mtu {device["mtu"]} up')
def setup_interfaces(logger, config):
# Set up the Cluster interface
cluster_dev = config["cluster_dev"]
cluster_mtu = config["cluster_mtu"]
cluster_dev_ip = config["cluster_dev_ip"]
logger.out(
f"Setting up Cluster network interface {cluster_dev} with MTU {cluster_mtu}",
state="i",
)
common.run_os_command(f"ip link set {cluster_dev} mtu {cluster_mtu} up")
logger.out(
f"Setting up Cluster network bridge on interface {cluster_dev} with IP {cluster_dev_ip}",
state="i",
)
common.run_os_command("brctl addbr brcluster")
common.run_os_command(f"brctl addif brcluster {cluster_dev}")
common.run_os_command(f"ip link set brcluster mtu {cluster_mtu} up")
common.run_os_command(f"ip address add {cluster_dev_ip} dev brcluster")
# Set up the Storage interface
storage_dev = config["storage_dev"]
storage_mtu = config["storage_mtu"]
storage_dev_ip = config["storage_dev_ip"]
logger.out(
f"Setting up Storage network interface {storage_dev} with MTU {storage_mtu}",
state="i",
)
common.run_os_command(f"ip link set {storage_dev} mtu {storage_mtu} up")
if storage_dev == cluster_dev:
if storage_dev_ip != cluster_dev_ip:
logger.out(
f"Setting up Storage network on Cluster network bridge with IP {storage_dev_ip}",
state="i",
)
common.run_os_command(f"ip address add {storage_dev_ip} dev brcluster")
else:
logger.out(
f"Setting up Storage network bridge on interface {storage_dev} with IP {storage_dev_ip}",
state="i",
)
common.run_os_command("brctl addbr brstorage")
common.run_os_command(f"brctl addif brstorage {storage_dev}")
common.run_os_command(f"ip link set brstorage mtu {storage_mtu} up")
common.run_os_command(f"ip address add {storage_dev_ip} dev brstorage")
# Set up the Upstream interface
upstream_dev = config["upstream_dev"]
upstream_mtu = config["upstream_mtu"]
upstream_dev_ip = config["upstream_dev_ip"]
logger.out(
f"Setting up Upstream network interface {upstream_dev} with MTU {upstream_mtu}",
state="i",
)
if upstream_dev == cluster_dev:
if upstream_dev_ip != cluster_dev_ip:
logger.out(
f"Setting up Upstream network on Cluster network bridge with IP {upstream_dev_ip}",
state="i",
)
common.run_os_command(f"ip address add {upstream_dev_ip} dev brcluster")
else:
logger.out(
f"Setting up Upstream network bridge on interface {upstream_dev} with IP {upstream_dev_ip}",
state="i",
)
common.run_os_command("brctl addbr brupstream")
common.run_os_command(f"brctl addif brupstream {upstream_dev}")
common.run_os_command(f"ip link set brupstream mtu {upstream_mtu} up")
common.run_os_command(f"ip address add {upstream_dev_ip} dev brupstream")
upstream_gateway = config["upstream_gateway"]
if upstream_gateway is not None:
logger.out(
f"Setting up Upstream network default gateway IP {upstream_gateway}",
state="i",
)
if upstream_dev == cluster_dev:
common.run_os_command(
f"ip route add default via {upstream_gateway} dev brcluster"
)
else:
common.run_os_command(
f"ip route add default via {upstream_gateway} dev brupstream"
)
# Set up sysctl tweaks to optimize networking
# Enable routing functions
common.run_os_command("sysctl net.ipv4.ip_forward=1")
common.run_os_command("sysctl net.ipv6.ip_forward=1")
# Enable send redirects
common.run_os_command("sysctl net.ipv4.conf.all.send_redirects=1")
common.run_os_command("sysctl net.ipv4.conf.default.send_redirects=1")
common.run_os_command("sysctl net.ipv6.conf.all.send_redirects=1")
common.run_os_command("sysctl net.ipv6.conf.default.send_redirects=1")
# Accept source routes
common.run_os_command("sysctl net.ipv4.conf.all.accept_source_route=1")
common.run_os_command("sysctl net.ipv4.conf.default.accept_source_route=1")
common.run_os_command("sysctl net.ipv6.conf.all.accept_source_route=1")
common.run_os_command("sysctl net.ipv6.conf.default.accept_source_route=1")
# Disable RP filtering on Cluster and Upstream interfaces (to allow traffic pivoting)
common.run_os_command(f"sysctl net.ipv4.conf.{cluster_dev}.rp_filter=0")
common.run_os_command("sysctl net.ipv4.conf.brcluster.rp_filter=0")
common.run_os_command(f"sysctl net.ipv4.conf.{upstream_dev}.rp_filter=0")
common.run_os_command("sysctl net.ipv4.conf.brupstream.rp_filter=0")
common.run_os_command(f"sysctl net.ipv6.conf.{cluster_dev}.rp_filter=0")
common.run_os_command("sysctl net.ipv6.conf.brcluster.rp_filter=0")
common.run_os_command(f"sysctl net.ipv6.conf.{upstream_dev}.rp_filter=0")
common.run_os_command("sysctl net.ipv6.conf.brupstream.rp_filter=0")
# Stop DNSMasq if it is running
common.run_os_command("systemctl stop dnsmasq.service")
logger.out("Waiting 3 seconds for networking to come up", state="s")
sleep(3)
def create_nft_configuration(logger, config):
if config["enable_networking"]:
logger.out("Creating NFT firewall configuration", state="i")
dynamic_directory = config["nft_dynamic_directory"]
# Create directories
makedirs(f"{dynamic_directory}/networks", exist_ok=True)
makedirs(f"{dynamic_directory}/static", exist_ok=True)
# Set up the base rules
nftables_base_rules = f"""# Base rules
flush ruleset
# Add the filter table and chains
add table inet filter
add chain inet filter forward {{ type filter hook forward priority 0; }}
add chain inet filter input {{ type filter hook input priority 0; }}
# Include static rules and network rules
include "{dynamic_directory}/static/*"
include "{dynamic_directory}/networks/*"
"""
# Write the base firewall config
nftables_base_filename = f"{dynamic_directory}/base.nft"
with open(nftables_base_filename, "w") as nftfh:
nftfh.write(nftables_base_rules)
common.reload_firewall_rules(nftables_base_filename, logger)

View File

@ -0,0 +1,99 @@
#!/usr/bin/env python3
# services.py - Utility functions for pvcnoded external services
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import daemon_lib.common as common
from time import sleep
def start_zookeeper(logger, config):
if config["daemon_mode"] == "coordinator":
logger.out("Starting Zookeeper daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start zookeeper.service")
def start_libvirtd(logger, config):
if config["enable_hypervisor"]:
logger.out("Starting Libvirt daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start libvirtd.service")
def start_patroni(logger, config):
if config["enable_networking"] and config["daemon_mode"] == "coordinator":
logger.out("Starting Patroni daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start patroni.service")
def start_frrouting(logger, config):
if config["enable_networking"] and config["daemon_mode"] == "coordinator":
logger.out("Starting FRRouting daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start frr.service")
def start_ceph_mon(logger, config):
if config["enable_storage"] and config["daemon_mode"] == "coordinator":
logger.out("Starting Ceph Monitor daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command(
f'systemctl start ceph-mon@{config["node_hostname"]}.service'
)
def start_ceph_mgr(logger, config):
if config["enable_storage"] and config["daemon_mode"] == "coordinator":
logger.out("Starting Ceph Manager daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command(
f'systemctl start ceph-mgr@{config["node_hostname"]}.service'
)
def start_keydb(logger, config):
if (config["enable_api"] or config["enable_worker"]) and config[
"daemon_mode"
] == "coordinator":
logger.out("Starting KeyDB daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start keydb-server.service")
def start_worker(logger, config):
if config["enable_worker"]:
logger.out("Starting Celery Worker daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start pvcworkerd.service")
def start_system_services(logger, config):
start_zookeeper(logger, config)
start_libvirtd(logger, config)
start_patroni(logger, config)
start_frrouting(logger, config)
start_ceph_mon(logger, config)
start_ceph_mgr(logger, config)
start_keydb(logger, config)
start_worker(logger, config)
logger.out("Waiting 10 seconds for daemons to start", state="s")
sleep(10)

View File

@ -0,0 +1,187 @@
#!/usr/bin/env python3
# <Filename> - <Description>
# zookeeper.py - Utility functions for pvcnoded Zookeeper connections
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
##############################################################################
from daemon_lib.zkhandler import ZKHandler
import os
import time
def connect(logger, config):
# Create an instance of the handler
zkhandler = ZKHandler(config, logger)
try:
logger.out(
"Connecting to Zookeeper on coordinator nodes {}".format(
config["coordinators"]
),
state="i",
)
# Start connection
zkhandler.connect(persistent=True)
except Exception as e:
logger.out(
"ERROR: Failed to connect to Zookeeper cluster: {}".format(e), state="e"
)
os._exit(1)
logger.out("Validating Zookeeper schema", state="i")
try:
node_schema_version = int(
zkhandler.read(("node.data.active_schema", config["node_hostname"]))
)
except Exception:
node_schema_version = int(zkhandler.read("base.schema.version"))
zkhandler.write(
[
(
("node.data.active_schema", config["node_hostname"]),
node_schema_version,
)
]
)
# Load in the current node schema version
zkhandler.schema.load(node_schema_version)
# Record the latest intalled schema version
latest_schema_version = zkhandler.schema.find_latest()
logger.out("Latest installed schema is {}".format(latest_schema_version), state="i")
zkhandler.write(
[(("node.data.latest_schema", config["node_hostname"]), latest_schema_version)]
)
# If we are the last node to get a schema update, fire the master update
if latest_schema_version > node_schema_version:
node_latest_schema_version = list()
for node in zkhandler.children("base.node"):
node_latest_schema_version.append(
int(zkhandler.read(("node.data.latest_schema", node)))
)
# This is true if all elements of the latest schema version are identical to the latest version,
# i.e. they have all had the latest schema installed and ready to load.
if node_latest_schema_version.count(latest_schema_version) == len(
node_latest_schema_version
):
zkhandler.write([("base.schema.version", latest_schema_version)])
return zkhandler, node_schema_version
def validate_schema(logger, zkhandler):
# Validate our schema against the active version
if not zkhandler.schema.validate(zkhandler, logger):
logger.out("Found schema violations, applying", state="i")
zkhandler.schema.apply(zkhandler)
else:
logger.out("Schema successfully validated", state="o")
def setup_node(logger, config, zkhandler):
# Check if our node exists in Zookeeper, and create it if not
if config["daemon_mode"] == "coordinator":
init_routerstate = "secondary"
else:
init_routerstate = "client"
if zkhandler.exists(("node", config["node_hostname"])):
logger.out(
f"Node is {logger.fmt_green}present{logger.fmt_end} in Zookeeper", state="i"
)
# Update static data just in case it's changed
zkhandler.write(
[
(("node", config["node_hostname"]), config["daemon_mode"]),
(("node.mode", config["node_hostname"]), config["daemon_mode"]),
(("node.state.daemon", config["node_hostname"]), "init"),
(("node.state.router", config["node_hostname"]), init_routerstate),
(
("node.data.static", config["node_hostname"]),
" ".join(config["static_data"]),
),
(
("node.data.pvc_version", config["node_hostname"]),
config["daemon_version"],
),
(
("node.ipmi.hostname", config["node_hostname"]),
config["ipmi_hostname"],
),
(
("node.ipmi.username", config["node_hostname"]),
config["ipmi_username"],
),
(
("node.ipmi.password", config["node_hostname"]),
config["ipmi_password"],
),
]
)
else:
logger.out(
f"Node is {logger.fmt_red}absent{logger.fmt_end} in Zookeeper; adding new node",
state="i",
)
keepalive_time = int(time.time())
zkhandler.write(
[
(("node", config["node_hostname"]), config["daemon_mode"]),
(("node.keepalive", config["node_hostname"]), str(keepalive_time)),
(("node.mode", config["node_hostname"]), config["daemon_mode"]),
(("node.state.daemon", config["node_hostname"]), "init"),
(("node.state.domain", config["node_hostname"]), "flushed"),
(("node.state.router", config["node_hostname"]), init_routerstate),
(
("node.data.static", config["node_hostname"]),
" ".join(config["static_data"]),
),
(
("node.data.pvc_version", config["node_hostname"]),
config["daemon_version"],
),
(
("node.ipmi.hostname", config["node_hostname"]),
config["ipmi_hostname"],
),
(
("node.ipmi.username", config["node_hostname"]),
config["ipmi_username"],
),
(
("node.ipmi.password", config["node_hostname"]),
config["ipmi_password"],
),
(("node.memory.total", config["node_hostname"]), "0"),
(("node.memory.used", config["node_hostname"]), "0"),
(("node.memory.free", config["node_hostname"]), "0"),
(("node.memory.allocated", config["node_hostname"]), "0"),
(("node.memory.provisioned", config["node_hostname"]), "0"),
(("node.vcpu.allocated", config["node_hostname"]), "0"),
(("node.cpu.load", config["node_hostname"]), "0.0"),
(("node.running_domains", config["node_hostname"]), "0"),
(("node.count.provisioned_domains", config["node_hostname"]), "0"),
(("node.count.networks", config["node_hostname"]), "0"),
]
)

View File

@ -27,7 +27,6 @@ import pvcnoded.util.services
import pvcnoded.util.libvirt
import pvcnoded.util.zookeeper
import pvcnoded.objects.MonitoringInstance as MonitoringInstance
import pvcnoded.objects.DNSAggregatorInstance as DNSAggregatorInstance
import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance
import pvcnoded.objects.VMInstance as VMInstance
@ -59,7 +58,6 @@ version = "0.9.82"
def entrypoint():
keepalive_timer = None
monitoring_instance = None
# Get our configuration
config = pvcnoded.util.config.get_configuration()
@ -202,7 +200,7 @@ def entrypoint():
# Define a cleanup function
def cleanup(failure=False):
nonlocal logger, zkhandler, keepalive_timer, d_domain, monitoring_instance
nonlocal logger, zkhandler, keepalive_timer, d_domain
logger.out("Terminating pvcnoded and cleaning up", state="s")
@ -251,13 +249,6 @@ def entrypoint():
except Exception:
pass
# Shut down the monitoring system
try:
logger.out("Shutting down monitoring subsystem", state="s")
monitoring_instance.shutdown()
except Exception:
pass
# Set stop state in Zookeeper
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
@ -1009,11 +1000,6 @@ def entrypoint():
state="i",
)
# Set up the node monitoring instance and thread
monitoring_instance = MonitoringInstance.MonitoringInstance(
zkhandler, config, logger, this_node
)
# Start keepalived thread
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(
logger, config, zkhandler, this_node