Convert vmbuilder to use new Celery step structure

This commit is contained in:
Joshua Boniface 2023-11-16 16:05:55 -05:00
parent e92ed245d6
commit f50f170d4e
2 changed files with 214 additions and 157 deletions

View File

@ -26,7 +26,6 @@ import re
import os import os
# import sys # import sys
import time
import importlib.util import importlib.util
import uuid import uuid
@ -35,6 +34,7 @@ from contextlib import contextmanager
from pvcapid.Daemon import config from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKHandler from daemon_lib.zkhandler import ZKHandler
from daemon_lib.celery import start, fail, log_info, log_warn, update, finish
import daemon_lib.common as pvc_common import daemon_lib.common as pvc_common
import daemon_lib.node as pvc_node import daemon_lib.node as pvc_node
@ -137,7 +137,10 @@ def chroot(destination):
os.fchdir(fake_root) os.fchdir(fake_root)
yield yield
except Exception: except Exception:
raise fail(
None,
f"Failed to chroot into {destination}",
)
finally: finally:
os.fchdir(real_root) os.fchdir(real_root)
os.chroot(".") os.chroot(".")
@ -160,12 +163,19 @@ def open_db(config):
) )
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
except Exception: except Exception:
raise ClusterError("Failed to connect to Postgres") fail(
None,
"Failed to connect to Postgres",
exception=ClusterError,
)
try: try:
yield cur yield cur
except Exception: except Exception:
raise fail(
None,
"Failed to yield database handle",
)
finally: finally:
conn.commit() conn.commit()
cur.close() cur.close()
@ -179,12 +189,19 @@ def open_zk(config):
zkhandler = ZKHandler(config) zkhandler = ZKHandler(config)
zkhandler.connect() zkhandler.connect()
except Exception: except Exception:
raise ClusterError("Failed to connect to Zookeeper") fail(
None,
"Failed to connect to Zookeeper",
exception=ClusterError,
)
try: try:
yield zkhandler yield zkhandler
except Exception: except Exception:
raise fail(
None,
"Failed to yield Zookeeper connection",
)
finally: finally:
zkhandler.disconnect() zkhandler.disconnect()
del zkhandler del zkhandler
@ -194,19 +211,28 @@ def open_zk(config):
# Main VM provisioning function - executed by the Celery worker # Main VM provisioning function - executed by the Celery worker
# #
def create_vm( def create_vm(
self, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[] celery, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]
): ):
print(f"Starting provisioning of VM '{vm_name}' with profile '{vm_profile}'") current_stage = 0
total_stages = 10
start(
celery,
f"Starting provisioning of VM '{vm_name}' with profile '{vm_profile}'",
current=current_stage,
total=total_stages,
)
# Phase 1 - setup # Phase 1 - setup
# * Get the profile elements # * Get the profile elements
# * Get the details from these elements # * Get the details from these elements
# * Assemble a VM configuration dictionary # * Assemble a VM configuration dictionary
self.update_state( current_stage += 1
state="RUNNING", update(
meta={"current": 1, "total": 10, "status": "Collecting configuration"}, celery,
"Collecting configuration details",
current=current_stage,
total=total_stages,
) )
time.sleep(1)
vm_id = re.findall(r"/(\d+)$/", vm_name) vm_id = re.findall(r"/(\d+)$/", vm_name)
if not vm_id: if not vm_id:
@ -313,13 +339,14 @@ def create_vm(
argument_name, argument_data = argument.split("=") argument_name, argument_data = argument.split("=")
script_arguments[argument_name] = argument_data script_arguments[argument_name] = argument_data
print("Script arguments: {}".format(script_arguments)) log_info(celery, f"Script arguments: {script_arguments}")
vm_data["script_arguments"] = script_arguments vm_data["script_arguments"] = script_arguments
print( log_info(
celery,
"VM configuration data:\n{}".format( "VM configuration data:\n{}".format(
json.dumps(vm_data, sort_keys=True, indent=2) json.dumps(vm_data, sort_keys=True, indent=2)
) ),
) )
# Phase 2 - verification # Phase 2 - verification
@ -327,21 +354,21 @@ def create_vm(
# * Ensure that all networks are valid # * Ensure that all networks are valid
# * Ensure that there is enough disk space in the Ceph cluster for the disks # * Ensure that there is enough disk space in the Ceph cluster for the disks
# This is the "safe fail" step when an invalid configuration will be caught # This is the "safe fail" step when an invalid configuration will be caught
self.update_state( current_stage += 1
state="RUNNING", update(
meta={ celery,
"current": 2, "Verifying configuration against cluster",
"total": 10, current=current_stage,
"status": "Verifying configuration against cluster", total=total_stages,
},
) )
time.sleep(1)
with open_zk(config) as zkhandler: with open_zk(config) as zkhandler:
# Verify that a VM with this name does not already exist # Verify that a VM with this name does not already exist
if pvc_vm.searchClusterByName(zkhandler, vm_name): if pvc_vm.searchClusterByName(zkhandler, vm_name):
raise ClusterError( fail(
"A VM with the name '{}' already exists in the cluster.".format(vm_name) celery,
f"A VM with the name '{vm_name}' already exists in the cluster.",
exception=ClusterError,
) )
# Verify that at least one host has enough free RAM to run the VM # Verify that at least one host has enough free RAM to run the VM
@ -361,16 +388,15 @@ def create_vm(
target_node = node["name"] target_node = node["name"]
# Raise if no node was found # Raise if no node was found
if not target_node: if not target_node:
raise ClusterError( fail(
"No ready cluster node contains at least {}+512 MB of free RAM.".format( celery,
vm_data["system_details"]["vram_mb"] f"No ready cluster node contains at least {vm_data['system_details']['vram_mb']}+512 MB of free RAM",
) exception=ClusterError,
) )
print( log_info(
'Selecting target node "{}" with "{}" MB free RAM'.format( celery,
target_node, last_free f'Selecting target node "{target_node}" with "{last_free}" MB free RAM',
)
) )
# Verify that all configured networks are present on the cluster # Verify that all configured networks are present on the cluster
@ -382,11 +408,13 @@ def create_vm(
"cluster", "cluster",
"storage", "storage",
]: ]:
raise ClusterError( fail(
'The network VNI "{}" is not present on the cluster.'.format(vni) celery,
f'The network VNI "{vni}" is not present on the cluster.',
exception=ClusterError,
) )
print("All configured networks for VM are valid") log_info(celery, "All configured networks for VM are valid")
# Verify that there is enough disk space free to provision all VM disks # Verify that there is enough disk space free to provision all VM disks
pools = dict() pools = dict()
@ -396,10 +424,10 @@ def create_vm(
zkhandler, volume["pool"], volume["source_volume"] zkhandler, volume["pool"], volume["source_volume"]
) )
if not volume_data: if not volume_data:
raise ClusterError( fail(
"The source volume {}/{} could not be found.".format( celery,
volume["pool"], volume["source_volume"] f"The source volume {volume['pool']}/{volume['source_volume']} could not be found.",
) exception=ClusterError,
) )
if not volume["pool"] in pools: if not volume["pool"] in pools:
pools[volume["pool"]] = int( pools[volume["pool"]] = int(
@ -427,8 +455,10 @@ def create_vm(
if not pool_information: if not pool_information:
raise raise
except Exception: except Exception:
raise ClusterError( fail(
'Pool "{}" is not present on the cluster.'.format(pool) celery,
f'Pool "{pool}" is not present on the cluster.',
exception=ClusterError,
) )
pool_free_space_gb = int( pool_free_space_gb = int(
pool_information["stats"]["free_bytes"] / 1024 / 1024 / 1024 pool_information["stats"]["free_bytes"] / 1024 / 1024 / 1024
@ -436,13 +466,13 @@ def create_vm(
pool_vm_usage_gb = int(pools[pool]) pool_vm_usage_gb = int(pools[pool])
if pool_vm_usage_gb >= pool_free_space_gb: if pool_vm_usage_gb >= pool_free_space_gb:
raise ClusterError( fail(
'Pool "{}" has only {} GB free and VM requires {} GB.'.format( celery,
pool, pool_free_space_gb, pool_vm_usage_gb f'Pool "{pool}" has only {pool_free_space_gb} GB free but VM requires {pool_vm_usage_gb} GB.',
) exception=ClusterError,
) )
print("There is enough space on cluster to store VM volumes") log_info(celery, "There is enough space on cluster to store VM volumes")
# Verify that every specified filesystem is valid # Verify that every specified filesystem is valid
used_filesystems = list() used_filesystems = list()
@ -461,33 +491,44 @@ def create_vm(
elif filesystem == "swap": elif filesystem == "swap":
retcode, stdout, stderr = pvc_common.run_os_command("which mkswap") retcode, stdout, stderr = pvc_common.run_os_command("which mkswap")
if retcode: if retcode:
raise ProvisioningError( fail(
"Failed to find binary for mkswap: {}".format(stderr) celery,
f"Failed to find binary for mkswap: {stderr}",
exception=ProvisioningError,
) )
else: else:
retcode, stdout, stderr = pvc_common.run_os_command( retcode, stdout, stderr = pvc_common.run_os_command(
"which mkfs.{}".format(filesystem) "which mkfs.{}".format(filesystem)
) )
if retcode: if retcode:
raise ProvisioningError( fail(
"Failed to find binary for mkfs.{}: {}".format(filesystem, stderr) celery,
f"Failed to find binary for mkfs.{filesystem}: {stderr}",
exception=ProvisioningError,
) )
print("All selected filesystems are valid") log_info(celery, "All selected filesystems are valid")
# Phase 3 - provisioning script preparation # Phase 3 - provisioning script preparation
# * Import the provisioning script as a library with importlib # * Import the provisioning script as a library with importlib
# * Ensure the required function(s) are present # * Ensure the required function(s) are present
self.update_state( current_stage += 1
state="RUNNING", update(
meta={"current": 3, "total": 10, "status": "Preparing provisioning script"}, celery,
"Preparing provisioning script",
current=current_stage,
total=total_stages,
) )
time.sleep(1)
# Write the script out to a temporary file # Write the script out to a temporary file
retcode, stdout, stderr = pvc_common.run_os_command("mktemp") retcode, stdout, stderr = pvc_common.run_os_command("mktemp")
if retcode: if retcode:
raise ProvisioningError("Failed to create a temporary file: {}".format(stderr)) fail(
celery,
f"Failed to create a temporary file: {stderr}",
exception=ProvisioningError,
)
script_file = stdout.strip() script_file = stdout.strip()
with open(script_file, "w") as fh: with open(script_file, "w") as fh:
fh.write(vm_data["script"]) fh.write(vm_data["script"])
@ -507,12 +548,17 @@ def create_vm(
vm_data=vm_data, vm_data=vm_data,
) )
print("Provisioning script imported successfully") log_info(celery, "Provisioning script imported successfully")
# Create temporary directory for external chroot # Create temporary directory for external chroot
retcode, stdout, stderr = pvc_common.run_os_command("mktemp -d") retcode, stdout, stderr = pvc_common.run_os_command("mktemp -d")
if retcode: if retcode:
raise ProvisioningError(f"Failed to create a temporary directory: {stderr}") fail(
celery,
f"Failed to create a temporary directory: {stderr}",
exception=ProvisioningError,
)
temp_dir = stdout.strip() temp_dir = stdout.strip()
# Bind mount / to the chroot location / # Bind mount / to the chroot location /
@ -520,8 +566,10 @@ def create_vm(
f"mount --bind --options ro / {temp_dir}" f"mount --bind --options ro / {temp_dir}"
) )
if retcode: if retcode:
raise ProvisioningError( fail(
f"Failed to mount rootfs onto {temp_dir} for chroot: {stderr}" celery,
f"Failed to mount rootfs into {temp_dir} for chroot: {stderr}",
exception=ProvisioningError,
) )
# Mount tmpfs to the chroot location /tmp # Mount tmpfs to the chroot location /tmp
@ -529,8 +577,10 @@ def create_vm(
f"mount --type tmpfs tmpfs {temp_dir}/tmp" f"mount --type tmpfs tmpfs {temp_dir}/tmp"
) )
if retcode: if retcode:
raise ProvisioningError( fail(
f"Failed to mount tmpfs onto {temp_dir}/tmp for chroot: {stderr}" celery,
f"Failed to mount tmpfs onto {temp_dir}/tmp for chroot: {stderr}",
exception=ProvisioningError,
) )
# Bind mount /dev to the chroot location /dev # Bind mount /dev to the chroot location /dev
@ -538,8 +588,10 @@ def create_vm(
f"mount --bind --options ro /dev {temp_dir}/dev" f"mount --bind --options ro /dev {temp_dir}/dev"
) )
if retcode: if retcode:
raise ProvisioningError( fail(
f"Failed to mount devfs onto {temp_dir}/dev for chroot: {stderr}" celery,
f"Failed to mount devfs onto {temp_dir}/dev for chroot: {stderr}",
exception=ProvisioningError,
) )
# Bind mount /run to the chroot location /run # Bind mount /run to the chroot location /run
@ -547,8 +599,10 @@ def create_vm(
f"mount --bind --options rw /run {temp_dir}/run" f"mount --bind --options rw /run {temp_dir}/run"
) )
if retcode: if retcode:
raise ProvisioningError( fail(
f"Failed to mount runfs onto {temp_dir}/run for chroot: {stderr}" celery,
f"Failed to mount runfs onto {temp_dir}/run for chroot: {stderr}",
exception=ProvisioningError,
) )
# Bind mount /sys to the chroot location /sys # Bind mount /sys to the chroot location /sys
@ -556,8 +610,10 @@ def create_vm(
f"mount --bind --options rw /sys {temp_dir}/sys" f"mount --bind --options rw /sys {temp_dir}/sys"
) )
if retcode: if retcode:
raise ProvisioningError( fail(
f"Failed to mount sysfs onto {temp_dir}/sys for chroot: {stderr}" celery,
f"Failed to mount sysfs onto {temp_dir}/sys for chroot: {stderr}",
exception=ProvisioningError,
) )
# Bind mount /proc to the chroot location /proc # Bind mount /proc to the chroot location /proc
@ -565,14 +621,16 @@ def create_vm(
f"mount --bind --options rw /proc {temp_dir}/proc" f"mount --bind --options rw /proc {temp_dir}/proc"
) )
if retcode: if retcode:
raise ProvisioningError( fail(
f"Failed to mount procfs onto {temp_dir}/proc for chroot: {stderr}" celery,
f"Failed to mount procfs onto {temp_dir}/proc for chroot: {stderr}",
exception=ProvisioningError,
) )
print("Chroot environment prepared successfully") log_info(celery, "Chroot environment prepared successfully")
def general_cleanup(): def general_cleanup():
print("Running upper cleanup steps") log_info(celery, "Running upper cleanup steps")
try: try:
# Unmount bind-mounted devfs on the chroot # Unmount bind-mounted devfs on the chroot
@ -599,68 +657,65 @@ def create_vm(
retcode, stdout, stderr = pvc_common.run_os_command(f"umount {temp_dir}") retcode, stdout, stderr = pvc_common.run_os_command(f"umount {temp_dir}")
except Exception as e: except Exception as e:
# We don't care about fails during cleanup, log and continue # We don't care about fails during cleanup, log and continue
print(f"Suberror during general cleanup unmounts: {e}") log_warn(celery, f"Suberror during general cleanup unmounts: {e}")
try: try:
# Remove the temp_dir # Remove the temp_dir
os.rmdir(temp_dir) os.rmdir(temp_dir)
except Exception as e: except Exception as e:
# We don't care about fails during cleanup, log and continue # We don't care about fails during cleanup, log and continue
print(f"Suberror during general cleanup directory removal: {e}") log_warn(celery, f"Suberror during general cleanup directory removal: {e}")
try: try:
# Remote temporary script (don't fail if not removed) # Remote temporary script (don't fail if not removed)
os.remove(script_file) os.remove(script_file)
except Exception as e: except Exception as e:
# We don't care about fails during cleanup, log and continue # We don't care about fails during cleanup, log and continue
print(f"Suberror during general cleanup script removal: {e}") log_warn(celery, f"Suberror during general cleanup script removal: {e}")
# Phase 4 - script: setup() # Phase 4 - script: setup()
# * Run pre-setup steps # * Run pre-setup steps
self.update_state( current_stage += 1
state="RUNNING", update(
meta={ celery, "Running script setup() step", current=current_stage, total=total_stages
"current": 4,
"total": 10,
"status": "Running script setup() step",
},
) )
time.sleep(1)
print("Running script setup() step")
try: try:
with chroot(temp_dir): with chroot(temp_dir):
vm_builder.setup() vm_builder.setup()
except Exception as e: except Exception as e:
general_cleanup() general_cleanup()
raise ProvisioningError(f"Error in script setup() step: {e}") fail(
celery,
f"Error in script setup() step: {e}",
exception=ProvisioningError,
)
# Phase 5 - script: create() # Phase 5 - script: create()
# * Prepare the libvirt XML defintion for the VM # * Prepare the libvirt XML defintion for the VM
self.update_state( current_stage += 1
state="RUNNING", update(
meta={ celery,
"current": 5, "Running script create() step",
"total": 10, current=current_stage,
"status": "Running script create() step", total=total_stages,
},
) )
time.sleep(1)
if define_vm: if define_vm:
print("Running script create() step")
try: try:
with chroot(temp_dir): with chroot(temp_dir):
vm_schema = vm_builder.create() vm_schema = vm_builder.create()
except Exception as e: except Exception as e:
general_cleanup() general_cleanup()
raise ProvisioningError(f"Error in script create() step: {e}") fail(
celery,
f"Error in script create() step: {e}",
exception=ProvisioningError,
)
print("Generated VM schema:\n{}\n".format(vm_schema)) log_info(celery, "Generated VM schema:\n{}\n".format(vm_schema))
print("Defining VM on cluster") log_info(celery, "Defining VM on cluster")
node_limit = vm_data["system_details"]["node_limit"] node_limit = vm_data["system_details"]["node_limit"]
if node_limit: if node_limit:
node_limit = node_limit.split(",") node_limit = node_limit.split(",")
@ -679,23 +734,19 @@ def create_vm(
vm_profile, vm_profile,
initial_state="provision", initial_state="provision",
) )
print(retmsg) log_info(celery, retmsg)
else: else:
print("Skipping VM definition due to define_vm=False") log_info("Skipping VM definition due to define_vm=False")
# Phase 6 - script: prepare() # Phase 6 - script: prepare()
# * Run preparation steps (e.g. disk creation and mapping, filesystem creation, etc.) # * Run preparation steps (e.g. disk creation and mapping, filesystem creation, etc.)
self.update_state( current_stage += 1
state="RUNNING", update(
meta={ celery,
"current": 6, "Running script prepare() step",
"total": 10, current=current_stage,
"status": "Running script prepare() step", total=total_stages,
},
) )
time.sleep(1)
print("Running script prepare() step")
try: try:
with chroot(temp_dir): with chroot(temp_dir):
@ -704,21 +755,21 @@ def create_vm(
with chroot(temp_dir): with chroot(temp_dir):
vm_builder.cleanup() vm_builder.cleanup()
general_cleanup() general_cleanup()
raise ProvisioningError(f"Error in script prepare() step: {e}") fail(
celery,
f"Error in script prepare() step: {e}",
exception=ProvisioningError,
)
# Phase 7 - script: install() # Phase 7 - script: install()
# * Run installation with arguments # * Run installation with arguments
self.update_state( current_stage += 1
state="RUNNING", update(
meta={ celery,
"current": 7, "Running script install() step",
"total": 10, current=current_stage,
"status": "Running script install() step", total=total_stages,
},
) )
time.sleep(1)
print("Running script install() step")
try: try:
with chroot(temp_dir): with chroot(temp_dir):
@ -727,63 +778,63 @@ def create_vm(
with chroot(temp_dir): with chroot(temp_dir):
vm_builder.cleanup() vm_builder.cleanup()
general_cleanup() general_cleanup()
raise ProvisioningError(f"Error in script install() step: {e}") fail(
celery,
f"Error in script install() step: {e}",
exception=ProvisioningError,
)
# Phase 8 - script: cleanup() # Phase 8 - script: cleanup()
# * Run cleanup steps # * Run cleanup steps
self.update_state( current_stage += 1
state="RUNNING", update(
meta={ celery,
"current": 8, "Running script cleanup() step",
"total": 10, current=current_stage,
"status": "Running script cleanup() step", total=total_stages,
},
) )
time.sleep(1)
print("Running script cleanup() step")
try: try:
with chroot(temp_dir): with chroot(temp_dir):
vm_builder.cleanup() vm_builder.cleanup()
except Exception as e: except Exception as e:
general_cleanup() general_cleanup()
raise ProvisioningError(f"Error in script cleanup() step: {e}") fail(
celery,
f"Error in script cleanup() step: {e}",
exception=ProvisioningError,
)
# Phase 9 - general cleanup # Phase 9 - general cleanup
# * Clean up the chroot from earlier # * Clean up the chroot from earlier
self.update_state( current_stage += 1
state="RUNNING", update(
meta={ celery,
"current": 9, "Running general cleanup steps",
"total": 10, current=current_stage,
"status": "Running general cleanup steps", total=total_stages,
},
) )
time.sleep(1)
general_cleanup() general_cleanup()
# Phase 10 - startup # Phase 10 - startup
# * Start the VM in the PVC cluster # * Start the VM in the PVC cluster
self.update_state( current_stage += 1
state="RUNNING", update(celery, "Starting VM", current=current_stage, total=total_stages)
meta={
"current": 10,
"total": 10,
"status": "Starting VM",
},
)
time.sleep(1)
if start_vm: if start_vm:
print("Starting VM")
with open_zk(config) as zkhandler: with open_zk(config) as zkhandler:
success, message = pvc_vm.start_vm(zkhandler, vm_name) success, message = pvc_vm.start_vm(zkhandler, vm_name)
print(message) log_info(celery, message)
end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned and started successfully' end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned and started successfully'
else: else:
end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned successfully' end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned successfully'
return {"status": end_message, "current": 10, "total": 10} current_stage += 1
return finish(
celery,
end_message,
current=current_stage,
total=total_stages,
)

View File

@ -41,11 +41,17 @@ def start(celery, msg, current=0, total=1):
sleep(1) sleep(1)
def fail(celery, msg, current=1, total=1): def fail(celery, msg, exception=None, current=1, total=1):
if exception is None:
exception = TaskFailure
msg = f"{exception}: {msg}"
logger = getLogger(__name__) logger = getLogger(__name__)
logger.error(msg) logger.error(msg)
sys.tracebacklimit = 0 sys.tracebacklimit = 0
raise TaskFailure(msg) raise exception(msg)
def log_info(celery, msg): def log_info(celery, msg):