703 lines
32 KiB
Python
703 lines
32 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# Daemon.py - PVC Node daemon main entrypoing
|
|
# Part of the Parallel Virtual Cluster (PVC) system
|
|
#
|
|
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, version 3.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
#
|
|
###############################################################################
|
|
|
|
import pvcnoded.util.keepalive
|
|
import pvcnoded.util.config
|
|
import pvcnoded.util.fencing
|
|
import pvcnoded.util.networking
|
|
import pvcnoded.util.services
|
|
import pvcnoded.util.libvirt
|
|
import pvcnoded.util.zookeeper
|
|
|
|
import pvcnoded.objects.DNSAggregatorInstance as DNSAggregatorInstance
|
|
import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance
|
|
import pvcnoded.objects.VMInstance as VMInstance
|
|
import pvcnoded.objects.NodeInstance as NodeInstance
|
|
import pvcnoded.objects.VXNetworkInstance as VXNetworkInstance
|
|
import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance
|
|
import pvcnoded.objects.CephInstance as CephInstance
|
|
|
|
import daemon_lib.log as log
|
|
import daemon_lib.common as common
|
|
|
|
from time import sleep
|
|
from distutils.util import strtobool
|
|
|
|
import os
|
|
import sys
|
|
import signal
|
|
import re
|
|
import json
|
|
|
|
# Daemon version
|
|
version = '0.9.35'
|
|
|
|
|
|
##########################################################
|
|
# Entrypoint
|
|
##########################################################
|
|
|
|
def entrypoint():
|
|
keepalive_timer = None
|
|
|
|
# Get our configuration
|
|
config = pvcnoded.util.config.get_configuration()
|
|
config['pvcnoded_version'] = version
|
|
|
|
# Set some useful booleans for later (fewer characters)
|
|
debug = config['debug']
|
|
if debug:
|
|
print('DEBUG MODE ENABLED')
|
|
|
|
# Create and validate our directories
|
|
pvcnoded.util.config.validate_directories(config)
|
|
|
|
# Set up the logger instance
|
|
logger = log.Logger(config)
|
|
|
|
# Print our startup message
|
|
logger.out('')
|
|
logger.out('|----------------------------------------------------------|')
|
|
logger.out('| |')
|
|
logger.out('| ███████████ ▜█▙ ▟█▛ █████ █ █ █ |')
|
|
logger.out('| ██ ▜█▙ ▟█▛ ██ |')
|
|
logger.out('| ███████████ ▜█▙ ▟█▛ ██ |')
|
|
logger.out('| ██ ▜█▙▟█▛ ███████████ |')
|
|
logger.out('| |')
|
|
logger.out('|----------------------------------------------------------|')
|
|
logger.out('| Parallel Virtual Cluster node daemon v{0: <18} |'.format(version))
|
|
logger.out('| Debug: {0: <49} |'.format(str(config['debug'])))
|
|
logger.out('| FQDN: {0: <50} |'.format(config['node_fqdn']))
|
|
logger.out('| Host: {0: <50} |'.format(config['node_hostname']))
|
|
logger.out('| ID: {0: <52} |'.format(config['node_id']))
|
|
logger.out('| IPMI hostname: {0: <41} |'.format(config['ipmi_hostname']))
|
|
logger.out('| Machine details: |')
|
|
logger.out('| CPUs: {0: <48} |'.format(config['static_data'][0]))
|
|
logger.out('| Arch: {0: <48} |'.format(config['static_data'][3]))
|
|
logger.out('| OS: {0: <50} |'.format(config['static_data'][2]))
|
|
logger.out('| Kernel: {0: <46} |'.format(config['static_data'][1]))
|
|
logger.out('|----------------------------------------------------------|')
|
|
logger.out('')
|
|
logger.out(f'Starting pvcnoded on host {config["node_fqdn"]}', state='s')
|
|
|
|
if config['enable_networking']:
|
|
if config['enable_sriov']:
|
|
# Set up SR-IOV devices
|
|
pvcnoded.util.networking.setup_sriov(logger, config)
|
|
|
|
# Set up our interfaces
|
|
pvcnoded.util.networking.setup_interfaces(logger, config)
|
|
|
|
# Get list of coordinator nodes
|
|
coordinator_nodes = config['coordinators']
|
|
|
|
if config['node_hostname'] in coordinator_nodes:
|
|
# We are indeed a coordinator node
|
|
config['daemon_mode'] = 'coordinator'
|
|
logger.out(f'This node is a {logger.fmt_blue}coordinator{logger.fmt_end}', state='i')
|
|
else:
|
|
# We are a hypervisor node
|
|
config['daemon_mode'] = 'hypervisor'
|
|
logger.out(f'This node is a {logger.fmt_cyan}hypervisor{logger.fmt_end}', state='i')
|
|
|
|
pvcnoded.util.services.start_system_services(logger, config)
|
|
|
|
# Connect to Zookeeper and return our handler and current schema version
|
|
zkhandler, node_schema_version = pvcnoded.util.zookeeper.connect(logger, config)
|
|
|
|
# Watch for a global schema update and fire
|
|
# This will only change by the API when triggered after seeing all nodes can update
|
|
@zkhandler.zk_conn.DataWatch(zkhandler.schema.path('base.schema.version'))
|
|
def update_schema(new_schema_version, stat, event=''):
|
|
nonlocal zkhandler, keepalive_timer, node_schema_version
|
|
|
|
try:
|
|
new_schema_version = int(new_schema_version.decode('ascii'))
|
|
except Exception:
|
|
new_schema_version = 0
|
|
|
|
if new_schema_version == node_schema_version:
|
|
return True
|
|
|
|
logger.out('Hot update of schema version started', state='s')
|
|
logger.out(f'Current version: {node_schema_version,} New version: {new_schema_version}', state='s')
|
|
|
|
# Prevent any keepalive updates while this happens
|
|
if keepalive_timer is not None:
|
|
pvcnoded.util.keepalive.stop_keepalive_timer()
|
|
sleep(1)
|
|
|
|
# Perform the migration (primary only)
|
|
if zkhandler.read('base.config.primary_node') == config['node_hostname']:
|
|
logger.out('Primary node acquiring exclusive lock', state='s')
|
|
# Wait for things to settle
|
|
sleep(0.5)
|
|
# Acquire a write lock on the root key
|
|
with zkhandler.exclusivelock('base.schema.version'):
|
|
# Perform the schema migration tasks
|
|
logger.out('Performing schema update', state='s')
|
|
if new_schema_version > node_schema_version:
|
|
zkhandler.schema.migrate(zkhandler, new_schema_version)
|
|
if new_schema_version < node_schema_version:
|
|
zkhandler.schema.rollback(zkhandler, new_schema_version)
|
|
# Wait for the exclusive lock to be lifted
|
|
else:
|
|
logger.out('Non-primary node acquiring read lock', state='s')
|
|
# Wait for things to settle
|
|
sleep(1)
|
|
# Wait for a read lock
|
|
lock = zkhandler.readlock('base.schema.version')
|
|
lock.acquire()
|
|
# Wait a bit more for the primary to return to normal
|
|
sleep(1)
|
|
|
|
# Update the local schema version
|
|
logger.out('Updating node target schema version', state='s')
|
|
zkhandler.write([
|
|
(('node.data.active_schema', config['node_hostname']), new_schema_version)
|
|
])
|
|
node_schema_version = new_schema_version
|
|
|
|
# Restart the API daemons if applicable
|
|
logger.out('Restarting services', state='s')
|
|
common.run_os_command('systemctl restart pvcapid-worker.service')
|
|
if zkhandler.read('base.config.primary_node') == config['node_hostname']:
|
|
common.run_os_command('systemctl restart pvcapid.service')
|
|
|
|
# Restart ourselves with the new schema
|
|
logger.out('Reloading node daemon', state='s')
|
|
try:
|
|
zkhandler.disconnect(persistent=True)
|
|
del zkhandler
|
|
except Exception:
|
|
pass
|
|
os.execv(sys.argv[0], sys.argv)
|
|
|
|
# Validate the schema
|
|
pvcnoded.util.zookeeper.validate_schema(logger, zkhandler)
|
|
|
|
# Define a cleanup function
|
|
def cleanup(failure=False):
|
|
nonlocal logger, zkhandler, keepalive_timer, d_domain
|
|
|
|
logger.out('Terminating pvcnoded and cleaning up', state='s')
|
|
|
|
# Set shutdown state in Zookeeper
|
|
zkhandler.write([
|
|
(('node.state.daemon', config['node_hostname']), 'shutdown')
|
|
])
|
|
|
|
# Waiting for any flushes to complete
|
|
logger.out('Waiting for any active flushes', state='s')
|
|
try:
|
|
if this_node is not None:
|
|
while this_node.flush_thread is not None:
|
|
sleep(0.5)
|
|
except Exception:
|
|
# We really don't care here, just proceed
|
|
pass
|
|
|
|
# Stop console logging on all VMs
|
|
logger.out('Stopping domain console watchers', state='s')
|
|
if d_domain is not None:
|
|
for domain in d_domain:
|
|
if d_domain[domain].getnode() == config['node_hostname']:
|
|
try:
|
|
d_domain[domain].console_log_instance.stop()
|
|
except Exception:
|
|
pass
|
|
|
|
# Force into secondary coordinator state if needed
|
|
try:
|
|
if this_node.router_state == 'primary':
|
|
zkhandler.write([
|
|
('base.config.primary_node', 'none')
|
|
])
|
|
logger.out('Waiting for primary migration', state='s')
|
|
while this_node.router_state != 'secondary':
|
|
sleep(0.5)
|
|
except Exception:
|
|
pass
|
|
|
|
# Stop keepalive thread
|
|
try:
|
|
pvcnoded.util.keepalive.stop_keepalive_timer(logger, keepalive_timer)
|
|
|
|
logger.out('Performing final keepalive update', state='s')
|
|
pvcnoded.util.keepalive.node_keepalive(logger, config, zkhandler, this_node)
|
|
except Exception:
|
|
pass
|
|
|
|
# Set stop state in Zookeeper
|
|
zkhandler.write([
|
|
(('node.state.daemon', config['node_hostname']), 'stop')
|
|
])
|
|
|
|
# Forcibly terminate dnsmasq because it gets stuck sometimes
|
|
common.run_os_command('killall dnsmasq')
|
|
|
|
# Close the Zookeeper connection
|
|
try:
|
|
zkhandler.disconnect(persistent=True)
|
|
del zkhandler
|
|
except Exception:
|
|
pass
|
|
|
|
logger.out('Terminated pvc daemon', state='s')
|
|
logger.terminate()
|
|
|
|
if failure:
|
|
retcode = 1
|
|
else:
|
|
retcode = 0
|
|
|
|
os._exit(retcode)
|
|
|
|
# Termination function
|
|
def term(signum='', frame=''):
|
|
cleanup(failure=False)
|
|
|
|
# Hangup (logrotate) function
|
|
def hup(signum='', frame=''):
|
|
if config['file_logging']:
|
|
logger.hup()
|
|
|
|
# Handle signals gracefully
|
|
signal.signal(signal.SIGTERM, term)
|
|
signal.signal(signal.SIGINT, term)
|
|
signal.signal(signal.SIGQUIT, term)
|
|
signal.signal(signal.SIGHUP, hup)
|
|
|
|
# Set up this node in Zookeeper
|
|
pvcnoded.util.zookeeper.setup_node(logger, config, zkhandler)
|
|
|
|
# Check that the primary node key exists and create it with us as primary if not
|
|
try:
|
|
current_primary = zkhandler.read('base.config.primary_node')
|
|
except Exception:
|
|
current_primary = 'none'
|
|
|
|
if current_primary and current_primary != 'none':
|
|
logger.out(f'Current primary node is {logger.fmt_blue}{current_primary}{logger.fmt_end}', state='i')
|
|
else:
|
|
if config['daemon_mode'] == 'coordinator':
|
|
logger.out('No primary node found; setting us as primary', state='i')
|
|
zkhandler.write([
|
|
('base.config.primary_node', config['node_hostname'])
|
|
])
|
|
|
|
# Ensure that IPMI is reachable and working
|
|
if not pvcnoded.util.fencing.verify_ipmi(config['ipmi_hostname'], config['ipmi_username'], config['ipmi_password']):
|
|
logger.out('Our IPMI is not reachable; fencing of this node will likely fail', state='w')
|
|
|
|
# Validate libvirt
|
|
if not pvcnoded.util.libvirt.validate_libvirtd(logger, config):
|
|
cleanup(failure=True)
|
|
|
|
# Set up NFT
|
|
pvcnoded.util.networking.create_nft_configuration(logger, config)
|
|
|
|
# Create our object dictionaries
|
|
logger.out('Setting up objects', state='i')
|
|
|
|
d_node = dict()
|
|
node_list = list()
|
|
d_network = dict()
|
|
network_list = list()
|
|
sriov_pf_list = list()
|
|
d_sriov_vf = dict()
|
|
sriov_vf_list = list()
|
|
d_domain = dict()
|
|
domain_list = list()
|
|
d_osd = dict()
|
|
osd_list = list()
|
|
d_pool = dict()
|
|
pool_list = list()
|
|
d_volume = dict()
|
|
volume_list = dict()
|
|
|
|
if config['enable_networking'] and config['daemon_mode'] == 'coordinator':
|
|
# Create an instance of the DNS Aggregator and Metadata API if we're a coordinator
|
|
dns_aggregator = DNSAggregatorInstance.DNSAggregatorInstance(config, logger)
|
|
metadata_api = MetadataAPIInstance.MetadataAPIInstance(zkhandler, config, logger)
|
|
else:
|
|
dns_aggregator = None
|
|
metadata_api = None
|
|
|
|
#
|
|
# Zookeeper watchers for objects
|
|
#
|
|
|
|
# Node objects
|
|
@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path('base.node'))
|
|
def set_nodes(new_node_list):
|
|
nonlocal d_node, node_list
|
|
|
|
# Add missing nodes to list
|
|
for node in [node for node in new_node_list if node not in node_list]:
|
|
d_node[node] = NodeInstance.NodeInstance(node, config['node_hostname'], zkhandler, config, logger, d_node, d_network, d_domain, dns_aggregator, metadata_api)
|
|
|
|
# Remove deleted nodes from list
|
|
for node in [node for node in node_list if node not in new_node_list]:
|
|
del(d_node[node])
|
|
|
|
node_list = new_node_list
|
|
logger.out(f'{logger.fmt_blue}Node list:{logger.fmt_end} {" ".join(node_list)}', state='i')
|
|
|
|
# Update node objects lists
|
|
for node in d_node:
|
|
d_node[node].update_node_list(d_node)
|
|
|
|
# Create helpful alias for this node
|
|
this_node = d_node[config['node_hostname']]
|
|
|
|
# Maintenance status
|
|
@zkhandler.zk_conn.DataWatch(zkhandler.schema.path('base.config.maintenance'))
|
|
def update_maintenance(_maintenance, stat):
|
|
try:
|
|
maintenance = bool(strtobool(_maintenance.decode('ascii')))
|
|
except Exception:
|
|
maintenance = False
|
|
|
|
this_node.maintenance = maintenance
|
|
|
|
# Primary node
|
|
@zkhandler.zk_conn.DataWatch(zkhandler.schema.path('base.config.primary_node'))
|
|
def update_primary_node(new_primary, stat, event=''):
|
|
try:
|
|
new_primary = new_primary.decode('ascii')
|
|
except AttributeError:
|
|
new_primary = 'none'
|
|
key_version = stat.version
|
|
|
|
# TODO: Move this to the Node structure
|
|
if new_primary != this_node.primary_node:
|
|
if config['daemon_mode'] == 'coordinator':
|
|
# We're a coordinator and there's no primary
|
|
if new_primary == 'none':
|
|
if this_node.daemon_state == 'run' and this_node.router_state not in ['primary', 'takeover', 'relinquish']:
|
|
logger.out('Contending for primary coordinator state', state='i')
|
|
# Acquire an exclusive lock on the primary_node key
|
|
primary_lock = zkhandler.exclusivelock('base.config.primary_node')
|
|
try:
|
|
# This lock times out after 0.4s, which is 0.1s less than the pre-takeover
|
|
# timeout beow. This ensures a primary takeover will not deadlock against
|
|
# a node which has failed the contention
|
|
primary_lock.acquire(timeout=0.4)
|
|
# Ensure that when we get the lock the versions are still consistent and
|
|
# that another node hasn't already acquired the primary state (maybe we're
|
|
# extremely slow to respond)
|
|
if key_version == zkhandler.zk_conn.get(zkhandler.schema.path('base.config.primary_node'))[1].version:
|
|
# Set the primary to us
|
|
logger.out('Acquiring primary coordinator state', state='o')
|
|
zkhandler.write([
|
|
('base.config.primary_node', config['node_hostname'])
|
|
])
|
|
# Cleanly release the lock
|
|
primary_lock.release()
|
|
# We timed out acquiring a lock, or failed to write, which means we failed the
|
|
# contention and should just log that
|
|
except Exception:
|
|
logger.out('Timed out contending for primary coordinator state', state='i')
|
|
elif new_primary == config['node_hostname']:
|
|
if this_node.router_state == 'secondary':
|
|
# Wait for 0.5s to ensure other contentions time out, then take over
|
|
sleep(0.5)
|
|
zkhandler.write([
|
|
(('node.state.router', config['node_hostname']), 'takeover')
|
|
])
|
|
else:
|
|
if this_node.router_state == 'primary':
|
|
# Wait for 0.5s to ensure other contentions time out, then relinquish
|
|
sleep(0.5)
|
|
zkhandler.write([
|
|
(('node.state.router', config['node_hostname']), 'relinquish')
|
|
])
|
|
else:
|
|
zkhandler.write([
|
|
(('node.state.router', config['node_hostname']), 'client')
|
|
])
|
|
|
|
# TODO: Turn this into a function like the others for clarity
|
|
for node in d_node:
|
|
d_node[node].primary_node = new_primary
|
|
|
|
if config['enable_networking']:
|
|
# Network objects
|
|
@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path('base.network'))
|
|
def update_networks(new_network_list):
|
|
nonlocal network_list, d_network
|
|
|
|
# Add any missing networks to the list
|
|
for network in [network for network in new_network_list if network not in network_list]:
|
|
d_network[network] = VXNetworkInstance.VXNetworkInstance(network, zkhandler, config, logger, this_node, dns_aggregator)
|
|
# TODO: Move this to the Network structure
|
|
if config['daemon_mode'] == 'coordinator' and d_network[network].nettype == 'managed':
|
|
try:
|
|
dns_aggregator.add_network(d_network[network])
|
|
except Exception as e:
|
|
logger.out(f'Failed to create DNS Aggregator for network {network}: {e}', state='w')
|
|
# Start primary functionality
|
|
if this_node.router_state == 'primary' and d_network[network].nettype == 'managed':
|
|
d_network[network].createGateways()
|
|
d_network[network].startDHCPServer()
|
|
|
|
# Remove any missing networks from the list
|
|
for network in [network for network in network_list if network not in new_network_list]:
|
|
# TODO: Move this to the Network structure
|
|
if d_network[network].nettype == 'managed':
|
|
# Stop primary functionality
|
|
if this_node.router_state == 'primary':
|
|
d_network[network].stopDHCPServer()
|
|
d_network[network].removeGateways()
|
|
dns_aggregator.remove_network(d_network[network])
|
|
# Stop firewalling
|
|
d_network[network].removeFirewall()
|
|
# Delete the network
|
|
d_network[network].removeNetwork()
|
|
del(d_network[network])
|
|
|
|
# Update the new list
|
|
network_list = new_network_list
|
|
logger.out(f'{logger.fmt_blue}Network list:{logger.fmt_end} {" ".join(network_list)}', state='i')
|
|
|
|
# Update node objects list
|
|
for node in d_node:
|
|
d_node[node].update_network_list(d_network)
|
|
|
|
# Add the SR-IOV PFs and VFs to Zookeeper
|
|
# These do not behave like the objects; they are not dynamic (the API cannot change them), and they
|
|
# exist for the lifetime of this Node instance. The objects are set here in Zookeeper on a per-node
|
|
# basis, under the Node configuration tree.
|
|
# MIGRATION: The schema.schema.get ensures that the current active Schema contains the required keys
|
|
if config['enable_sriov'] and zkhandler.schema.schema.get('sriov_pf', None) is not None:
|
|
vf_list = list()
|
|
for device in config['sriov_device']:
|
|
pf = device['phy']
|
|
vfcount = device['vfcount']
|
|
if device.get('mtu', None) is None:
|
|
mtu = 1500
|
|
else:
|
|
mtu = device['mtu']
|
|
|
|
# Create the PF device in Zookeeper
|
|
zkhandler.write([
|
|
(('node.sriov.pf', config['node_hostname'], 'sriov_pf', pf), ''),
|
|
(('node.sriov.pf', config['node_hostname'], 'sriov_pf.mtu', pf), mtu),
|
|
(('node.sriov.pf', config['node_hostname'], 'sriov_pf.vfcount', pf), vfcount),
|
|
])
|
|
# Append the device to the list of PFs
|
|
sriov_pf_list.append(pf)
|
|
|
|
# Get the list of VFs from `ip link show`
|
|
vf_list = json.loads(common.run_os_command(f'ip --json link show {pf}')[1])[0].get('vfinfo_list', [])
|
|
for vf in vf_list:
|
|
# {
|
|
# 'vf': 3,
|
|
# 'link_type': 'ether',
|
|
# 'address': '00:00:00:00:00:00',
|
|
# 'broadcast': 'ff:ff:ff:ff:ff:ff',
|
|
# 'vlan_list': [{'vlan': 101, 'qos': 2}],
|
|
# 'rate': {'max_tx': 0, 'min_tx': 0},
|
|
# 'spoofchk': True,
|
|
# 'link_state': 'auto',
|
|
# 'trust': False,
|
|
# 'query_rss_en': False
|
|
# }
|
|
vfphy = f'{pf}v{vf["vf"]}'
|
|
|
|
# Get the PCIe bus information
|
|
dev_pcie_path = None
|
|
try:
|
|
with open(f'/sys/class/net/{vfphy}/device/uevent') as vfh:
|
|
dev_uevent = vfh.readlines()
|
|
for line in dev_uevent:
|
|
if re.match(r'^PCI_SLOT_NAME=.*', line):
|
|
dev_pcie_path = line.rstrip().split('=')[-1]
|
|
except FileNotFoundError:
|
|
# Something must already be using the PCIe device
|
|
pass
|
|
|
|
# Add the VF to Zookeeper if it does not yet exist
|
|
if not zkhandler.exists(('node.sriov.vf', config['node_hostname'], 'sriov_vf', vfphy)):
|
|
if dev_pcie_path is not None:
|
|
pcie_domain, pcie_bus, pcie_slot, pcie_function = re.split(r':|\.', dev_pcie_path)
|
|
else:
|
|
# We can't add the device - for some reason we can't get any information on its PCIe bus path,
|
|
# so just ignore this one, and continue.
|
|
# This shouldn't happen under any real circumstances, unless the admin tries to attach a non-existent
|
|
# VF to a VM manually, then goes ahead and adds that VF to the system with the VM running.
|
|
continue
|
|
|
|
zkhandler.write([
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf', vfphy), ''),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.pf', vfphy), pf),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.mtu', vfphy), mtu),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.mac', vfphy), vf['address']),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.phy_mac', vfphy), vf['address']),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.config', vfphy), ''),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.config.vlan_id', vfphy), vf['vlan_list'][0].get('vlan', '0')),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.config.vlan_qos', vfphy), vf['vlan_list'][0].get('qos', '0')),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.config.tx_rate_min', vfphy), vf['rate']['min_tx']),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.config.tx_rate_max', vfphy), vf['rate']['max_tx']),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.config.spoof_check', vfphy), vf['spoofchk']),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.config.link_state', vfphy), vf['link_state']),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.config.trust', vfphy), vf['trust']),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.config.query_rss', vfphy), vf['query_rss_en']),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.pci', vfphy), ''),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.pci.domain', vfphy), pcie_domain),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.pci.bus', vfphy), pcie_bus),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.pci.slot', vfphy), pcie_slot),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.pci.function', vfphy), pcie_function),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.used', vfphy), False),
|
|
(('node.sriov.vf', config['node_hostname'], 'sriov_vf.used_by', vfphy), ''),
|
|
])
|
|
|
|
# Append the device to the list of VFs
|
|
sriov_vf_list.append(vfphy)
|
|
|
|
# Remove any obsolete PFs from Zookeeper if they go away
|
|
for pf in zkhandler.children(('node.sriov.pf', config['node_hostname'])):
|
|
if pf not in sriov_pf_list:
|
|
zkhandler.delete([
|
|
('node.sriov.pf', config['node_hostname'], 'sriov_pf', pf)
|
|
])
|
|
# Remove any obsolete VFs from Zookeeper if their PF goes away
|
|
for vf in zkhandler.children(('node.sriov.vf', config['node_hostname'])):
|
|
vf_pf = zkhandler.read(('node.sriov.vf', config['node_hostname'], 'sriov_vf.pf', vf))
|
|
if vf_pf not in sriov_pf_list:
|
|
zkhandler.delete([
|
|
('node.sriov.vf', config['node_hostname'], 'sriov_vf', vf)
|
|
])
|
|
|
|
# SR-IOV VF objects
|
|
# This is a ChildrenWatch just for consistency; the list never changes at runtime
|
|
@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path('node.sriov.vf', config['node_hostname']))
|
|
def update_sriov_vfs(new_sriov_vf_list):
|
|
nonlocal sriov_vf_list, d_sriov_vf
|
|
|
|
# Add VFs to the list
|
|
for vf in common.sortInterfaceNames(new_sriov_vf_list):
|
|
d_sriov_vf[vf] = SRIOVVFInstance.SRIOVVFInstance(vf, zkhandler, config, logger, this_node)
|
|
|
|
sriov_vf_list = sorted(new_sriov_vf_list)
|
|
logger.out(f'{logger.fmt_blue}SR-IOV VF list:{logger.fmt_end} {" ".join(sriov_vf_list)}', state='i')
|
|
|
|
if config['enable_hypervisor']:
|
|
# VM command pipeline key
|
|
@zkhandler.zk_conn.DataWatch(zkhandler.schema.path('base.cmd.domain'))
|
|
def run_domain_command(data, stat, event=''):
|
|
if data:
|
|
VMInstance.vm_command(zkhandler, logger, this_node, data.decode('ascii'))
|
|
|
|
# VM domain objects
|
|
@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path('base.domain'))
|
|
def update_domains(new_domain_list):
|
|
nonlocal domain_list, d_domain
|
|
|
|
# Add missing domains to the list
|
|
for domain in [domain for domain in new_domain_list if domain not in domain_list]:
|
|
d_domain[domain] = VMInstance.VMInstance(domain, zkhandler, config, logger, this_node)
|
|
|
|
# Remove any deleted domains from the list
|
|
for domain in [domain for domain in domain_list if domain not in new_domain_list]:
|
|
del(d_domain[domain])
|
|
|
|
# Update the new list
|
|
domain_list = new_domain_list
|
|
logger.out(f'{logger.fmt_blue}Domain list:{logger.fmt_end} {" ".join(domain_list)}', state='i')
|
|
|
|
# Update node objects' list
|
|
for node in d_node:
|
|
d_node[node].update_domain_list(d_domain)
|
|
|
|
if config['enable_storage']:
|
|
# Ceph command pipeline key
|
|
@zkhandler.zk_conn.DataWatch(zkhandler.schema.path('base.cmd.ceph'))
|
|
def run_ceph_command(data, stat, event=''):
|
|
if data:
|
|
CephInstance.ceph_command(zkhandler, logger, this_node, data.decode('ascii'), d_osd)
|
|
|
|
# OSD objects
|
|
@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path('base.osd'))
|
|
def update_osds(new_osd_list):
|
|
nonlocal osd_list, d_osd
|
|
|
|
# Add any missing OSDs to the list
|
|
for osd in [osd for osd in new_osd_list if osd not in osd_list]:
|
|
d_osd[osd] = CephInstance.CephOSDInstance(zkhandler, this_node, osd)
|
|
|
|
# Remove any deleted OSDs from the list
|
|
for osd in [osd for osd in osd_list if osd not in new_osd_list]:
|
|
del(d_osd[osd])
|
|
|
|
# Update the new list
|
|
osd_list = new_osd_list
|
|
logger.out(f'{logger.fmt_blue}OSD list:{logger.fmt_end} {" ".join(osd_list)}', state='i')
|
|
|
|
# Pool objects
|
|
@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path('base.pool'))
|
|
def update_pools(new_pool_list):
|
|
nonlocal pool_list, d_pool, volume_list, d_volume
|
|
|
|
# Add any missing pools to the list
|
|
for pool in [pool for pool in new_pool_list if pool not in pool_list]:
|
|
d_pool[pool] = CephInstance.CephPoolInstance(zkhandler, this_node, pool)
|
|
# Prepare the volume components for this pool
|
|
volume_list[pool] = list()
|
|
d_volume[pool] = dict()
|
|
|
|
# Remove any deleted pools from the list
|
|
for pool in [pool for pool in pool_list if pool not in new_pool_list]:
|
|
del(d_pool[pool])
|
|
|
|
# Update the new list
|
|
pool_list = new_pool_list
|
|
logger.out(f'{logger.fmt_blue}Pool list:{logger.fmt_end} {" ".join(pool_list)}', state='i')
|
|
|
|
# Volume objects (in each pool)
|
|
for pool in pool_list:
|
|
@zkhandler.zk_conn.ChildrenWatch(zkhandler.schema.path('volume', pool))
|
|
def update_volumes(new_volume_list):
|
|
nonlocal volume_list, d_volume
|
|
|
|
# Add any missing volumes to the list
|
|
for volume in [volume for volume in new_volume_list if volume not in volume_list[pool]]:
|
|
d_volume[pool][volume] = CephInstance.CephVolumeInstance(zkhandler, this_node, pool, volume)
|
|
|
|
# Remove any deleted volumes from the list
|
|
for volume in [volume for volume in volume_list[pool] if volume not in new_volume_list]:
|
|
del(d_volume[pool][volume])
|
|
|
|
# Update the new list
|
|
volume_list[pool] = new_volume_list
|
|
logger.out(f'{logger.fmt_blue}Volume list [{pool}:{logger.fmt_end} {" ".join(volume_list[pool])}', state='i')
|
|
|
|
# Start keepalived thread
|
|
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(logger, config, zkhandler, this_node)
|
|
|
|
# Tick loop; does nothing since everything is async
|
|
while True:
|
|
try:
|
|
sleep(1)
|
|
except Exception:
|
|
break
|