Clean up config naming and dead files

This commit is contained in:
Joshua Boniface 2023-11-29 16:34:17 -05:00
parent 4a2eba0961
commit 97eb63ebab
7 changed files with 3 additions and 1689 deletions

View File

@ -421,10 +421,7 @@ class MonitoringInstance(object):
plugin_results.append(future.result())
for result in sorted(plugin_results, key=lambda x: x.plugin_name):
if (
self.config["log_keepalives"]
and self.config["log_keepalive_plugin_details"]
):
if self.config["log_monitoring_details"]:
self.logger.out(
result.message + f" [-{result.health_delta}]",
state="t",

View File

@ -332,12 +332,7 @@ def get_configuration_current(config_file):
"log_colours": o_logging.get("log_colours", False),
"log_dates": o_logging.get("log_dates", False),
"log_keepalives": o_logging.get("log_keepalives", False),
"log_keepalive_cluster_details": o_logging.get(
"log_cluster_details", False
),
"log_keepalive_plugin_details": o_logging.get(
"log_monitoring_details", False
),
"log_monitoring_details": o_logging.get("log_monitoring_details", False),
"console_log_lines": o_logging.get("console_log_lines", False),
"node_log_lines": o_logging.get("node_log_lines", False),
}
@ -476,12 +471,7 @@ def get_configuration_legacy(pvcnoded_config_file):
"log_colours": o_logging.get("log_colours", False),
"log_dates": o_logging.get("log_dates", False),
"log_keepalives": o_logging.get("log_keepalives", False),
"log_keepalive_cluster_details": o_logging.get(
"log_keepalive_cluster_details", False
),
"log_keepalive_plugin_details": o_logging.get(
"log_keepalive_plugin_details", False
),
"log_monitoring_details": o_logging.get("log_keepalive_plugin_details", False),
"console_log_lines": o_logging.get("console_log_lines", False),
"node_log_lines": o_logging.get("node_log_lines", False),
}

View File

@ -1,338 +0,0 @@
#!/usr/bin/env python3
# fencing.py - Utility functions for pvcnoded fencing
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import time
import daemon_lib.common as common
from daemon_lib.vm import vm_worker_flush_locks
#
# Fence thread entry function
#
def fence_node(node_name, zkhandler, config, logger):
# We allow exactly 6 saving throws (30 seconds) for the host to come back online or we kill it
failcount_limit = 6
failcount = 0
while failcount < failcount_limit:
# Wait 5 seconds
time.sleep(config["keepalive_interval"])
# Get the state
node_daemon_state = zkhandler.read(("node.state.daemon", node_name))
# Is it still 'dead'
if node_daemon_state == "dead":
failcount += 1
logger.out(
f"Node {node_name} failed {failcount}/{failcount_limit} saving throws",
state="s",
prefix=f"fencing {node_name}",
)
# It changed back to something else so it must be alive
else:
logger.out(
f"Node {node_name} passed a saving throw; cancelling fance",
state="o",
prefix=f"fencing {node_name}",
)
return
logger.out(
f"Fencing node {node_name} via IPMI reboot signal",
state="s",
prefix=f"fencing {node_name}",
)
# Get IPMI information
ipmi_hostname = zkhandler.read(("node.ipmi.hostname", node_name))
ipmi_username = zkhandler.read(("node.ipmi.username", node_name))
ipmi_password = zkhandler.read(("node.ipmi.password", node_name))
# Shoot it in the head
fence_status = reboot_via_ipmi(
node_name, ipmi_hostname, ipmi_username, ipmi_password, logger
)
# Hold to ensure the fence takes effect and system stabilizes
logger.out(
f"Waiting {config['keepalive_interval']}s for fence of node {node_name} to take effect",
state="i",
prefix=f"fencing {node_name}",
)
time.sleep(config["keepalive_interval"])
if fence_status:
logger.out(
f"Marking node {node_name} as fenced",
state="i",
prefix=f"fencing {node_name}",
)
while True:
try:
zkhandler.write([(("node.state.daemon", node_name), "fenced")])
break
except Exception:
continue
# Force into secondary network state if needed
if node_name in config["coordinators"]:
logger.out(
f"Forcing secondary coordinator state for node {node_name}",
state="i",
prefix=f"fencing {node_name}",
)
zkhandler.write([(("node.state.router", node_name), "secondary")])
if zkhandler.read("base.config.primary_node") == node_name:
zkhandler.write([("base.config.primary_node", "none")])
# If the fence succeeded and successful_fence is migrate
if fence_status and config["successful_fence"] == "migrate":
migrateFromFencedNode(zkhandler, node_name, config, logger)
# If the fence failed and failed_fence is migrate
if (
not fence_status
and config["failed_fence"] == "migrate"
and config["suicide_intervals"] != "0"
):
migrateFromFencedNode(zkhandler, node_name, config, logger)
# Migrate hosts away from a fenced node
def migrateFromFencedNode(zkhandler, node_name, config, logger):
logger.out(
f"Migrating VMs from dead node {node_name} to new hosts",
state="i",
prefix=f"fencing {node_name}",
)
# Get the list of VMs
dead_node_running_domains = zkhandler.read(
("node.running_domains", node_name)
).split()
# Set the node to a custom domainstate so we know what's happening
zkhandler.write([(("node.state.domain", node_name), "fence-flush")])
# Migrate a VM after a flush
def fence_migrate_vm(dom_uuid):
logger.out(
f"Flushing locks of VM {dom_uuid} due to fence",
state="i",
prefix=f"fencing {node_name}",
)
vm_worker_flush_locks(zkhandler, None, dom_uuid, force_unlock=True)
target_node = common.findTargetNode(zkhandler, dom_uuid)
if target_node is not None:
logger.out(
f"Migrating VM {dom_uuid} to node {target_node}",
state="i",
prefix=f"fencing {node_name}",
)
zkhandler.write(
[
(("domain.state", dom_uuid), "start"),
(("domain.node", dom_uuid), target_node),
(("domain.last_node", dom_uuid), node_name),
]
)
logger.out(
f"Successfully migrated running VM {dom_uuid} to node {target_node}",
state="o",
prefix=f"fencing {node_name}",
)
else:
logger.out(
f"No target node found for VM {dom_uuid}; marking autostart=True on current node",
state="i",
prefix=f"fencing {node_name}",
)
zkhandler.write(
{
(("domain.state", dom_uuid), "stopped"),
(("domain.meta.autostart", dom_uuid), "True"),
}
)
logger.out(
f"Successfully marked autostart for running VM {dom_uuid} on current node",
state="o",
prefix=f"fencing {node_name}",
)
# Loop through the VMs
for dom_uuid in dead_node_running_domains:
try:
fence_migrate_vm(dom_uuid)
except Exception as e:
logger.out(
f"Failed to migrate VM {dom_uuid}, continuing: {e}",
state="w",
prefix=f"fencing {node_name}",
)
# Set node in flushed state for easy remigrating when it comes back
zkhandler.write([(("node.state.domain", node_name), "flushed")])
logger.out(
f"All VMs flushed from dead node {node_name} to other nodes",
state="i",
prefix=f"fencing {node_name}",
)
#
# Perform an IPMI fence
#
def reboot_via_ipmi(node_name, ipmi_hostname, ipmi_user, ipmi_password, logger):
# Power off the node the node
logger.out(
"Sending power off to dead node",
state="i",
prefix=f"fencing {node_name}",
)
ipmi_stop_retcode, ipmi_stop_stdout, ipmi_stop_stderr = common.run_os_command(
f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power off"
)
if ipmi_stop_retcode != 0:
logger.out(
f"Failed to power off dead node: {ipmi_stop_stderr}",
state="e",
prefix=f"fencing {node_name}",
)
logger.out(
"Waiting 5s for power off to take effect",
state="i",
prefix=f"fencing {node_name}",
)
time.sleep(5)
# Check the chassis power state
logger.out(
"Checking power state of dead node",
state="i",
prefix=f"fencing {node_name}",
)
ipmi_status_retcode, ipmi_status_stdout, ipmi_status_stderr = common.run_os_command(
f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status"
)
if ipmi_status_retcode == 0:
logger.out(
f"Current chassis power state is: {ipmi_status_stdout.strip()}",
state="i",
prefix=f"fencing {node_name}",
)
else:
logger.out(
"Current chassis power state is: Unknown",
state="w",
prefix=f"fencing {node_name}",
)
# Power on the node
logger.out(
"Sending power on to dead node",
state="i",
prefix=f"fencing {node_name}",
)
ipmi_start_retcode, ipmi_start_stdout, ipmi_start_stderr = common.run_os_command(
f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power on"
)
if ipmi_start_retcode != 0:
logger.out(
f"Failed to power on dead node: {ipmi_start_stderr}",
state="w",
prefix=f"fencing {node_name}",
)
logger.out(
"Waiting 2s for power on to take effect",
state="i",
prefix=f"fencing {node_name}",
)
time.sleep(2)
# Check the chassis power state
logger.out(
"Checking power state of dead node",
state="i",
prefix=f"fencing {node_name}",
)
ipmi_status_retcode, ipmi_status_stdout, ipmi_status_stderr = common.run_os_command(
f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status"
)
if ipmi_stop_retcode == 0:
if ipmi_status_stdout.strip() == "Chassis Power is on":
# We successfully rebooted the node and it is powered on; this is a succeessful fence
logger.out(
"Successfully rebooted dead node; proceeding with fence recovery action",
state="o",
prefix=f"fencing {node_name}",
)
return True
elif ipmi_status_stdout.strip() == "Chassis Power is off":
# We successfully rebooted the node but it is powered off; this might be expected or not, but the node is confirmed off so we can call it a successful fence
logger.out(
"Chassis power is in confirmed off state after successfuly IPMI reboot; proceeding with fence recovery action",
state="o",
prefix=f"fencing {node_name}",
)
return True
else:
# We successfully rebooted the node but it is in some unknown power state; since this might indicate a silent failure, we must call it a failed fence
logger.out(
f"Chassis power is in an unknown state ({ipmi_status_stdout.strip()}) after successful IPMI reboot; NOT proceeding fence recovery action",
state="e",
prefix=f"fencing {node_name}",
)
return False
else:
if ipmi_status_stdout.strip() == "Chassis Power is off":
# We failed to reboot the node but it is powered off; it has probably suffered a serious hardware failure, but the node is confirmed off so we can call it a successful fence
logger.out(
"Chassis power is in confirmed off state after failed IPMI reboot; proceeding with fence recovery action",
state="o",
prefix=f"fencing {node_name}",
)
return True
else:
# We failed to reboot the node but it is in some unknown power state (including "on"); since this might indicate a silent failure, we must call it a failed fence
logger.out(
"Chassis power is not in confirmed off state after failed IPMI reboot; NOT proceeding wiht fence recovery action",
state="e",
prefix=f"fencing {node_name}",
)
return False
#
# Verify that IPMI connectivity to this host exists (used during node init)
#
def verify_ipmi(ipmi_hostname, ipmi_user, ipmi_password):
ipmi_command = f"/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status"
retcode, stdout, stderr = common.run_os_command(ipmi_command, timeout=2)
if retcode == 0 and stdout.strip() == "Chassis Power is on":
return True
else:
return False

View File

@ -1,968 +0,0 @@
#!/usr/bin/env python3
# keepalive.py - Utility functions for pvcnoded Keepalives
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvcnoded.util.fencing
import daemon_lib.common as common
from apscheduler.schedulers.background import BackgroundScheduler
from rados import Rados
from xml.etree import ElementTree
from queue import Queue
from threading import Thread
from datetime import datetime
import json
import re
import libvirt
import psutil
import os
import time
# State table for pretty stats
libvirt_vm_states = {
0: "NOSTATE",
1: "RUNNING",
2: "BLOCKED",
3: "PAUSED",
4: "SHUTDOWN",
5: "SHUTOFF",
6: "CRASHED",
7: "PMSUSPENDED",
}
def start_keepalive_timer(logger, config, zkhandler, this_node):
keepalive_interval = config["keepalive_interval"]
logger.out(
f"Starting keepalive timer ({keepalive_interval} second interval)", state="s"
)
keepalive_timer = BackgroundScheduler()
keepalive_timer.add_job(
node_keepalive,
args=(logger, config, zkhandler, this_node),
trigger="interval",
seconds=keepalive_interval,
)
keepalive_timer.start()
return keepalive_timer
def stop_keepalive_timer(logger, keepalive_timer):
try:
keepalive_timer.shutdown()
logger.out("Stopping keepalive timer", state="s")
except Exception:
logger.out("Failed to stop keepalive timer", state="w")
# Ceph stats update function
def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
pool_list = zkhandler.children("base.pool")
osd_list = zkhandler.children("base.osd")
debug = config["debug"]
if debug:
logger.out("Thread starting", state="d", prefix="ceph-thread")
# Connect to the Ceph cluster
try:
ceph_conn = Rados(
conffile=config["ceph_config_file"],
conf=dict(keyring=config["ceph_admin_keyring"]),
)
if debug:
logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
ceph_conn.connect(timeout=1)
except Exception as e:
logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e")
return
# Primary-only functions
if this_node.coordinator_state == "primary":
# Get Ceph status information (pretty)
if debug:
logger.out(
"Set Ceph status information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
command = {"prefix": "status", "format": "pretty"}
ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
try:
zkhandler.write([("base.storage", str(ceph_status))])
except Exception as e:
logger.out("Failed to set Ceph status data: {}".format(e), state="e")
# Get Ceph health information (JSON)
if debug:
logger.out(
"Set Ceph health information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
command = {"prefix": "health", "format": "json"}
ceph_health = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
try:
zkhandler.write([("base.storage.health", str(ceph_health))])
except Exception as e:
logger.out("Failed to set Ceph health data: {}".format(e), state="e")
# Get Ceph df information (pretty)
if debug:
logger.out(
"Set Ceph rados df information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
# Get rados df info
command = {"prefix": "df", "format": "pretty"}
ceph_df = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1].decode(
"ascii"
)
try:
zkhandler.write([("base.storage.util", str(ceph_df))])
except Exception as e:
logger.out("Failed to set Ceph utilization data: {}".format(e), state="e")
if debug:
logger.out(
"Set pool information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
# Get pool info
command = {"prefix": "df", "format": "json"}
ceph_df_output = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
try:
ceph_pool_df_raw = json.loads(ceph_df_output)["pools"]
except Exception as e:
logger.out("Failed to obtain Pool data (ceph df): {}".format(e), state="w")
ceph_pool_df_raw = []
retcode, stdout, stderr = common.run_os_command(
"rados df --format json", timeout=1
)
try:
rados_pool_df_raw = json.loads(stdout)["pools"]
except Exception as e:
logger.out("Failed to obtain Pool data (rados df): {}".format(e), state="w")
rados_pool_df_raw = []
pool_count = len(ceph_pool_df_raw)
if debug:
logger.out(
"Getting info for {} pools".format(pool_count),
state="d",
prefix="ceph-thread",
)
for pool_idx in range(0, pool_count):
try:
# Combine all the data for this pool
ceph_pool_df = ceph_pool_df_raw[pool_idx]
rados_pool_df = rados_pool_df_raw[pool_idx]
pool = ceph_pool_df
pool.update(rados_pool_df)
# Ignore any pools that aren't in our pool list
if pool["name"] not in pool_list:
if debug:
logger.out(
"Pool {} not in pool list {}".format(
pool["name"], pool_list
),
state="d",
prefix="ceph-thread",
)
continue
else:
if debug:
logger.out(
"Parsing data for pool {}".format(pool["name"]),
state="d",
prefix="ceph-thread",
)
# Assemble a useful data structure
pool_df = {
"id": pool["id"],
"stored_bytes": pool["stats"]["stored"],
"free_bytes": pool["stats"]["max_avail"],
"used_bytes": pool["stats"]["bytes_used"],
"used_percent": pool["stats"]["percent_used"],
"num_objects": pool["stats"]["objects"],
"num_object_clones": pool["num_object_clones"],
"num_object_copies": pool["num_object_copies"],
"num_objects_missing_on_primary": pool[
"num_objects_missing_on_primary"
],
"num_objects_unfound": pool["num_objects_unfound"],
"num_objects_degraded": pool["num_objects_degraded"],
"read_ops": pool["read_ops"],
"read_bytes": pool["read_bytes"],
"write_ops": pool["write_ops"],
"write_bytes": pool["write_bytes"],
}
# Write the pool data to Zookeeper
zkhandler.write(
[(("pool.stats", pool["name"]), str(json.dumps(pool_df)))]
)
except Exception as e:
# One or more of the status commands timed out, just continue
logger.out(
"Failed to format and send pool data: {}".format(e), state="w"
)
pass
# Only grab OSD stats if there are OSDs to grab (otherwise `ceph osd df` hangs)
osds_this_node = 0
if len(osd_list) > 0:
# Get data from Ceph OSDs
if debug:
logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
# Parse the dump data
osd_dump = dict()
command = {"prefix": "osd dump", "format": "json"}
osd_dump_output = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
try:
osd_dump_raw = json.loads(osd_dump_output)["osds"]
except Exception as e:
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
osd_dump_raw = []
if debug:
logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
for osd in osd_dump_raw:
osd_dump.update(
{
str(osd["osd"]): {
"uuid": osd["uuid"],
"up": osd["up"],
"in": osd["in"],
"primary_affinity": osd["primary_affinity"],
}
}
)
# Parse the df data
if debug:
logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
osd_df = dict()
command = {"prefix": "osd df", "format": "json"}
try:
osd_df_raw = json.loads(
ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1]
)["nodes"]
except Exception as e:
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
osd_df_raw = []
if debug:
logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
for osd in osd_df_raw:
osd_df.update(
{
str(osd["id"]): {
"utilization": osd["utilization"],
"var": osd["var"],
"pgs": osd["pgs"],
"kb": osd["kb"],
"kb_used": osd["kb_used"],
"kb_used_data": osd["kb_used_data"],
"kb_used_omap": osd["kb_used_omap"],
"kb_used_meta": osd["kb_used_meta"],
"kb_avail": osd["kb_avail"],
"weight": osd["crush_weight"],
"reweight": osd["reweight"],
"class": osd["device_class"],
}
}
)
# Parse the status data
if debug:
logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
osd_status = dict()
command = {"prefix": "osd status", "format": "pretty"}
try:
osd_status_raw = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
except Exception as e:
logger.out("Failed to obtain OSD status data: {}".format(e), state="w")
osd_status_raw = []
if debug:
logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
for line in osd_status_raw.split("\n"):
# Strip off colour
line = re.sub(r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))", "", line)
# Split it for parsing
line = line.split()
# Ceph 14 format:
# ['|', '0', '|', 'hv1.p.u.bonilan.net', '|', '318G', '|', '463G', '|', '213', '|', '1430k', '|', '22', '|', '124k', '|', 'exists,up', '|']
# Ceph 16 format:
# ['0', 'hv1.t.u.bonilan.net', '2489M', '236G', '0', '0', '0', '0', 'exists,up']
# Bypass obviously invalid lines
if len(line) < 1:
continue
elif line[0] == "+":
continue
try:
# If line begins with | and second entry is a digit (i.e. OSD ID)
if line[0] == "|" and line[1].isdigit():
# Parse the line in Ceph 14 format
osd_id = line[1]
node = line[3].split(".")[0]
used = line[5]
avail = line[7]
wr_ops = line[9]
wr_data = line[11]
rd_ops = line[13]
rd_data = line[15]
state = line[17]
# If first entry is a digit (i.e. OSD ID)
elif line[0].isdigit():
# Parse the line in Ceph 16 format
osd_id = line[0]
node = line[1].split(".")[0]
used = line[2]
avail = line[3]
wr_ops = line[4]
wr_data = line[5]
rd_ops = line[6]
rd_data = line[7]
state = line[8]
# Otherwise, it's the header line and is ignored
else:
continue
except IndexError:
continue
# I don't know why 2018 me used this construct instead of a normal
# dictionary update, but it works so not changing it.
# ref: bfbe9188ce830381f3f2fa1da11f1973f08eca8c
osd_status.update(
{
str(osd_id): {
"node": node,
"used": used,
"avail": avail,
"wr_ops": wr_ops,
"wr_data": wr_data,
"rd_ops": rd_ops,
"rd_data": rd_data,
"state": state,
}
}
)
# Merge them together into a single meaningful dict
if debug:
logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
osd_stats = dict()
for osd in osd_list:
if zkhandler.read(("osd.node", osd)) == config["node_hostname"]:
osds_this_node += 1
try:
this_dump = osd_dump[osd]
this_dump.update(osd_df[osd])
this_dump.update(osd_status[osd])
osd_stats[osd] = this_dump
except KeyError as e:
# One or more of the status commands timed out, just continue
logger.out(
"Failed to parse OSD stats into dictionary: {}".format(e), state="w"
)
# Upload OSD data for the cluster (primary-only)
if this_node.coordinator_state == "primary":
if debug:
logger.out(
"Trigger updates for each OSD", state="d", prefix="ceph-thread"
)
for osd in osd_list:
try:
stats = json.dumps(osd_stats[osd])
zkhandler.write([(("osd.stats", osd), str(stats))])
except KeyError as e:
# One or more of the status commands timed out, just continue
logger.out(
"Failed to upload OSD stats from dictionary: {}".format(e),
state="w",
)
ceph_conn.shutdown()
queue.put(osds_this_node)
if debug:
logger.out("Thread finished", state="d", prefix="ceph-thread")
# VM stats update function
def collect_vm_stats(logger, config, zkhandler, this_node, queue):
debug = config["debug"]
if debug:
logger.out("Thread starting", state="d", prefix="vm-thread")
# Connect to libvirt
libvirt_name = "qemu:///system"
if debug:
logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
try:
lv_conn = libvirt.open(libvirt_name)
if lv_conn is None:
raise Exception
except Exception:
logger.out('Failed to open connection to "{}"'.format(libvirt_name), state="e")
return
memalloc = 0
memprov = 0
vcpualloc = 0
# Toggle state management of dead VMs to restart them
if debug:
logger.out(
"Toggle state management of dead VMs to restart them",
state="d",
prefix="vm-thread",
)
# Make a copy of the d_domain; if not, and it changes in flight, this can fail
fixed_d_domain = this_node.d_domain.copy()
for domain, instance in fixed_d_domain.items():
if domain in this_node.domain_list:
if instance.getstate() == "start" and instance.getnode() == this_node.name:
if instance.getdom() is not None:
try:
if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
logger.out(
"VM {} has failed".format(instance.domname),
state="w",
prefix="vm-thread",
)
raise
except Exception:
# Toggle a state "change"
logger.out(
"Resetting state to {} for VM {}".format(
instance.getstate(), instance.domname
),
state="i",
prefix="vm-thread",
)
zkhandler.write(
[(("domain.state", domain), instance.getstate())]
)
elif instance.getnode() == this_node.name:
memprov += instance.getmemory()
# Get list of running domains from Libvirt
running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE)
# Get statistics from any running VMs
for domain in running_domains:
try:
# Get basic information about the VM
tree = ElementTree.fromstring(domain.XMLDesc())
domain_uuid = domain.UUIDString()
domain_name = domain.name()
# Get all the raw information about the VM
if debug:
logger.out(
"Getting general statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
(
domain_state,
domain_maxmem,
domain_mem,
domain_vcpus,
domain_cputime,
) = domain.info()
# We can't properly gather stats from a non-running VMs so continue
if domain_state != libvirt.VIR_DOMAIN_RUNNING:
continue
domain_memory_stats = domain.memoryStats()
domain_cpu_stats = domain.getCPUStats(True)[0]
# Add the allocated memory to our memalloc value
memalloc += instance.getmemory()
memprov += instance.getmemory()
vcpualloc += instance.getvcpus()
except Exception as e:
if debug:
try:
logger.out(
"Failed getting VM information for {}: {}".format(
domain.name(), e
),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
# Ensure VM is present in the domain_list
if domain_uuid not in this_node.domain_list:
this_node.domain_list.append(domain_uuid)
if debug:
logger.out(
"Getting disk statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
domain_disk_stats = []
try:
for disk in tree.findall("devices/disk"):
disk_name = disk.find("source").get("name")
if not disk_name:
disk_name = disk.find("source").get("file")
disk_stats = domain.blockStats(disk.find("target").get("dev"))
domain_disk_stats.append(
{
"name": disk_name,
"rd_req": disk_stats[0],
"rd_bytes": disk_stats[1],
"wr_req": disk_stats[2],
"wr_bytes": disk_stats[3],
"err": disk_stats[4],
}
)
except Exception as e:
if debug:
try:
logger.out(
"Failed getting disk stats for {}: {}".format(domain.name(), e),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
if debug:
logger.out(
"Getting network statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
domain_network_stats = []
try:
for interface in tree.findall("devices/interface"):
interface_type = interface.get("type")
if interface_type not in ["bridge"]:
continue
interface_name = interface.find("target").get("dev")
interface_bridge = interface.find("source").get("bridge")
interface_stats = domain.interfaceStats(interface_name)
domain_network_stats.append(
{
"name": interface_name,
"bridge": interface_bridge,
"rd_bytes": interface_stats[0],
"rd_packets": interface_stats[1],
"rd_errors": interface_stats[2],
"rd_drops": interface_stats[3],
"wr_bytes": interface_stats[4],
"wr_packets": interface_stats[5],
"wr_errors": interface_stats[6],
"wr_drops": interface_stats[7],
}
)
except Exception as e:
if debug:
try:
logger.out(
"Failed getting network stats for {}: {}".format(
domain.name(), e
),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
# Create the final dictionary
domain_stats = {
"state": libvirt_vm_states[domain_state],
"maxmem": domain_maxmem,
"livemem": domain_mem,
"cpus": domain_vcpus,
"cputime": domain_cputime,
"mem_stats": domain_memory_stats,
"cpu_stats": domain_cpu_stats,
"disk_stats": domain_disk_stats,
"net_stats": domain_network_stats,
}
if debug:
logger.out(
"Writing statistics for VM {} to Zookeeper".format(domain_name),
state="d",
prefix="vm-thread",
)
try:
zkhandler.write(
[(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))]
)
except Exception as e:
if debug:
logger.out(
"Failed to write domain statistics: {}".format(e),
state="d",
prefix="vm-thread",
)
# Close the Libvirt connection
lv_conn.close()
if debug:
logger.out(
f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}",
state="d",
prefix="vm-thread",
)
queue.put(len(running_domains))
queue.put(memalloc)
queue.put(memprov)
queue.put(vcpualloc)
if debug:
logger.out("Thread finished", state="d", prefix="vm-thread")
# Keepalive update function
def node_keepalive(logger, config, zkhandler, this_node):
debug = config["debug"]
# Display node information to the terminal
if config["log_keepalives"]:
if this_node.coordinator_state == "primary":
cst_colour = logger.fmt_green
elif this_node.coordinator_state == "secondary":
cst_colour = logger.fmt_blue
else:
cst_colour = logger.fmt_cyan
active_coordinator_state = this_node.coordinator_state
runtime_start = datetime.now()
# Set the migration selector in Zookeeper for clients to read
if config["enable_hypervisor"]:
if this_node.coordinator_state == "primary":
try:
if (
zkhandler.read("base.config.migration_target_selector")
!= config["migration_target_selector"]
):
zkhandler.write(
[
(
"base.config.migration_target_selector",
config["migration_target_selector"],
)
]
)
except Exception:
logger.out(
"Failed to set migration target selector in Zookeeper",
state="e",
prefix="main-thread",
)
# Set the upstream IP in Zookeeper for clients to read
if config["enable_networking"]:
if this_node.coordinator_state == "primary":
try:
if (
zkhandler.read("base.config.upstream_ip")
!= config["upstream_floating_ip"]
):
zkhandler.write(
[("base.config.upstream_ip", config["upstream_floating_ip"])]
)
except Exception:
logger.out(
"Failed to set upstream floating IP in Zookeeper",
state="e",
prefix="main-thread",
)
# Get past state and update if needed
if debug:
logger.out(
"Get past state and update if needed", state="d", prefix="main-thread"
)
past_state = zkhandler.read(("node.state.daemon", this_node.name))
if past_state != "run" and past_state != "shutdown":
this_node.daemon_state = "run"
zkhandler.write([(("node.state.daemon", this_node.name), "run")])
else:
this_node.daemon_state = "run"
# Ensure the primary key is properly set
if debug:
logger.out(
"Ensure the primary key is properly set", state="d", prefix="main-thread"
)
if this_node.coordinator_state == "primary":
if zkhandler.read("base.config.primary_node") != this_node.name:
zkhandler.write([("base.config.primary_node", this_node.name)])
# Run VM statistics collection in separate thread for parallelization
if config["enable_hypervisor"]:
vm_thread_queue = Queue()
vm_stats_thread = Thread(
target=collect_vm_stats,
args=(logger, config, zkhandler, this_node, vm_thread_queue),
kwargs={},
)
vm_stats_thread.start()
# Run Ceph status collection in separate thread for parallelization
if config["enable_storage"]:
ceph_thread_queue = Queue()
ceph_stats_thread = Thread(
target=collect_ceph_stats,
args=(logger, config, zkhandler, this_node, ceph_thread_queue),
kwargs={},
)
ceph_stats_thread.start()
# Get node performance statistics
this_node.memtotal = int(psutil.virtual_memory().total / 1024 / 1024)
this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024)
this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024)
this_node.cpuload = round(os.getloadavg()[0], 2)
# Join against running threads
if config["enable_hypervisor"]:
vm_stats_thread.join(timeout=config["keepalive_interval"])
if vm_stats_thread.is_alive():
logger.out("VM stats gathering exceeded timeout, continuing", state="w")
if config["enable_storage"]:
ceph_stats_thread.join(timeout=config["keepalive_interval"])
if ceph_stats_thread.is_alive():
logger.out("Ceph stats gathering exceeded timeout, continuing", state="w")
# Get information from thread queues
if config["enable_hypervisor"]:
try:
this_node.domains_count = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.memalloc = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.memprov = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.vcpualloc = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
except Exception:
logger.out("VM stats queue get exceeded timeout, continuing", state="w")
else:
this_node.domains_count = 0
this_node.memalloc = 0
this_node.memprov = 0
this_node.vcpualloc = 0
if config["enable_storage"]:
try:
osds_this_node = ceph_thread_queue.get(
timeout=(config["keepalive_interval"] - 1)
)
except Exception:
logger.out("Ceph stats queue get exceeded timeout, continuing", state="w")
osds_this_node = "?"
else:
osds_this_node = "0"
# Set our information in zookeeper
keepalive_time = int(time.time())
if debug:
logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
try:
zkhandler.write(
[
(("node.memory.total", this_node.name), str(this_node.memtotal)),
(("node.memory.used", this_node.name), str(this_node.memused)),
(("node.memory.free", this_node.name), str(this_node.memfree)),
(("node.memory.allocated", this_node.name), str(this_node.memalloc)),
(("node.memory.provisioned", this_node.name), str(this_node.memprov)),
(("node.vcpu.allocated", this_node.name), str(this_node.vcpualloc)),
(("node.cpu.load", this_node.name), str(this_node.cpuload)),
(
("node.count.provisioned_domains", this_node.name),
str(this_node.domains_count),
),
(
("node.running_domains", this_node.name),
" ".join(this_node.domain_list),
),
(("node.keepalive", this_node.name), str(keepalive_time)),
]
)
except Exception:
logger.out("Failed to set keepalive data", state="e")
if config["log_keepalives"]:
runtime_end = datetime.now()
runtime_delta = runtime_end - runtime_start
runtime = "{:0.02f}".format(runtime_delta.total_seconds())
logger.out(
"{start_colour}{hostname} keepalive @ {starttime}{nofmt} [{cst_colour}{costate}{nofmt}] in {runtime} seconds".format(
start_colour=logger.fmt_purple,
cst_colour=logger.fmt_bold + cst_colour,
nofmt=logger.fmt_end,
hostname=config["node_hostname"],
starttime=runtime_start,
costate=active_coordinator_state,
runtime=runtime,
),
state="t",
)
if this_node.maintenance is True:
maintenance_colour = logger.fmt_blue
else:
maintenance_colour = logger.fmt_green
if isinstance(this_node.health, int):
if this_node.health > 90:
health_colour = logger.fmt_green
elif this_node.health > 50:
health_colour = logger.fmt_yellow
else:
health_colour = logger.fmt_red
health_text = str(this_node.health) + "%"
else:
health_colour = logger.fmt_blue
health_text = "N/A"
if config["log_keepalive_cluster_details"]:
logger.out(
"{bold}Maintenance:{nofmt} {maintenance_colour}{maintenance}{nofmt} "
"{bold}Health:{nofmt} {health_colour}{health}{nofmt} "
"{bold}VMs:{nofmt} {domcount} "
"{bold}OSDs:{nofmt} {osdcount} "
"{bold}Load:{nofmt} {load} "
"{bold}Memory [MiB]: "
"{bold}Used:{nofmt} {usedmem} "
"{bold}Free:{nofmt} {freemem}".format(
bold=logger.fmt_bold,
maintenance_colour=maintenance_colour,
health_colour=health_colour,
nofmt=logger.fmt_end,
maintenance=this_node.maintenance,
health=health_text,
domcount=this_node.domains_count,
osdcount=osds_this_node,
load=this_node.cpuload,
freemem=this_node.memfree,
usedmem=this_node.memused,
),
state="t",
)
# Look for dead nodes and fence them
if not this_node.maintenance:
if debug:
logger.out(
"Look for dead nodes and fence them", state="d", prefix="main-thread"
)
if config["daemon_mode"] == "coordinator":
for node_name in zkhandler.children("base.node"):
try:
node_daemon_state = zkhandler.read(("node.state.daemon", node_name))
node_keepalive = int(zkhandler.read(("node.keepalive", node_name)))
except Exception:
node_daemon_state = "unknown"
node_keepalive = 0
# Handle deadtime and fencng if needed
# (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds
# out-of-date while in 'start' state)
node_deadtime = int(time.time()) - (
int(config["keepalive_interval"]) * int(config["fence_intervals"])
)
if node_keepalive < node_deadtime and node_daemon_state == "run":
logger.out(
"Node {} seems dead - starting monitor for fencing".format(
node_name
),
state="w",
)
zk_lock = zkhandler.writelock(("node.state.daemon", node_name))
with zk_lock:
# Ensures that, if we lost the lock race and come out of waiting,
# we won't try to trigger our own fence thread.
if zkhandler.read(("node.state.daemon", node_name)) != "dead":
fence_thread = Thread(
target=pvcnoded.util.fencing.fence_node,
args=(node_name, zkhandler, config, logger),
kwargs={},
)
fence_thread.start()
# Write the updated data after we start the fence thread
zkhandler.write(
[(("node.state.daemon", node_name), "dead")]
)

View File

@ -1,36 +0,0 @@
#!/usr/bin/env python3
# libvirt.py - Utility functions for pvcnoded libvirt
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import libvirt
def validate_libvirtd(logger, config):
if config["enable_hypervisor"]:
libvirt_check_name = f'qemu+tcp://{config["node_hostname"]}/system'
logger.out(f"Connecting to Libvirt daemon at {libvirt_check_name}", state="i")
try:
lv_conn = libvirt.open(libvirt_check_name)
lv_conn.close()
except Exception as e:
logger.out(f"Failed to connect to Libvirt daemon: {e}", state="e")
return False
return True

View File

@ -1,232 +0,0 @@
#!/usr/bin/env python3
# networking.py - Utility functions for pvcnoded networking
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import daemon_lib.common as common
from time import sleep
from os import makedirs
def setup_sriov(logger, config):
logger.out("Setting up SR-IOV device support", state="i")
# Enable unsafe interrupts for the vfio_iommu_type1 kernel module
try:
common.run_os_command("modprobe vfio_iommu_type1 allow_unsafe_interrupts=1")
with open(
"/sys/module/vfio_iommu_type1/parameters/allow_unsafe_interrupts", "w"
) as mfh:
mfh.write("Y")
except Exception:
logger.out(
"Failed to enable vfio_iommu_type1 kernel module; SR-IOV may fail",
state="w",
)
# Loop through our SR-IOV NICs and enable the numvfs for each
for device in config["sriov_device"]:
logger.out(
f'Preparing SR-IOV PF {device["phy"]} with {device["vfcount"]} VFs',
state="i",
)
try:
with open(
f'/sys/class/net/{device["phy"]}/device/sriov_numvfs', "r"
) as vfh:
current_vf_count = vfh.read().strip()
with open(
f'/sys/class/net/{device["phy"]}/device/sriov_numvfs', "w"
) as vfh:
vfh.write(str(device["vfcount"]))
except FileNotFoundError:
logger.out(
f'Failed to open SR-IOV configuration for PF {device["phy"]}; device may not support SR-IOV',
state="w",
)
except OSError:
logger.out(
f'Failed to set SR-IOV VF count for PF {device["phy"]} to {device["vfcount"]}; already set to {current_vf_count}',
state="w",
)
if device.get("mtu", None) is not None:
logger.out(
f'Setting SR-IOV PF {device["phy"]} to MTU {device["mtu"]}', state="i"
)
common.run_os_command(f'ip link set {device["phy"]} mtu {device["mtu"]} up')
def setup_interfaces(logger, config):
# Set up the Cluster interface
cluster_dev = config["cluster_dev"]
cluster_mtu = config["cluster_mtu"]
cluster_dev_ip = config["cluster_dev_ip"]
logger.out(
f"Setting up Cluster network interface {cluster_dev} with MTU {cluster_mtu}",
state="i",
)
common.run_os_command(f"ip link set {cluster_dev} mtu {cluster_mtu} up")
logger.out(
f"Setting up Cluster network bridge on interface {cluster_dev} with IP {cluster_dev_ip}",
state="i",
)
common.run_os_command("brctl addbr brcluster")
common.run_os_command(f"brctl addif brcluster {cluster_dev}")
common.run_os_command(f"ip link set brcluster mtu {cluster_mtu} up")
common.run_os_command(f"ip address add {cluster_dev_ip} dev brcluster")
# Set up the Storage interface
storage_dev = config["storage_dev"]
storage_mtu = config["storage_mtu"]
storage_dev_ip = config["storage_dev_ip"]
logger.out(
f"Setting up Storage network interface {storage_dev} with MTU {storage_mtu}",
state="i",
)
common.run_os_command(f"ip link set {storage_dev} mtu {storage_mtu} up")
if storage_dev == cluster_dev:
if storage_dev_ip != cluster_dev_ip:
logger.out(
f"Setting up Storage network on Cluster network bridge with IP {storage_dev_ip}",
state="i",
)
common.run_os_command(f"ip address add {storage_dev_ip} dev brcluster")
else:
logger.out(
f"Setting up Storage network bridge on interface {storage_dev} with IP {storage_dev_ip}",
state="i",
)
common.run_os_command("brctl addbr brstorage")
common.run_os_command(f"brctl addif brstorage {storage_dev}")
common.run_os_command(f"ip link set brstorage mtu {storage_mtu} up")
common.run_os_command(f"ip address add {storage_dev_ip} dev brstorage")
# Set up the Upstream interface
upstream_dev = config["upstream_dev"]
upstream_mtu = config["upstream_mtu"]
upstream_dev_ip = config["upstream_dev_ip"]
logger.out(
f"Setting up Upstream network interface {upstream_dev} with MTU {upstream_mtu}",
state="i",
)
if upstream_dev == cluster_dev:
if upstream_dev_ip != cluster_dev_ip:
logger.out(
f"Setting up Upstream network on Cluster network bridge with IP {upstream_dev_ip}",
state="i",
)
common.run_os_command(f"ip address add {upstream_dev_ip} dev brcluster")
else:
logger.out(
f"Setting up Upstream network bridge on interface {upstream_dev} with IP {upstream_dev_ip}",
state="i",
)
common.run_os_command("brctl addbr brupstream")
common.run_os_command(f"brctl addif brupstream {upstream_dev}")
common.run_os_command(f"ip link set brupstream mtu {upstream_mtu} up")
common.run_os_command(f"ip address add {upstream_dev_ip} dev brupstream")
upstream_gateway = config["upstream_gateway"]
if upstream_gateway is not None:
logger.out(
f"Setting up Upstream network default gateway IP {upstream_gateway}",
state="i",
)
if upstream_dev == cluster_dev:
common.run_os_command(
f"ip route add default via {upstream_gateway} dev brcluster"
)
else:
common.run_os_command(
f"ip route add default via {upstream_gateway} dev brupstream"
)
# Set up sysctl tweaks to optimize networking
# Enable routing functions
common.run_os_command("sysctl net.ipv4.ip_forward=1")
common.run_os_command("sysctl net.ipv6.ip_forward=1")
# Enable send redirects
common.run_os_command("sysctl net.ipv4.conf.all.send_redirects=1")
common.run_os_command("sysctl net.ipv4.conf.default.send_redirects=1")
common.run_os_command("sysctl net.ipv6.conf.all.send_redirects=1")
common.run_os_command("sysctl net.ipv6.conf.default.send_redirects=1")
# Accept source routes
common.run_os_command("sysctl net.ipv4.conf.all.accept_source_route=1")
common.run_os_command("sysctl net.ipv4.conf.default.accept_source_route=1")
common.run_os_command("sysctl net.ipv6.conf.all.accept_source_route=1")
common.run_os_command("sysctl net.ipv6.conf.default.accept_source_route=1")
# Disable RP filtering on Cluster and Upstream interfaces (to allow traffic pivoting)
common.run_os_command(f"sysctl net.ipv4.conf.{cluster_dev}.rp_filter=0")
common.run_os_command("sysctl net.ipv4.conf.brcluster.rp_filter=0")
common.run_os_command(f"sysctl net.ipv4.conf.{upstream_dev}.rp_filter=0")
common.run_os_command("sysctl net.ipv4.conf.brupstream.rp_filter=0")
common.run_os_command(f"sysctl net.ipv6.conf.{cluster_dev}.rp_filter=0")
common.run_os_command("sysctl net.ipv6.conf.brcluster.rp_filter=0")
common.run_os_command(f"sysctl net.ipv6.conf.{upstream_dev}.rp_filter=0")
common.run_os_command("sysctl net.ipv6.conf.brupstream.rp_filter=0")
# Stop DNSMasq if it is running
common.run_os_command("systemctl stop dnsmasq.service")
logger.out("Waiting 3 seconds for networking to come up", state="s")
sleep(3)
def create_nft_configuration(logger, config):
if config["enable_networking"]:
logger.out("Creating NFT firewall configuration", state="i")
dynamic_directory = config["nft_dynamic_directory"]
# Create directories
makedirs(f"{dynamic_directory}/networks", exist_ok=True)
makedirs(f"{dynamic_directory}/static", exist_ok=True)
# Set up the base rules
nftables_base_rules = f"""# Base rules
flush ruleset
# Add the filter table and chains
add table inet filter
add chain inet filter forward {{ type filter hook forward priority 0; }}
add chain inet filter input {{ type filter hook input priority 0; }}
# Include static rules and network rules
include "{dynamic_directory}/static/*"
include "{dynamic_directory}/networks/*"
"""
# Write the base firewall config
nftables_base_filename = f"{dynamic_directory}/base.nft"
with open(nftables_base_filename, "w") as nftfh:
nftfh.write(nftables_base_rules)
common.reload_firewall_rules(nftables_base_filename, logger)

View File

@ -1,99 +0,0 @@
#!/usr/bin/env python3
# services.py - Utility functions for pvcnoded external services
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import daemon_lib.common as common
from time import sleep
def start_zookeeper(logger, config):
if config["daemon_mode"] == "coordinator":
logger.out("Starting Zookeeper daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start zookeeper.service")
def start_libvirtd(logger, config):
if config["enable_hypervisor"]:
logger.out("Starting Libvirt daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start libvirtd.service")
def start_patroni(logger, config):
if config["enable_networking"] and config["daemon_mode"] == "coordinator":
logger.out("Starting Patroni daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start patroni.service")
def start_frrouting(logger, config):
if config["enable_networking"] and config["daemon_mode"] == "coordinator":
logger.out("Starting FRRouting daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start frr.service")
def start_ceph_mon(logger, config):
if config["enable_storage"] and config["daemon_mode"] == "coordinator":
logger.out("Starting Ceph Monitor daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command(
f'systemctl start ceph-mon@{config["node_hostname"]}.service'
)
def start_ceph_mgr(logger, config):
if config["enable_storage"] and config["daemon_mode"] == "coordinator":
logger.out("Starting Ceph Manager daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command(
f'systemctl start ceph-mgr@{config["node_hostname"]}.service'
)
def start_keydb(logger, config):
if (config["enable_api"] or config["enable_worker"]) and config[
"daemon_mode"
] == "coordinator":
logger.out("Starting KeyDB daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start keydb-server.service")
def start_worker(logger, config):
if config["enable_worker"]:
logger.out("Starting Celery Worker daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start pvcworkerd.service")
def start_system_services(logger, config):
start_zookeeper(logger, config)
start_libvirtd(logger, config)
start_patroni(logger, config)
start_frrouting(logger, config)
start_ceph_mon(logger, config)
start_ceph_mgr(logger, config)
start_keydb(logger, config)
start_worker(logger, config)
logger.out("Waiting 10 seconds for daemons to start", state="s")
sleep(10)