1029 lines
42 KiB
Python
1029 lines
42 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# Daemon.py - PVC Node daemon main entrypoing
|
|
# Part of the Parallel Virtual Cluster (PVC) system
|
|
#
|
|
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, version 3.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
#
|
|
###############################################################################
|
|
|
|
import pvcnoded.util.keepalive
|
|
import pvcnoded.util.config
|
|
import pvcnoded.util.fencing
|
|
import pvcnoded.util.networking
|
|
import pvcnoded.util.services
|
|
import pvcnoded.util.libvirt
|
|
import pvcnoded.util.zookeeper
|
|
|
|
import pvcnoded.objects.DNSAggregatorInstance as DNSAggregatorInstance
|
|
import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance
|
|
import pvcnoded.objects.VMInstance as VMInstance
|
|
import pvcnoded.objects.NodeInstance as NodeInstance
|
|
import pvcnoded.objects.VXNetworkInstance as VXNetworkInstance
|
|
import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance
|
|
import pvcnoded.objects.CephInstance as CephInstance
|
|
|
|
import daemon_lib.log as log
|
|
import daemon_lib.common as common
|
|
|
|
from time import sleep
|
|
from distutils.util import strtobool
|
|
|
|
import os
|
|
import sys
|
|
import signal
|
|
import re
|
|
import json
|
|
|
|
# Daemon version
|
|
version = "0.9.56"
|
|
|
|
|
|
##########################################################
|
|
# Entrypoint
|
|
##########################################################
|
|
|
|
|
|
def entrypoint():
|
|
keepalive_timer = None
|
|
|
|
# Get our configuration
|
|
config = pvcnoded.util.config.get_configuration()
|
|
config["pvcnoded_version"] = version
|
|
|
|
# Set some useful booleans for later (fewer characters)
|
|
debug = config["debug"]
|
|
if debug:
|
|
print("DEBUG MODE ENABLED")
|
|
|
|
# Create and validate our directories
|
|
pvcnoded.util.config.validate_directories(config)
|
|
|
|
# Set up the logger instance
|
|
logger = log.Logger(config)
|
|
|
|
# Print our startup message
|
|
logger.out("")
|
|
logger.out("|----------------------------------------------------------|")
|
|
logger.out("| |")
|
|
logger.out("| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |")
|
|
logger.out("| ██ ▜█▙ ▟█▛ ██ |")
|
|
logger.out("| ███████████ ▜█▙ ▟█▛ ██ |")
|
|
logger.out("| ██ ▜█▙▟█▛ ███████████ |")
|
|
logger.out("| |")
|
|
logger.out("|----------------------------------------------------------|")
|
|
logger.out("| Parallel Virtual Cluster node daemon v{0: <18} |".format(version))
|
|
logger.out("| Debug: {0: <49} |".format(str(config["debug"])))
|
|
logger.out("| FQDN: {0: <50} |".format(config["node_fqdn"]))
|
|
logger.out("| Host: {0: <50} |".format(config["node_hostname"]))
|
|
logger.out("| ID: {0: <52} |".format(config["node_id"]))
|
|
logger.out("| IPMI hostname: {0: <41} |".format(config["ipmi_hostname"]))
|
|
logger.out("| Machine details: |")
|
|
logger.out("| CPUs: {0: <48} |".format(config["static_data"][0]))
|
|
logger.out("| Arch: {0: <48} |".format(config["static_data"][3]))
|
|
logger.out("| OS: {0: <50} |".format(config["static_data"][2]))
|
|
logger.out("| Kernel: {0: <46} |".format(config["static_data"][1]))
|
|
logger.out("|----------------------------------------------------------|")
|
|
logger.out("")
|
|
logger.out(f'Starting pvcnoded on host {config["node_fqdn"]}', state="s")
|
|
|
|
if config["enable_networking"]:
|
|
if config["enable_sriov"]:
|
|
# Set up SR-IOV devices
|
|
pvcnoded.util.networking.setup_sriov(logger, config)
|
|
|
|
# Set up our interfaces
|
|
pvcnoded.util.networking.setup_interfaces(logger, config)
|
|
|
|
# Get list of coordinator nodes
|
|
coordinator_nodes = config["coordinators"]
|
|
|
|
if config["node_hostname"] in coordinator_nodes:
|
|
# We are indeed a coordinator node
|
|
config["daemon_mode"] = "coordinator"
|
|
logger.out(
|
|
f"This node is a {logger.fmt_blue}coordinator{logger.fmt_end}", state="i"
|
|
)
|
|
else:
|
|
# We are a hypervisor node
|
|
config["daemon_mode"] = "hypervisor"
|
|
logger.out(
|
|
f"This node is a {logger.fmt_cyan}hypervisor{logger.fmt_end}", state="i"
|
|
)
|
|
|
|
pvcnoded.util.services.start_system_services(logger, config)
|
|
|
|
# Connect to Zookeeper and return our handler and current schema version
|
|
zkhandler, node_schema_version = pvcnoded.util.zookeeper.connect(logger, config)
|
|
|
|
# Watch for a global schema update and fire
|
|
# This will only change by the API when triggered after seeing all nodes can update
|
|
@zkhandler.zk_conn.DataWatch(zkhandler.schema.path("base.schema.version"))
|
|
def update_schema(new_schema_version, stat, event=""):
|
|
nonlocal zkhandler, keepalive_timer, node_schema_version
|
|
|
|
try:
|
|
new_schema_version = int(new_schema_version.decode("ascii"))
|
|
except Exception:
|
|
new_schema_version = 0
|
|
|
|
if new_schema_version == node_schema_version:
|
|
return True
|
|
|
|
logger.out("Hot update of schema version started", state="s")
|
|
logger.out(
|
|
f"Current version: {node_schema_version,} New version: {new_schema_version}",
|
|
state="s",
|
|
)
|
|
|
|
# Prevent any keepalive updates while this happens
|
|
if keepalive_timer is not None:
|
|
pvcnoded.util.keepalive.stop_keepalive_timer(logger, keepalive_timer)
|
|
sleep(1)
|
|
|
|
# Perform the migration (primary only)
|
|
if zkhandler.read("base.config.primary_node") == config["node_hostname"]:
|
|
logger.out("Primary node acquiring exclusive lock", state="s")
|
|
# Wait for things to settle
|
|
sleep(0.5)
|
|
# Acquire a write lock on the root key
|
|
with zkhandler.exclusivelock("base.schema.version"):
|
|
# Perform the schema migration tasks
|
|
logger.out("Performing schema update", state="s")
|
|
if new_schema_version > node_schema_version:
|
|
zkhandler.schema.migrate(zkhandler, new_schema_version)
|
|
if new_schema_version < node_schema_version:
|
|
zkhandler.schema.rollback(zkhandler, new_schema_version)
|
|
# Wait for the exclusive lock to be lifted
|
|
else:
|
|
logger.out("Non-primary node acquiring read lock", state="s")
|
|
# Wait for things to settle
|
|
sleep(1)
|
|
# Wait for a read lock
|
|
lock = zkhandler.readlock("base.schema.version")
|
|
lock.acquire()
|
|
# Wait a bit more for the primary to return to normal
|
|
sleep(1)
|
|
|
|
# Update the local schema version
|
|
logger.out("Updating node target schema version", state="s")
|
|
zkhandler.write(
|
|
[(("node.data.active_schema", config["node_hostname"]), new_schema_version)]
|
|
)
|
|
node_schema_version = new_schema_version
|
|
|
|
# Restart the API daemons if applicable
|
|
logger.out("Restarting services", state="s")
|
|
common.run_os_command("systemctl restart pvcapid-worker.service")
|
|
if zkhandler.read("base.config.primary_node") == config["node_hostname"]:
|
|
common.run_os_command("systemctl restart pvcapid.service")
|
|
|
|
# Restart ourselves with the new schema
|
|
logger.out("Reloading node daemon", state="s")
|
|
try:
|
|
zkhandler.disconnect(persistent=True)
|
|
del zkhandler
|
|
except Exception:
|
|
pass
|
|
os.execv(sys.argv[0], sys.argv)
|
|
|
|
# Validate the schema
|
|
pvcnoded.util.zookeeper.validate_schema(logger, zkhandler)
|
|
|
|
# Define a cleanup function
|
|
def cleanup(failure=False):
|
|
nonlocal logger, zkhandler, keepalive_timer, d_domain
|
|
|
|
logger.out("Terminating pvcnoded and cleaning up", state="s")
|
|
|
|
# Set shutdown state in Zookeeper
|
|
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "shutdown")])
|
|
|
|
# Waiting for any flushes to complete
|
|
logger.out("Waiting for any active flushes", state="s")
|
|
try:
|
|
if this_node is not None:
|
|
while this_node.flush_thread is not None:
|
|
sleep(0.5)
|
|
except Exception:
|
|
# We really don't care here, just proceed
|
|
pass
|
|
|
|
# Stop console logging on all VMs
|
|
logger.out("Stopping domain console watchers", state="s")
|
|
try:
|
|
if d_domain is not None:
|
|
for domain in d_domain:
|
|
if d_domain[domain].getnode() == config["node_hostname"]:
|
|
d_domain[domain].console_log_instance.stop()
|
|
except Exception:
|
|
pass
|
|
|
|
# Force into secondary coordinator state if needed
|
|
try:
|
|
if this_node.router_state == "primary" and len(d_node) > 1:
|
|
zkhandler.write([("base.config.primary_node", "none")])
|
|
logger.out("Waiting for primary migration", state="s")
|
|
timeout = 240
|
|
count = 0
|
|
while this_node.router_state != "secondary" and count < timeout:
|
|
sleep(0.5)
|
|
count += 1
|
|
except Exception:
|
|
pass
|
|
|
|
# Stop keepalive thread
|
|
try:
|
|
pvcnoded.util.keepalive.stop_keepalive_timer(logger, keepalive_timer)
|
|
|
|
logger.out("Performing final keepalive update", state="s")
|
|
pvcnoded.util.keepalive.node_keepalive(logger, config, zkhandler, this_node)
|
|
except Exception:
|
|
pass
|
|
|
|
# Set stop state in Zookeeper
|
|
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
|
|
|
|
# Forcibly terminate dnsmasq because it gets stuck sometimes
|
|
common.run_os_command("killall dnsmasq")
|
|
|
|
# Close the Zookeeper connection
|
|
try:
|
|
zkhandler.disconnect(persistent=True)
|
|
del zkhandler
|
|
except Exception:
|
|
pass
|
|
|
|
logger.out("Terminated pvc daemon", state="s")
|
|
logger.terminate()
|
|
|
|
if failure:
|
|
retcode = 1
|
|
else:
|
|
retcode = 0
|
|
|
|
os._exit(retcode)
|
|
|
|
# Termination function
|
|
def term(signum="", frame=""):
|
|
cleanup(failure=False)
|
|
|
|
# Hangup (logrotate) function
|
|
def hup(signum="", frame=""):
|
|
if config["file_logging"]:
|
|
logger.hup()
|
|
|
|
# Handle signals gracefully
|
|
signal.signal(signal.SIGTERM, term)
|
|
signal.signal(signal.SIGINT, term)
|
|
signal.signal(signal.SIGQUIT, term)
|
|
signal.signal(signal.SIGHUP, hup)
|
|
|
|
# Set up this node in Zookeeper
|
|
pvcnoded.util.zookeeper.setup_node(logger, config, zkhandler)
|
|
|
|
# Check that the primary node key exists and create it with us as primary if not
|
|
try:
|
|
current_primary = zkhandler.read("base.config.primary_node")
|
|
except Exception:
|
|
current_primary = "none"
|
|
|
|
if current_primary and current_primary != "none":
|
|
logger.out(
|
|
f"Current primary node is {logger.fmt_blue}{current_primary}{logger.fmt_end}",
|
|
state="i",
|
|
)
|
|
else:
|
|
if config["daemon_mode"] == "coordinator":
|
|
logger.out("No primary node found; setting us as primary", state="i")
|
|
zkhandler.write([("base.config.primary_node", config["node_hostname"])])
|
|
|
|
# Ensure that IPMI is reachable and working
|
|
if not pvcnoded.util.fencing.verify_ipmi(
|
|
config["ipmi_hostname"], config["ipmi_username"], config["ipmi_password"]
|
|
):
|
|
logger.out(
|
|
"Our IPMI is not reachable; fencing of this node will likely fail",
|
|
state="w",
|
|
)
|
|
|
|
# Validate libvirt
|
|
if not pvcnoded.util.libvirt.validate_libvirtd(logger, config):
|
|
cleanup(failure=True)
|
|
|
|
# Set up NFT
|
|
pvcnoded.util.networking.create_nft_configuration(logger, config)
|
|
|
|
# Create our object dictionaries
|
|
logger.out("Setting up objects", state="s")
|
|
|
|
d_node = dict()
|
|
node_list = list()
|
|
d_network = dict()
|
|
network_list = list()
|
|
sriov_pf_list = list()
|
|
d_sriov_vf = dict()
|
|
sriov_vf_list = list()
|
|
d_domain = dict()
|
|
domain_list = list()
|
|
d_osd = dict()
|
|
osd_list = list()
|
|
d_pool = dict()
|
|
pool_list = list()
|
|
d_volume = dict()
|
|
volume_list = dict()
|
|
|
|
if config["enable_networking"] and config["daemon_mode"] == "coordinator":
|
|
# Create an instance of the DNS Aggregator and Metadata API if we're a coordinator
|
|
dns_aggregator = DNSAggregatorInstance.DNSAggregatorInstance(config, logger)
|
|
metadata_api = MetadataAPIInstance.MetadataAPIInstance(
|
|
zkhandler, config, logger
|
|
)
|
|
else:
|
|
dns_aggregator = None
|
|
metadata_api = None
|
|
|
|
#
|
|
# Zookeeper watchers for objects
|
|
#
|
|
|
|
# Node objects
|
|
@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path("base.node"))
|
|
def set_nodes(new_node_list):
|
|
nonlocal d_node, node_list
|
|
|
|
# Add missing nodes to list
|
|
for node in [node for node in new_node_list if node not in node_list]:
|
|
d_node[node] = NodeInstance.NodeInstance(
|
|
node,
|
|
config["node_hostname"],
|
|
zkhandler,
|
|
config,
|
|
logger,
|
|
d_node,
|
|
d_network,
|
|
d_domain,
|
|
dns_aggregator,
|
|
metadata_api,
|
|
)
|
|
|
|
# Remove deleted nodes from list
|
|
for node in [node for node in node_list if node not in new_node_list]:
|
|
del d_node[node]
|
|
|
|
node_list = new_node_list
|
|
logger.out(
|
|
f'{logger.fmt_blue}Node list:{logger.fmt_end} {" ".join(node_list)}',
|
|
state="i",
|
|
)
|
|
|
|
# Update node objects lists
|
|
for node in d_node:
|
|
d_node[node].update_node_list(d_node)
|
|
|
|
# Create helpful alias for this node
|
|
this_node = d_node[config["node_hostname"]]
|
|
|
|
# Maintenance status
|
|
@zkhandler.zk_conn.DataWatch(zkhandler.schema.path("base.config.maintenance"))
|
|
def update_maintenance(_maintenance, stat):
|
|
try:
|
|
maintenance = bool(strtobool(_maintenance.decode("ascii")))
|
|
except Exception:
|
|
maintenance = False
|
|
|
|
this_node.maintenance = maintenance
|
|
|
|
# Primary node
|
|
@zkhandler.zk_conn.DataWatch(zkhandler.schema.path("base.config.primary_node"))
|
|
def update_primary_node(new_primary, stat, event=""):
|
|
try:
|
|
new_primary = new_primary.decode("ascii")
|
|
except AttributeError:
|
|
new_primary = "none"
|
|
key_version = stat.version
|
|
|
|
# TODO: Move this to the Node structure
|
|
if new_primary != this_node.primary_node:
|
|
if config["daemon_mode"] == "coordinator":
|
|
# We're a coordinator and there's no primary
|
|
if new_primary == "none":
|
|
if (
|
|
this_node.daemon_state == "run"
|
|
and this_node.router_state
|
|
not in ["primary", "takeover", "relinquish"]
|
|
):
|
|
logger.out(
|
|
"Contending for primary coordinator state", state="i"
|
|
)
|
|
# Acquire an exclusive lock on the primary_node key
|
|
primary_lock = zkhandler.exclusivelock(
|
|
"base.config.primary_node"
|
|
)
|
|
try:
|
|
# This lock times out after 0.4s, which is 0.1s less than the pre-takeover
|
|
# timeout beow. This ensures a primary takeover will not deadlock against
|
|
# a node which has failed the contention
|
|
primary_lock.acquire(timeout=0.4)
|
|
# Ensure that when we get the lock the versions are still consistent and
|
|
# that another node hasn't already acquired the primary state (maybe we're
|
|
# extremely slow to respond)
|
|
if (
|
|
key_version
|
|
== zkhandler.zk_conn.get(
|
|
zkhandler.schema.path("base.config.primary_node")
|
|
)[1].version
|
|
):
|
|
# Set the primary to us
|
|
logger.out(
|
|
"Acquiring primary coordinator state", state="o"
|
|
)
|
|
zkhandler.write(
|
|
[
|
|
(
|
|
"base.config.primary_node",
|
|
config["node_hostname"],
|
|
)
|
|
]
|
|
)
|
|
# Cleanly release the lock
|
|
primary_lock.release()
|
|
# We timed out acquiring a lock, or failed to write, which means we failed the
|
|
# contention and should just log that
|
|
except Exception:
|
|
logger.out(
|
|
"Timed out contending for primary coordinator state",
|
|
state="i",
|
|
)
|
|
elif new_primary == config["node_hostname"]:
|
|
if this_node.router_state == "secondary":
|
|
# Wait for 0.5s to ensure other contentions time out, then take over
|
|
sleep(0.5)
|
|
zkhandler.write(
|
|
[
|
|
(
|
|
("node.state.router", config["node_hostname"]),
|
|
"takeover",
|
|
)
|
|
]
|
|
)
|
|
else:
|
|
if this_node.router_state == "primary":
|
|
# Wait for 0.5s to ensure other contentions time out, then relinquish
|
|
sleep(0.5)
|
|
zkhandler.write(
|
|
[
|
|
(
|
|
("node.state.router", config["node_hostname"]),
|
|
"relinquish",
|
|
)
|
|
]
|
|
)
|
|
else:
|
|
zkhandler.write(
|
|
[(("node.state.router", config["node_hostname"]), "client")]
|
|
)
|
|
|
|
# TODO: Turn this into a function like the others for clarity
|
|
for node in d_node:
|
|
d_node[node].primary_node = new_primary
|
|
|
|
if config["enable_networking"]:
|
|
# Network objects
|
|
@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path("base.network"))
|
|
def update_networks(new_network_list):
|
|
nonlocal network_list, d_network
|
|
|
|
# Add any missing networks to the list
|
|
for network in [
|
|
network for network in new_network_list if network not in network_list
|
|
]:
|
|
d_network[network] = VXNetworkInstance.VXNetworkInstance(
|
|
network, zkhandler, config, logger, this_node, dns_aggregator
|
|
)
|
|
# TODO: Move this to the Network structure
|
|
if (
|
|
config["daemon_mode"] == "coordinator"
|
|
and d_network[network].nettype == "managed"
|
|
):
|
|
try:
|
|
dns_aggregator.add_network(d_network[network])
|
|
except Exception as e:
|
|
logger.out(
|
|
f"Failed to create DNS Aggregator for network {network}: {e}",
|
|
state="w",
|
|
)
|
|
# Start primary functionality
|
|
if (
|
|
this_node.router_state == "primary"
|
|
and d_network[network].nettype == "managed"
|
|
):
|
|
d_network[network].createGateways()
|
|
d_network[network].startDHCPServer()
|
|
|
|
# Remove any missing networks from the list
|
|
for network in [
|
|
network for network in network_list if network not in new_network_list
|
|
]:
|
|
# TODO: Move this to the Network structure
|
|
if d_network[network].nettype == "managed":
|
|
# Stop primary functionality
|
|
if this_node.router_state == "primary":
|
|
d_network[network].stopDHCPServer()
|
|
d_network[network].removeGateways()
|
|
dns_aggregator.remove_network(d_network[network])
|
|
# Stop firewalling
|
|
d_network[network].removeFirewall()
|
|
# Delete the network
|
|
d_network[network].removeNetwork()
|
|
del d_network[network]
|
|
|
|
# Update the new list
|
|
network_list = new_network_list
|
|
logger.out(
|
|
f'{logger.fmt_blue}Network list:{logger.fmt_end} {" ".join(network_list)}',
|
|
state="i",
|
|
)
|
|
|
|
# Update node objects list
|
|
for node in d_node:
|
|
d_node[node].update_network_list(d_network)
|
|
|
|
# Add the SR-IOV PFs and VFs to Zookeeper
|
|
# These do not behave like the objects; they are not dynamic (the API cannot change them), and they
|
|
# exist for the lifetime of this Node instance. The objects are set here in Zookeeper on a per-node
|
|
# basis, under the Node configuration tree.
|
|
# MIGRATION: The schema.schema.get ensures that the current active Schema contains the required keys
|
|
if (
|
|
config["enable_sriov"]
|
|
and zkhandler.schema.schema.get("sriov_pf", None) is not None
|
|
):
|
|
vf_list = list()
|
|
for device in config["sriov_device"]:
|
|
pf = device["phy"]
|
|
vfcount = device["vfcount"]
|
|
if device.get("mtu", None) is None:
|
|
mtu = 1500
|
|
else:
|
|
mtu = device["mtu"]
|
|
|
|
# Create the PF device in Zookeeper
|
|
zkhandler.write(
|
|
[
|
|
(
|
|
("node.sriov.pf", config["node_hostname"], "sriov_pf", pf),
|
|
"",
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.pf",
|
|
config["node_hostname"],
|
|
"sriov_pf.mtu",
|
|
pf,
|
|
),
|
|
mtu,
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.pf",
|
|
config["node_hostname"],
|
|
"sriov_pf.vfcount",
|
|
pf,
|
|
),
|
|
vfcount,
|
|
),
|
|
]
|
|
)
|
|
# Append the device to the list of PFs
|
|
sriov_pf_list.append(pf)
|
|
|
|
# Get the list of VFs from `ip link show`
|
|
vf_list = json.loads(
|
|
common.run_os_command(f"ip --json link show {pf}")[1]
|
|
)[0].get("vfinfo_list", [])
|
|
for vf in vf_list:
|
|
# {
|
|
# 'vf': 3,
|
|
# 'link_type': 'ether',
|
|
# 'address': '00:00:00:00:00:00',
|
|
# 'broadcast': 'ff:ff:ff:ff:ff:ff',
|
|
# 'vlan_list': [{'vlan': 101, 'qos': 2}],
|
|
# 'rate': {'max_tx': 0, 'min_tx': 0},
|
|
# 'spoofchk': True,
|
|
# 'link_state': 'auto',
|
|
# 'trust': False,
|
|
# 'query_rss_en': False
|
|
# }
|
|
vfphy = f'{pf}v{vf["vf"]}'
|
|
|
|
# Get the PCIe bus information
|
|
dev_pcie_path = None
|
|
try:
|
|
with open(f"/sys/class/net/{vfphy}/device/uevent") as vfh:
|
|
dev_uevent = vfh.readlines()
|
|
for line in dev_uevent:
|
|
if re.match(r"^PCI_SLOT_NAME=.*", line):
|
|
dev_pcie_path = line.rstrip().split("=")[-1]
|
|
except FileNotFoundError:
|
|
# Something must already be using the PCIe device
|
|
pass
|
|
|
|
# Add the VF to Zookeeper if it does not yet exist
|
|
if not zkhandler.exists(
|
|
("node.sriov.vf", config["node_hostname"], "sriov_vf", vfphy)
|
|
):
|
|
if dev_pcie_path is not None:
|
|
pcie_domain, pcie_bus, pcie_slot, pcie_function = re.split(
|
|
r":|\.", dev_pcie_path
|
|
)
|
|
else:
|
|
# We can't add the device - for some reason we can't get any information on its PCIe bus path,
|
|
# so just ignore this one, and continue.
|
|
# This shouldn't happen under any real circumstances, unless the admin tries to attach a non-existent
|
|
# VF to a VM manually, then goes ahead and adds that VF to the system with the VM running.
|
|
continue
|
|
|
|
zkhandler.write(
|
|
[
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf",
|
|
vfphy,
|
|
),
|
|
"",
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.pf",
|
|
vfphy,
|
|
),
|
|
pf,
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.mtu",
|
|
vfphy,
|
|
),
|
|
mtu,
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.mac",
|
|
vfphy,
|
|
),
|
|
vf["address"],
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.phy_mac",
|
|
vfphy,
|
|
),
|
|
vf["address"],
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.config",
|
|
vfphy,
|
|
),
|
|
"",
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.config.vlan_id",
|
|
vfphy,
|
|
),
|
|
vf["vlan_list"][0].get("vlan", "0"),
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.config.vlan_qos",
|
|
vfphy,
|
|
),
|
|
vf["vlan_list"][0].get("qos", "0"),
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.config.tx_rate_min",
|
|
vfphy,
|
|
),
|
|
vf["rate"]["min_tx"],
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.config.tx_rate_max",
|
|
vfphy,
|
|
),
|
|
vf["rate"]["max_tx"],
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.config.spoof_check",
|
|
vfphy,
|
|
),
|
|
vf["spoofchk"],
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.config.link_state",
|
|
vfphy,
|
|
),
|
|
vf["link_state"],
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.config.trust",
|
|
vfphy,
|
|
),
|
|
vf["trust"],
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.config.query_rss",
|
|
vfphy,
|
|
),
|
|
vf["query_rss_en"],
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.pci",
|
|
vfphy,
|
|
),
|
|
"",
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.pci.domain",
|
|
vfphy,
|
|
),
|
|
pcie_domain,
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.pci.bus",
|
|
vfphy,
|
|
),
|
|
pcie_bus,
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.pci.slot",
|
|
vfphy,
|
|
),
|
|
pcie_slot,
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.pci.function",
|
|
vfphy,
|
|
),
|
|
pcie_function,
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.used",
|
|
vfphy,
|
|
),
|
|
False,
|
|
),
|
|
(
|
|
(
|
|
"node.sriov.vf",
|
|
config["node_hostname"],
|
|
"sriov_vf.used_by",
|
|
vfphy,
|
|
),
|
|
"",
|
|
),
|
|
]
|
|
)
|
|
|
|
# Append the device to the list of VFs
|
|
sriov_vf_list.append(vfphy)
|
|
|
|
# Remove any obsolete PFs from Zookeeper if they go away
|
|
for pf in zkhandler.children(("node.sriov.pf", config["node_hostname"])):
|
|
if pf not in sriov_pf_list:
|
|
zkhandler.delete(
|
|
[("node.sriov.pf", config["node_hostname"], "sriov_pf", pf)]
|
|
)
|
|
# Remove any obsolete VFs from Zookeeper if their PF goes away
|
|
for vf in zkhandler.children(("node.sriov.vf", config["node_hostname"])):
|
|
vf_pf = zkhandler.read(
|
|
("node.sriov.vf", config["node_hostname"], "sriov_vf.pf", vf)
|
|
)
|
|
if vf_pf not in sriov_pf_list:
|
|
zkhandler.delete(
|
|
[("node.sriov.vf", config["node_hostname"], "sriov_vf", vf)]
|
|
)
|
|
|
|
# SR-IOV VF objects
|
|
# This is a ChildrenWatch just for consistency; the list never changes at runtime
|
|
@zkhandler.zk_conn.ChildrenWatch(
|
|
zkhandler.schema.path("node.sriov.vf", config["node_hostname"])
|
|
)
|
|
def update_sriov_vfs(new_sriov_vf_list):
|
|
nonlocal sriov_vf_list, d_sriov_vf
|
|
|
|
# Add VFs to the list
|
|
for vf in common.sortInterfaceNames(new_sriov_vf_list):
|
|
d_sriov_vf[vf] = SRIOVVFInstance.SRIOVVFInstance(
|
|
vf, zkhandler, config, logger, this_node
|
|
)
|
|
|
|
sriov_vf_list = sorted(new_sriov_vf_list)
|
|
logger.out(
|
|
f'{logger.fmt_blue}SR-IOV VF list:{logger.fmt_end} {" ".join(sriov_vf_list)}',
|
|
state="i",
|
|
)
|
|
|
|
if config["enable_hypervisor"]:
|
|
# VM command pipeline key
|
|
@zkhandler.zk_conn.DataWatch(zkhandler.schema.path("base.cmd.domain"))
|
|
def run_domain_command(data, stat, event=""):
|
|
if data:
|
|
VMInstance.vm_command(
|
|
zkhandler, logger, this_node, data.decode("ascii")
|
|
)
|
|
|
|
# VM domain objects
|
|
@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path("base.domain"))
|
|
def update_domains(new_domain_list):
|
|
nonlocal domain_list, d_domain
|
|
|
|
# Add missing domains to the list
|
|
for domain in [
|
|
domain for domain in new_domain_list if domain not in domain_list
|
|
]:
|
|
d_domain[domain] = VMInstance.VMInstance(
|
|
domain, zkhandler, config, logger, this_node
|
|
)
|
|
|
|
# Remove any deleted domains from the list
|
|
for domain in [
|
|
domain for domain in domain_list if domain not in new_domain_list
|
|
]:
|
|
del d_domain[domain]
|
|
|
|
# Update the new list
|
|
domain_list = new_domain_list
|
|
logger.out(
|
|
f'{logger.fmt_blue}Domain list:{logger.fmt_end} {" ".join(domain_list)}',
|
|
state="i",
|
|
)
|
|
|
|
# Update node objects' list
|
|
for node in d_node:
|
|
d_node[node].update_domain_list(d_domain)
|
|
|
|
if config["enable_storage"]:
|
|
# Ceph command pipeline key
|
|
@zkhandler.zk_conn.DataWatch(zkhandler.schema.path("base.cmd.ceph"))
|
|
def run_ceph_command(data, stat, event=""):
|
|
if data:
|
|
CephInstance.ceph_command(
|
|
zkhandler, logger, this_node, data.decode("ascii"), d_osd
|
|
)
|
|
|
|
# OSD objects
|
|
@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path("base.osd"))
|
|
def update_osds(new_osd_list):
|
|
nonlocal osd_list, d_osd
|
|
|
|
# Add any missing OSDs to the list
|
|
for osd in [osd for osd in new_osd_list if osd not in osd_list]:
|
|
d_osd[osd] = CephInstance.CephOSDInstance(
|
|
zkhandler, logger, this_node, osd
|
|
)
|
|
|
|
# Remove any deleted OSDs from the list
|
|
for osd in [osd for osd in osd_list if osd not in new_osd_list]:
|
|
del d_osd[osd]
|
|
|
|
# Update the new list
|
|
osd_list = new_osd_list
|
|
logger.out(
|
|
f'{logger.fmt_blue}OSD list:{logger.fmt_end} {" ".join(osd_list)}',
|
|
state="i",
|
|
)
|
|
|
|
# Pool objects
|
|
@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path("base.pool"))
|
|
def update_pools(new_pool_list):
|
|
nonlocal pool_list, d_pool, volume_list, d_volume
|
|
|
|
# Add any missing pools to the list
|
|
for pool in [pool for pool in new_pool_list if pool not in pool_list]:
|
|
d_pool[pool] = CephInstance.CephPoolInstance(
|
|
zkhandler, logger, this_node, pool
|
|
)
|
|
# Prepare the volume components for this pool
|
|
volume_list[pool] = list()
|
|
d_volume[pool] = dict()
|
|
|
|
# Remove any deleted pools from the list
|
|
for pool in [pool for pool in pool_list if pool not in new_pool_list]:
|
|
del d_pool[pool]
|
|
|
|
# Update the new list
|
|
pool_list = new_pool_list
|
|
logger.out(
|
|
f'{logger.fmt_blue}Pool list:{logger.fmt_end} {" ".join(pool_list)}',
|
|
state="i",
|
|
)
|
|
|
|
# Volume objects (in each pool)
|
|
for pool in pool_list:
|
|
|
|
@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path("volume", pool))
|
|
def update_volumes(new_volume_list):
|
|
nonlocal volume_list, d_volume
|
|
|
|
# Add any missing volumes to the list
|
|
for volume in [
|
|
volume
|
|
for volume in new_volume_list
|
|
if volume not in volume_list[pool]
|
|
]:
|
|
d_volume[pool][volume] = CephInstance.CephVolumeInstance(
|
|
zkhandler, logger, this_node, pool, volume
|
|
)
|
|
|
|
# Remove any deleted volumes from the list
|
|
for volume in [
|
|
volume
|
|
for volume in volume_list[pool]
|
|
if volume not in new_volume_list
|
|
]:
|
|
del d_volume[pool][volume]
|
|
|
|
# Update the new list
|
|
volume_list[pool] = new_volume_list
|
|
logger.out(
|
|
f'{logger.fmt_blue}Volume list [{pool}]:{logger.fmt_end} {" ".join(volume_list[pool])}',
|
|
state="i",
|
|
)
|
|
|
|
# Start keepalived thread
|
|
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(
|
|
logger, config, zkhandler, this_node
|
|
)
|
|
|
|
# Tick loop; does nothing since everything is async
|
|
while True:
|
|
try:
|
|
sleep(1)
|
|
except Exception:
|
|
break
|