From f50f170d4eb357a471fa9bcfdc371c4e0c5b3545 Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Thu, 16 Nov 2023 16:05:55 -0500 Subject: [PATCH] Convert vmbuilder to use new Celery step structure --- api-daemon/pvcapid/vmbuilder.py | 361 ++++++++++++++++++-------------- daemon-common/celery.py | 10 +- 2 files changed, 214 insertions(+), 157 deletions(-) diff --git a/api-daemon/pvcapid/vmbuilder.py b/api-daemon/pvcapid/vmbuilder.py index 0c71532b..2ae2aece 100755 --- a/api-daemon/pvcapid/vmbuilder.py +++ b/api-daemon/pvcapid/vmbuilder.py @@ -26,7 +26,6 @@ import re import os # import sys -import time import importlib.util import uuid @@ -35,6 +34,7 @@ from contextlib import contextmanager from pvcapid.Daemon import config from daemon_lib.zkhandler import ZKHandler +from daemon_lib.celery import start, fail, log_info, log_warn, update, finish import daemon_lib.common as pvc_common import daemon_lib.node as pvc_node @@ -137,7 +137,10 @@ def chroot(destination): os.fchdir(fake_root) yield except Exception: - raise + fail( + None, + f"Failed to chroot into {destination}", + ) finally: os.fchdir(real_root) os.chroot(".") @@ -160,12 +163,19 @@ def open_db(config): ) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) except Exception: - raise ClusterError("Failed to connect to Postgres") + fail( + None, + "Failed to connect to Postgres", + exception=ClusterError, + ) try: yield cur except Exception: - raise + fail( + None, + "Failed to yield database handle", + ) finally: conn.commit() cur.close() @@ -179,12 +189,19 @@ def open_zk(config): zkhandler = ZKHandler(config) zkhandler.connect() except Exception: - raise ClusterError("Failed to connect to Zookeeper") + fail( + None, + "Failed to connect to Zookeeper", + exception=ClusterError, + ) try: yield zkhandler except Exception: - raise + fail( + None, + "Failed to yield Zookeeper connection", + ) finally: zkhandler.disconnect() del zkhandler @@ -194,19 +211,28 @@ def open_zk(config): # Main VM provisioning function - executed by the Celery worker # def create_vm( - self, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[] + celery, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[] ): - print(f"Starting provisioning of VM '{vm_name}' with profile '{vm_profile}'") + current_stage = 0 + total_stages = 10 + start( + celery, + f"Starting provisioning of VM '{vm_name}' with profile '{vm_profile}'", + current=current_stage, + total=total_stages, + ) # Phase 1 - setup # * Get the profile elements # * Get the details from these elements # * Assemble a VM configuration dictionary - self.update_state( - state="RUNNING", - meta={"current": 1, "total": 10, "status": "Collecting configuration"}, + current_stage += 1 + update( + celery, + "Collecting configuration details", + current=current_stage, + total=total_stages, ) - time.sleep(1) vm_id = re.findall(r"/(\d+)$/", vm_name) if not vm_id: @@ -313,13 +339,14 @@ def create_vm( argument_name, argument_data = argument.split("=") script_arguments[argument_name] = argument_data - print("Script arguments: {}".format(script_arguments)) + log_info(celery, f"Script arguments: {script_arguments}") vm_data["script_arguments"] = script_arguments - print( + log_info( + celery, "VM configuration data:\n{}".format( json.dumps(vm_data, sort_keys=True, indent=2) - ) + ), ) # Phase 2 - verification @@ -327,21 +354,21 @@ def create_vm( # * Ensure that all networks are valid # * Ensure that there is enough disk space in the Ceph cluster for the disks # This is the "safe fail" step when an invalid configuration will be caught - self.update_state( - state="RUNNING", - meta={ - "current": 2, - "total": 10, - "status": "Verifying configuration against cluster", - }, + current_stage += 1 + update( + celery, + "Verifying configuration against cluster", + current=current_stage, + total=total_stages, ) - time.sleep(1) with open_zk(config) as zkhandler: # Verify that a VM with this name does not already exist if pvc_vm.searchClusterByName(zkhandler, vm_name): - raise ClusterError( - "A VM with the name '{}' already exists in the cluster.".format(vm_name) + fail( + celery, + f"A VM with the name '{vm_name}' already exists in the cluster.", + exception=ClusterError, ) # Verify that at least one host has enough free RAM to run the VM @@ -361,16 +388,15 @@ def create_vm( target_node = node["name"] # Raise if no node was found if not target_node: - raise ClusterError( - "No ready cluster node contains at least {}+512 MB of free RAM.".format( - vm_data["system_details"]["vram_mb"] - ) + fail( + celery, + f"No ready cluster node contains at least {vm_data['system_details']['vram_mb']}+512 MB of free RAM", + exception=ClusterError, ) - print( - 'Selecting target node "{}" with "{}" MB free RAM'.format( - target_node, last_free - ) + log_info( + celery, + f'Selecting target node "{target_node}" with "{last_free}" MB free RAM', ) # Verify that all configured networks are present on the cluster @@ -382,11 +408,13 @@ def create_vm( "cluster", "storage", ]: - raise ClusterError( - 'The network VNI "{}" is not present on the cluster.'.format(vni) + fail( + celery, + f'The network VNI "{vni}" is not present on the cluster.', + exception=ClusterError, ) - print("All configured networks for VM are valid") + log_info(celery, "All configured networks for VM are valid") # Verify that there is enough disk space free to provision all VM disks pools = dict() @@ -396,10 +424,10 @@ def create_vm( zkhandler, volume["pool"], volume["source_volume"] ) if not volume_data: - raise ClusterError( - "The source volume {}/{} could not be found.".format( - volume["pool"], volume["source_volume"] - ) + fail( + celery, + f"The source volume {volume['pool']}/{volume['source_volume']} could not be found.", + exception=ClusterError, ) if not volume["pool"] in pools: pools[volume["pool"]] = int( @@ -427,8 +455,10 @@ def create_vm( if not pool_information: raise except Exception: - raise ClusterError( - 'Pool "{}" is not present on the cluster.'.format(pool) + fail( + celery, + f'Pool "{pool}" is not present on the cluster.', + exception=ClusterError, ) pool_free_space_gb = int( pool_information["stats"]["free_bytes"] / 1024 / 1024 / 1024 @@ -436,13 +466,13 @@ def create_vm( pool_vm_usage_gb = int(pools[pool]) if pool_vm_usage_gb >= pool_free_space_gb: - raise ClusterError( - 'Pool "{}" has only {} GB free and VM requires {} GB.'.format( - pool, pool_free_space_gb, pool_vm_usage_gb - ) + fail( + celery, + f'Pool "{pool}" has only {pool_free_space_gb} GB free but VM requires {pool_vm_usage_gb} GB.', + exception=ClusterError, ) - print("There is enough space on cluster to store VM volumes") + log_info(celery, "There is enough space on cluster to store VM volumes") # Verify that every specified filesystem is valid used_filesystems = list() @@ -461,33 +491,44 @@ def create_vm( elif filesystem == "swap": retcode, stdout, stderr = pvc_common.run_os_command("which mkswap") if retcode: - raise ProvisioningError( - "Failed to find binary for mkswap: {}".format(stderr) + fail( + celery, + f"Failed to find binary for mkswap: {stderr}", + exception=ProvisioningError, ) else: retcode, stdout, stderr = pvc_common.run_os_command( "which mkfs.{}".format(filesystem) ) if retcode: - raise ProvisioningError( - "Failed to find binary for mkfs.{}: {}".format(filesystem, stderr) + fail( + celery, + f"Failed to find binary for mkfs.{filesystem}: {stderr}", + exception=ProvisioningError, ) - print("All selected filesystems are valid") + log_info(celery, "All selected filesystems are valid") # Phase 3 - provisioning script preparation # * Import the provisioning script as a library with importlib # * Ensure the required function(s) are present - self.update_state( - state="RUNNING", - meta={"current": 3, "total": 10, "status": "Preparing provisioning script"}, + current_stage += 1 + update( + celery, + "Preparing provisioning script", + current=current_stage, + total=total_stages, ) - time.sleep(1) # Write the script out to a temporary file retcode, stdout, stderr = pvc_common.run_os_command("mktemp") if retcode: - raise ProvisioningError("Failed to create a temporary file: {}".format(stderr)) + fail( + celery, + f"Failed to create a temporary file: {stderr}", + exception=ProvisioningError, + ) + script_file = stdout.strip() with open(script_file, "w") as fh: fh.write(vm_data["script"]) @@ -507,12 +548,17 @@ def create_vm( vm_data=vm_data, ) - print("Provisioning script imported successfully") + log_info(celery, "Provisioning script imported successfully") # Create temporary directory for external chroot retcode, stdout, stderr = pvc_common.run_os_command("mktemp -d") if retcode: - raise ProvisioningError(f"Failed to create a temporary directory: {stderr}") + fail( + celery, + f"Failed to create a temporary directory: {stderr}", + exception=ProvisioningError, + ) + temp_dir = stdout.strip() # Bind mount / to the chroot location / @@ -520,8 +566,10 @@ def create_vm( f"mount --bind --options ro / {temp_dir}" ) if retcode: - raise ProvisioningError( - f"Failed to mount rootfs onto {temp_dir} for chroot: {stderr}" + fail( + celery, + f"Failed to mount rootfs into {temp_dir} for chroot: {stderr}", + exception=ProvisioningError, ) # Mount tmpfs to the chroot location /tmp @@ -529,8 +577,10 @@ def create_vm( f"mount --type tmpfs tmpfs {temp_dir}/tmp" ) if retcode: - raise ProvisioningError( - f"Failed to mount tmpfs onto {temp_dir}/tmp for chroot: {stderr}" + fail( + celery, + f"Failed to mount tmpfs onto {temp_dir}/tmp for chroot: {stderr}", + exception=ProvisioningError, ) # Bind mount /dev to the chroot location /dev @@ -538,8 +588,10 @@ def create_vm( f"mount --bind --options ro /dev {temp_dir}/dev" ) if retcode: - raise ProvisioningError( - f"Failed to mount devfs onto {temp_dir}/dev for chroot: {stderr}" + fail( + celery, + f"Failed to mount devfs onto {temp_dir}/dev for chroot: {stderr}", + exception=ProvisioningError, ) # Bind mount /run to the chroot location /run @@ -547,8 +599,10 @@ def create_vm( f"mount --bind --options rw /run {temp_dir}/run" ) if retcode: - raise ProvisioningError( - f"Failed to mount runfs onto {temp_dir}/run for chroot: {stderr}" + fail( + celery, + f"Failed to mount runfs onto {temp_dir}/run for chroot: {stderr}", + exception=ProvisioningError, ) # Bind mount /sys to the chroot location /sys @@ -556,8 +610,10 @@ def create_vm( f"mount --bind --options rw /sys {temp_dir}/sys" ) if retcode: - raise ProvisioningError( - f"Failed to mount sysfs onto {temp_dir}/sys for chroot: {stderr}" + fail( + celery, + f"Failed to mount sysfs onto {temp_dir}/sys for chroot: {stderr}", + exception=ProvisioningError, ) # Bind mount /proc to the chroot location /proc @@ -565,14 +621,16 @@ def create_vm( f"mount --bind --options rw /proc {temp_dir}/proc" ) if retcode: - raise ProvisioningError( - f"Failed to mount procfs onto {temp_dir}/proc for chroot: {stderr}" + fail( + celery, + f"Failed to mount procfs onto {temp_dir}/proc for chroot: {stderr}", + exception=ProvisioningError, ) - print("Chroot environment prepared successfully") + log_info(celery, "Chroot environment prepared successfully") def general_cleanup(): - print("Running upper cleanup steps") + log_info(celery, "Running upper cleanup steps") try: # Unmount bind-mounted devfs on the chroot @@ -599,68 +657,65 @@ def create_vm( retcode, stdout, stderr = pvc_common.run_os_command(f"umount {temp_dir}") except Exception as e: # We don't care about fails during cleanup, log and continue - print(f"Suberror during general cleanup unmounts: {e}") + log_warn(celery, f"Suberror during general cleanup unmounts: {e}") try: # Remove the temp_dir os.rmdir(temp_dir) except Exception as e: # We don't care about fails during cleanup, log and continue - print(f"Suberror during general cleanup directory removal: {e}") + log_warn(celery, f"Suberror during general cleanup directory removal: {e}") try: # Remote temporary script (don't fail if not removed) os.remove(script_file) except Exception as e: # We don't care about fails during cleanup, log and continue - print(f"Suberror during general cleanup script removal: {e}") + log_warn(celery, f"Suberror during general cleanup script removal: {e}") # Phase 4 - script: setup() # * Run pre-setup steps - self.update_state( - state="RUNNING", - meta={ - "current": 4, - "total": 10, - "status": "Running script setup() step", - }, + current_stage += 1 + update( + celery, "Running script setup() step", current=current_stage, total=total_stages ) - time.sleep(1) - - print("Running script setup() step") try: with chroot(temp_dir): vm_builder.setup() except Exception as e: general_cleanup() - raise ProvisioningError(f"Error in script setup() step: {e}") + fail( + celery, + f"Error in script setup() step: {e}", + exception=ProvisioningError, + ) # Phase 5 - script: create() # * Prepare the libvirt XML defintion for the VM - self.update_state( - state="RUNNING", - meta={ - "current": 5, - "total": 10, - "status": "Running script create() step", - }, + current_stage += 1 + update( + celery, + "Running script create() step", + current=current_stage, + total=total_stages, ) - time.sleep(1) if define_vm: - print("Running script create() step") - try: with chroot(temp_dir): vm_schema = vm_builder.create() except Exception as e: general_cleanup() - raise ProvisioningError(f"Error in script create() step: {e}") + fail( + celery, + f"Error in script create() step: {e}", + exception=ProvisioningError, + ) - print("Generated VM schema:\n{}\n".format(vm_schema)) + log_info(celery, "Generated VM schema:\n{}\n".format(vm_schema)) - print("Defining VM on cluster") + log_info(celery, "Defining VM on cluster") node_limit = vm_data["system_details"]["node_limit"] if node_limit: node_limit = node_limit.split(",") @@ -679,23 +734,19 @@ def create_vm( vm_profile, initial_state="provision", ) - print(retmsg) + log_info(celery, retmsg) else: - print("Skipping VM definition due to define_vm=False") + log_info("Skipping VM definition due to define_vm=False") # Phase 6 - script: prepare() # * Run preparation steps (e.g. disk creation and mapping, filesystem creation, etc.) - self.update_state( - state="RUNNING", - meta={ - "current": 6, - "total": 10, - "status": "Running script prepare() step", - }, + current_stage += 1 + update( + celery, + "Running script prepare() step", + current=current_stage, + total=total_stages, ) - time.sleep(1) - - print("Running script prepare() step") try: with chroot(temp_dir): @@ -704,21 +755,21 @@ def create_vm( with chroot(temp_dir): vm_builder.cleanup() general_cleanup() - raise ProvisioningError(f"Error in script prepare() step: {e}") + fail( + celery, + f"Error in script prepare() step: {e}", + exception=ProvisioningError, + ) # Phase 7 - script: install() # * Run installation with arguments - self.update_state( - state="RUNNING", - meta={ - "current": 7, - "total": 10, - "status": "Running script install() step", - }, + current_stage += 1 + update( + celery, + "Running script install() step", + current=current_stage, + total=total_stages, ) - time.sleep(1) - - print("Running script install() step") try: with chroot(temp_dir): @@ -727,63 +778,63 @@ def create_vm( with chroot(temp_dir): vm_builder.cleanup() general_cleanup() - raise ProvisioningError(f"Error in script install() step: {e}") + fail( + celery, + f"Error in script install() step: {e}", + exception=ProvisioningError, + ) # Phase 8 - script: cleanup() # * Run cleanup steps - self.update_state( - state="RUNNING", - meta={ - "current": 8, - "total": 10, - "status": "Running script cleanup() step", - }, + current_stage += 1 + update( + celery, + "Running script cleanup() step", + current=current_stage, + total=total_stages, ) - time.sleep(1) - - print("Running script cleanup() step") try: with chroot(temp_dir): vm_builder.cleanup() except Exception as e: general_cleanup() - raise ProvisioningError(f"Error in script cleanup() step: {e}") + fail( + celery, + f"Error in script cleanup() step: {e}", + exception=ProvisioningError, + ) # Phase 9 - general cleanup # * Clean up the chroot from earlier - self.update_state( - state="RUNNING", - meta={ - "current": 9, - "total": 10, - "status": "Running general cleanup steps", - }, + current_stage += 1 + update( + celery, + "Running general cleanup steps", + current=current_stage, + total=total_stages, ) - time.sleep(1) general_cleanup() # Phase 10 - startup # * Start the VM in the PVC cluster - self.update_state( - state="RUNNING", - meta={ - "current": 10, - "total": 10, - "status": "Starting VM", - }, - ) - time.sleep(1) + current_stage += 1 + update(celery, "Starting VM", current=current_stage, total=total_stages) if start_vm: - print("Starting VM") with open_zk(config) as zkhandler: success, message = pvc_vm.start_vm(zkhandler, vm_name) - print(message) + log_info(celery, message) end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned and started successfully' else: end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned successfully' - return {"status": end_message, "current": 10, "total": 10} + current_stage += 1 + return finish( + celery, + end_message, + current=current_stage, + total=total_stages, + ) diff --git a/daemon-common/celery.py b/daemon-common/celery.py index 4888bff0..ae521b2a 100644 --- a/daemon-common/celery.py +++ b/daemon-common/celery.py @@ -41,11 +41,17 @@ def start(celery, msg, current=0, total=1): sleep(1) -def fail(celery, msg, current=1, total=1): +def fail(celery, msg, exception=None, current=1, total=1): + if exception is None: + exception = TaskFailure + + msg = f"{exception}: {msg}" + logger = getLogger(__name__) logger.error(msg) + sys.tracebacklimit = 0 - raise TaskFailure(msg) + raise exception(msg) def log_info(celery, msg):