Compare commits

...

26 Commits

Author SHA1 Message Date
dd6a38d5ea Properly pass the name of the exception 2023-11-16 18:05:52 -05:00
73a4795967 Avoid fail during yields
This just causes a double-exception, so don't do it.
2023-11-16 17:22:53 -05:00
2a637c62e8 Port provisioner scripts to updated framework
Updates all the example provisioner scripts to use the new functions
exposed by the VMBuilder class as an illustration of how best to use
them.

Also adds a wrapper fail() handler to ensure the cleanup of the script,
as well as the global cleanup, are run on an exception.
2023-11-16 17:04:46 -05:00
618a1c1c10 Add helper functions to VMBuilder instances 2023-11-16 16:17:17 -05:00
f50f170d4e Convert vmbuilder to use new Celery step structure 2023-11-16 16:08:49 -05:00
e92ed245d6 Pass proper task details into wait function 2023-11-16 15:31:25 -05:00
9ab505ec98 Return and show task_name 2023-11-16 14:50:02 -05:00
9958d1cfe8 Add name to task output 2023-11-16 13:23:31 -05:00
0cb81f96e6 Use custom task IDs for Celery tasks
Full UUIDs were obnoxiously long, so switch to using just the first
8-character section of a UUID instead. Keeps the list nice and short,
makes them easier to copy, and is just generally nicer.

Could this cause uniqueness problems? Perhaps, but I don't see that
happening nearly frequently enough to matter.
2023-11-16 13:22:14 -05:00
3651885954 Add --events to workers 2023-11-16 12:35:54 -05:00
d226e9f4e5 Enable extended Celery results 2023-11-16 12:02:57 -05:00
fa361a55d9 Explicitly use kwargs in Celery task calls 2023-11-16 11:55:30 -05:00
8915864fa9 Lower truncation size and add elipses 2023-11-16 11:47:36 -05:00
79f7e8f82e Skip "run_on" argument in output
This isn't required to know, it's internal.
2023-11-16 11:46:15 -05:00
0d818017e8 Name the celery workers pvcworkerd@<hostname> 2023-11-16 11:43:17 -05:00
eb1d61a8b9 Generalize task status output 2023-11-16 11:39:08 -05:00
262babc63d Use kwargs for all task arguments
This will help ensure that the CLI frontend can properly parse the args
in a consistent way.
2023-11-16 10:10:48 -05:00
63773a3061 Allow watching existing task via cluster task 2023-11-16 03:06:13 -05:00
289049d223 Properly handle a "primary" run_on value 2023-11-16 02:49:29 -05:00
e818df5dae Use enable/disable --now instead of two commands
Avoids needing two calls here especially for the stop.
2023-11-16 02:40:35 -05:00
c76a5afd04 Avoid waits during node secondary
Waiting for the daemons to stop took too much time on some nodes and
could throw off the lockstep. Instead, leverage background=True to run
the systemctl os_commands in the background (when they complete is
irrelevant), stop the Metadata API first, and don't delay during its
stop at all.
2023-11-16 02:34:12 -05:00
0bec6abe71 Return proper run_on for ported tasks 2023-11-16 02:28:57 -05:00
18e43a9377 Adjust name in worker log output 2023-11-16 02:25:14 -05:00
4555f5a20a Remove warnings when switch coordinator state
Tasks are no longer bound to the primary coordinator for state updates
due to using KeyDB and a proper shared queue and result backend, so this
warning is now obsolete and no longer required.

This would interrupt "--wait" commands on provisioner tasks, but we no
longer believe that this warrants a warning, as the affected user could
simply run "pvc cluster task" to validate or resume the watcher.
2023-11-16 02:15:01 -05:00
d727764ebc Remove obsolete status and add cluster task
Removes the obsoleted "pvc provisioner status" command and replaces it
with a generalized "pvc cluster task" command to show all
currently-active or pending tasks on the cluster workers.
2023-11-16 02:13:26 -05:00
484e6542c2 Port remaining tasks to new task handler
Move the create_vm and run_benchmark tasks to use the new Celery
subsystem, handlers, and wait command. Remove the obsolete, dedicated
API endpoints.

Standardize the CLI client and move the repeated handler code into a
separate common function.
2023-11-16 02:00:23 -05:00
19 changed files with 798 additions and 922 deletions

View File

@ -31,6 +31,20 @@
# function is provided in context of the example; see the other examples for
# more potential uses.
# Within the VMBuilderScript class, several helper functions are exposed through
# the parent VMBuilder class:
# self.log_info(message):
# Use this function to log an "informational" message instead of "print()"
# self.log_warn(message):
# Use this function to log a "warning" message
# self.log_err(message):
# Use this function to log an "error" message outside of an exception (see below)
# self.fail(message, exception=<ExceptionClass>):
# Use this function to bail out of the script safely instead if raising a
# normal Python exception. You may pass an optional exception class keyword
# argument for posterity in the logs if you wish; otherwise, ProvisioningException
# is used. This function implicitly calls a "self.log_err" with the passed message
# Within the VMBuilderScript class, several common variables are exposed through
# the parent VMBuilder class:
# self.vm_name: The name of the VM from PVC's perspective
@ -132,9 +146,8 @@
# since they could still do destructive things to /dev and the like!
# This import is always required here, as VMBuilder is used by the VMBuilderScript class
# and ProvisioningError is the primary exception that should be raised within the class.
from pvcapid.vmbuilder import VMBuilder, ProvisioningError
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
from pvcapid.vmbuilder import VMBuilder
# The VMBuilderScript class must be named as such, and extend VMBuilder.

View File

@ -32,6 +32,20 @@
# function is provided in context of the example; see the other examples for
# more potential uses.
# Within the VMBuilderScript class, several helper functions are exposed through
# the parent VMBuilder class:
# self.log_info(message):
# Use this function to log an "informational" message instead of "print()"
# self.log_warn(message):
# Use this function to log a "warning" message
# self.log_err(message):
# Use this function to log an "error" message outside of an exception (see below)
# self.fail(message, exception=<ExceptionClass>):
# Use this function to bail out of the script safely instead if raising a
# normal Python exception. You may pass an optional exception class keyword
# argument for posterity in the logs if you wish; otherwise, ProvisioningException
# is used. This function implicitly calls a "self.log_err" with the passed message
# Within the VMBuilderScript class, several common variables are exposed through
# the parent VMBuilder class:
# self.vm_name: The name of the VM from PVC's perspective
@ -133,9 +147,8 @@
# since they could still do destructive things to /dev and the like!
# This import is always required here, as VMBuilder is used by the VMBuilderScript class
# and ProvisioningError is the primary exception that should be raised within the class.
from pvcapid.vmbuilder import VMBuilder, ProvisioningError
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
from pvcapid.vmbuilder import VMBuilder
# The VMBuilderScript class must be named as such, and extend VMBuilder.
@ -283,9 +296,9 @@ class VMBuilderScript(VMBuilder):
import os
# First loop: Create the destination disks
print("Creating destination disk volumes")
self.log_info("Creating destination disk volumes")
for volume in self.vm_data["volumes"]:
print(f"Processing volume {volume['volume_name']}")
self.log_info(f"Processing volume {volume['volume_name']}")
with open_zk(config) as zkhandler:
success, message = pvc_ceph.add_volume(
zkhandler,
@ -293,16 +306,14 @@ class VMBuilderScript(VMBuilder):
f"{self.vm_name}_{volume['disk_id']}",
f"{volume['disk_size_gb']}G",
)
print(message)
self.log_info(message)
if not success:
raise ProvisioningError(
f"Failed to create volume '{volume['disk_id']}'."
)
self.fail(f"Failed to create volume '{volume['disk_id']}'.")
# Second loop: Map the destination disks
print("Mapping destination disk volumes")
self.log_info("Mapping destination disk volumes")
for volume in self.vm_data["volumes"]:
print(f"Processing volume {volume['volume_name']}")
self.log_info(f"Processing volume {volume['volume_name']}")
dst_volume_name = f"{self.vm_name}_{volume['disk_id']}"
dst_volume = f"{volume['pool']}/{dst_volume_name}"
@ -312,14 +323,14 @@ class VMBuilderScript(VMBuilder):
volume["pool"],
dst_volume_name,
)
print(message)
self.log_info(message)
if not success:
raise ProvisioningError(f"Failed to map volume '{dst_volume}'.")
self.fail(f"Failed to map volume '{dst_volume}'.")
# Third loop: Map the source disks
print("Mapping source disk volumes")
self.log_info("Mapping source disk volumes")
for volume in self.vm_data["volumes"]:
print(f"Processing volume {volume['volume_name']}")
self.log_info(f"Processing volume {volume['volume_name']}")
src_volume_name = volume["volume_name"]
src_volume = f"{volume['pool']}/{src_volume_name}"
@ -329,9 +340,9 @@ class VMBuilderScript(VMBuilder):
volume["pool"],
src_volume_name,
)
print(message)
self.log_info(message)
if not success:
raise ProvisioningError(f"Failed to map volume '{src_volume}'.")
self.fail(f"Failed to map volume '{src_volume}'.")
def install(self):
"""
@ -351,14 +362,14 @@ class VMBuilderScript(VMBuilder):
dst_volume = f"{volume['pool']}/{dst_volume_name}"
dst_devpath = f"/dev/rbd/{dst_volume}"
print(
self.log_info(
f"Converting {volume['volume_format']} {src_volume} at {src_devpath} to {dst_volume} at {dst_devpath}"
)
retcode, stdout, stderr = pvc_common.run_os_command(
f"qemu-img convert -C -f {volume['volume_format']} -O raw {src_devpath} {dst_devpath}"
)
if retcode:
raise ProvisioningError(
self.fail(
f"Failed to convert {volume['volume_format']} volume '{src_volume}' to raw volume '{dst_volume}' with qemu-img: {stderr}"
)
@ -368,7 +379,7 @@ class VMBuilderScript(VMBuilder):
This function is also called if there is ANY exception raised in the prepare()
or install() steps. While this doesn't mean you shouldn't or can't raise exceptions
here, be warned that doing so might cause loops. Do this only if you really need to.
here, be warned that doing so might cause loops. Do this only if you really need to!
"""
# Run any imports first
@ -388,7 +399,7 @@ class VMBuilderScript(VMBuilder):
src_volume_name,
)
if not success:
raise ProvisioningError(
self.log_err(
f"Failed to unmap source volume '{src_volume_name}': {message}"
)
@ -404,6 +415,6 @@ class VMBuilderScript(VMBuilder):
dst_volume_name,
)
if not success:
raise ProvisioningError(
self.log_err(
f"Failed to unmap destination volume '{dst_volume_name}': {message}"
)

View File

@ -31,6 +31,20 @@
# function is provided in context of the example; see the other examples for
# more potential uses.
# Within the VMBuilderScript class, several helper functions are exposed through
# the parent VMBuilder class:
# self.log_info(message):
# Use this function to log an "informational" message instead of "print()"
# self.log_warn(message):
# Use this function to log a "warning" message
# self.log_err(message):
# Use this function to log an "error" message outside of an exception (see below)
# self.fail(message, exception=<ExceptionClass>):
# Use this function to bail out of the script safely instead if raising a
# normal Python exception. You may pass an optional exception class keyword
# argument for posterity in the logs if you wish; otherwise, ProvisioningException
# is used. This function implicitly calls a "self.log_err" with the passed message
# Within the VMBuilderScript class, several common variables are exposed through
# the parent VMBuilder class:
# self.vm_name: The name of the VM from PVC's perspective
@ -132,9 +146,8 @@
# since they could still do destructive things to /dev and the like!
# This import is always required here, as VMBuilder is used by the VMBuilderScript class
# and ProvisioningError is the primary exception that should be raised within the class.
from pvcapid.vmbuilder import VMBuilder, ProvisioningError
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
from pvcapid.vmbuilder import VMBuilder
# The VMBuilderScript class must be named as such, and extend VMBuilder.
@ -159,7 +172,7 @@ class VMBuilderScript(VMBuilder):
if retcode:
# Raise a ProvisioningError for any exception; the provisioner will handle
# this gracefully and properly, avoiding dangling mounts, RBD maps, etc.
raise ProvisioningError("Failed to find critical dependency: debootstrap")
self.fail("Failed to find critical dependency: debootstrap")
def create(self):
"""
@ -312,9 +325,9 @@ class VMBuilderScript(VMBuilder):
volume["source_volume"],
f"{self.vm_name}_{volume['disk_id']}",
)
print(message)
self.log_info(message)
if not success:
raise ProvisioningError(
self.fail(
f"Failed to clone volume '{volume['source_volume']}' to '{volume['disk_id']}'."
)
else:
@ -325,11 +338,9 @@ class VMBuilderScript(VMBuilder):
f"{self.vm_name}_{volume['disk_id']}",
f"{volume['disk_size_gb']}G",
)
print(message)
self.log_info(message)
if not success:
raise ProvisioningError(
f"Failed to create volume '{volume['disk_id']}'."
)
self.fail(f"Failed to create volume '{volume['disk_id']}'.")
# Second loop: Map the disks to the local system
for volume in self.vm_data["volumes"]:
@ -342,9 +353,9 @@ class VMBuilderScript(VMBuilder):
volume["pool"],
dst_volume_name,
)
print(message)
self.log_info(message)
if not success:
raise ProvisioningError(f"Failed to map volume '{dst_volume}'.")
self.fail(f"Failed to map volume '{dst_volume}'.")
# Third loop: Create filesystems on the volumes
for volume in self.vm_data["volumes"]:
@ -370,19 +381,17 @@ class VMBuilderScript(VMBuilder):
f"mkswap -f /dev/rbd/{dst_volume}"
)
if retcode:
raise ProvisioningError(
f"Failed to create swap on '{dst_volume}': {stderr}"
)
self.fail(f"Failed to create swap on '{dst_volume}': {stderr}")
else:
retcode, stdout, stderr = pvc_common.run_os_command(
f"mkfs.{volume['filesystem']} {filesystem_args} /dev/rbd/{dst_volume}"
)
if retcode:
raise ProvisioningError(
self.fail(
f"Faield to create {volume['filesystem']} file on '{dst_volume}': {stderr}"
)
print(stdout)
self.log_info(stdout)
# Create a temporary directory to use during install
temp_dir = "/tmp/target"
@ -413,7 +422,7 @@ class VMBuilderScript(VMBuilder):
f"mount {mapped_dst_volume} {mount_path}"
)
if retcode:
raise ProvisioningError(
self.fail(
f"Failed to mount '{mapped_dst_volume}' on '{mount_path}': {stderr}"
)
@ -480,10 +489,10 @@ class VMBuilderScript(VMBuilder):
if volume["mountpoint"] == "/":
root_volume = volume
if not root_volume:
raise ProvisioningError("Failed to find root volume in volumes list")
self.fail("Failed to find root volume in volumes list")
# Perform a debootstrap installation
print(
self.log_info(
f"Installing system with debootstrap: debootstrap --include={','.join(deb_packages)} {deb_release} {temp_dir} {deb_mirror}"
)
os.system(
@ -735,7 +744,7 @@ GRUB_DISABLE_LINUX_UUID=false
f"umount {mount_path}"
)
if retcode:
raise ProvisioningError(
self.log_err(
f"Failed to unmount '{mapped_dst_volume}' on '{mount_path}': {stderr}"
)
@ -747,6 +756,4 @@ GRUB_DISABLE_LINUX_UUID=false
dst_volume_name,
)
if not success:
raise ProvisioningError(
f"Failed to unmap '{mapped_dst_volume}': {stderr}"
)
self.log_err(f"Failed to unmap '{mapped_dst_volume}': {stderr}")

View File

@ -31,6 +31,20 @@
# function is provided in context of the example; see the other examples for
# more potential uses.
# Within the VMBuilderScript class, several helper functions are exposed through
# the parent VMBuilder class:
# self.log_info(message):
# Use this function to log an "informational" message instead of "print()"
# self.log_warn(message):
# Use this function to log a "warning" message
# self.log_err(message):
# Use this function to log an "error" message outside of an exception (see below)
# self.fail(message, exception=<ExceptionClass>):
# Use this function to bail out of the script safely instead if raising a
# normal Python exception. You may pass an optional exception class keyword
# argument for posterity in the logs if you wish; otherwise, ProvisioningException
# is used. This function implicitly calls a "self.log_err" with the passed message
# Within the VMBuilderScript class, several common variables are exposed through
# the parent VMBuilder class:
# self.vm_name: The name of the VM from PVC's perspective
@ -132,9 +146,8 @@
# since they could still do destructive things to /dev and the like!
# This import is always required here, as VMBuilder is used by the VMBuilderScript class
# and ProvisioningError is the primary exception that should be raised within the class.
from pvcapid.vmbuilder import VMBuilder, ProvisioningError
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
from pvcapid.vmbuilder import VMBuilder
# The VMBuilderScript class must be named as such, and extend VMBuilder.
@ -159,7 +172,7 @@ class VMBuilderScript(VMBuilder):
if retcode:
# Raise a ProvisioningError for any exception; the provisioner will handle
# this gracefully and properly, avoiding dangling mounts, RBD maps, etc.
raise ProvisioningError("Failed to find critical dependency: rinse")
self.fail("Failed to find critical dependency: rinse")
def create(self):
"""
@ -312,9 +325,9 @@ class VMBuilderScript(VMBuilder):
volume["source_volume"],
f"{self.vm_name}_{volume['disk_id']}",
)
print(message)
self.log_info(message)
if not success:
raise ProvisioningError(
self.fail(
f"Failed to clone volume '{volume['source_volume']}' to '{volume['disk_id']}'."
)
else:
@ -325,11 +338,9 @@ class VMBuilderScript(VMBuilder):
f"{self.vm_name}_{volume['disk_id']}",
f"{volume['disk_size_gb']}G",
)
print(message)
self.log_info(message)
if not success:
raise ProvisioningError(
f"Failed to create volume '{volume['disk_id']}'."
)
self.fail(f"Failed to create volume '{volume['disk_id']}'.")
# Second loop: Map the disks to the local system
for volume in self.vm_data["volumes"]:
@ -342,9 +353,9 @@ class VMBuilderScript(VMBuilder):
volume["pool"],
dst_volume_name,
)
print(message)
self.log_info(message)
if not success:
raise ProvisioningError(f"Failed to map volume '{dst_volume}'.")
self.fail(f"Failed to map volume '{dst_volume}'.")
# Third loop: Create filesystems on the volumes
for volume in self.vm_data["volumes"]:
@ -370,19 +381,17 @@ class VMBuilderScript(VMBuilder):
f"mkswap -f /dev/rbd/{dst_volume}"
)
if retcode:
raise ProvisioningError(
f"Failed to create swap on '{dst_volume}': {stderr}"
)
self.fail(f"Failed to create swap on '{dst_volume}': {stderr}")
else:
retcode, stdout, stderr = pvc_common.run_os_command(
f"mkfs.{volume['filesystem']} {filesystem_args} /dev/rbd/{dst_volume}"
)
if retcode:
raise ProvisioningError(
self.fail(
f"Faield to create {volume['filesystem']} file on '{dst_volume}': {stderr}"
)
print(stdout)
self.log_info(stdout)
# Create a temporary directory to use during install
temp_dir = "/tmp/target"
@ -413,7 +422,7 @@ class VMBuilderScript(VMBuilder):
f"mount {mapped_dst_volume} {mount_path}"
)
if retcode:
raise ProvisioningError(
self.fail(
f"Failed to mount '{mapped_dst_volume}' on '{mount_path}': {stderr}"
)
@ -492,7 +501,7 @@ class VMBuilderScript(VMBuilder):
if volume["mountpoint"] == "/":
root_volume = volume
if not root_volume:
raise ProvisioningError("Failed to find root volume in volumes list")
self.fail("Failed to find root volume in volumes list")
if rinse_mirror is not None:
mirror_arg = f"--mirror {rinse_mirror}"
@ -509,7 +518,7 @@ class VMBuilderScript(VMBuilder):
fh.write(f"{pkg}\n")
# Perform a rinse installation
print(
self.log_info(
f"Installing system with rinse: rinse --arch {rinse_architecture} --directory {temporary_directory} --distribution {rinse_release} --cache-dir {rinse_cache} --add-pkg-list /tmp/addpkg --verbose {mirror_arg}"
)
os.system(
@ -711,7 +720,7 @@ GRUB_SERIAL_COMMAND="serial --speed=115200 --unit=0 --word=8 --parity=no --stop=
f"umount {mount_path}"
)
if retcode:
raise ProvisioningError(
self.log_err(
f"Failed to unmount '{mapped_dst_volume}' on '{mount_path}': {stderr}"
)
@ -723,6 +732,4 @@ GRUB_SERIAL_COMMAND="serial --speed=115200 --unit=0 --word=8 --parity=no --stop=
dst_volume_name,
)
if not success:
raise ProvisioningError(
f"Failed to unmap '{mapped_dst_volume}': {stderr}"
)
self.log_err(f"Failed to unmap '{mapped_dst_volume}': {stderr}")

View File

@ -57,6 +57,20 @@
# function is provided in context of the example; see the other examples for
# more potential uses.
# Within the VMBuilderScript class, several helper functions are exposed through
# the parent VMBuilder class:
# self.log_info(message):
# Use this function to log an "informational" message instead of "print()"
# self.log_warn(message):
# Use this function to log a "warning" message
# self.log_err(message):
# Use this function to log an "error" message outside of an exception (see below)
# self.fail(message, exception=<ExceptionClass>):
# Use this function to bail out of the script safely instead if raising a
# normal Python exception. You may pass an optional exception class keyword
# argument for posterity in the logs if you wish; otherwise, ProvisioningException
# is used. This function implicitly calls a "self.log_err" with the passed message
# Within the VMBuilderScript class, several common variables are exposed through
# the parent VMBuilder class:
# self.vm_name: The name of the VM from PVC's perspective
@ -158,9 +172,8 @@
# since they could still do destructive things to /dev and the like!
# This import is always required here, as VMBuilder is used by the VMBuilderScript class
# and ProvisioningError is the primary exception that should be raised within the class.
from pvcapid.vmbuilder import VMBuilder, ProvisioningError
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
from pvcapid.vmbuilder import VMBuilder
# Set up some variables for later; if you frequently use these tools, you might benefit from
@ -189,9 +202,7 @@ class VMBuilderScript(VMBuilder):
# Ensure that our required runtime variables are defined
if self.vm_data["script_arguments"].get("pfsense_wan_iface") is None:
raise ProvisioningError(
"Required script argument 'pfsense_wan_iface' not provided"
)
self.fail("Required script argument 'pfsense_wan_iface' not provided")
if self.vm_data["script_arguments"].get("pfsense_wan_dhcp") is None:
for argument in [
@ -199,9 +210,7 @@ class VMBuilderScript(VMBuilder):
"pfsense_wan_gateway",
]:
if self.vm_data["script_arguments"].get(argument) is None:
raise ProvisioningError(
f"Required script argument '{argument}' not provided"
)
self.fail(f"Required script argument '{argument}' not provided")
# Ensure we have all dependencies intalled on the provisioner system
for dependency in "wget", "unzip", "gzip":
@ -209,9 +218,7 @@ class VMBuilderScript(VMBuilder):
if retcode:
# Raise a ProvisioningError for any exception; the provisioner will handle
# this gracefully and properly, avoiding dangling mounts, RBD maps, etc.
raise ProvisioningError(
f"Failed to find critical dependency: {dependency}"
)
self.fail(f"Failed to find critical dependency: {dependency}")
# Create a temporary directory to use for Packer binaries/scripts
packer_temp_dir = "/tmp/packer"
@ -361,41 +368,39 @@ class VMBuilderScript(VMBuilder):
packer_temp_dir = "/tmp/packer"
# Download pfSense image file to temporary target directory
print(f"Downloading pfSense ISO image from {PFSENSE_ISO_URL}")
self.log_info(f"Downloading pfSense ISO image from {PFSENSE_ISO_URL}")
retcode, stdout, stderr = pvc_common.run_os_command(
f"wget --output-document={packer_temp_dir}/dl/pfsense.iso.gz {PFSENSE_ISO_URL}"
)
if retcode:
raise ProvisioningError(
f"Failed to download pfSense image from {PFSENSE_ISO_URL}"
)
self.fail(f"Failed to download pfSense image from {PFSENSE_ISO_URL}")
# Extract pfSense image file under temporary target directory
print(f"Extracting pfSense ISO image")
self.log_info(f"Extracting pfSense ISO image")
retcode, stdout, stderr = pvc_common.run_os_command(
f"gzip --decompress {packer_temp_dir}/dl/pfsense.iso.gz"
)
if retcode:
raise ProvisioningError("Failed to extract pfSense ISO image")
self.fail("Failed to extract pfSense ISO image")
# Download Packer to temporary target directory
print(f"Downloading Packer from {PACKER_URL}")
self.log_info(f"Downloading Packer from {PACKER_URL}")
retcode, stdout, stderr = pvc_common.run_os_command(
f"wget --output-document={packer_temp_dir}/packer.zip {PACKER_URL}"
)
if retcode:
raise ProvisioningError(f"Failed to download Packer from {PACKER_URL}")
self.fail(f"Failed to download Packer from {PACKER_URL}")
# Extract Packer under temporary target directory
print(f"Extracting Packer binary")
self.log_info(f"Extracting Packer binary")
retcode, stdout, stderr = pvc_common.run_os_command(
f"unzip {packer_temp_dir}/packer.zip -d {packer_temp_dir}"
)
if retcode:
raise ProvisioningError("Failed to extract Packer binary")
self.fail("Failed to extract Packer binary")
# Output the Packer configuration
print(f"Generating Packer configurations")
self.log_info(f"Generating Packer configurations")
first_volume = self.vm_data["volumes"][0]
first_volume_size_mb = int(first_volume["disk_size_gb"]) * 1024
@ -829,7 +834,7 @@ class VMBuilderScript(VMBuilder):
fh.write(pfsense_config)
# Create the disk(s)
print(f"Creating volumes")
self.log_info(f"Creating volumes")
for volume in self.vm_data["volumes"]:
with open_zk(config) as zkhandler:
success, message = pvc_ceph.add_volume(
@ -838,14 +843,12 @@ class VMBuilderScript(VMBuilder):
f"{self.vm_name}_{volume['disk_id']}",
f"{volume['disk_size_gb']}G",
)
print(message)
self.log_info(message)
if not success:
raise ProvisioningError(
f"Failed to create volume '{volume['disk_id']}'."
)
self.fail(f"Failed to create volume '{volume['disk_id']}'.")
# Map the target RBD volumes
print(f"Mapping volumes")
self.log_info(f"Mapping volumes")
for volume in self.vm_data["volumes"]:
dst_volume_name = f"{self.vm_name}_{volume['disk_id']}"
dst_volume = f"{volume['pool']}/{dst_volume_name}"
@ -856,9 +859,9 @@ class VMBuilderScript(VMBuilder):
volume["pool"],
dst_volume_name,
)
print(message)
self.log_info(message)
if not success:
raise ProvisioningError(f"Failed to map volume '{dst_volume}'.")
self.fail(f"Failed to map volume '{dst_volume}'.")
def install(self):
"""
@ -871,7 +874,7 @@ class VMBuilderScript(VMBuilder):
packer_temp_dir = "/tmp/packer"
print(
self.log_info(
f"Running Packer: PACKER_LOG=1 PACKER_CONFIG_DIR={packer_temp_dir} PACKER_CACHE_DIR={packer_temp_dir} {packer_temp_dir}/packer build {packer_temp_dir}/build.json"
)
os.system(
@ -879,9 +882,9 @@ class VMBuilderScript(VMBuilder):
)
if not os.path.exists(f"{packer_temp_dir}/bin/{self.vm_name}"):
raise ProvisioningError("Packer failed to build output image")
self.fail("Packer failed to build output image")
print("Copying output image to first volume")
self.log_info("Copying output image to first volume")
first_volume = self.vm_data["volumes"][0]
dst_volume_name = f"{self.vm_name}_{first_volume['disk_id']}"
dst_volume = f"{first_volume['pool']}/{dst_volume_name}"

View File

@ -26,6 +26,7 @@ from flask_restful import Resource, Api, reqparse, abort
from celery import Celery
from kombu import Queue
from lxml.objectify import fromstring as lxml_fromstring
from uuid import uuid4
from daemon_lib.common import getPrimaryNode
from daemon_lib.zkhandler import ZKConnection
@ -56,12 +57,11 @@ from flask_sqlalchemy import SQLAlchemy
# Create Flask app and set config values
app = flask.Flask(__name__)
app.config["CELERY_BROKER_URL"] = "redis://{}:{}{}".format(
config["queue_host"], config["queue_port"], config["queue_path"]
)
app.config["CELERY_RESULT_BACKEND"] = "redis://{}:{}{}".format(
celery_task_uri = "redis://{}:{}{}".format(
config["queue_host"], config["queue_port"], config["queue_path"]
)
app.config["CELERY_BROKER_URL"] = celery_task_uri
app.config["CELERY_RESULT_BACKEND"] = celery_task_uri
# Set up Celery queues
@ -71,6 +71,11 @@ def get_all_nodes(zkhandler):
return [n["name"] for n in all_nodes]
@ZKConnection(config)
def get_primary_node(zkhandler):
return getPrimaryNode(zkhandler)
app.config["CELERY_QUEUES"] = tuple(
[Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()]
)
@ -78,16 +83,14 @@ app.config["CELERY_QUEUES"] = tuple(
# Set up Celery queue routing
def route_task(name, args, kwargs, options, task=None, **kw):
@ZKConnection(config)
def get_primary_node(zkhandler):
return getPrimaryNode(zkhandler)
print("----")
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}")
# If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys():
run_on = kwargs[options["routing_key"]]
if run_on == "primary":
run_on = get_primary_node()
# Otherwise, use the primary node
else:
run_on = get_primary_node()
@ -100,6 +103,19 @@ def route_task(name, args, kwargs, options, task=None, **kw):
app.config["CELERY_ROUTES"] = (route_task,)
# Set up Celery task ID generator
# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID.
def run_celery_task(task_def, **kwargs):
task_id = str(uuid4()).split("-")[0]
task = task_def.apply_async(
(),
kwargs,
task_id=task_id,
)
return task
# Set up SQLAlchemy backend
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_DATABASE_URI"] = "postgresql://{}:{}@{}:{}/{}".format(
@ -129,7 +145,12 @@ api = Api(blueprint)
app.register_blueprint(blueprint)
# Create celery definition
celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"])
celery = Celery(
app.name,
broker=celery_task_uri,
result_backend=celery_task_uri,
result_extended=True,
)
celery.conf.update(app.config)
#
@ -198,9 +219,15 @@ def Authenticator(function):
#
# Job functions
#
@celery.task(name="provisioner.create", bind=True)
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
def create_vm(
self, vm_name, profile_name, define_vm=True, start_vm=True, script_run_args=[]
self,
vm_name=None,
profile_name=None,
define_vm=True,
start_vm=True,
script_run_args=[],
run_on="primary",
):
return api_vmbuilder.create_vm(
self,
@ -212,13 +239,13 @@ def create_vm(
)
@celery.task(name="storage.benchmark", bind=True)
def run_benchmark(self, pool):
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
def run_benchmark(self, pool=None, run_on="primary"):
return api_benchmark.run_benchmark(self, pool)
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
def vm_flush_locks(self, domain, force_unlock=False, run_on="primary"):
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
@ZKConnection(config)
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
@ -227,7 +254,7 @@ def vm_flush_locks(self, domain, force_unlock=False, run_on="primary"):
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
def vm_device_attach(self, domain, xml, run_on=None):
def vm_device_attach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_attach(zkhandler, self, domain, xml):
return vm_worker_attach_device(zkhandler, self, domain, xml)
@ -236,7 +263,7 @@ def vm_device_attach(self, domain, xml, run_on=None):
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
def vm_device_detach(self, domain, xml, run_on=None):
def vm_device_detach(self, domain=None, xml=None, run_on=None):
@ZKConnection(config)
def run_vm_device_detach(zkhandler, self, domain, xml):
return vm_worker_detach_device(zkhandler, self, domain, xml)
@ -247,8 +274,8 @@ def vm_device_detach(self, domain, xml, run_on=None):
@celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add(
self,
device,
weight,
device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
@ -284,8 +311,8 @@ def osd_add(
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
def osd_replace(
self,
osd_id,
new_device,
osd_id=None,
new_device=None,
old_device=None,
weight=None,
ext_db_ratio=None,
@ -322,7 +349,7 @@ def osd_replace(
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
def osd_refresh(self, osd_id, device, ext_db_flag=False, run_on=None):
def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
return osd_worker_refresh_osd(
@ -333,7 +360,7 @@ def osd_refresh(self, osd_id, device, ext_db_flag=False, run_on=None):
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
def osd_remove(self, osd_id, force_flag=False, skip_zap_flag=False, run_on=None):
def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_remove(
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
@ -346,7 +373,7 @@ def osd_remove(self, osd_id, force_flag=False, skip_zap_flag=False, run_on=None)
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
def osd_add_db_vg(self, device, run_on=None):
def osd_add_db_vg(self, device=None, run_on=None):
@ZKConnection(config)
def run_osd_add_db_vg(zkhandler, self, run_on, device):
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
@ -2428,10 +2455,10 @@ class API_VM_Locks(Resource):
else:
return vm_node_detail, retcode
task = vm_flush_locks.delay(vm, run_on=vm_node)
task = run_celery_task(vm_flush_locks, domain=vm, run_on=vm_node)
return (
{"task_id": task.id, "run_on": vm_node},
{"task_id": task.id, "task_name": "vm.flush_locks", "run_on": vm_node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@ -2563,10 +2590,10 @@ class API_VM_Device(Resource):
else:
return vm_node_detail, retcode
task = vm_device_attach.delay(vm, xml, run_on=vm_node)
task = run_celery_task(vm_device_attach, domain=vm, xml=xml, run_on=vm_node)
return (
{"task_id": task.id, "run_on": vm_node},
{"task_id": task.id, "task_name": "vm.device_attach", "run_on": vm_node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@ -2617,10 +2644,10 @@ class API_VM_Device(Resource):
else:
return vm_node_detail, retcode
task = vm_device_detach.delay(vm, xml, run_on=vm_node)
task = run_celery_task(vm_device_detach, domain=vm, xml=xml, run_on=vm_node)
return (
{"task_id": task.id, "run_on": vm_node},
{"task_id": task.id, "task_name": "vm.device_detach", "run_on": vm_node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@ -4373,11 +4400,17 @@ class API_Storage_Ceph_Benchmark(Resource):
"message": 'Pool "{}" is not valid.'.format(reqargs.get("pool"))
}, 400
task = run_benchmark.delay(reqargs.get("pool", None))
task = run_celery_task(
run_benchmark, pool=reqargs.get("pool", None), run_on="primary"
)
return (
{"task_id": task.id},
{
"task_id": task.id,
"task_name": "storage.benchmark",
"run_on": get_primary_node(),
},
202,
{"Location": Api.url_for(api, API_Storage_Ceph_Benchmark, task_id=task.id)},
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@ -4493,10 +4526,12 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
"""
node = reqargs.get("node", None)
task = osd_add_db_vg.delay(reqargs.get("device", None), run_on=node)
task = run_celery_task(
osd_add_db_vg, device=reqargs.get("device", None), run_on=node
)
return (
{"task_id": task.id, "run_on": node},
{"task_id": task.id, "task_name": "osd.add_db_vg", "run_on": node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@ -4690,17 +4725,18 @@ class API_Storage_Ceph_OSD_Root(Resource):
"""
node = reqargs.get("node", None)
task = osd_add.delay(
reqargs.get("device", None),
reqargs.get("weight", None),
reqargs.get("ext_db_ratio", None),
reqargs.get("ext_db_size", None),
reqargs.get("osd_count", None),
task = run_celery_task(
osd_add,
device=reqargs.get("device", None),
weight=reqargs.get("weight", None),
ext_db_ratio=reqargs.get("ext_db_ratio", None),
ext_db_size=reqargs.get("ext_db_size", None),
split_count=reqargs.get("osd_count", None),
run_on=node,
)
return (
{"task_id": task.id, "run_on": node},
{"task_id": task.id, "task_name": "osd.add", "run_on": node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@ -4808,18 +4844,19 @@ class API_Storage_Ceph_OSD_Element(Resource):
else:
return osd_node_detail, retcode
task = osd_replace.delay(
osdid,
reqargs.get("new_device"),
reqargs.get("old_device", None),
reqargs.get("weight", None),
reqargs.get("ext_db_ratio", None),
reqargs.get("ext_db_size", None),
task = run_celery_task(
osd_replace,
osd_id=osdid,
new_device=reqargs.get("new_device"),
old_device=reqargs.get("old_device", None),
weight=reqargs.get("weight", None),
ext_db_ratio=reqargs.get("ext_db_ratio", None),
ext_db_size=reqargs.get("ext_db_size", None),
run_on=node,
)
return (
{"task_id": task.id, "run_on": node},
{"task_id": task.id, "task_name": "osd.replace", "run_on": node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@ -4865,14 +4902,16 @@ class API_Storage_Ceph_OSD_Element(Resource):
else:
return osd_node_detail, retcode
task = osd_refresh.delay(
osdid,
reqargs.get("device", None),
task = run_celery_task(
osd_refresh,
osd_id=osdid,
device=reqargs.get("device", None),
ext_db_flag=False,
run_on=node,
)
return (
{"task_id": task.id, "run_on": node},
{"task_id": task.id, "task_name": "osd.refresh", "run_on": node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@ -4934,14 +4973,15 @@ class API_Storage_Ceph_OSD_Element(Resource):
else:
return osd_node_detail, retcode
task = osd_remove.delay(
osdid,
task = run_celery_task(
osd_remove,
osd_id=osdid,
force_flag=reqargs.get("force", False),
run_on=node,
)
return (
{"task_id": task.id, "run_on": node},
{"task_id": task.id, "task_name": "osd.remove", "run_on": node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@ -8462,122 +8502,24 @@ class API_Provisioner_Create_Root(Resource):
else:
start_vm = False
task = create_vm.delay(
reqargs.get("name", None),
reqargs.get("profile", None),
task = run_celery_task(
create_vm,
vm_name=reqargs.get("name", None),
profile_name=reqargs.get("profile", None),
define_vm=define_vm,
start_vm=start_vm,
script_run_args=reqargs.get("arg", []),
run_on="primary",
)
return (
{"task_id": task.id},
202,
{
"Location": Api.url_for(
api, API_Provisioner_Status_Element, task_id=task.id
)
"task_id": task.id,
"task_name": "provisioner.create",
"run_on": get_primary_node(),
},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
api.add_resource(API_Provisioner_Create_Root, "/provisioner/create")
# /provisioner/status
class API_Provisioner_Status_Root(Resource):
@Authenticator
def get(self):
"""
View status of provisioner Celery queue
---
tags:
- provisioner
responses:
200:
description: OK
schema:
type: object
properties:
active:
type: object
description: Celery app.control.inspect active tasks
reserved:
type: object
description: Celery app.control.inspect reserved tasks
scheduled:
type: object
description: Celery app.control.inspect scheduled tasks
"""
queue = celery.control.inspect()
response = {
"scheduled": queue.scheduled(),
"active": queue.active(),
"reserved": queue.reserved(),
}
return response
api.add_resource(API_Provisioner_Status_Root, "/provisioner/status")
# /provisioner/status/<task_id>
class API_Provisioner_Status_Element(Resource):
@Authenticator
def get(self, task_id):
"""
View status of a provisioner Celery worker job {task_id}
---
tags:
- provisioner
responses:
200:
description: OK
schema:
type: object
properties:
total:
type: integer
description: Total number of steps
current:
type: integer
description: Current steps completed
state:
type: string
description: Current job state
status:
type: string
description: Status details about job
404:
description: Not found
schema:
type: object
id: Message
"""
task = create_vm.AsyncResult(task_id)
if task.state == "PENDING":
response = {
"state": task.state,
"current": 0,
"total": 1,
"status": "Pending job start",
}
elif task.state != "FAILURE":
response = {
"state": task.state,
"current": task.info.get("current", 0),
"total": task.info.get("total", 1),
"status": task.info.get("status", ""),
}
if "result" in task.info:
response["result"] = task.info["result"]
else:
response = {
"state": task.state,
"current": 1,
"total": 1,
"status": str(task.info),
}
return response
api.add_resource(API_Provisioner_Status_Element, "/provisioner/status/<task_id>")

View File

@ -26,7 +26,6 @@ import re
import os
# import sys
import time
import importlib.util
import uuid
@ -35,6 +34,7 @@ from contextlib import contextmanager
from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKHandler
from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish
import daemon_lib.common as pvc_common
import daemon_lib.node as pvc_node
@ -87,6 +87,21 @@ class VMBuilder(object):
self.vm_profile = vm_profile
self.vm_data = vm_data
#
# Helper class functions; used by the individual scripts
#
def log_info(self, msg):
log_info(None, msg)
def log_warn(self, msg):
log_warn(None, msg)
def log_err(self, msg):
log_err(None, msg)
def fail(self, msg, exception=ProvisioningError):
fail(None, msg, exception=exception)
#
# Primary class functions; implemented by the individual scripts
#
@ -160,7 +175,11 @@ def open_db(config):
)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
except Exception:
raise ClusterError("Failed to connect to Postgres")
fail(
None,
"Failed to connect to Postgres",
exception=ClusterError,
)
try:
yield cur
@ -179,7 +198,11 @@ def open_zk(config):
zkhandler = ZKHandler(config)
zkhandler.connect()
except Exception:
raise ClusterError("Failed to connect to Zookeeper")
fail(
None,
"Failed to connect to Zookeeper",
exception=ClusterError,
)
try:
yield zkhandler
@ -194,19 +217,28 @@ def open_zk(config):
# Main VM provisioning function - executed by the Celery worker
#
def create_vm(
self, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]
celery, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]
):
print(f"Starting provisioning of VM '{vm_name}' with profile '{vm_profile}'")
current_stage = 0
total_stages = 10
start(
celery,
f"Starting provisioning of VM '{vm_name}' with profile '{vm_profile}'",
current=current_stage,
total=total_stages,
)
# Phase 1 - setup
# * Get the profile elements
# * Get the details from these elements
# * Assemble a VM configuration dictionary
self.update_state(
state="RUNNING",
meta={"current": 1, "total": 10, "status": "Collecting configuration"},
current_stage += 1
update(
celery,
"Collecting configuration details",
current=current_stage,
total=total_stages,
)
time.sleep(1)
vm_id = re.findall(r"/(\d+)$/", vm_name)
if not vm_id:
@ -313,13 +345,14 @@ def create_vm(
argument_name, argument_data = argument.split("=")
script_arguments[argument_name] = argument_data
print("Script arguments: {}".format(script_arguments))
log_info(celery, f"Script arguments: {script_arguments}")
vm_data["script_arguments"] = script_arguments
print(
log_info(
celery,
"VM configuration data:\n{}".format(
json.dumps(vm_data, sort_keys=True, indent=2)
)
),
)
# Phase 2 - verification
@ -327,21 +360,21 @@ def create_vm(
# * Ensure that all networks are valid
# * Ensure that there is enough disk space in the Ceph cluster for the disks
# This is the "safe fail" step when an invalid configuration will be caught
self.update_state(
state="RUNNING",
meta={
"current": 2,
"total": 10,
"status": "Verifying configuration against cluster",
},
current_stage += 1
update(
celery,
"Verifying configuration against cluster",
current=current_stage,
total=total_stages,
)
time.sleep(1)
with open_zk(config) as zkhandler:
# Verify that a VM with this name does not already exist
if pvc_vm.searchClusterByName(zkhandler, vm_name):
raise ClusterError(
"A VM with the name '{}' already exists in the cluster.".format(vm_name)
fail(
celery,
f"A VM with the name '{vm_name}' already exists in the cluster.",
exception=ClusterError,
)
# Verify that at least one host has enough free RAM to run the VM
@ -361,16 +394,15 @@ def create_vm(
target_node = node["name"]
# Raise if no node was found
if not target_node:
raise ClusterError(
"No ready cluster node contains at least {}+512 MB of free RAM.".format(
vm_data["system_details"]["vram_mb"]
)
fail(
celery,
f"No ready cluster node contains at least {vm_data['system_details']['vram_mb']}+512 MB of free RAM",
exception=ClusterError,
)
print(
'Selecting target node "{}" with "{}" MB free RAM'.format(
target_node, last_free
)
log_info(
celery,
f'Selecting target node "{target_node}" with "{last_free}" MB free RAM',
)
# Verify that all configured networks are present on the cluster
@ -382,11 +414,13 @@ def create_vm(
"cluster",
"storage",
]:
raise ClusterError(
'The network VNI "{}" is not present on the cluster.'.format(vni)
fail(
celery,
f'The network VNI "{vni}" is not present on the cluster.',
exception=ClusterError,
)
print("All configured networks for VM are valid")
log_info(celery, "All configured networks for VM are valid")
# Verify that there is enough disk space free to provision all VM disks
pools = dict()
@ -396,10 +430,10 @@ def create_vm(
zkhandler, volume["pool"], volume["source_volume"]
)
if not volume_data:
raise ClusterError(
"The source volume {}/{} could not be found.".format(
volume["pool"], volume["source_volume"]
)
fail(
celery,
f"The source volume {volume['pool']}/{volume['source_volume']} could not be found.",
exception=ClusterError,
)
if not volume["pool"] in pools:
pools[volume["pool"]] = int(
@ -427,8 +461,10 @@ def create_vm(
if not pool_information:
raise
except Exception:
raise ClusterError(
'Pool "{}" is not present on the cluster.'.format(pool)
fail(
celery,
f'Pool "{pool}" is not present on the cluster.',
exception=ClusterError,
)
pool_free_space_gb = int(
pool_information["stats"]["free_bytes"] / 1024 / 1024 / 1024
@ -436,13 +472,13 @@ def create_vm(
pool_vm_usage_gb = int(pools[pool])
if pool_vm_usage_gb >= pool_free_space_gb:
raise ClusterError(
'Pool "{}" has only {} GB free and VM requires {} GB.'.format(
pool, pool_free_space_gb, pool_vm_usage_gb
)
fail(
celery,
f'Pool "{pool}" has only {pool_free_space_gb} GB free but VM requires {pool_vm_usage_gb} GB.',
exception=ClusterError,
)
print("There is enough space on cluster to store VM volumes")
log_info(celery, "There is enough space on cluster to store VM volumes")
# Verify that every specified filesystem is valid
used_filesystems = list()
@ -461,33 +497,44 @@ def create_vm(
elif filesystem == "swap":
retcode, stdout, stderr = pvc_common.run_os_command("which mkswap")
if retcode:
raise ProvisioningError(
"Failed to find binary for mkswap: {}".format(stderr)
fail(
celery,
f"Failed to find binary for mkswap: {stderr}",
exception=ProvisioningError,
)
else:
retcode, stdout, stderr = pvc_common.run_os_command(
"which mkfs.{}".format(filesystem)
)
if retcode:
raise ProvisioningError(
"Failed to find binary for mkfs.{}: {}".format(filesystem, stderr)
fail(
celery,
f"Failed to find binary for mkfs.{filesystem}: {stderr}",
exception=ProvisioningError,
)
print("All selected filesystems are valid")
log_info(celery, "All selected filesystems are valid")
# Phase 3 - provisioning script preparation
# * Import the provisioning script as a library with importlib
# * Ensure the required function(s) are present
self.update_state(
state="RUNNING",
meta={"current": 3, "total": 10, "status": "Preparing provisioning script"},
current_stage += 1
update(
celery,
"Preparing provisioning script",
current=current_stage,
total=total_stages,
)
time.sleep(1)
# Write the script out to a temporary file
retcode, stdout, stderr = pvc_common.run_os_command("mktemp")
if retcode:
raise ProvisioningError("Failed to create a temporary file: {}".format(stderr))
fail(
celery,
f"Failed to create a temporary file: {stderr}",
exception=ProvisioningError,
)
script_file = stdout.strip()
with open(script_file, "w") as fh:
fh.write(vm_data["script"])
@ -507,12 +554,17 @@ def create_vm(
vm_data=vm_data,
)
print("Provisioning script imported successfully")
log_info(celery, "Provisioning script imported successfully")
# Create temporary directory for external chroot
retcode, stdout, stderr = pvc_common.run_os_command("mktemp -d")
if retcode:
raise ProvisioningError(f"Failed to create a temporary directory: {stderr}")
fail(
celery,
f"Failed to create a temporary directory: {stderr}",
exception=ProvisioningError,
)
temp_dir = stdout.strip()
# Bind mount / to the chroot location /
@ -520,8 +572,10 @@ def create_vm(
f"mount --bind --options ro / {temp_dir}"
)
if retcode:
raise ProvisioningError(
f"Failed to mount rootfs onto {temp_dir} for chroot: {stderr}"
fail(
celery,
f"Failed to mount rootfs into {temp_dir} for chroot: {stderr}",
exception=ProvisioningError,
)
# Mount tmpfs to the chroot location /tmp
@ -529,8 +583,10 @@ def create_vm(
f"mount --type tmpfs tmpfs {temp_dir}/tmp"
)
if retcode:
raise ProvisioningError(
f"Failed to mount tmpfs onto {temp_dir}/tmp for chroot: {stderr}"
fail(
celery,
f"Failed to mount tmpfs onto {temp_dir}/tmp for chroot: {stderr}",
exception=ProvisioningError,
)
# Bind mount /dev to the chroot location /dev
@ -538,8 +594,10 @@ def create_vm(
f"mount --bind --options ro /dev {temp_dir}/dev"
)
if retcode:
raise ProvisioningError(
f"Failed to mount devfs onto {temp_dir}/dev for chroot: {stderr}"
fail(
celery,
f"Failed to mount devfs onto {temp_dir}/dev for chroot: {stderr}",
exception=ProvisioningError,
)
# Bind mount /run to the chroot location /run
@ -547,8 +605,10 @@ def create_vm(
f"mount --bind --options rw /run {temp_dir}/run"
)
if retcode:
raise ProvisioningError(
f"Failed to mount runfs onto {temp_dir}/run for chroot: {stderr}"
fail(
celery,
f"Failed to mount runfs onto {temp_dir}/run for chroot: {stderr}",
exception=ProvisioningError,
)
# Bind mount /sys to the chroot location /sys
@ -556,8 +616,10 @@ def create_vm(
f"mount --bind --options rw /sys {temp_dir}/sys"
)
if retcode:
raise ProvisioningError(
f"Failed to mount sysfs onto {temp_dir}/sys for chroot: {stderr}"
fail(
celery,
f"Failed to mount sysfs onto {temp_dir}/sys for chroot: {stderr}",
exception=ProvisioningError,
)
# Bind mount /proc to the chroot location /proc
@ -565,14 +627,16 @@ def create_vm(
f"mount --bind --options rw /proc {temp_dir}/proc"
)
if retcode:
raise ProvisioningError(
f"Failed to mount procfs onto {temp_dir}/proc for chroot: {stderr}"
fail(
celery,
f"Failed to mount procfs onto {temp_dir}/proc for chroot: {stderr}",
exception=ProvisioningError,
)
print("Chroot environment prepared successfully")
log_info(celery, "Chroot environment prepared successfully")
def general_cleanup():
print("Running upper cleanup steps")
log_info(celery, "Running upper cleanup steps")
try:
# Unmount bind-mounted devfs on the chroot
@ -599,68 +663,73 @@ def create_vm(
retcode, stdout, stderr = pvc_common.run_os_command(f"umount {temp_dir}")
except Exception as e:
# We don't care about fails during cleanup, log and continue
print(f"Suberror during general cleanup unmounts: {e}")
log_warn(celery, f"Suberror during general cleanup unmounts: {e}")
try:
# Remove the temp_dir
os.rmdir(temp_dir)
except Exception as e:
# We don't care about fails during cleanup, log and continue
print(f"Suberror during general cleanup directory removal: {e}")
log_warn(celery, f"Suberror during general cleanup directory removal: {e}")
try:
# Remote temporary script (don't fail if not removed)
os.remove(script_file)
except Exception as e:
# We don't care about fails during cleanup, log and continue
print(f"Suberror during general cleanup script removal: {e}")
log_warn(celery, f"Suberror during general cleanup script removal: {e}")
def fail_clean(celery, msg, exception=ProvisioningError):
try:
vm_builder.cleanup()
general_cleanup()
except Exception:
pass
fail(celery, msg, exception=exception)
# Phase 4 - script: setup()
# * Run pre-setup steps
self.update_state(
state="RUNNING",
meta={
"current": 4,
"total": 10,
"status": "Running script setup() step",
},
current_stage += 1
update(
celery, "Running script setup() step", current=current_stage, total=total_stages
)
time.sleep(1)
print("Running script setup() step")
try:
with chroot(temp_dir):
vm_builder.setup()
except Exception as e:
general_cleanup()
raise ProvisioningError(f"Error in script setup() step: {e}")
fail_clean(
celery,
f"Error in script setup() step: {e}",
exception=ProvisioningError,
)
# Phase 5 - script: create()
# * Prepare the libvirt XML defintion for the VM
self.update_state(
state="RUNNING",
meta={
"current": 5,
"total": 10,
"status": "Running script create() step",
},
current_stage += 1
update(
celery,
"Running script create() step",
current=current_stage,
total=total_stages,
)
time.sleep(1)
if define_vm:
print("Running script create() step")
try:
with chroot(temp_dir):
vm_schema = vm_builder.create()
except Exception as e:
general_cleanup()
raise ProvisioningError(f"Error in script create() step: {e}")
fail_clean(
celery,
f"Error in script create() step: {e}",
exception=ProvisioningError,
)
print("Generated VM schema:\n{}\n".format(vm_schema))
log_info(celery, "Generated VM schema:\n{}\n".format(vm_schema))
print("Defining VM on cluster")
log_info(celery, "Defining VM on cluster")
node_limit = vm_data["system_details"]["node_limit"]
if node_limit:
node_limit = node_limit.split(",")
@ -679,23 +748,19 @@ def create_vm(
vm_profile,
initial_state="provision",
)
print(retmsg)
log_info(celery, retmsg)
else:
print("Skipping VM definition due to define_vm=False")
log_info("Skipping VM definition due to define_vm=False")
# Phase 6 - script: prepare()
# * Run preparation steps (e.g. disk creation and mapping, filesystem creation, etc.)
self.update_state(
state="RUNNING",
meta={
"current": 6,
"total": 10,
"status": "Running script prepare() step",
},
current_stage += 1
update(
celery,
"Running script prepare() step",
current=current_stage,
total=total_stages,
)
time.sleep(1)
print("Running script prepare() step")
try:
with chroot(temp_dir):
@ -704,21 +769,21 @@ def create_vm(
with chroot(temp_dir):
vm_builder.cleanup()
general_cleanup()
raise ProvisioningError(f"Error in script prepare() step: {e}")
fail_clean(
celery,
f"Error in script prepare() step: {e}",
exception=ProvisioningError,
)
# Phase 7 - script: install()
# * Run installation with arguments
self.update_state(
state="RUNNING",
meta={
"current": 7,
"total": 10,
"status": "Running script install() step",
},
current_stage += 1
update(
celery,
"Running script install() step",
current=current_stage,
total=total_stages,
)
time.sleep(1)
print("Running script install() step")
try:
with chroot(temp_dir):
@ -727,63 +792,62 @@ def create_vm(
with chroot(temp_dir):
vm_builder.cleanup()
general_cleanup()
raise ProvisioningError(f"Error in script install() step: {e}")
fail_clean(
celery,
f"Error in script install() step: {e}",
exception=ProvisioningError,
)
# Phase 8 - script: cleanup()
# * Run cleanup steps
self.update_state(
state="RUNNING",
meta={
"current": 8,
"total": 10,
"status": "Running script cleanup() step",
},
current_stage += 1
update(
celery,
"Running script cleanup() step",
current=current_stage,
total=total_stages,
)
time.sleep(1)
print("Running script cleanup() step")
try:
with chroot(temp_dir):
vm_builder.cleanup()
except Exception as e:
general_cleanup()
raise ProvisioningError(f"Error in script cleanup() step: {e}")
fail(
celery,
f"Error in script cleanup() step: {e}",
exception=ProvisioningError,
)
# Phase 9 - general cleanup
# * Clean up the chroot from earlier
self.update_state(
state="RUNNING",
meta={
"current": 9,
"total": 10,
"status": "Running general cleanup steps",
},
current_stage += 1
update(
celery,
"Running general cleanup steps",
current=current_stage,
total=total_stages,
)
time.sleep(1)
general_cleanup()
# Phase 10 - startup
# * Start the VM in the PVC cluster
self.update_state(
state="RUNNING",
meta={
"current": 10,
"total": 10,
"status": "Starting VM",
},
)
time.sleep(1)
current_stage += 1
update(celery, "Starting VM", current=current_stage, total=total_stages)
if start_vm:
print("Starting VM")
with open_zk(config) as zkhandler:
success, message = pvc_vm.start_vm(zkhandler, vm_name)
print(message)
log_info(celery, message)
end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned and started successfully'
else:
end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned successfully'
return {"status": end_message, "current": 10, "total": 10}
current_stage += 1
return finish(
celery,
end_message,
current=current_stage,
total=total_stages,
)

View File

@ -25,10 +25,10 @@ CELERY_BIN="$( which celery )"
# app arguments work in a non-backwards-compatible way with Celery 5.
case "$( cat /etc/debian_version )" in
10.*)
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --concurrency 3 --hostname $(hostname -s) --queues $(hostname -s) --loglevel INFO"
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
;;
*)
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --concurrency 3 --hostname $(hostname -s) --queues $(hostname -s) --loglevel INFO"
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
;;
esac

View File

@ -534,6 +534,55 @@ def cli_cluster_maintenance_off():
finish(retcode, retdata)
###############################################################################
# > pvc cluster task
###############################################################################
@click.command(name="task", short_help="Show status of worker task.")
@connection_req
@click.argument("task_id", required=False, default=None)
@click.option(
"--wait/--no-wait",
"wait_flag",
is_flag=True,
default=True,
show_default=True,
help="""Wait or don't wait for task to complete, showing progress. Requires TASK_ID; overrides "-f"/"--format".""",
)
@format_opt(
{
"pretty": cli_cluster_task_format_pretty,
"raw": lambda d: "\n".join([t["id"] for t in d])
if isinstance(d, list)
else d["state"],
"json": lambda d: jdumps(d),
"json-pretty": lambda d: jdumps(d, indent=2),
}
)
def cli_cluster_task(task_id, wait_flag, format_function):
"""
Show the current status of worker task TASK_ID or a list of all active and pending tasks.
"""
if task_id is None:
wait_flag = False
if wait_flag:
# First validate that this is actually a valid task that is running
retcode, retdata = pvc.lib.common.task_status(CLI_CONFIG, None)
if task_id in [i["id"] for i in retdata]:
task = [i for i in retdata if i["id"] == task_id][0]
retmsg = wait_for_celery_task(
CLI_CONFIG,
{"task_id": task["id"], "task_name": task["name"]},
start_late=True,
)
else:
retmsg = f"No task with ID {task_id} found."
finish(retcode, retmsg)
else:
retcode, retdata = pvc.lib.common.task_status(CLI_CONFIG, task_id)
finish(retcode, retdata, format_function)
###############################################################################
# > pvc node
###############################################################################
@ -575,18 +624,6 @@ def cli_node_primary(
Set NODE in primary coordinator state, making it the primary coordinator for the cluster.
"""
# Handle active provisioner task warnings
_, tasks_retdata = pvc.lib.provisioner.task_status(CLI_CONFIG, None)
if len(tasks_retdata) > 0:
echo(
CLI_CONFIG,
f"""\
NOTE: There are currently {len(tasks_retdata)} active or queued provisioner tasks.
These jobs will continue executing, but their status visibility will be lost until
the current primary node returns to primary state.
""",
)
retcode, retdata = pvc.lib.node.node_coordinator_state(CLI_CONFIG, node, "primary")
if not retcode or "already" in retdata:
finish(retcode, retdata)
@ -625,18 +662,6 @@ def cli_node_secondary(
Set NODE in secondary coordinator state, making another active node the primary node for the cluster.
"""
# Handle active provisioner task warnings
_, tasks_retdata = pvc.lib.provisioner.task_status(CLI_CONFIG, None)
if len(tasks_retdata) > 0:
echo(
CLI_CONFIG,
f"""\
NOTE: There are currently {len(tasks_retdata)} active or queued provisioner tasks.
These jobs will continue executing, but their status visibility will be lost until
the current primary node returns to primary state.
""",
)
retcode, retdata = pvc.lib.node.node_coordinator_state(
CLI_CONFIG, node, "secondary"
)
@ -3296,15 +3321,26 @@ def cli_storage_benchmark():
@click.command(name="run", short_help="Run a storage benchmark.")
@connection_req
@click.argument("pool")
@click.option(
"--wait/--no-wait",
"wait_flag",
is_flag=True,
default=True,
show_default=True,
help="Wait or don't wait for task to complete, showing progress",
)
@confirm_opt(
"Storage benchmarks take approximately 10 minutes to run and generate significant load on the cluster; they should be run sparingly. Continue"
)
def cli_storage_benchmark_run(pool):
def cli_storage_benchmark_run(pool, wait_flag):
"""
Run a storage benchmark on POOL in the background.
"""
retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(CLI_CONFIG, pool)
retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(CLI_CONFIG, pool, wait_flag)
if retcode and wait_flag:
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
finish(retcode, retmsg)
@ -5581,15 +5617,15 @@ def cli_provisioner_profile_list(limit, format_function):
help="Start the VM automatically upon completion of provisioning.",
)
@click.option(
"-w",
"--wait",
"--wait/--no-wait",
"wait_flag",
is_flag=True,
default=False,
help="Wait for provisioning to complete, showing progress",
default=True,
show_default=True,
help="Wait or don't wait for task to complete, showing progress",
)
def cli_provisioner_create(
name, profile, wait_flag, define_flag, start_flag, script_args
name, profile, define_flag, start_flag, script_args, wait_flag
):
"""
Create a new VM NAME with profile PROFILE.
@ -5610,39 +5646,13 @@ def cli_provisioner_create(
if not define_flag:
start_flag = False
retcode, retdata = pvc.lib.provisioner.vm_create(
CLI_CONFIG, name, profile, wait_flag, define_flag, start_flag, script_args
retcode, retmsg = pvc.lib.provisioner.vm_create(
CLI_CONFIG, name, profile, define_flag, start_flag, script_args, wait_flag
)
if retcode and wait_flag:
task_id = retdata
retdata = wait_for_provisioner(CLI_CONFIG, task_id)
finish(retcode, retdata)
###############################################################################
# > pvc provisioner status
###############################################################################
@click.command(name="status", short_help="Show status of provisioner job.")
@connection_req
@click.argument("job", required=False, default=None)
@format_opt(
{
"pretty": cli_provisioner_status_format_pretty,
"raw": lambda d: "\n".join([t["id"] for t in d])
if isinstance(d, list)
else d["state"],
"json": lambda d: jdumps(d),
"json-pretty": lambda d: jdumps(d, indent=2),
}
)
def cli_provisioner_status(job, format_function):
"""
Show status of provisioner job JOB or a list of jobs.
"""
retcode, retdata = pvc.lib.provisioner.task_status(CLI_CONFIG, job)
finish(retcode, retdata, format_function)
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
finish(retcode, retmsg)
###############################################################################
@ -6159,7 +6169,6 @@ cli_provisioner_profile.add_command(cli_provisioner_profile_remove)
cli_provisioner_profile.add_command(cli_provisioner_profile_list)
cli_provisioner.add_command(cli_provisioner_profile)
cli_provisioner.add_command(cli_provisioner_create)
cli_provisioner.add_command(cli_provisioner_status)
cli.add_command(cli_provisioner)
cli_cluster.add_command(cli_cluster_status)
cli_cluster.add_command(cli_cluster_init)
@ -6168,6 +6177,7 @@ cli_cluster.add_command(cli_cluster_restore)
cli_cluster_maintenance.add_command(cli_cluster_maintenance_on)
cli_cluster_maintenance.add_command(cli_cluster_maintenance_off)
cli_cluster.add_command(cli_cluster_maintenance)
cli_cluster.add_command(cli_cluster_task)
cli.add_command(cli_cluster)
cli_connection.add_command(cli_connection_add)
cli_connection.add_command(cli_connection_remove)

View File

@ -47,7 +47,6 @@ from pvc.lib.provisioner import format_list_userdata as provisioner_format_userd
from pvc.lib.provisioner import format_list_script as provisioner_format_script_list
from pvc.lib.provisioner import format_list_ova as provisioner_format_ova_list
from pvc.lib.provisioner import format_list_profile as provisioner_format_profile_list
from pvc.lib.provisioner import format_list_task as provisioner_format_task_status
# Define colour values for use in formatters
@ -262,6 +261,184 @@ def cli_cluster_status_format_short(CLI_CONFIG, data):
return "\n".join(output)
def cli_cluster_task_format_pretty(CLI_CONFIG, task_data):
"""
Pretty format the output of cli_cluster_task
"""
if not isinstance(task_data, list):
job_state = task_data["state"]
if job_state == "RUNNING":
retdata = "Job state: RUNNING\nStage: {}/{}\nStatus: {}".format(
task_data["current"], task_data["total"], task_data["status"]
)
elif job_state == "FAILED":
retdata = "Job state: FAILED\nStatus: {}".format(task_data["status"])
elif job_state == "COMPLETED":
retdata = "Job state: COMPLETED\nStatus: {}".format(task_data["status"])
else:
retdata = "Job state: {}\nStatus: {}".format(
task_data["state"], task_data["status"]
)
return retdata
task_list_output = []
# Determine optimal column widths
task_id_length = 3
task_name_length = 5
task_type_length = 7
task_worker_length = 7
task_arg_name_length = 5
task_arg_data_length = 10
tasks = list()
for task in task_data:
# task_id column
_task_id_length = len(str(task["id"])) + 1
if _task_id_length > task_id_length:
task_id_length = _task_id_length
# task_name column
_task_name_length = len(str(task["name"])) + 1
if _task_name_length > task_name_length:
task_name_length = _task_name_length
# task_worker column
_task_worker_length = len(str(task["worker"])) + 1
if _task_worker_length > task_worker_length:
task_worker_length = _task_worker_length
# task_type column
_task_type_length = len(str(task["type"])) + 1
if _task_type_length > task_type_length:
task_type_length = _task_type_length
updated_kwargs = list()
for arg_name, arg_data in task["kwargs"].items():
# Skip the "run_on" argument
if arg_name == "run_on":
continue
# task_arg_name column
_task_arg_name_length = len(str(arg_name)) + 1
if _task_arg_name_length > task_arg_name_length:
task_arg_name_length = _task_arg_name_length
if len(str(arg_data)) > 17:
arg_data = arg_data[:17] + "..."
# task_arg_data column
_task_arg_data_length = len(str(arg_data)) + 1
if _task_arg_data_length > task_arg_data_length:
task_arg_data_length = _task_arg_data_length
updated_kwargs.append({"name": arg_name, "data": arg_data})
task["kwargs"] = updated_kwargs
tasks.append(task)
# Format the string (header)
task_list_output.append(
"{bold}{task_header: <{task_header_length}} {arg_header: <{arg_header_length}}{end_bold}".format(
bold=ansii["bold"],
end_bold=ansii["end"],
task_header_length=task_id_length
+ task_name_length
+ task_type_length
+ task_worker_length
+ 3,
arg_header_length=task_arg_name_length + task_arg_data_length,
task_header="Tasks "
+ "".join(
[
"-"
for _ in range(
6,
task_id_length
+ task_name_length
+ task_type_length
+ task_worker_length
+ 2,
)
]
),
arg_header="Arguments "
+ "".join(
[
"-"
for _ in range(11, task_arg_name_length + task_arg_data_length + 1)
]
),
)
)
task_list_output.append(
"{bold}{task_id: <{task_id_length}} {task_name: <{task_name_length}} {task_type: <{task_type_length}} \
{task_worker: <{task_worker_length}} \
{task_arg_name: <{task_arg_name_length}} \
{task_arg_data: <{task_arg_data_length}}{end_bold}".format(
task_id_length=task_id_length,
task_name_length=task_name_length,
task_type_length=task_type_length,
task_worker_length=task_worker_length,
task_arg_name_length=task_arg_name_length,
task_arg_data_length=task_arg_data_length,
bold=ansii["bold"],
end_bold=ansii["end"],
task_id="ID",
task_name="Name",
task_type="Status",
task_worker="Worker",
task_arg_name="Name",
task_arg_data="Data",
)
)
# Format the string (elements)
for task in sorted(tasks, key=lambda i: i.get("type", None)):
task_list_output.append(
"{bold}{task_id: <{task_id_length}} {task_name: <{task_name_length}} {task_type: <{task_type_length}} \
{task_worker: <{task_worker_length}} \
{task_arg_name: <{task_arg_name_length}} \
{task_arg_data: <{task_arg_data_length}}{end_bold}".format(
task_id_length=task_id_length,
task_name_length=task_name_length,
task_type_length=task_type_length,
task_worker_length=task_worker_length,
task_arg_name_length=task_arg_name_length,
task_arg_data_length=task_arg_data_length,
bold="",
end_bold="",
task_id=task["id"],
task_name=task["name"],
task_type=task["type"],
task_worker=task["worker"],
task_arg_name=str(task["kwargs"][0]["name"]),
task_arg_data=str(task["kwargs"][0]["data"]),
)
)
for arg in task["kwargs"][1:]:
task_list_output.append(
"{bold}{task_id: <{task_id_length}} {task_name: <{task_name_length}} {task_type: <{task_type_length}} \
{task_worker: <{task_worker_length}} \
{task_arg_name: <{task_arg_name_length}} \
{task_arg_data: <{task_arg_data_length}}{end_bold}".format(
task_id_length=task_id_length,
task_name_length=task_name_length,
task_type_length=task_type_length,
task_worker_length=task_worker_length,
task_arg_name_length=task_arg_name_length,
task_arg_data_length=task_arg_data_length,
bold="",
end_bold="",
task_id="",
task_name="",
task_type="",
task_worker="",
task_arg_name=str(arg["name"]),
task_arg_data=str(arg["data"]),
)
)
return "\n".join(task_list_output)
def cli_connection_list_format_pretty(CLI_CONFIG, data):
"""
Pretty format the output of cli_connection_list
@ -724,11 +901,3 @@ def cli_provisioner_profile_list_format_pretty(CLI_CONFIG, data):
"""
return provisioner_format_profile_list(CLI_CONFIG, data)
def cli_provisioner_status_format_pretty(CLI_CONFIG, data):
"""
Pretty format the output of cli_provisioner_status
"""
return provisioner_format_task_status(CLI_CONFIG, data)

View File

@ -65,36 +65,46 @@ def cli_node_waiter(config, node, state_field, state_value):
echo(config, f" done. [{int(t_end - t_start)}s]")
def wait_for_celery_task(CLI_CONFIG, task_detail):
def wait_for_celery_task(CLI_CONFIG, task_detail, start_late=False):
"""
Wait for a Celery task to complete
"""
task_id = task_detail["task_id"]
run_on = task_detail["run_on"]
task_name = task_detail["task_name"]
echo(CLI_CONFIG, f"Task ID: {task_id} assigned to node {run_on}")
echo(CLI_CONFIG, "")
if not start_late:
run_on = task_detail["run_on"]
# Wait for the task to start
echo(CLI_CONFIG, "Waiting for task to start...", newline=False)
while True:
sleep(0.5)
echo(CLI_CONFIG, f"Task ID: {task_id} ({task_name}) assigned to node {run_on}")
echo(CLI_CONFIG, "")
# Wait for the task to start
echo(CLI_CONFIG, "Waiting for task to start...", newline=False)
while True:
sleep(0.5)
task_status = pvc.lib.common.task_status(
CLI_CONFIG, task_id=task_id, is_watching=True
)
if task_status.get("state") != "PENDING":
break
echo(CLI_CONFIG, ".", newline=False)
echo(CLI_CONFIG, " done.")
echo(CLI_CONFIG, "")
echo(
CLI_CONFIG,
task_status.get("status") + ":",
)
else:
task_status = pvc.lib.common.task_status(
CLI_CONFIG, task_id=task_id, is_watching=True
)
if task_status.get("state") != "PENDING":
break
echo(CLI_CONFIG, ".", newline=False)
echo(CLI_CONFIG, " done.")
echo(CLI_CONFIG, "")
echo(CLI_CONFIG, f"Watching existing task {task_id} ({task_name}):")
# Start following the task state, updating progress as we go
total_task = task_status.get("total")
echo(
CLI_CONFIG,
task_status.get("status") + ":",
)
with progressbar(length=total_task, show_eta=False) as bar:
last_task = 0
maxlen = 21
@ -132,60 +142,3 @@ def wait_for_celery_task(CLI_CONFIG, task_detail):
retdata = task_status.get("state") + ": " + task_status.get("status")
return retdata
def wait_for_provisioner(CLI_CONFIG, task_id):
"""
Wait for a provisioner task to complete
"""
echo(CLI_CONFIG, f"Task ID: {task_id}")
echo(CLI_CONFIG, "")
# Wait for the task to start
echo(CLI_CONFIG, "Waiting for task to start...", newline=False)
while True:
sleep(1)
task_status = pvc.lib.provisioner.task_status(
CLI_CONFIG, task_id, is_watching=True
)
if task_status.get("state") != "PENDING":
break
echo(CLI_CONFIG, ".", newline=False)
echo(CLI_CONFIG, " done.")
echo(CLI_CONFIG, "")
# Start following the task state, updating progress as we go
total_task = task_status.get("total")
with progressbar(length=total_task, show_eta=False) as bar:
last_task = 0
maxlen = 0
while True:
sleep(1)
if task_status.get("state") != "RUNNING":
break
if task_status.get("current") > last_task:
current_task = int(task_status.get("current"))
bar.update(current_task - last_task)
last_task = current_task
# The extensive spaces at the end cause this to overwrite longer previous messages
curlen = len(str(task_status.get("status")))
if curlen > maxlen:
maxlen = curlen
lendiff = maxlen - curlen
overwrite_whitespace = " " * lendiff
echo(
CLI_CONFIG,
" " + task_status.get("status") + overwrite_whitespace,
newline=False,
)
task_status = pvc.lib.provisioner.task_status(
CLI_CONFIG, task_id, is_watching=True
)
if task_status.get("state") == "SUCCESS":
bar.update(total_task - last_task)
echo(CLI_CONFIG, "")
retdata = task_status.get("state") + ": " + task_status.get("status")
return retdata

View File

@ -202,6 +202,24 @@ def call_api(
return response
def get_wait_retdata(response, wait_flag):
if response.status_code == 202:
retvalue = True
retjson = response.json()
if not wait_flag:
retdata = (
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
)
else:
# Just return the task JSON without formatting
retdata = response.json()
else:
retvalue = False
retdata = response.json().get("message", "")
return retvalue, retdata
def task_status(config, task_id=None, is_watching=False):
"""
Get information about Celery job {task_id}, or all tasks if None
@ -249,6 +267,7 @@ def task_status(config, task_id=None, is_watching=False):
task["type"] = task_type
task["worker"] = task_host
task["id"] = task_job.get("id")
task["name"] = task_job.get("name")
try:
task["args"] = literal_eval(task_job.get("args"))
except Exception:

View File

@ -25,8 +25,7 @@ from requests_toolbelt.multipart.encoder import (
)
import pvc.lib.ansiprint as ansiprint
from pvc.lib.common import UploadProgressBar, call_api
from ast import literal_eval
from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata
#
@ -700,7 +699,7 @@ def profile_remove(config, name):
return retvalue, response.json().get("message", "")
def vm_create(config, name, profile, wait_flag, define_flag, start_flag, script_args):
def vm_create(config, name, profile, define_flag, start_flag, script_args, wait_flag):
"""
Create a new VM named {name} with profile {profile}
@ -717,85 +716,7 @@ def vm_create(config, name, profile, wait_flag, define_flag, start_flag, script_
}
response = call_api(config, "post", "/provisioner/create", params=params)
if response.status_code == 202:
retvalue = True
if not wait_flag:
retdata = "Task ID: {}".format(response.json()["task_id"])
else:
# Just return the task_id raw, instead of formatting it
retdata = response.json()["task_id"]
else:
retvalue = False
retdata = response.json().get("message", "")
return retvalue, retdata
def task_status(config, task_id=None, is_watching=False):
"""
Get information about provisioner job {task_id} or all tasks if None
API endpoint: GET /api/v1/provisioner/status
API arguments:
API schema: {json_data_object}
"""
if task_id is not None:
response = call_api(
config, "get", "/provisioner/status/{task_id}".format(task_id=task_id)
)
else:
response = call_api(config, "get", "/provisioner/status")
if task_id is not None:
if response.status_code == 200:
retvalue = True
respjson = response.json()
if is_watching:
# Just return the raw JSON to the watching process instead of including value
return respjson
else:
return retvalue, respjson
else:
retvalue = False
retdata = response.json().get("message", "")
else:
retvalue = True
task_data_raw = response.json()
# Format the Celery data into a more useful data structure
task_data = list()
for task_type in ["active", "reserved", "scheduled"]:
try:
type_data = task_data_raw[task_type]
except Exception:
type_data = None
if not type_data:
type_data = dict()
for task_host in type_data:
for task_job in task_data_raw[task_type][task_host]:
task = dict()
if task_type == "reserved":
task["type"] = "pending"
else:
task["type"] = task_type
task["worker"] = task_host
task["id"] = task_job.get("id")
try:
task_args = literal_eval(task_job.get("args"))
except Exception:
task_args = task_job.get("args")
task["vm_name"] = task_args[0]
task["vm_profile"] = task_args[1]
try:
task_kwargs = literal_eval(task_job.get("kwargs"))
except Exception:
task_kwargs = task_job.get("kwargs")
task["vm_define"] = str(bool(task_kwargs["define_vm"]))
task["vm_start"] = str(bool(task_kwargs["start_vm"]))
task_data.append(task)
retdata = task_data
return retvalue, retdata
return get_wait_retdata(response, wait_flag)
#
@ -1862,158 +1783,3 @@ def format_list_profile(config, profile_data):
)
return "\n".join(profile_list_output)
def format_list_task(config, task_data):
if not isinstance(task_data, list):
job_state = task_data["state"]
if job_state == "RUNNING":
retdata = "Job state: RUNNING\nStage: {}/{}\nStatus: {}".format(
task_data["current"], task_data["total"], task_data["status"]
)
elif job_state == "FAILED":
retdata = "Job state: FAILED\nStatus: {}".format(task_data["status"])
elif job_state == "COMPLETED":
retdata = "Job state: COMPLETED\nStatus: {}".format(task_data["status"])
else:
retdata = "Job state: {}\nStatus: {}".format(
task_data["state"], task_data["status"]
)
return retdata
task_list_output = []
# Determine optimal column widths
task_id_length = 7
task_type_length = 7
task_worker_length = 7
task_vm_name_length = 5
task_vm_profile_length = 8
task_vm_define_length = 8
task_vm_start_length = 7
for task in task_data:
# task_id column
_task_id_length = len(str(task["id"])) + 1
if _task_id_length > task_id_length:
task_id_length = _task_id_length
# task_worker column
_task_worker_length = len(str(task["worker"])) + 1
if _task_worker_length > task_worker_length:
task_worker_length = _task_worker_length
# task_type column
_task_type_length = len(str(task["type"])) + 1
if _task_type_length > task_type_length:
task_type_length = _task_type_length
# task_vm_name column
_task_vm_name_length = len(str(task["vm_name"])) + 1
if _task_vm_name_length > task_vm_name_length:
task_vm_name_length = _task_vm_name_length
# task_vm_profile column
_task_vm_profile_length = len(str(task["vm_profile"])) + 1
if _task_vm_profile_length > task_vm_profile_length:
task_vm_profile_length = _task_vm_profile_length
# task_vm_define column
_task_vm_define_length = len(str(task["vm_define"])) + 1
if _task_vm_define_length > task_vm_define_length:
task_vm_define_length = _task_vm_define_length
# task_vm_start column
_task_vm_start_length = len(str(task["vm_start"])) + 1
if _task_vm_start_length > task_vm_start_length:
task_vm_start_length = _task_vm_start_length
# Format the string (header)
task_list_output.append(
"{bold}{task_header: <{task_header_length}} {vms_header: <{vms_header_length}}{end_bold}".format(
bold=ansiprint.bold(),
end_bold=ansiprint.end(),
task_header_length=task_id_length
+ task_type_length
+ task_worker_length
+ 2,
vms_header_length=task_vm_name_length
+ task_vm_profile_length
+ task_vm_define_length
+ task_vm_start_length
+ 3,
task_header="Tasks "
+ "".join(
[
"-"
for _ in range(
6, task_id_length + task_type_length + task_worker_length + 1
)
]
),
vms_header="VM Details "
+ "".join(
[
"-"
for _ in range(
11,
task_vm_name_length
+ task_vm_profile_length
+ task_vm_define_length
+ task_vm_start_length
+ 2,
)
]
),
)
)
task_list_output.append(
"{bold}{task_id: <{task_id_length}} {task_type: <{task_type_length}} \
{task_worker: <{task_worker_length}} \
{task_vm_name: <{task_vm_name_length}} \
{task_vm_profile: <{task_vm_profile_length}} \
{task_vm_define: <{task_vm_define_length}} \
{task_vm_start: <{task_vm_start_length}}{end_bold}".format(
task_id_length=task_id_length,
task_type_length=task_type_length,
task_worker_length=task_worker_length,
task_vm_name_length=task_vm_name_length,
task_vm_profile_length=task_vm_profile_length,
task_vm_define_length=task_vm_define_length,
task_vm_start_length=task_vm_start_length,
bold=ansiprint.bold(),
end_bold=ansiprint.end(),
task_id="Job ID",
task_type="Status",
task_worker="Worker",
task_vm_name="Name",
task_vm_profile="Profile",
task_vm_define="Define?",
task_vm_start="Start?",
)
)
# Format the string (elements)
for task in sorted(task_data, key=lambda i: i.get("type", None)):
task_list_output.append(
"{bold}{task_id: <{task_id_length}} {task_type: <{task_type_length}} \
{task_worker: <{task_worker_length}} \
{task_vm_name: <{task_vm_name_length}} \
{task_vm_profile: <{task_vm_profile_length}} \
{task_vm_define: <{task_vm_define_length}} \
{task_vm_start: <{task_vm_start_length}}{end_bold}".format(
task_id_length=task_id_length,
task_type_length=task_type_length,
task_worker_length=task_worker_length,
task_vm_name_length=task_vm_name_length,
task_vm_profile_length=task_vm_profile_length,
task_vm_define_length=task_vm_define_length,
task_vm_start_length=task_vm_start_length,
bold="",
end_bold="",
task_id=task["id"],
task_type=task["type"],
task_worker=task["worker"],
task_vm_name=task["vm_name"],
task_vm_profile=task["vm_profile"],
task_vm_define=task["vm_define"],
task_vm_start=task["vm_start"],
)
)
return "\n".join(task_list_output)

View File

@ -29,7 +29,7 @@ from requests_toolbelt.multipart.encoder import (
)
import pvc.lib.ansiprint as ansiprint
from pvc.lib.common import UploadProgressBar, call_api
from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata
#
# Supplemental functions
@ -175,21 +175,7 @@ def ceph_osd_db_vg_add(config, node, device, wait_flag):
params = {"node": node, "device": device}
response = call_api(config, "post", "/storage/ceph/osddb", params=params)
if response.status_code == 202:
retvalue = True
retjson = response.json()
if not wait_flag:
retdata = (
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
)
else:
# Just return the task JSON without formatting
retdata = response.json()
else:
retvalue = False
retdata = response.json().get("message", "")
return retvalue, retdata
return get_wait_retdata(response, wait_flag)
#
@ -265,21 +251,7 @@ def ceph_osd_add(
response = call_api(config, "post", "/storage/ceph/osd", params=params)
if response.status_code == 202:
retvalue = True
retjson = response.json()
if not wait_flag:
retdata = (
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
)
else:
# Just return the task JSON without formatting
retdata = response.json()
else:
retvalue = False
retdata = response.json().get("message", "")
return retvalue, retdata
return get_wait_retdata(response, wait_flag)
def ceph_osd_replace(
@ -308,21 +280,7 @@ def ceph_osd_replace(
response = call_api(config, "post", f"/storage/ceph/osd/{osdid}", params=params)
if response.status_code == 202:
retvalue = True
retjson = response.json()
if not wait_flag:
retdata = (
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
)
else:
# Just return the task JSON without formatting
retdata = response.json()
else:
retvalue = False
retdata = response.json().get("message", "")
return retvalue, retdata
return get_wait_retdata(response, wait_flag)
def ceph_osd_refresh(config, osdid, device, wait_flag):
@ -338,21 +296,7 @@ def ceph_osd_refresh(config, osdid, device, wait_flag):
}
response = call_api(config, "put", f"/storage/ceph/osd/{osdid}", params=params)
if response.status_code == 202:
retvalue = True
retjson = response.json()
if not wait_flag:
retdata = (
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
)
else:
# Just return the task JSON without formatting
retdata = response.json()
else:
retvalue = False
retdata = response.json().get("message", "")
return retvalue, retdata
return get_wait_retdata(response, wait_flag)
def ceph_osd_remove(config, osdid, force_flag, wait_flag):
@ -368,21 +312,7 @@ def ceph_osd_remove(config, osdid, force_flag, wait_flag):
config, "delete", "/storage/ceph/osd/{osdid}".format(osdid=osdid), params=params
)
if response.status_code == 202:
retvalue = True
retjson = response.json()
if not wait_flag:
retdata = (
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
)
else:
# Just return the task JSON without formatting
retdata = response.json()
else:
retvalue = False
retdata = response.json().get("message", "")
return retvalue, retdata
return get_wait_retdata(response, wait_flag)
def ceph_osd_state(config, osdid, state):
@ -1765,7 +1695,7 @@ def format_list_snapshot(config, snapshot_list):
#
# Benchmark functions
#
def ceph_benchmark_run(config, pool):
def ceph_benchmark_run(config, pool, wait_flag):
"""
Run a storage benchmark against {pool}
@ -1776,14 +1706,7 @@ def ceph_benchmark_run(config, pool):
params = {"pool": pool}
response = call_api(config, "post", "/storage/ceph/benchmark", params=params)
if response.status_code == 202:
retvalue = True
retdata = "Task ID: {}".format(response.json()["task_id"])
else:
retvalue = False
retdata = response.json().get("message", "")
return retvalue, retdata
return get_wait_retdata(response, wait_flag)
def ceph_benchmark_list(config, job):

View File

@ -23,7 +23,7 @@ import time
import re
import pvc.lib.ansiprint as ansiprint
from pvc.lib.common import call_api, format_bytes, format_metric
from pvc.lib.common import call_api, format_bytes, format_metric, get_wait_retdata
#
@ -425,21 +425,7 @@ def vm_locks(config, vm, wait_flag):
"""
response = call_api(config, "post", f"/vm/{vm}/locks")
if response.status_code == 202:
retvalue = True
retjson = response.json()
if not wait_flag:
retdata = (
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
)
else:
# Just return the task JSON without formatting
retdata = response.json()
else:
retvalue = False
retdata = response.json().get("message", "")
return retvalue, retdata
return get_wait_retdata(response, wait_flag)
def vm_backup(config, vm, backup_path, incremental_parent=None, retain_snapshot=False):

View File

@ -41,11 +41,17 @@ def start(celery, msg, current=0, total=1):
sleep(1)
def fail(celery, msg, current=1, total=1):
def fail(celery, msg, exception=None, current=1, total=1):
if exception is None:
exception = TaskFailure
msg = f"{type(exception()).__name__}: {msg}"
logger = getLogger(__name__)
logger.error(msg)
sys.tracebacklimit = 0
raise TaskFailure(msg)
raise exception(msg)
def log_info(celery, msg):

View File

@ -22,7 +22,6 @@
import gevent.pywsgi
import flask
import sys
import time
import psycopg2
from threading import Thread
@ -123,9 +122,7 @@ class MetadataAPIInstance(object):
self.logger.out("Stopping Metadata API at 169.254.169.254:80", state="i")
try:
self.md_http_server.stop()
time.sleep(0.1)
self.md_http_server.close()
time.sleep(0.1)
self.md_http_server = None
self.logger.out("Successfully stopped Metadata API", state="o")
except Exception as e:

View File

@ -610,8 +610,7 @@ class NodeInstance(object):
# 6. Start client API
if self.config["enable_api"]:
self.logger.out("Starting PVC API client service", state="i")
common.run_os_command("systemctl enable pvcapid.service")
common.run_os_command("systemctl start pvcapid.service")
common.run_os_command("systemctl enable --now pvcapid.service")
# 7. Start metadata API; just continue if we fail
self.metadata_api.start()
# 8. Start DHCP servers
@ -670,13 +669,14 @@ class NodeInstance(object):
self.zkhandler.write([("base.config.primary_node.sync_lock", "")])
lock.release()
self.logger.out("Released write lock for synchronization phase B", state="o")
# 3. Stop client API
# 3. Stop metadata API
self.metadata_api.stop()
# 4. Stop client API
if self.config["enable_api"]:
self.logger.out("Stopping PVC API client service", state="i")
common.run_os_command("systemctl stop pvcapid.service")
common.run_os_command("systemctl disable pvcapid.service")
# 4. Stop metadata API
self.metadata_api.stop()
common.run_os_command(
"systemctl disable --now pvcapid.service", background=True
)
time.sleep(0.1) # Time fir new writer to acquire the lock
# Synchronize nodes C (I am reader)

View File

@ -78,7 +78,7 @@ def start_keydb(logger, config):
def start_api_worker(logger, config):
if config["enable_api"]:
logger.out("Starting API worker daemon", state="i")
logger.out("Starting Celery Worker daemon", state="i")
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
common.run_os_command("systemctl start pvcworkerd.service")