Refactor pvcnoded to reduce Daemon.py size

This branch commit refactors the pvcnoded component to better adhere to
good programming practices. The previous Daemon.py was a massive file
which contained almost 2000 lines of direct, root-level code which was
directly imported. Not only was this poor practice, but this resulted
in a nigh-unmaintainable file which was hard even for me to understand.

This refactoring splits a large section of the code from Daemon.py into
separate small modules and functions in the `util/` directory. This will
hopefully make most of the functionality easy to find and modify without
having to dig through a single large file.

Further the existing subcomponents have been moved to the `objects/`
directory which clearly separates them.

Finally, the Daemon.py code has mostly been moved into a function,
`entrypoint()`, which is then called from the `pvcnoded.py` stub.

An additional item is that most format strings have been replaced by
f-strings to make use of the Python 3.6 features in Daemon.py and the
utility files.
This commit is contained in:
Joshua Boniface 2021-08-21 02:46:11 -04:00
parent 4014ef7714
commit 534c7cd7f0
24 changed files with 2667 additions and 2433 deletions

2
lint
View File

@ -6,7 +6,7 @@ if ! which flake8 &>/dev/null; then
fi
flake8 \
--ignore=E501 \
--ignore=E501,E241 \
--exclude=debian,api-daemon/migrations/versions,api-daemon/provisioner/examples
ret=$?
if [[ $ret -eq 0 ]]; then

View File

@ -20,3 +20,5 @@
###############################################################################
import pvcnoded.Daemon # noqa: F401
pvcnoded.Daemon.entrypoint()

View File

@ -182,15 +182,15 @@ pvc:
device: ens4
# mtu: Upstream interface MTU; use 9000 for jumbo frames (requires switch support)
mtu: 1500
# address: Upstream interface IP address, options: None, by-id, <static>/<mask>
address: None
# address: Upstream interface IP address, options: by-id, <static>/<mask>
address: by-id
# cluster: Cluster (VNIC) physical interface device
cluster:
# device: Cluster (VNIC) interface device name
device: ens4
# mtu: Cluster (VNIC) interface MTU; use 9000 for jumbo frames (requires switch support)
mtu: 1500
# address: Cluster (VNIC) interface IP address, options: None, by-id, <static>/<mask>
# address: Cluster (VNIC) interface IP address, options: by-id, <static>/<mask>
address: by-id
# storage: Storage (Ceph OSD) physical interface device
storage:
@ -198,7 +198,7 @@ pvc:
device: ens4
# mtu: Storage (Ceph OSD) interface MTU; use 9000 for jumbo frames (requires switch support)
mtu: 1500
# address: Storage (Ceph OSD) interface IP address, options: None, by-id, <static>/<mask>
# address: Storage (Ceph OSD) interface IP address, options: by-id, <static>/<mask>
address: by-id
# storage; PVC storage configuration
# OPTIONAL if enable_storage: False

View File

@ -1,428 +0,0 @@
#!/usr/bin/env python3
# CephInstance.py - Class implementing a PVC node Ceph instance
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import time
import json
import psutil
import daemon_lib.common as common
class CephOSDInstance(object):
def __init__(self, zkhandler, this_node, osd_id):
self.zkhandler = zkhandler
self.this_node = this_node
self.osd_id = osd_id
self.node = None
self.size = None
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.node', self.osd_id))
def watch_osd_node(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.node:
self.node = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.stats', self.osd_id))
def watch_osd_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
def add_osd(zkhandler, logger, node, device, weight):
# We are ready to create a new OSD on this node
logger.out('Creating new OSD disk on block device {}'.format(device), state='i')
try:
# 1. Create an OSD; we do this so we know what ID will be gen'd
retcode, stdout, stderr = common.run_os_command('ceph osd create')
if retcode:
print('ceph osd create')
print(stdout)
print(stderr)
raise
osd_id = stdout.rstrip()
# 2. Remove that newly-created OSD
retcode, stdout, stderr = common.run_os_command('ceph osd rm {}'.format(osd_id))
if retcode:
print('ceph osd rm')
print(stdout)
print(stderr)
raise
# 3a. Zap the disk to ensure it is ready to go
logger.out('Zapping disk {}'.format(device), state='i')
retcode, stdout, stderr = common.run_os_command('ceph-volume lvm zap --destroy {}'.format(device))
if retcode:
print('ceph-volume lvm zap')
print(stdout)
print(stderr)
raise
# 3b. Create the OSD for real
logger.out('Preparing LVM for new OSD disk with ID {} on {}'.format(osd_id, device), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm prepare --bluestore --data {device}'.format(
osdid=osd_id,
device=device
)
)
if retcode:
print('ceph-volume lvm prepare')
print(stdout)
print(stderr)
raise
# 4a. Get OSD FSID
logger.out('Getting OSD FSID for ID {} on {}'.format(osd_id, device), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm list {device}'.format(
osdid=osd_id,
device=device
)
)
for line in stdout.split('\n'):
if 'osd fsid' in line:
osd_fsid = line.split()[-1]
if not osd_fsid:
print('ceph-volume lvm list')
print('Could not find OSD fsid in data:')
print(stdout)
print(stderr)
raise
# 4b. Activate the OSD
logger.out('Activating new OSD disk with ID {}'.format(osd_id, device), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm activate --bluestore {osdid} {osdfsid}'.format(
osdid=osd_id,
osdfsid=osd_fsid
)
)
if retcode:
print('ceph-volume lvm activate')
print(stdout)
print(stderr)
raise
# 5. Add it to the crush map
logger.out('Adding new OSD disk with ID {} to CRUSH map'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph osd crush add osd.{osdid} {weight} root=default host={node}'.format(
osdid=osd_id,
weight=weight,
node=node
)
)
if retcode:
print('ceph osd crush add')
print(stdout)
print(stderr)
raise
time.sleep(0.5)
# 6. Verify it started
retcode, stdout, stderr = common.run_os_command(
'systemctl status ceph-osd@{osdid}'.format(
osdid=osd_id
)
)
if retcode:
print('systemctl status')
print(stdout)
print(stderr)
raise
# 7. Add the new OSD to the list
logger.out('Adding new OSD disk with ID {} to Zookeeper'.format(osd_id), state='i')
zkhandler.write([
(('osd', osd_id), ''),
(('osd.node', osd_id), node),
(('osd.device', osd_id), device),
(('osd.stats', osd_id), '{}'),
])
# Log it
logger.out('Created new OSD disk with ID {}'.format(osd_id), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to create new OSD disk: {}'.format(e), state='e')
return False
def remove_osd(zkhandler, logger, osd_id, osd_obj):
logger.out('Removing OSD disk {}'.format(osd_id), state='i')
try:
# 1. Verify the OSD is present
retcode, stdout, stderr = common.run_os_command('ceph osd ls')
osd_list = stdout.split('\n')
if osd_id not in osd_list:
logger.out('Could not find OSD {} in the cluster'.format(osd_id), state='e')
return True
# 1. Set the OSD out so it will flush
logger.out('Setting out OSD disk with ID {}'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command('ceph osd out {}'.format(osd_id))
if retcode:
print('ceph osd out')
print(stdout)
print(stderr)
raise
# 2. Wait for the OSD to flush
logger.out('Flushing OSD disk with ID {}'.format(osd_id), state='i')
osd_string = str()
while True:
try:
retcode, stdout, stderr = common.run_os_command('ceph pg dump osds --format json')
dump_string = json.loads(stdout)
for osd in dump_string:
if str(osd['osd']) == osd_id:
osd_string = osd
num_pgs = osd_string['num_pgs']
if num_pgs > 0:
time.sleep(5)
else:
raise
except Exception:
break
# 3. Stop the OSD process and wait for it to be terminated
logger.out('Stopping OSD disk with ID {}'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command('systemctl stop ceph-osd@{}'.format(osd_id))
if retcode:
print('systemctl stop')
print(stdout)
print(stderr)
raise
# FIXME: There has to be a better way to do this /shrug
while True:
is_osd_up = False
# Find if there is a process named ceph-osd with arg '--id {id}'
for p in psutil.process_iter(attrs=['name', 'cmdline']):
if 'ceph-osd' == p.info['name'] and '--id {}'.format(osd_id) in ' '.join(p.info['cmdline']):
is_osd_up = True
# If there isn't, continue
if not is_osd_up:
break
# 4. Determine the block devices
retcode, stdout, stderr = common.run_os_command('readlink /var/lib/ceph/osd/ceph-{}/block'.format(osd_id))
vg_name = stdout.split('/')[-2] # e.g. /dev/ceph-<uuid>/osd-block-<uuid>
retcode, stdout, stderr = common.run_os_command('vgs --separator , --noheadings -o pv_name {}'.format(vg_name))
pv_block = stdout.strip()
# 5. Zap the volumes
logger.out('Zapping OSD disk with ID {} on {}'.format(osd_id, pv_block), state='i')
retcode, stdout, stderr = common.run_os_command('ceph-volume lvm zap --destroy {}'.format(pv_block))
if retcode:
print('ceph-volume lvm zap')
print(stdout)
print(stderr)
raise
# 6. Purge the OSD from Ceph
logger.out('Purging OSD disk with ID {}'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command('ceph osd purge {} --yes-i-really-mean-it'.format(osd_id))
if retcode:
print('ceph osd purge')
print(stdout)
print(stderr)
raise
# 7. Delete OSD from ZK
logger.out('Deleting OSD disk with ID {} from Zookeeper'.format(osd_id), state='i')
zkhandler.delete(('osd', osd_id), recursive=True)
# Log it
logger.out('Removed OSD disk with ID {}'.format(osd_id), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to purge OSD disk with ID {}: {}'.format(osd_id, e), state='e')
return False
class CephPoolInstance(object):
def __init__(self, zkhandler, this_node, name):
self.zkhandler = zkhandler
self.this_node = this_node
self.name = name
self.pgs = ''
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.pgs', self.name))
def watch_pool_node(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.pgs:
self.pgs = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.stats', self.name))
def watch_pool_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
class CephVolumeInstance(object):
def __init__(self, zkhandler, this_node, pool, name):
self.zkhandler = zkhandler
self.this_node = this_node
self.pool = pool
self.name = name
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('volume.stats', f'{self.pool}/{self.name}'))
def watch_volume_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
class CephSnapshotInstance(object):
def __init__(self, zkhandler, this_node, pool, volume, name):
self.zkhandler = zkhandler
self.this_node = this_node
self.pool = pool
self.volume = volume
self.name = name
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('snapshot.stats', f'{self.pool}/{self.volume}/{self.name}'))
def watch_snapshot_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
# Primary command function
# This command pipe is only used for OSD adds and removes
def run_command(zkhandler, logger, this_node, data, d_osd):
# Get the command and args
command, args = data.split()
# Adding a new OSD
if command == 'osd_add':
node, device, weight = args.split(',')
if node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.ceph')
with zk_lock:
# Add the OSD
result = add_osd(zkhandler, logger, node, device, weight)
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Removing an OSD
elif command == 'osd_remove':
osd_id = args
# Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.ceph')
with zk_lock:
# Remove the OSD
result = remove_osd(zkhandler, logger, osd_id, d_osd[osd_id])
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
#!/usr/bin/python3
#!/usr/bin/env python3
# dnsmasq-zookeeper-leases.py - DNSMASQ leases script for Zookeeper
# Part of the Parallel Virtual Cluster (PVC) system

View File

@ -0,0 +1,428 @@
#!/usr/bin/env python3
# CephInstance.py - Class implementing a PVC node Ceph instance
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import time
import json
import psutil
import daemon_lib.common as common
class CephOSDInstance(object):
def __init__(self, zkhandler, this_node, osd_id):
self.zkhandler = zkhandler
self.this_node = this_node
self.osd_id = osd_id
self.node = None
self.size = None
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.node', self.osd_id))
def watch_osd_node(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.node:
self.node = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('osd.stats', self.osd_id))
def watch_osd_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
@staticmethod
def add_osd(zkhandler, logger, node, device, weight):
# We are ready to create a new OSD on this node
logger.out('Creating new OSD disk on block device {}'.format(device), state='i')
try:
# 1. Create an OSD; we do this so we know what ID will be gen'd
retcode, stdout, stderr = common.run_os_command('ceph osd create')
if retcode:
print('ceph osd create')
print(stdout)
print(stderr)
raise
osd_id = stdout.rstrip()
# 2. Remove that newly-created OSD
retcode, stdout, stderr = common.run_os_command('ceph osd rm {}'.format(osd_id))
if retcode:
print('ceph osd rm')
print(stdout)
print(stderr)
raise
# 3a. Zap the disk to ensure it is ready to go
logger.out('Zapping disk {}'.format(device), state='i')
retcode, stdout, stderr = common.run_os_command('ceph-volume lvm zap --destroy {}'.format(device))
if retcode:
print('ceph-volume lvm zap')
print(stdout)
print(stderr)
raise
# 3b. Create the OSD for real
logger.out('Preparing LVM for new OSD disk with ID {} on {}'.format(osd_id, device), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm prepare --bluestore --data {device}'.format(
osdid=osd_id,
device=device
)
)
if retcode:
print('ceph-volume lvm prepare')
print(stdout)
print(stderr)
raise
# 4a. Get OSD FSID
logger.out('Getting OSD FSID for ID {} on {}'.format(osd_id, device), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm list {device}'.format(
osdid=osd_id,
device=device
)
)
for line in stdout.split('\n'):
if 'osd fsid' in line:
osd_fsid = line.split()[-1]
if not osd_fsid:
print('ceph-volume lvm list')
print('Could not find OSD fsid in data:')
print(stdout)
print(stderr)
raise
# 4b. Activate the OSD
logger.out('Activating new OSD disk with ID {}'.format(osd_id, device), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm activate --bluestore {osdid} {osdfsid}'.format(
osdid=osd_id,
osdfsid=osd_fsid
)
)
if retcode:
print('ceph-volume lvm activate')
print(stdout)
print(stderr)
raise
# 5. Add it to the crush map
logger.out('Adding new OSD disk with ID {} to CRUSH map'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command(
'ceph osd crush add osd.{osdid} {weight} root=default host={node}'.format(
osdid=osd_id,
weight=weight,
node=node
)
)
if retcode:
print('ceph osd crush add')
print(stdout)
print(stderr)
raise
time.sleep(0.5)
# 6. Verify it started
retcode, stdout, stderr = common.run_os_command(
'systemctl status ceph-osd@{osdid}'.format(
osdid=osd_id
)
)
if retcode:
print('systemctl status')
print(stdout)
print(stderr)
raise
# 7. Add the new OSD to the list
logger.out('Adding new OSD disk with ID {} to Zookeeper'.format(osd_id), state='i')
zkhandler.write([
(('osd', osd_id), ''),
(('osd.node', osd_id), node),
(('osd.device', osd_id), device),
(('osd.stats', osd_id), '{}'),
])
# Log it
logger.out('Created new OSD disk with ID {}'.format(osd_id), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to create new OSD disk: {}'.format(e), state='e')
return False
@staticmethod
def remove_osd(zkhandler, logger, osd_id, osd_obj):
logger.out('Removing OSD disk {}'.format(osd_id), state='i')
try:
# 1. Verify the OSD is present
retcode, stdout, stderr = common.run_os_command('ceph osd ls')
osd_list = stdout.split('\n')
if osd_id not in osd_list:
logger.out('Could not find OSD {} in the cluster'.format(osd_id), state='e')
return True
# 1. Set the OSD out so it will flush
logger.out('Setting out OSD disk with ID {}'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command('ceph osd out {}'.format(osd_id))
if retcode:
print('ceph osd out')
print(stdout)
print(stderr)
raise
# 2. Wait for the OSD to flush
logger.out('Flushing OSD disk with ID {}'.format(osd_id), state='i')
osd_string = str()
while True:
try:
retcode, stdout, stderr = common.run_os_command('ceph pg dump osds --format json')
dump_string = json.loads(stdout)
for osd in dump_string:
if str(osd['osd']) == osd_id:
osd_string = osd
num_pgs = osd_string['num_pgs']
if num_pgs > 0:
time.sleep(5)
else:
raise
except Exception:
break
# 3. Stop the OSD process and wait for it to be terminated
logger.out('Stopping OSD disk with ID {}'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command('systemctl stop ceph-osd@{}'.format(osd_id))
if retcode:
print('systemctl stop')
print(stdout)
print(stderr)
raise
# FIXME: There has to be a better way to do this /shrug
while True:
is_osd_up = False
# Find if there is a process named ceph-osd with arg '--id {id}'
for p in psutil.process_iter(attrs=['name', 'cmdline']):
if 'ceph-osd' == p.info['name'] and '--id {}'.format(osd_id) in ' '.join(p.info['cmdline']):
is_osd_up = True
# If there isn't, continue
if not is_osd_up:
break
# 4. Determine the block devices
retcode, stdout, stderr = common.run_os_command('readlink /var/lib/ceph/osd/ceph-{}/block'.format(osd_id))
vg_name = stdout.split('/')[-2] # e.g. /dev/ceph-<uuid>/osd-block-<uuid>
retcode, stdout, stderr = common.run_os_command('vgs --separator , --noheadings -o pv_name {}'.format(vg_name))
pv_block = stdout.strip()
# 5. Zap the volumes
logger.out('Zapping OSD disk with ID {} on {}'.format(osd_id, pv_block), state='i')
retcode, stdout, stderr = common.run_os_command('ceph-volume lvm zap --destroy {}'.format(pv_block))
if retcode:
print('ceph-volume lvm zap')
print(stdout)
print(stderr)
raise
# 6. Purge the OSD from Ceph
logger.out('Purging OSD disk with ID {}'.format(osd_id), state='i')
retcode, stdout, stderr = common.run_os_command('ceph osd purge {} --yes-i-really-mean-it'.format(osd_id))
if retcode:
print('ceph osd purge')
print(stdout)
print(stderr)
raise
# 7. Delete OSD from ZK
logger.out('Deleting OSD disk with ID {} from Zookeeper'.format(osd_id), state='i')
zkhandler.delete(('osd', osd_id), recursive=True)
# Log it
logger.out('Removed OSD disk with ID {}'.format(osd_id), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to purge OSD disk with ID {}: {}'.format(osd_id, e), state='e')
return False
class CephPoolInstance(object):
def __init__(self, zkhandler, this_node, name):
self.zkhandler = zkhandler
self.this_node = this_node
self.name = name
self.pgs = ''
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.pgs', self.name))
def watch_pool_node(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.pgs:
self.pgs = data
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('pool.stats', self.name))
def watch_pool_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
class CephVolumeInstance(object):
def __init__(self, zkhandler, this_node, pool, name):
self.zkhandler = zkhandler
self.this_node = this_node
self.pool = pool
self.name = name
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('volume.stats', f'{self.pool}/{self.name}'))
def watch_volume_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
class CephSnapshotInstance(object):
def __init__(self, zkhandler, this_node, pool, volume, name):
self.zkhandler = zkhandler
self.this_node = this_node
self.pool = pool
self.volume = volume
self.name = name
self.stats = dict()
@self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('snapshot.stats', f'{self.pool}/{self.volume}/{self.name}'))
def watch_snapshot_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
# Primary command function
# This command pipe is only used for OSD adds and removes
def ceph_command(zkhandler, logger, this_node, data, d_osd):
# Get the command and args
command, args = data.split()
# Adding a new OSD
if command == 'osd_add':
node, device, weight = args.split(',')
if node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.ceph')
with zk_lock:
# Add the OSD
result = CephOSDInstance.add_osd(zkhandler, logger, node, device, weight)
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Removing an OSD
elif command == 'osd_remove':
osd_id = args
# Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.ceph')
with zk_lock:
# Remove the OSD
result = CephOSDInstance.remove_osd(zkhandler, logger, osd_id, d_osd[osd_id])
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.ceph', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)

View File

@ -74,7 +74,7 @@ class PowerDNSInstance(object):
self.dns_server_daemon = None
# Floating upstreams
self.vni_floatingipaddr, self.vni_cidrnetmask = self.config['vni_floating_ip'].split('/')
self.cluster_floatingipaddr, self.cluster_cidrnetmask = self.config['cluster_floating_ip'].split('/')
self.upstream_floatingipaddr, self.upstream_cidrnetmask = self.config['upstream_floating_ip'].split('/')
def start(self):
@ -91,7 +91,7 @@ class PowerDNSInstance(object):
'--disable-syslog=yes', # Log only to stdout (which is then captured)
'--disable-axfr=no', # Allow AXFRs
'--allow-axfr-ips=0.0.0.0/0', # Allow AXFRs to anywhere
'--local-address={},{}'.format(self.vni_floatingipaddr, self.upstream_floatingipaddr), # Listen on floating IPs
'--local-address={},{}'.format(self.cluster_floatingipaddr, self.upstream_floatingipaddr), # Listen on floating IPs
'--local-port=53', # On port 53
'--log-dns-details=on', # Log details
'--loglevel=3', # Log info

View File

@ -65,9 +65,9 @@ class NodeInstance(object):
self.upstream_dev = self.config['upstream_dev']
self.upstream_floatingipaddr = self.config['upstream_floating_ip'].split('/')[0]
self.upstream_ipaddr, self.upstream_cidrnetmask = self.config['upstream_dev_ip'].split('/')
self.vni_dev = self.config['vni_dev']
self.vni_floatingipaddr = self.config['vni_floating_ip'].split('/')[0]
self.vni_ipaddr, self.vni_cidrnetmask = self.config['vni_dev_ip'].split('/')
self.cluster_dev = self.config['cluster_dev']
self.cluster_floatingipaddr = self.config['cluster_floating_ip'].split('/')[0]
self.cluster_ipaddr, self.cluster_cidrnetmask = self.config['cluster_dev_ip'].split('/')
self.storage_dev = self.config['storage_dev']
self.storage_floatingipaddr = self.config['storage_floating_ip'].split('/')[0]
self.storage_ipaddr, self.storage_cidrnetmask = self.config['storage_dev_ip'].split('/')
@ -76,10 +76,10 @@ class NodeInstance(object):
self.upstream_floatingipaddr = None
self.upstream_ipaddr = None
self.upstream_cidrnetmask = None
self.vni_dev = None
self.vni_floatingipaddr = None
self.vni_ipaddr = None
self.vni_cidrnetmask = None
self.cluster_dev = None
self.cluster_floatingipaddr = None
self.cluster_ipaddr = None
self.cluster_cidrnetmask = None
self.storage_dev = None
self.storage_floatingipaddr = None
self.storage_ipaddr = None
@ -387,13 +387,13 @@ class NodeInstance(object):
# 2. Add Cluster & Storage floating IP
self.logger.out(
'Creating floating management IP {}/{} on interface {}'.format(
self.vni_floatingipaddr,
self.vni_cidrnetmask,
self.cluster_floatingipaddr,
self.cluster_cidrnetmask,
'brcluster'
),
state='o'
)
common.createIPAddress(self.vni_floatingipaddr, self.vni_cidrnetmask, 'brcluster')
common.createIPAddress(self.cluster_floatingipaddr, self.cluster_cidrnetmask, 'brcluster')
self.logger.out(
'Creating floating storage IP {}/{} on interface {}'.format(
self.storage_floatingipaddr,
@ -599,13 +599,13 @@ class NodeInstance(object):
# 6. Remove Cluster & Storage floating IP
self.logger.out(
'Removing floating management IP {}/{} from interface {}'.format(
self.vni_floatingipaddr,
self.vni_cidrnetmask,
self.cluster_floatingipaddr,
self.cluster_cidrnetmask,
'brcluster'
),
state='o'
)
common.removeIPAddress(self.vni_floatingipaddr, self.vni_cidrnetmask, 'brcluster')
common.removeIPAddress(self.cluster_floatingipaddr, self.cluster_cidrnetmask, 'brcluster')
self.logger.out(
'Removing floating storage IP {}/{} from interface {}'.format(
self.storage_floatingipaddr,

View File

@ -30,86 +30,11 @@ from xml.etree import ElementTree
import daemon_lib.common as common
import pvcnoded.VMConsoleWatcherInstance as VMConsoleWatcherInstance
import pvcnoded.objects.VMConsoleWatcherInstance as VMConsoleWatcherInstance
import daemon_lib.common as daemon_common
def flush_locks(zkhandler, logger, dom_uuid, this_node=None):
logger.out('Flushing RBD locks for VM "{}"'.format(dom_uuid), state='i')
# Get the list of RBD images
rbd_list = zkhandler.read(('domain.storage.volumes', dom_uuid)).split(',')
for rbd in rbd_list:
# Check if a lock exists
lock_list_retcode, lock_list_stdout, lock_list_stderr = common.run_os_command('rbd lock list --format json {}'.format(rbd))
if lock_list_retcode != 0:
logger.out('Failed to obtain lock list for volume "{}"'.format(rbd), state='e')
continue
try:
lock_list = json.loads(lock_list_stdout)
except Exception as e:
logger.out('Failed to parse lock list for volume "{}": {}'.format(rbd, e), state='e')
continue
# If there's at least one lock
if lock_list:
# Loop through the locks
for lock in lock_list:
if this_node is not None and zkhandler.read(('domain.state', dom_uuid)) != 'stop' and lock['address'].split(':')[0] != this_node.storage_ipaddr:
logger.out('RBD lock does not belong to this host (lock owner: {}): freeing this lock would be unsafe, aborting'.format(lock['address'].split(':')[0], state='e'))
zkhandler.write([
(('domain.state', dom_uuid), 'fail'),
(('domain.failed_reason', dom_uuid), 'Could not safely free RBD lock {} ({}) on volume {}; stop VM and flush locks manually'.format(lock['id'], lock['address'], rbd)),
])
break
# Free the lock
lock_remove_retcode, lock_remove_stdout, lock_remove_stderr = common.run_os_command('rbd lock remove {} "{}" "{}"'.format(rbd, lock['id'], lock['locker']))
if lock_remove_retcode != 0:
logger.out('Failed to free RBD lock "{}" on volume "{}": {}'.format(lock['id'], rbd, lock_remove_stderr), state='e')
zkhandler.write([
(('domain.state', dom_uuid), 'fail'),
(('domain.failed_reason', dom_uuid), 'Could not free RBD lock {} ({}) on volume {}: {}'.format(lock['id'], lock['address'], rbd, lock_remove_stderr)),
])
break
logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock['id'], rbd), state='o')
return True
# Primary command function
def run_command(zkhandler, logger, this_node, data):
# Get the command and args
command, args = data.split()
# Flushing VM RBD locks
if command == 'flush_locks':
dom_uuid = args
# Verify that the VM is set to run on this node
if this_node.d_domain[dom_uuid].getnode() == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.domain')
with zk_lock:
# Flush the lock
result = flush_locks(zkhandler, logger, dom_uuid, this_node)
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.domain', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.domain', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
class VMInstance(object):
# Initialization function
def __init__(self, domuuid, zkhandler, config, logger, this_node):
@ -265,7 +190,7 @@ class VMInstance(object):
if self.getdom() is None or self.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
# Flush locks
self.logger.out('Flushing RBD locks', state='i', prefix='Domain {}'.format(self.domuuid))
flush_locks(self.zkhandler, self.logger, self.domuuid, self.this_node)
VMInstance.flush_locks(self.zkhandler, self.logger, self.domuuid, self.this_node)
if self.zkhandler.read(('domain.state', self.domuuid)) == 'fail':
lv_conn.close()
self.dom = None
@ -877,3 +802,79 @@ class VMInstance(object):
# Return the dom object (or None)
return dom
# Flush the locks of a VM based on UUID
@staticmethod
def flush_locks(zkhandler, logger, dom_uuid, this_node=None):
logger.out('Flushing RBD locks for VM "{}"'.format(dom_uuid), state='i')
# Get the list of RBD images
rbd_list = zkhandler.read(('domain.storage.volumes', dom_uuid)).split(',')
for rbd in rbd_list:
# Check if a lock exists
lock_list_retcode, lock_list_stdout, lock_list_stderr = common.run_os_command('rbd lock list --format json {}'.format(rbd))
if lock_list_retcode != 0:
logger.out('Failed to obtain lock list for volume "{}"'.format(rbd), state='e')
continue
try:
lock_list = json.loads(lock_list_stdout)
except Exception as e:
logger.out('Failed to parse lock list for volume "{}": {}'.format(rbd, e), state='e')
continue
# If there's at least one lock
if lock_list:
# Loop through the locks
for lock in lock_list:
if this_node is not None and zkhandler.read(('domain.state', dom_uuid)) != 'stop' and lock['address'].split(':')[0] != this_node.storage_ipaddr:
logger.out('RBD lock does not belong to this host (lock owner: {}): freeing this lock would be unsafe, aborting'.format(lock['address'].split(':')[0], state='e'))
zkhandler.write([
(('domain.state', dom_uuid), 'fail'),
(('domain.failed_reason', dom_uuid), 'Could not safely free RBD lock {} ({}) on volume {}; stop VM and flush locks manually'.format(lock['id'], lock['address'], rbd)),
])
break
# Free the lock
lock_remove_retcode, lock_remove_stdout, lock_remove_stderr = common.run_os_command('rbd lock remove {} "{}" "{}"'.format(rbd, lock['id'], lock['locker']))
if lock_remove_retcode != 0:
logger.out('Failed to free RBD lock "{}" on volume "{}": {}'.format(lock['id'], rbd, lock_remove_stderr), state='e')
zkhandler.write([
(('domain.state', dom_uuid), 'fail'),
(('domain.failed_reason', dom_uuid), 'Could not free RBD lock {} ({}) on volume {}: {}'.format(lock['id'], lock['address'], rbd, lock_remove_stderr)),
])
break
logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock['id'], rbd), state='o')
return True
# Primary command function
def vm_command(zkhandler, logger, this_node, data):
# Get the command and args
command, args = data.split()
# Flushing VM RBD locks
if command == 'flush_locks':
dom_uuid = args
# Verify that the VM is set to run on this node
if this_node.d_domain[dom_uuid].getnode() == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock('base.cmd.domain')
with zk_lock:
# Flush the lock
result = VMInstance.flush_locks(zkhandler, logger, dom_uuid, this_node)
# Command succeeded
if result:
# Update the command queue
zkhandler.write([
('base.cmd.domain', 'success-{}'.format(data))
])
# Command failed
else:
# Update the command queue
zkhandler.write([
('base.cmd.domain', 'failure-{}'.format(data))
])
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)

View File

@ -36,8 +36,8 @@ class VXNetworkInstance(object):
self.logger = logger
self.this_node = this_node
self.dns_aggregator = dns_aggregator
self.vni_dev = config['vni_dev']
self.vni_mtu = config['vni_mtu']
self.cluster_dev = config['cluster_dev']
self.cluster_mtu = config['cluster_mtu']
self.bridge_dev = config['bridge_dev']
self.nettype = self.zkhandler.read(('network.type', self.vni))
@ -481,7 +481,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
)
# Set MTU of vLAN and bridge NICs
vx_mtu = self.vni_mtu
vx_mtu = self.cluster_mtu
common.run_os_command(
'ip link set {} mtu {} up'.format(
self.vlan_nic,
@ -521,7 +521,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
def createNetworkManaged(self):
self.logger.out(
'Creating VXLAN device on interface {}'.format(
self.vni_dev
self.cluster_dev
),
prefix='VNI {}'.format(self.vni),
state='o'
@ -532,7 +532,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
'ip link add {} type vxlan id {} dstport 4789 dev {}'.format(
self.vxlan_nic,
self.vni,
self.vni_dev
self.cluster_dev
)
)
# Create bridge interface
@ -543,7 +543,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
)
# Set MTU of VXLAN and bridge NICs
vx_mtu = self.vni_mtu - 50
vx_mtu = self.cluster_mtu - 50
common.run_os_command(
'ip link set {} mtu {} up'.format(
self.vxlan_nic,
@ -716,7 +716,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
def removeNetworkBridged(self):
self.logger.out(
'Removing VNI device on interface {}'.format(
self.vni_dev
self.cluster_dev
),
prefix='VNI {}'.format(self.vni),
state='o'
@ -752,7 +752,7 @@ add rule inet filter forward ip6 saddr {netaddr6} counter jump {vxlannic}-out
def removeNetworkManaged(self):
self.logger.out(
'Removing VNI device on interface {}'.format(
self.vni_dev
self.cluster_dev
),
prefix='VNI {}'.format(self.vni),
state='o'

View File

View File

View File

@ -0,0 +1,384 @@
#!/usr/bin/env python3
# config.py - Utility functions for pvcnoded configuration parsing
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import subprocess
import yaml
from socket import gethostname
from re import findall
from psutil import cpu_count
from ipaddress import ip_address, ip_network
class MalformedConfigurationError(Exception):
"""
An except when parsing the PVC Node daemon configuration file
"""
def __init__(self, error=None):
self.msg = f'ERROR: Configuration file is malformed: {error}'
def __str__(self):
return str(self.msg)
def get_static_data():
"""
Data that is obtained once at node startup for use later
"""
staticdata = list()
staticdata.append(str(cpu_count())) # CPU count
staticdata.append(
subprocess.run(
['uname', '-r'], stdout=subprocess.PIPE
).stdout.decode('ascii').strip()
)
staticdata.append(
subprocess.run(
['uname', '-o'], stdout=subprocess.PIPE
).stdout.decode('ascii').strip()
)
staticdata.append(
subprocess.run(
['uname', '-m'], stdout=subprocess.PIPE
).stdout.decode('ascii').strip()
)
return staticdata
def get_configuration_path():
try:
return os.environ['PVCD_CONFIG_FILE']
except KeyError:
print('ERROR: The "PVCD_CONFIG_FILE" environment variable must be set.')
os._exit(1)
def get_hostname():
node_fqdn = gethostname()
node_hostname = node_fqdn.split('.', 1)[0]
node_domain = ''.join(node_fqdn.split('.', 1)[1:])
try:
node_id = findall(r'\d+', node_hostname)[-1]
except IndexError:
node_id = 0
return node_fqdn, node_hostname, node_domain, node_id
def validate_floating_ip(config, network):
if network not in ['cluster', 'storage', 'upstream']:
return False, f'Specified network type "{network}" is not valid'
floating_key = f'{network}_floating_ip'
network_key = f'{network}_network'
# Verify the network provided is valid
try:
network = ip_network(config[network_key])
except Exception:
return False, f'Network address {config[network_key]} for {network_key} is not valid'
# Verify that the floating IP is valid (and in the network)
try:
floating_address = ip_address(config[floating_key].split('/')[0])
if floating_address not in list(network.hosts()):
raise
except Exception:
return False, f'Floating address {config[floating_key]} for {floating_key} is not valid'
return True, ''
def get_configuration():
"""
Parse the configuration of the node daemon.
"""
pvcnoded_config_file = get_configuration_path()
print('Loading configuration from file "{}"'.format(pvcnoded_config_file))
with open(pvcnoded_config_file, 'r') as cfgfile:
try:
o_config = yaml.load(cfgfile, Loader=yaml.SafeLoader)
except Exception as e:
print('ERROR: Failed to parse configuration file: {}'.format(e))
os._exit(1)
node_fqdn, node_hostname, node_domain, node_id = get_hostname()
# Create the configuration dictionary
config = dict()
# Get the initial base configuration
try:
o_base = o_config['pvc']
o_cluster = o_config['pvc']['cluster']
except Exception as e:
raise MalformedConfigurationError(e)
config_general = {
'node': o_base.get('node', node_hostname),
'node_hostname': node_hostname,
'node_fqdn': node_fqdn,
'node_domain': node_domain,
'node_id': node_id,
'coordinators': o_cluster.get('coordinators', list()),
'debug': o_base.get('debug', False),
}
config = {**config, **config_general}
# Get the functions configuration
try:
o_functions = o_config['pvc']['functions']
except Exception as e:
raise MalformedConfigurationError(e)
config_functions = {
'enable_hypervisor': o_functions.get('enable_hypervisor', False),
'enable_networking': o_functions.get('enable_networking', False),
'enable_storage': o_functions.get('enable_storage', False),
'enable_api': o_functions.get('enable_api', False),
}
config = {**config, **config_functions}
# Get the directory configuration
try:
o_directories = o_config['pvc']['system']['configuration']['directories']
except Exception as e:
raise MalformedConfigurationError(e)
config_directories = {
'dynamic_directory': o_directories.get('dynamic_directory', None),
'log_directory': o_directories.get('log_directory', None),
'console_log_directory': o_directories.get('console_log_directory', None),
}
# Define our dynamic directory schema
config_directories['dnsmasq_dynamic_directory'] = config_directories['dynamic_directory'] + '/dnsmasq'
config_directories['pdns_dynamic_directory'] = config_directories['dynamic_directory'] + '/pdns'
config_directories['nft_dynamic_directory'] = config_directories['dynamic_directory'] + '/nft'
# Define our log directory schema
config_directories['dnsmasq_log_directory'] = config_directories['log_directory'] + '/dnsmasq'
config_directories['pdns_log_directory'] = config_directories['log_directory'] + '/pdns'
config_directories['nft_log_directory'] = config_directories['log_directory'] + '/nft'
config = {**config, **config_directories}
# Get the logging configuration
try:
o_logging = o_config['pvc']['system']['configuration']['logging']
except Exception as e:
raise MalformedConfigurationError(e)
config_logging = {
'file_logging': o_logging.get('file_logging', False),
'stdout_logging': o_logging.get('stdout_logging', False),
'zookeeper_logging': o_logging.get('zookeeper_logging', False),
'log_colours': o_logging.get('log_colours', False),
'log_dates': o_logging.get('log_dates', False),
'log_keepalives': o_logging.get('log_keepalives', False),
'log_keepalive_cluster_details': o_logging.get('log_keepalive_cluster_details', False),
'log_keepalive_storage_details': o_logging.get('log_keepalive_storage_details', False),
'console_log_lines': o_logging.get('console_log_lines', False),
'node_log_lines': o_logging.get('node_log_lines', False),
}
config = {**config, **config_logging}
# Get the interval configuration
try:
o_intervals = o_config['pvc']['system']['intervals']
except Exception as e:
raise MalformedConfigurationError(e)
config_intervals = {
'vm_shutdown_timeout': int(o_intervals.get('vm_shutdown_timeout', 60)),
'keepalive_interval': int(o_intervals.get('keepalive_interval', 5)),
'fence_intervals': int(o_intervals.get('fence_intervals', 6)),
'suicide_intervals': int(o_intervals.get('suicide_interval', 0)),
}
config = {**config, **config_intervals}
# Get the fencing configuration
try:
o_fencing = o_config['pvc']['system']['fencing']
o_fencing_actions = o_fencing['actions']
o_fencing_ipmi = o_fencing['ipmi']
except Exception as e:
raise MalformedConfigurationError(e)
config_fencing = {
'successful_fence': o_fencing_actions.get('successful_fence', None),
'failed_fence': o_fencing_actions.get('failed_fence', None),
'ipmi_hostname': o_fencing_ipmi.get('host', f'{node_hostname}-lom.{node_domain}'),
'ipmi_username': o_fencing_ipmi.get('user', 'null'),
'ipmi_password': o_fencing_ipmi.get('pass', 'null'),
}
config = {**config, **config_fencing}
# Get the migration configuration
try:
o_migration = o_config['pvc']['system']['migration']
except Exception as e:
raise MalformedConfigurationError(e)
config_migration = {
'migration_target_selector': o_migration.get('target_selector', 'mem'),
}
config = {**config, **config_migration}
if config['enable_networking']:
# Get the node networks configuration
try:
o_networks = o_config['pvc']['cluster']['networks']
o_network_cluster = o_networks['cluster']
o_network_storage = o_networks['storage']
o_network_upstream = o_networks['upstream']
o_sysnetworks = o_config['pvc']['system']['configuration']['networking']
o_sysnetwork_cluster = o_sysnetworks['cluster']
o_sysnetwork_storage = o_sysnetworks['storage']
o_sysnetwork_upstream = o_sysnetworks['upstream']
except Exception as e:
raise MalformedConfigurationError(e)
config_networks = {
'cluster_domain': o_network_cluster.get('domain', None),
'cluster_network': o_network_cluster.get('network', None),
'cluster_floating_ip': o_network_cluster.get('floating_ip', None),
'cluster_dev': o_sysnetwork_cluster.get('device', None),
'cluster_mtu': o_sysnetwork_cluster.get('mtu', None),
'cluster_dev_ip': o_sysnetwork_cluster.get('address', None),
'storage_domain': o_network_storage.get('domain', None),
'storage_network': o_network_storage.get('network', None),
'storage_floating_ip': o_network_storage.get('floating_ip', None),
'storage_dev': o_sysnetwork_storage.get('device', None),
'storage_mtu': o_sysnetwork_storage.get('mtu', None),
'storage_dev_ip': o_sysnetwork_storage.get('address', None),
'upstream_domain': o_network_upstream.get('domain', None),
'upstream_network': o_network_upstream.get('network', None),
'upstream_floating_ip': o_network_upstream.get('floating_ip', None),
'upstream_gateway': o_network_upstream.get('gateway', None),
'upstream_dev': o_sysnetwork_upstream.get('device', None),
'upstream_mtu': o_sysnetwork_upstream.get('mtu', None),
'upstream_dev_ip': o_sysnetwork_upstream.get('address', None),
'bridge_dev': o_sysnetworks.get('bridge_device', None),
'enable_sriov': o_sysnetworks.get('sriov_enable', False),
'sriov_device': o_sysnetworks.get('sriov_device', list())
}
config = {**config, **config_networks}
for network_type in ['cluster', 'storage', 'upstream']:
result, msg = validate_floating_ip(config, network_type)
if not result:
raise MalformedConfigurationError(msg)
address_key = '{}_dev_ip'.format(network_type)
network_key = f'{network_type}_network'
network = ip_network(config[network_key])
# With autoselection of addresses, construct an IP from the relevant network
if config[address_key] == 'by-id':
# The NodeID starts at 1, but indexes start at 0
address_id = int(config['node_id']) - 1
# Grab the nth address from the network
config[address_key] = '{}/{}'.format(list(network.hosts())[address_id], network.prefixlen)
# Validate the provided IP instead
else:
try:
address = ip_address(config[address_key].split('/')[0])
if address not in list(network.hosts()):
raise
except Exception:
raise MalformedConfigurationError(
f'IP address {config[address_key]} for {address_key} is not valid'
)
# Get the PowerDNS aggregator database configuration
try:
o_pdnsdb = o_config['pvc']['coordinator']['dns']['database']
except Exception as e:
raise MalformedConfigurationError(e)
config_pdnsdb = {
'pdns_postgresql_host': o_pdnsdb.get('host', None),
'pdns_postgresql_port': o_pdnsdb.get('port', None),
'pdns_postgresql_dbname': o_pdnsdb.get('name', None),
'pdns_postgresql_user': o_pdnsdb.get('user', None),
'pdns_postgresql_password': o_pdnsdb.get('pass', None),
}
config = {**config, **config_pdnsdb}
# Get the Cloud-Init Metadata database configuration
try:
o_metadatadb = o_config['pvc']['coordinator']['metadata']['database']
except Exception as e:
raise MalformedConfigurationError(e)
config_metadatadb = {
'metadata_postgresql_host': o_metadatadb.get('host', None),
'metadata_postgresql_port': o_metadatadb.get('port', None),
'metadata_postgresql_dbname': o_metadatadb.get('name', None),
'metadata_postgresql_user': o_metadatadb.get('user', None),
'metadata_postgresql_password': o_metadatadb.get('pass', None),
}
config = {**config, **config_metadatadb}
if config['enable_storage']:
# Get the storage configuration
try:
o_storage = o_config['pvc']['system']['configuration']['storage']
except Exception as e:
raise MalformedConfigurationError(e)
config_storage = {
'ceph_config_file': o_storage.get('ceph_config_file', None),
'ceph_admin_keyring': o_storage.get('ceph_admin_keyring', None),
}
config = {**config, **config_storage}
# Add our node static data to the config
config['static_data'] = get_static_data()
return config
def validate_directories(config):
if not os.path.exists(config['dynamic_directory']):
os.makedirs(config['dynamic_directory'])
os.makedirs(config['dnsmasq_dynamic_directory'])
os.makedirs(config['pdns_dynamic_directory'])
os.makedirs(config['nft_dynamic_directory'])
if not os.path.exists(config['log_directory']):
os.makedirs(config['log_directory'])
os.makedirs(config['dnsmasq_log_directory'])
os.makedirs(config['pdns_log_directory'])
os.makedirs(config['nft_log_directory'])

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# fencing.py - PVC daemon function library, node fencing functions
# fencing.py - Utility functions for pvcnoded fencing
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
@ -22,13 +22,14 @@
import time
import daemon_lib.common as common
import pvcnoded.VMInstance as VMInstance
from pvcnoded.objects.VMInstance import VMInstance
#
# Fence thread entry function
#
def fenceNode(node_name, zkhandler, config, logger):
def fence_node(node_name, zkhandler, config, logger):
# We allow exactly 6 saving throws (30 seconds) for the host to come back online or we kill it
failcount_limit = 6
failcount = 0
@ -40,13 +41,13 @@ def fenceNode(node_name, zkhandler, config, logger):
# Is it still 'dead'
if node_daemon_state == 'dead':
failcount += 1
logger.out('Node "{}" failed {}/{} saving throws'.format(node_name, failcount, failcount_limit), state='w')
logger.out('Node "{}" failed {}/{} saving throws'.format(node_name, failcount, failcount_limit), state='s')
# It changed back to something else so it must be alive
else:
logger.out('Node "{}" passed a saving throw; canceling fence'.format(node_name), state='o')
return
logger.out('Fencing node "{}" via IPMI reboot signal'.format(node_name), state='w')
logger.out('Fencing node "{}" via IPMI reboot signal'.format(node_name), state='s')
# Get IPMI information
ipmi_hostname = zkhandler.read(('node.ipmi.hostname', node_name))
@ -54,7 +55,7 @@ def fenceNode(node_name, zkhandler, config, logger):
ipmi_password = zkhandler.read(('node.ipmi.password', node_name))
# Shoot it in the head
fence_status = rebootViaIPMI(ipmi_hostname, ipmi_username, ipmi_password, logger)
fence_status = reboot_via_ipmi(ipmi_hostname, ipmi_username, ipmi_password, logger)
# Hold to ensure the fence takes effect and system stabilizes
time.sleep(config['keepalive_interval'] * 2)
@ -123,7 +124,7 @@ def migrateFromFencedNode(zkhandler, node_name, config, logger):
#
# Perform an IPMI fence
#
def rebootViaIPMI(ipmi_hostname, ipmi_user, ipmi_password, logger):
def reboot_via_ipmi(ipmi_hostname, ipmi_user, ipmi_password, logger):
# Forcibly reboot the node
ipmi_command_reset = '/usr/bin/ipmitool -I lanplus -H {} -U {} -P {} chassis power reset'.format(
ipmi_hostname, ipmi_user, ipmi_password
@ -131,8 +132,7 @@ def rebootViaIPMI(ipmi_hostname, ipmi_user, ipmi_password, logger):
ipmi_reset_retcode, ipmi_reset_stdout, ipmi_reset_stderr = common.run_os_command(ipmi_command_reset)
if ipmi_reset_retcode != 0:
logger.out('Failed to reboot dead node', state='e')
print(ipmi_reset_stderr)
logger.out(f'Failed to reboot dead node: {ipmi_reset_stderr}', state='e')
time.sleep(1)
@ -178,12 +178,10 @@ def rebootViaIPMI(ipmi_hostname, ipmi_user, ipmi_password, logger):
#
# Verify that IPMI connectivity to this host exists (used during node init)
#
def verifyIPMI(ipmi_hostname, ipmi_user, ipmi_password):
ipmi_command_status = '/usr/bin/ipmitool -I lanplus -H {} -U {} -P {} chassis power status'.format(
ipmi_hostname, ipmi_user, ipmi_password
)
ipmi_status_retcode, ipmi_status_stdout, ipmi_status_stderr = common.run_os_command(ipmi_command_status, timeout=2)
if ipmi_status_retcode == 0 and ipmi_status_stdout != "Chassis Power is on":
def verify_ipmi(ipmi_hostname, ipmi_user, ipmi_password):
ipmi_command = f'/usr/bin/ipmitool -I lanplus -H {ipmi_hostname} -U {ipmi_user} -P {ipmi_password} chassis power status'
retcode, stdout, stderr = common.run_os_command(ipmi_command, timeout=2)
if retcode == 0 and stdout != "Chassis Power is on":
return True
else:
return False

View File

@ -0,0 +1,718 @@
#!/usr/bin/env python3
# keepalive.py - Utility functions for pvcnoded Keepalives
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvcnoded.util.fencing
import daemon_lib.common as common
from apscheduler.schedulers.background import BackgroundScheduler
from rados import Rados
from xml.etree import ElementTree
from queue import Queue
from threading import Thread
from datetime import datetime
import json
import re
import libvirt
import psutil
import os
import time
# State table for pretty stats
libvirt_vm_states = {
0: "NOSTATE",
1: "RUNNING",
2: "BLOCKED",
3: "PAUSED",
4: "SHUTDOWN",
5: "SHUTOFF",
6: "CRASHED",
7: "PMSUSPENDED"
}
def start_keepalive_timer(logger, config, zkhandler, this_node):
keepalive_interval = config['keepalive_interval']
logger.out(f'Starting keepalive timer ({keepalive_interval} second interval)', state='s')
keepalive_timer = BackgroundScheduler()
keepalive_timer.add_job(
node_keepalive,
args=(logger, config, zkhandler, this_node),
trigger='interval',
seconds=keepalive_interval)
keepalive_timer.start()
return keepalive_timer
def stop_keepalive_timer(logger, keepalive_timer):
try:
keepalive_timer.shutdown()
logger.out('Stopping keepalive timer', state='s')
except Exception:
logger.out('Failed to stop keepalive timer', state='w')
# Ceph stats update function
def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
pool_list = zkhandler.children('base.pool')
osd_list = zkhandler.children('base.osd')
debug = config['debug']
if debug:
logger.out("Thread starting", state='d', prefix='ceph-thread')
# Connect to the Ceph cluster
try:
ceph_conn = Rados(conffile=config['ceph_config_file'], conf=dict(keyring=config['ceph_admin_keyring']))
if debug:
logger.out("Connecting to cluster", state='d', prefix='ceph-thread')
ceph_conn.connect(timeout=1)
except Exception as e:
logger.out('Failed to open connection to Ceph cluster: {}'.format(e), state='e')
return
if debug:
logger.out("Getting health stats from monitor", state='d', prefix='ceph-thread')
# Get Ceph cluster health for local status output
command = {"prefix": "health", "format": "json"}
try:
health_status = json.loads(ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1])
ceph_health = health_status['status']
except Exception as e:
logger.out('Failed to obtain Ceph health data: {}'.format(e), state='e')
ceph_health = 'HEALTH_UNKN'
if ceph_health in ['HEALTH_OK']:
ceph_health_colour = logger.fmt_green
elif ceph_health in ['HEALTH_UNKN']:
ceph_health_colour = logger.fmt_cyan
elif ceph_health in ['HEALTH_WARN']:
ceph_health_colour = logger.fmt_yellow
else:
ceph_health_colour = logger.fmt_red
# Primary-only functions
if this_node.router_state == 'primary':
if debug:
logger.out("Set ceph health information in zookeeper (primary only)", state='d', prefix='ceph-thread')
command = {"prefix": "status", "format": "pretty"}
ceph_status = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii')
try:
zkhandler.write([
('base.storage', str(ceph_status))
])
except Exception as e:
logger.out('Failed to set Ceph status data: {}'.format(e), state='e')
if debug:
logger.out("Set ceph rados df information in zookeeper (primary only)", state='d', prefix='ceph-thread')
# Get rados df info
command = {"prefix": "df", "format": "pretty"}
ceph_df = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii')
try:
zkhandler.write([
('base.storage.util', str(ceph_df))
])
except Exception as e:
logger.out('Failed to set Ceph utilization data: {}'.format(e), state='e')
if debug:
logger.out("Set pool information in zookeeper (primary only)", state='d', prefix='ceph-thread')
# Get pool info
command = {"prefix": "df", "format": "json"}
ceph_df_output = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii')
try:
ceph_pool_df_raw = json.loads(ceph_df_output)['pools']
except Exception as e:
logger.out('Failed to obtain Pool data (ceph df): {}'.format(e), state='w')
ceph_pool_df_raw = []
retcode, stdout, stderr = common.run_os_command('rados df --format json', timeout=1)
try:
rados_pool_df_raw = json.loads(stdout)['pools']
except Exception as e:
logger.out('Failed to obtain Pool data (rados df): {}'.format(e), state='w')
rados_pool_df_raw = []
pool_count = len(ceph_pool_df_raw)
if debug:
logger.out("Getting info for {} pools".format(pool_count), state='d', prefix='ceph-thread')
for pool_idx in range(0, pool_count):
try:
# Combine all the data for this pool
ceph_pool_df = ceph_pool_df_raw[pool_idx]
rados_pool_df = rados_pool_df_raw[pool_idx]
pool = ceph_pool_df
pool.update(rados_pool_df)
# Ignore any pools that aren't in our pool list
if pool['name'] not in pool_list:
if debug:
logger.out("Pool {} not in pool list {}".format(pool['name'], pool_list), state='d', prefix='ceph-thread')
continue
else:
if debug:
logger.out("Parsing data for pool {}".format(pool['name']), state='d', prefix='ceph-thread')
# Assemble a useful data structure
pool_df = {
'id': pool['id'],
'stored_bytes': pool['stats']['stored'],
'free_bytes': pool['stats']['max_avail'],
'used_bytes': pool['stats']['bytes_used'],
'used_percent': pool['stats']['percent_used'],
'num_objects': pool['stats']['objects'],
'num_object_clones': pool['num_object_clones'],
'num_object_copies': pool['num_object_copies'],
'num_objects_missing_on_primary': pool['num_objects_missing_on_primary'],
'num_objects_unfound': pool['num_objects_unfound'],
'num_objects_degraded': pool['num_objects_degraded'],
'read_ops': pool['read_ops'],
'read_bytes': pool['read_bytes'],
'write_ops': pool['write_ops'],
'write_bytes': pool['write_bytes']
}
# Write the pool data to Zookeeper
zkhandler.write([
(('pool.stats', pool['name']), str(json.dumps(pool_df)))
])
except Exception as e:
# One or more of the status commands timed out, just continue
logger.out('Failed to format and send pool data: {}'.format(e), state='w')
pass
# Only grab OSD stats if there are OSDs to grab (otherwise `ceph osd df` hangs)
osds_this_node = 0
if len(osd_list) > 0:
# Get data from Ceph OSDs
if debug:
logger.out("Get data from Ceph OSDs", state='d', prefix='ceph-thread')
# Parse the dump data
osd_dump = dict()
command = {"prefix": "osd dump", "format": "json"}
osd_dump_output = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii')
try:
osd_dump_raw = json.loads(osd_dump_output)['osds']
except Exception as e:
logger.out('Failed to obtain OSD data: {}'.format(e), state='w')
osd_dump_raw = []
if debug:
logger.out("Loop through OSD dump", state='d', prefix='ceph-thread')
for osd in osd_dump_raw:
osd_dump.update({
str(osd['osd']): {
'uuid': osd['uuid'],
'up': osd['up'],
'in': osd['in'],
'primary_affinity': osd['primary_affinity']
}
})
# Parse the df data
if debug:
logger.out("Parse the OSD df data", state='d', prefix='ceph-thread')
osd_df = dict()
command = {"prefix": "osd df", "format": "json"}
try:
osd_df_raw = json.loads(ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1])['nodes']
except Exception as e:
logger.out('Failed to obtain OSD data: {}'.format(e), state='w')
osd_df_raw = []
if debug:
logger.out("Loop through OSD df", state='d', prefix='ceph-thread')
for osd in osd_df_raw:
osd_df.update({
str(osd['id']): {
'utilization': osd['utilization'],
'var': osd['var'],
'pgs': osd['pgs'],
'kb': osd['kb'],
'weight': osd['crush_weight'],
'reweight': osd['reweight'],
}
})
# Parse the status data
if debug:
logger.out("Parse the OSD status data", state='d', prefix='ceph-thread')
osd_status = dict()
command = {"prefix": "osd status", "format": "pretty"}
try:
osd_status_raw = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii')
except Exception as e:
logger.out('Failed to obtain OSD status data: {}'.format(e), state='w')
osd_status_raw = []
if debug:
logger.out("Loop through OSD status data", state='d', prefix='ceph-thread')
for line in osd_status_raw.split('\n'):
# Strip off colour
line = re.sub(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))', '', line)
# Split it for parsing
line = line.split()
if len(line) > 1 and line[1].isdigit():
# This is an OSD line so parse it
osd_id = line[1]
node = line[3].split('.')[0]
used = line[5]
avail = line[7]
wr_ops = line[9]
wr_data = line[11]
rd_ops = line[13]
rd_data = line[15]
state = line[17]
osd_status.update({
str(osd_id): {
'node': node,
'used': used,
'avail': avail,
'wr_ops': wr_ops,
'wr_data': wr_data,
'rd_ops': rd_ops,
'rd_data': rd_data,
'state': state
}
})
# Merge them together into a single meaningful dict
if debug:
logger.out("Merge OSD data together", state='d', prefix='ceph-thread')
osd_stats = dict()
for osd in osd_list:
if zkhandler.read(('osd.node', osd)) == config['node_hostname']:
osds_this_node += 1
try:
this_dump = osd_dump[osd]
this_dump.update(osd_df[osd])
this_dump.update(osd_status[osd])
osd_stats[osd] = this_dump
except KeyError as e:
# One or more of the status commands timed out, just continue
logger.out('Failed to parse OSD stats into dictionary: {}'.format(e), state='w')
# Upload OSD data for the cluster (primary-only)
if this_node.router_state == 'primary':
if debug:
logger.out("Trigger updates for each OSD", state='d', prefix='ceph-thread')
for osd in osd_list:
try:
stats = json.dumps(osd_stats[osd])
zkhandler.write([
(('osd.stats', osd), str(stats))
])
except KeyError as e:
# One or more of the status commands timed out, just continue
logger.out('Failed to upload OSD stats from dictionary: {}'.format(e), state='w')
ceph_conn.shutdown()
queue.put(ceph_health_colour)
queue.put(ceph_health)
queue.put(osds_this_node)
if debug:
logger.out("Thread finished", state='d', prefix='ceph-thread')
# VM stats update function
def collect_vm_stats(logger, config, zkhandler, this_node, queue):
debug = config['debug']
if debug:
logger.out("Thread starting", state='d', prefix='vm-thread')
# Connect to libvirt
libvirt_name = "qemu:///system"
if debug:
logger.out("Connecting to libvirt", state='d', prefix='vm-thread')
lv_conn = libvirt.open(libvirt_name)
if lv_conn is None:
logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e')
memalloc = 0
memprov = 0
vcpualloc = 0
# Toggle state management of dead VMs to restart them
if debug:
logger.out("Toggle state management of dead VMs to restart them", state='d', prefix='vm-thread')
# Make a copy of the d_domain; if not, and it changes in flight, this can fail
fixed_d_domain = this_node.d_domain.copy()
for domain, instance in fixed_d_domain.items():
if domain in this_node.domain_list:
# Add the allocated memory to our memalloc value
memalloc += instance.getmemory()
memprov += instance.getmemory()
vcpualloc += instance.getvcpus()
if instance.getstate() == 'start' and instance.getnode() == this_node.name:
if instance.getdom() is not None:
try:
if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
logger.out("VM {} has failed".format(instance.domname), state='w', prefix='vm-thread')
raise
except Exception:
# Toggle a state "change"
logger.out("Resetting state to {} for VM {}".format(instance.getstate(), instance.domname), state='i', prefix='vm-thread')
zkhandler.write([
(('domain.state', domain), instance.getstate())
])
elif instance.getnode() == this_node.name:
memprov += instance.getmemory()
# Get list of running domains from Libvirt
running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE)
# Get statistics from any running VMs
for domain in running_domains:
try:
# Get basic information about the VM
tree = ElementTree.fromstring(domain.XMLDesc())
domain_uuid = domain.UUIDString()
domain_name = domain.name()
# Get all the raw information about the VM
if debug:
logger.out("Getting general statistics for VM {}".format(domain_name), state='d', prefix='vm-thread')
domain_state, domain_maxmem, domain_mem, domain_vcpus, domain_cputime = domain.info()
# We can't properly gather stats from a non-running VMs so continue
if domain_state != libvirt.VIR_DOMAIN_RUNNING:
continue
domain_memory_stats = domain.memoryStats()
domain_cpu_stats = domain.getCPUStats(True)[0]
except Exception as e:
if debug:
try:
logger.out("Failed getting VM information for {}: {}".format(domain.name(), e), state='d', prefix='vm-thread')
except Exception:
pass
continue
# Ensure VM is present in the domain_list
if domain_uuid not in this_node.domain_list:
this_node.domain_list.append(domain_uuid)
if debug:
logger.out("Getting disk statistics for VM {}".format(domain_name), state='d', prefix='vm-thread')
domain_disk_stats = []
for disk in tree.findall('devices/disk'):
disk_name = disk.find('source').get('name')
if not disk_name:
disk_name = disk.find('source').get('file')
disk_stats = domain.blockStats(disk.find('target').get('dev'))
domain_disk_stats.append({
"name": disk_name,
"rd_req": disk_stats[0],
"rd_bytes": disk_stats[1],
"wr_req": disk_stats[2],
"wr_bytes": disk_stats[3],
"err": disk_stats[4]
})
if debug:
logger.out("Getting network statistics for VM {}".format(domain_name), state='d', prefix='vm-thread')
domain_network_stats = []
for interface in tree.findall('devices/interface'):
interface_type = interface.get('type')
if interface_type not in ['bridge']:
continue
interface_name = interface.find('target').get('dev')
interface_bridge = interface.find('source').get('bridge')
interface_stats = domain.interfaceStats(interface_name)
domain_network_stats.append({
"name": interface_name,
"bridge": interface_bridge,
"rd_bytes": interface_stats[0],
"rd_packets": interface_stats[1],
"rd_errors": interface_stats[2],
"rd_drops": interface_stats[3],
"wr_bytes": interface_stats[4],
"wr_packets": interface_stats[5],
"wr_errors": interface_stats[6],
"wr_drops": interface_stats[7]
})
# Create the final dictionary
domain_stats = {
"state": libvirt_vm_states[domain_state],
"maxmem": domain_maxmem,
"livemem": domain_mem,
"cpus": domain_vcpus,
"cputime": domain_cputime,
"mem_stats": domain_memory_stats,
"cpu_stats": domain_cpu_stats,
"disk_stats": domain_disk_stats,
"net_stats": domain_network_stats
}
if debug:
logger.out("Writing statistics for VM {} to Zookeeper".format(domain_name), state='d', prefix='vm-thread')
try:
zkhandler.write([
(('domain.stats', domain_uuid), str(json.dumps(domain_stats)))
])
except Exception as e:
if debug:
logger.out("{}".format(e), state='d', prefix='vm-thread')
# Close the Libvirt connection
lv_conn.close()
queue.put(len(running_domains))
queue.put(memalloc)
queue.put(memprov)
queue.put(vcpualloc)
if debug:
logger.out("Thread finished", state='d', prefix='vm-thread')
# Keepalive update function
def node_keepalive(logger, config, zkhandler, this_node):
debug = config['debug']
if debug:
logger.out("Keepalive starting", state='d', prefix='main-thread')
# Set the migration selector in Zookeeper for clients to read
if config['enable_hypervisor']:
if this_node.router_state == 'primary':
try:
if zkhandler.read('base.config.migration_target_selector') != config['migration_target_selector']:
raise
except Exception:
zkhandler.write([
('base.config.migration_target_selector', config['migration_target_selector'])
])
# Set the upstream IP in Zookeeper for clients to read
if config['enable_networking']:
if this_node.router_state == 'primary':
try:
if zkhandler.read('base.config.upstream_ip') != config['upstream_floating_ip']:
raise
except Exception:
zkhandler.write([
('base.config.upstream_ip', config['upstream_floating_ip'])
])
# Get past state and update if needed
if debug:
logger.out("Get past state and update if needed", state='d', prefix='main-thread')
past_state = zkhandler.read(('node.state.daemon', this_node.name))
if past_state != 'run' and past_state != 'shutdown':
this_node.daemon_state = 'run'
zkhandler.write([
(('node.state.daemon', this_node.name), 'run')
])
else:
this_node.daemon_state = 'run'
# Ensure the primary key is properly set
if debug:
logger.out("Ensure the primary key is properly set", state='d', prefix='main-thread')
if this_node.router_state == 'primary':
if zkhandler.read('base.config.primary_node') != this_node.name:
zkhandler.write([
('base.config.primary_node', this_node.name)
])
# Run VM statistics collection in separate thread for parallelization
if config['enable_hypervisor']:
vm_thread_queue = Queue()
vm_stats_thread = Thread(target=collect_vm_stats, args=(logger, config, zkhandler, this_node, vm_thread_queue), kwargs={})
vm_stats_thread.start()
# Run Ceph status collection in separate thread for parallelization
if config['enable_storage']:
ceph_thread_queue = Queue()
ceph_stats_thread = Thread(target=collect_ceph_stats, args=(logger, config, zkhandler, this_node, ceph_thread_queue), kwargs={})
ceph_stats_thread.start()
# Get node performance statistics
this_node.memtotal = int(psutil.virtual_memory().total / 1024 / 1024)
this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024)
this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024)
this_node.cpuload = os.getloadavg()[0]
# Join against running threads
if config['enable_hypervisor']:
vm_stats_thread.join(timeout=4.0)
if vm_stats_thread.is_alive():
logger.out('VM stats gathering exceeded 4s timeout, continuing', state='w')
if config['enable_storage']:
ceph_stats_thread.join(timeout=4.0)
if ceph_stats_thread.is_alive():
logger.out('Ceph stats gathering exceeded 4s timeout, continuing', state='w')
# Get information from thread queues
if config['enable_hypervisor']:
try:
this_node.domains_count = vm_thread_queue.get()
this_node.memalloc = vm_thread_queue.get()
this_node.memprov = vm_thread_queue.get()
this_node.vcpualloc = vm_thread_queue.get()
except Exception:
pass
else:
this_node.domains_count = 0
this_node.memalloc = 0
this_node.memprov = 0
this_node.vcpualloc = 0
if config['enable_storage']:
try:
ceph_health_colour = ceph_thread_queue.get()
ceph_health = ceph_thread_queue.get()
osds_this_node = ceph_thread_queue.get()
except Exception:
ceph_health_colour = logger.fmt_cyan
ceph_health = 'UNKNOWN'
osds_this_node = '?'
# Set our information in zookeeper
keepalive_time = int(time.time())
if debug:
logger.out("Set our information in zookeeper", state='d', prefix='main-thread')
try:
zkhandler.write([
(('node.memory.total', this_node.name), str(this_node.memtotal)),
(('node.memory.used', this_node.name), str(this_node.memused)),
(('node.memory.free', this_node.name), str(this_node.memfree)),
(('node.memory.allocated', this_node.name), str(this_node.memalloc)),
(('node.memory.provisioned', this_node.name), str(this_node.memprov)),
(('node.vcpu.allocated', this_node.name), str(this_node.vcpualloc)),
(('node.cpu.load', this_node.name), str(this_node.cpuload)),
(('node.count.provisioned_domains', this_node.name), str(this_node.domains_count)),
(('node.running_domains', this_node.name), ' '.join(this_node.domain_list)),
(('node.keepalive', this_node.name), str(keepalive_time)),
])
except Exception:
logger.out('Failed to set keepalive data', state='e')
# Display node information to the terminal
if config['log_keepalives']:
if this_node.router_state == 'primary':
cst_colour = logger.fmt_green
elif this_node.router_state == 'secondary':
cst_colour = logger.fmt_blue
else:
cst_colour = logger.fmt_cyan
logger.out(
'{}{} keepalive @ {}{} [{}{}{}]'.format(
logger.fmt_purple,
config['node_hostname'],
datetime.now(),
logger.fmt_end,
logger.fmt_bold + cst_colour,
this_node.router_state,
logger.fmt_end
),
state='t'
)
if config['log_keepalive_cluster_details']:
logger.out(
'{bold}Maintenance:{nofmt} {maint} '
'{bold}Active VMs:{nofmt} {domcount} '
'{bold}Networks:{nofmt} {netcount} '
'{bold}Load:{nofmt} {load} '
'{bold}Memory [MiB]: VMs:{nofmt} {allocmem} '
'{bold}Used:{nofmt} {usedmem} '
'{bold}Free:{nofmt} {freemem}'.format(
bold=logger.fmt_bold,
nofmt=logger.fmt_end,
maint=this_node.maintenance,
domcount=this_node.domains_count,
netcount=len(zkhandler.children('base.network')),
load=this_node.cpuload,
freemem=this_node.memfree,
usedmem=this_node.memused,
allocmem=this_node.memalloc,
),
state='t'
)
if config['enable_storage'] and config['log_keepalive_storage_details']:
logger.out(
'{bold}Ceph cluster status:{nofmt} {health_colour}{health}{nofmt} '
'{bold}Total OSDs:{nofmt} {total_osds} '
'{bold}Node OSDs:{nofmt} {node_osds} '
'{bold}Pools:{nofmt} {total_pools} '.format(
bold=logger.fmt_bold,
health_colour=ceph_health_colour,
nofmt=logger.fmt_end,
health=ceph_health,
total_osds=len(zkhandler.children('base.osd')),
node_osds=osds_this_node,
total_pools=len(zkhandler.children('base.pool'))
),
state='t'
)
# Look for dead nodes and fence them
if not this_node.maintenance:
if debug:
logger.out("Look for dead nodes and fence them", state='d', prefix='main-thread')
if config['daemon_mode'] == 'coordinator':
for node_name in zkhandler.children('base.node'):
try:
node_daemon_state = zkhandler.read(('node.state.daemon', node_name))
node_keepalive = int(zkhandler.read(('node.keepalive', node_name)))
except Exception:
node_daemon_state = 'unknown'
node_keepalive = 0
# Handle deadtime and fencng if needed
# (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds
# out-of-date while in 'start' state)
node_deadtime = int(time.time()) - (int(config['keepalive_interval']) * int(config['fence_intervals']))
if node_keepalive < node_deadtime and node_daemon_state == 'run':
logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w')
zk_lock = zkhandler.writelock(('node.state.daemon', node_name))
with zk_lock:
# Ensures that, if we lost the lock race and come out of waiting,
# we won't try to trigger our own fence thread.
if zkhandler.read(('node.state.daemon', node_name)) != 'dead':
fence_thread = Thread(target=pvcnoded.util.fencing.fence_node, args=(node_name, zkhandler, config, logger), kwargs={})
fence_thread.start()
# Write the updated data after we start the fence thread
zkhandler.write([
(('node.state.daemon', node_name), 'dead')
])
if debug:
logger.out("Keepalive finished", state='d', prefix='main-thread')

View File

@ -0,0 +1,36 @@
#!/usr/bin/env python3
# libvirt.py - Utility functions for pvcnoded libvirt
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import libvirt
def validate_libvirtd(logger, config):
if config['enable_hypervisor']:
libvirt_check_name = f'qemu+tcp://{config["node_hostname"]}/system'
logger.out(f'Connecting to Libvirt daemon at {libvirt_check_name}', state='i')
try:
lv_conn = libvirt.open(libvirt_check_name)
lv_conn.close()
except Exception as e:
logger.out(f'Failed to connect to Libvirt daemon: {e}', state='e')
return False
return True

View File

@ -0,0 +1,181 @@
#!/usr/bin/env python3
# networking.py - Utility functions for pvcnoded networking
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import daemon_lib.common as common
from time import sleep
from os import makedirs
def setup_sriov(logger, config):
logger.out('Setting up SR-IOV device support', state='i')
# Enable unsafe interrupts for the vfio_iommu_type1 kernel module
try:
common.run_os_command('modprobe vfio_iommu_type1 allow_unsafe_interrupts=1')
with open('/sys/module/vfio_iommu_type1/parameters/allow_unsafe_interrupts', 'w') as mfh:
mfh.write('Y')
except Exception:
logger.out('Failed to enable vfio_iommu_type1 kernel module; SR-IOV may fail', state='w')
# Loop through our SR-IOV NICs and enable the numvfs for each
for device in config['sriov_device']:
logger.out(f'Preparing SR-IOV PF {device["phy"]} with {device["vfcount"]} VFs', state='i')
try:
with open(f'/sys/class/net/{device["phy"]}/device/sriov_numvfs', 'r') as vfh:
current_vf_count = vfh.read().strip()
with open(f'/sys/class/net/{device["phy"]}/device/sriov_numvfs', 'w') as vfh:
vfh.write(str(device['vfcount']))
except FileNotFoundError:
logger.out(f'Failed to open SR-IOV configuration for PF {device["phy"]}; device may not support SR-IOV', state='w')
except OSError:
logger.out(f'Failed to set SR-IOV VF count for PF {device["phy"]} to {device["vfcount"]}; already set to {current_vf_count}', state='w')
if device.get('mtu', None) is not None:
logger.out(f'Setting SR-IOV PF {device["phy"]} to MTU {device["mtu"]}', state='i')
common.run_os_command(f'ip link set {device["phy"]} mtu {device["mtu"]} up')
def setup_interfaces(logger, config):
# Set up the Cluster interface
cluster_dev = config['cluster_dev']
cluster_mtu = config['cluster_mtu']
cluster_dev_ip = config['cluster_dev_ip']
logger.out(f'Setting up Cluster network interface {cluster_dev} with MTU {cluster_mtu}', state='i')
common.run_os_command(f'ip link set {cluster_dev} mtu {cluster_mtu} up')
logger.out(f'Setting up Cluster network bridge on interface {cluster_dev} with IP {cluster_dev_ip}', state='i')
common.run_os_command(f'brctl addbr brcluster')
common.run_os_command(f'brctl addif brcluster {cluster_dev}')
common.run_os_command(f'ip link set brcluster mtu {cluster_mtu} up')
common.run_os_command(f'ip address add {cluster_dev_ip} dev brcluster')
# Set up the Storage interface
storage_dev = config['storage_dev']
storage_mtu = config['storage_mtu']
storage_dev_ip = config['storage_dev_ip']
logger.out(f'Setting up Storage network interface {storage_dev} with MTU {storage_mtu}', state='i')
common.run_os_command(f'ip link set {storage_dev} mtu {storage_mtu} up')
if storage_dev == cluster_dev:
if storage_dev_ip != cluster_dev_ip:
logger.out(f'Setting up Storage network on Cluster network bridge with IP {storage_dev_ip}', state='i')
common.run_os_command(f'ip address add {storage_dev_ip} dev brcluster')
else:
logger.out(f'Setting up Storage network bridge on interface {storage_dev} with IP {storage_dev_ip}', state='i')
common.run_os_command(f'brctl addbr brstorage')
common.run_os_command(f'brctl addif brstorage {storage_dev}')
common.run_os_command(f'ip link set brstorage mtu {storage_mtu} up')
common.run_os_command(f'ip address add {storage_dev_ip} dev brstorage')
# Set up the Upstream interface
upstream_dev = config['upstream_dev']
upstream_mtu = config['upstream_mtu']
upstream_dev_ip = config['upstream_dev_ip']
logger.out(f'Setting up Upstream network interface {upstream_dev} with MTU {upstream_mtu}', state='i')
if upstream_dev == cluster_dev:
if upstream_dev_ip != cluster_dev_ip:
logger.out(f'Setting up Upstream network on Cluster network bridge with IP {upstream_dev_ip}', state='i')
common.run_os_command(f'ip address add {upstream_dev_ip} dev brcluster')
else:
logger.out(f'Setting up Upstream network bridge on interface {upstream_dev} with IP {upstream_dev_ip}', state='i')
common.run_os_command(f'brctl addbr brupstream')
common.run_os_command(f'brctl addif brupstream {upstream_dev}')
common.run_os_command(f'ip link set brupstream mtu {upstream_mtu} up')
common.run_os_command(f'ip address add {upstream_dev_ip} dev brupstream')
upstream_gateway = config['upstream_gateway']
if upstream_gateway is not None:
logger.out(f'Setting up Upstream networok default gateway IP {upstream_gateway}', state='i')
if upstream_dev == cluster_dev:
common.run_os_command(f'ip route add default via {upstream_gateway} dev brcluster')
else:
common.run_os_command(f'ip route add default via {upstream_gateway} dev brupstream')
# Set up sysctl tweaks to optimize networking
# Enable routing functions
common.run_os_command('sysctl net.ipv4.ip_forward=1')
common.run_os_command('sysctl net.ipv6.ip_forward=1')
# Enable send redirects
common.run_os_command('sysctl net.ipv4.conf.all.send_redirects=1')
common.run_os_command('sysctl net.ipv4.conf.default.send_redirects=1')
common.run_os_command('sysctl net.ipv6.conf.all.send_redirects=1')
common.run_os_command('sysctl net.ipv6.conf.default.send_redirects=1')
# Accept source routes
common.run_os_command('sysctl net.ipv4.conf.all.accept_source_route=1')
common.run_os_command('sysctl net.ipv4.conf.default.accept_source_route=1')
common.run_os_command('sysctl net.ipv6.conf.all.accept_source_route=1')
common.run_os_command('sysctl net.ipv6.conf.default.accept_source_route=1')
# Disable RP filtering on Cluster and Upstream interfaces (to allow traffic pivoting)
common.run_os_command(f'sysctl net.ipv4.conf.{cluster_dev}.rp_filter=0')
common.run_os_command(f'sysctl net.ipv4.conf.brcluster.rp_filter=0')
common.run_os_command(f'sysctl net.ipv4.conf.{upstream_dev}.rp_filter=0')
common.run_os_command(f'sysctl net.ipv4.conf.brupstream.rp_filter=0')
common.run_os_command(f'sysctl net.ipv6.conf.{cluster_dev}.rp_filter=0')
common.run_os_command(f'sysctl net.ipv6.conf.brcluster.rp_filter=0')
common.run_os_command(f'sysctl net.ipv6.conf.{upstream_dev}.rp_filter=0')
common.run_os_command(f'sysctl net.ipv6.conf.brupstream.rp_filter=0')
# Stop DNSMasq if it is running
common.run_os_command('systemctl stop dnsmasq.service')
logger.out('Waiting 3 seconds for networking to come up', state='s')
sleep(3)
def create_nft_configuration(logger, config):
if config['enable_networking']:
logger.out('Creating NFT firewall configuration', state='i')
dynamic_directory = config['nft_dynamic_directory']
# Create directories
makedirs(f'{dynamic_directory}/networks', exist_ok=True)
makedirs(f'{dynamic_directory}/static', exist_ok=True)
# Set up the base rules
nftables_base_rules = f"""# Base rules
flush ruleset
# Add the filter table and chains
add table inet filter
add chain inet filter forward {{ type filter hook forward priority 0; }}
add chain inet filter input {{ type filter hook input priority 0; }}
# Include static rules and network rules
include "{dynamic_directory}/static/*"
include "{dynamic_directory}/networks/*"
"""
# Write the base firewall config
nftables_base_filename = f'{dynamic_directory}/base.nft'
with open(nftables_base_filename, 'w') as nftfh:
nftfh.write(nftables_base_rules)
common.reload_firewall_rules(nftables_base_filename, logger)

View File

@ -0,0 +1,77 @@
#!/usr/bin/env python3
# services.py - Utility functions for pvcnoded external services
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import daemon_lib.common as common
from time import sleep
def start_zookeeper(logger, config):
if config['daemon_mode'] == 'coordinator':
logger.out('Starting Zookeeper daemon', state='i')
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command('systemctl start zookeeper.service')
def start_libvirtd(logger, config):
if config['enable_hypervisor']:
logger.out('Starting Libvirt daemon', state='i')
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command('systemctl start libvirtd.service')
def start_patroni(logger, config):
if config['enable_networking'] and config['daemon_mode'] == 'coordinator':
logger.out('Starting Patroni daemon', state='i')
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command('systemctl start patroni.service')
def start_frrouting(logger, config):
if config['enable_networking'] and config['daemon_mode'] == 'coordinator':
logger.out('Starting FRRouting daemon', state='i')
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command('systemctl start frr.service')
def start_ceph_mon(logger, config):
if config['enable_storage'] and config['daemon_mode'] == 'coordinator':
logger.out('Starting Ceph Monitor daemon', state='i')
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command(f'systemctl start ceph-mon@{config["node_hostname"]}.service')
def start_ceph_mgr(logger, config):
if config['enable_storage'] and config['daemon_mode'] == 'coordinator':
logger.out('Starting Ceph Manager daemon', state='i')
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command(f'systemctl start ceph-mgr@{config["node_hostname"]}.service')
def start_system_services(logger, config):
start_zookeeper(logger, config)
start_libvirtd(logger, config)
start_patroni(logger, config)
start_frrouting(logger, config)
start_ceph_mon(logger, config)
start_ceph_mgr(logger, config)
logger.out('Waiting 3 seconds for daemons to start', state='s')
sleep(3)

View File

@ -0,0 +1,132 @@
#!/usr/bin/env python3
# <Filename> - <Description>
# zookeeper.py - Utility functions for pvcnoded Zookeeper connections
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
##############################################################################
from daemon_lib.zkhandler import ZKHandler
import os
import time
def connect(logger, config):
# Create an instance of the handler
zkhandler = ZKHandler(config, logger)
try:
logger.out('Connecting to Zookeeper on coordinator nodes {}'.format(config['coordinators']), state='i')
# Start connection
zkhandler.connect(persistent=True)
except Exception as e:
logger.out('ERROR: Failed to connect to Zookeeper cluster: {}'.format(e), state='e')
os._exit(1)
logger.out('Validating Zookeeper schema', state='i')
try:
node_schema_version = int(zkhandler.read(('node.data.active_schema', config['node_hostname'])))
except Exception:
node_schema_version = int(zkhandler.read('base.schema.version'))
zkhandler.write([
(('node.data.active_schema', config['node_hostname']), node_schema_version)
])
# Load in the current node schema version
zkhandler.schema.load(node_schema_version)
# Record the latest intalled schema version
latest_schema_version = zkhandler.schema.find_latest()
logger.out('Latest installed schema is {}'.format(latest_schema_version), state='i')
zkhandler.write([
(('node.data.latest_schema', config['node_hostname']), latest_schema_version)
])
# If we are the last node to get a schema update, fire the master update
if latest_schema_version > node_schema_version:
node_latest_schema_version = list()
for node in zkhandler.children('base.node'):
node_latest_schema_version.append(int(zkhandler.read(('node.data.latest_schema', node))))
# This is true if all elements of the latest schema version are identical to the latest version,
# i.e. they have all had the latest schema installed and ready to load.
if node_latest_schema_version.count(latest_schema_version) == len(node_latest_schema_version):
zkhandler.write([
('base.schema.version', latest_schema_version)
])
return zkhandler, node_schema_version
def validate_schema(logger, zkhandler):
# Validate our schema against the active version
if not zkhandler.schema.validate(zkhandler, logger):
logger.out('Found schema violations, applying', state='i')
zkhandler.schema.apply(zkhandler)
else:
logger.out('Schema successfully validated', state='o')
def setup_node(logger, config, zkhandler):
# Check if our node exists in Zookeeper, and create it if not
if config['daemon_mode'] == 'coordinator':
init_routerstate = 'secondary'
else:
init_routerstate = 'client'
if zkhandler.exists(('node', config['node_hostname'])):
logger.out(f'Node is {logger.fmt_green}present{logger.fmt_end} in Zookeeper', state='i')
# Update static data just in case it's changed
zkhandler.write([
(('node', config['node_hostname']), config['daemon_mode']),
(('node.mode', config['node_hostname']), config['daemon_mode']),
(('node.state.daemon', config['node_hostname']), 'init'),
(('node.state.router', config['node_hostname']), init_routerstate),
(('node.data.static', config['node_hostname']), ' '.join(config['static_data'])),
(('node.data.pvc_version', config['node_hostname']), config['pvcnoded_version']),
(('node.ipmi.hostname', config['node_hostname']), config['ipmi_hostname']),
(('node.ipmi.username', config['node_hostname']), config['ipmi_username']),
(('node.ipmi.password', config['node_hostname']), config['ipmi_password']),
])
else:
logger.out(f'Node is {logger.fmt_red}absent{logger.fmt_end} in Zookeeper; adding new node', state='i')
keepalive_time = int(time.time())
zkhandler.write([
(('node', config['node_hostname']), config['daemon_mode']),
(('node.keepalive', config['node_hostname']), str(keepalive_time)),
(('node.mode', config['node_hostname']), config['daemon_mode']),
(('node.state.daemon', config['node_hostname']), 'init'),
(('node.state.domain', config['node_hostname']), 'flushed'),
(('node.state.router', config['node_hostname']), init_routerstate),
(('node.data.static', config['node_hostname']), ' '.join(config['static_data'])),
(('node.data.pvc_version', config['node_hostname']), config['pvcnoded_version']),
(('node.ipmi.hostname', config['node_hostname']), config['ipmi_hostname']),
(('node.ipmi.username', config['node_hostname']), config['ipmi_username']),
(('node.ipmi.password', config['node_hostname']), config['ipmi_password']),
(('node.memory.total', config['node_hostname']), '0'),
(('node.memory.used', config['node_hostname']), '0'),
(('node.memory.free', config['node_hostname']), '0'),
(('node.memory.allocated', config['node_hostname']), '0'),
(('node.memory.provisioned', config['node_hostname']), '0'),
(('node.vcpu.allocated', config['node_hostname']), '0'),
(('node.cpu.load', config['node_hostname']), '0.0'),
(('node.running_domains', config['node_hostname']), '0'),
(('node.count.provisioned_domains', config['node_hostname']), '0'),
(('node.count.networks', config['node_hostname']), '0'),
])

View File

@ -26,44 +26,44 @@ rm ${backup_tmp} || true
# Provisioner tests
_pvc provisioner profile list test
_pvc provisioner create --wait testX test
_pvc provisioner create --wait testx test
sleep 30
# VM tests
vm_tmp=$(mktemp)
_pvc vm dump testX --file ${vm_tmp}
_pvc vm shutdown --yes --wait testX
_pvc vm start testX
_pvc vm dump testx --file ${vm_tmp}
_pvc vm shutdown --yes --wait testx
_pvc vm start testx
sleep 30
_pvc vm stop --yes testX
_pvc vm disable testX
_pvc vm undefine --yes testX
_pvc vm stop --yes testx
_pvc vm disable testx
_pvc vm undefine --yes testx
_pvc vm define --target hv3 --tag pvc-test ${vm_tmp}
_pvc vm start testX
_pvc vm start testx
sleep 30
_pvc vm restart --yes --wait testX
_pvc vm restart --yes --wait testx
sleep 30
_pvc vm migrate --wait testX
_pvc vm migrate --wait testx
sleep 5
_pvc vm unmigrate --wait testX
_pvc vm unmigrate --wait testx
sleep 5
_pvc vm move --wait --target hv1 testX
_pvc vm move --wait --target hv1 testx
sleep 5
_pvc vm meta testX --limit hv1 --selector vms --method live --profile test --no-autostart
_pvc vm tag add testX mytag
_pvc vm tag get testX
_pvc vm meta testx --limit hv1 --selector vms --method live --profile test --no-autostart
_pvc vm tag add testx mytag
_pvc vm tag get testx
_pvc vm list --tag mytag
_pvc vm tag remove testX mytag
_pvc vm network get testX
_pvc vm vcpu set testX 4
_pvc vm vcpu get testX
_pvc vm memory set testX 4096
_pvc vm memory get testX
_pvc vm vcpu set testX 2
_pvc vm memory set testX 2048 --restart --yes
_pvc vm tag remove testx mytag
_pvc vm network get testx
_pvc vm vcpu set testx 4
_pvc vm vcpu get testx
_pvc vm memory set testx 4096
_pvc vm memory get testx
_pvc vm vcpu set testx 2
_pvc vm memory set testx 2048 --restart --yes
sleep 5
_pvc vm list testX
_pvc vm info --long testX
_pvc vm list testx
_pvc vm info --long testx
rm ${vm_tmp} || true
# Node tests
@ -81,9 +81,9 @@ _pvc node info hv1
# Network tests
_pvc network add 10001 --description testing --type managed --domain testing.local --ipnet 10.100.100.0/24 --gateway 10.100.100.1 --dhcp --dhcp-start 10.100.100.100 --dhcp-end 10.100.100.199
sleep 5
_pvc vm network add --restart --yes testX 10001
_pvc vm network add --restart --yes testx 10001
sleep 30
_pvc vm network remove --restart --yes testX 10001
_pvc vm network remove --restart --yes testx 10001
sleep 5
_pvc network acl add 10001 --in --description test-acl --order 0 --rule "'ip daddr 10.0.0.0/8 counter'"
@ -98,10 +98,10 @@ _pvc network list
_pvc network info --long 10001
# Network-VM interaction tests
_pvc vm network add testX 10001 --model virtio --restart --yes
_pvc vm network add testx 10001 --model virtio --restart --yes
sleep 30
_pvc vm network get testX
_pvc vm network remove testX 10001 --restart --yes
_pvc vm network get testx
_pvc vm network remove testx 10001 --restart --yes
sleep 5
_pvc network remove --yes 10001
@ -117,9 +117,9 @@ _pvc storage osd list
_pvc storage pool add testing 64 --replcfg "copies=3,mincopies=2"
sleep 5
_pvc storage pool list
_pvc storage volume add testing testX 1G
_pvc storage volume resize testing testX 2G
_pvc storage volume rename testing testX testerX
_pvc storage volume add testing testx 1G
_pvc storage volume resize testing testx 2G
_pvc storage volume rename testing testx testerX
_pvc storage volume clone testing testerX testerY
_pvc storage volume list --pool testing
_pvc storage volume snapshot add testing testerX asnapshotX
@ -128,10 +128,10 @@ _pvc storage volume snapshot list
_pvc storage volume snapshot remove --yes testing testerX asnapshotY
# Storage-VM interaction tests
_pvc vm volume add testX --type rbd --disk-id sdh --bus scsi testing/testerY --restart --yes
_pvc vm volume add testx --type rbd --disk-id sdh --bus scsi testing/testerY --restart --yes
sleep 30
_pvc vm volume get testX
_pvc vm volume remove testX testing/testerY --restart --yes
_pvc vm volume get testx
_pvc vm volume remove testx testing/testerY --restart --yes
sleep 5
_pvc storage volume remove --yes testing testerY
@ -139,8 +139,8 @@ _pvc storage volume remove --yes testing testerX
_pvc storage pool remove --yes testing
# Remove the VM
_pvc vm stop --yes testX
_pvc vm remove --yes testX
_pvc vm stop --yes testx
_pvc vm remove --yes testx
time_end=$(date +%s)