Compare commits
26 Commits
aef38639cf
...
dd6a38d5ea
Author | SHA1 | Date | |
---|---|---|---|
dd6a38d5ea | |||
73a4795967 | |||
2a637c62e8 | |||
618a1c1c10 | |||
f50f170d4e | |||
e92ed245d6 | |||
9ab505ec98 | |||
9958d1cfe8 | |||
0cb81f96e6 | |||
3651885954 | |||
d226e9f4e5 | |||
fa361a55d9 | |||
8915864fa9 | |||
79f7e8f82e | |||
0d818017e8 | |||
eb1d61a8b9 | |||
262babc63d | |||
63773a3061 | |||
289049d223 | |||
e818df5dae | |||
c76a5afd04 | |||
0bec6abe71 | |||
18e43a9377 | |||
4555f5a20a | |||
d727764ebc | |||
484e6542c2 |
@ -31,6 +31,20 @@
|
||||
# function is provided in context of the example; see the other examples for
|
||||
# more potential uses.
|
||||
|
||||
# Within the VMBuilderScript class, several helper functions are exposed through
|
||||
# the parent VMBuilder class:
|
||||
# self.log_info(message):
|
||||
# Use this function to log an "informational" message instead of "print()"
|
||||
# self.log_warn(message):
|
||||
# Use this function to log a "warning" message
|
||||
# self.log_err(message):
|
||||
# Use this function to log an "error" message outside of an exception (see below)
|
||||
# self.fail(message, exception=<ExceptionClass>):
|
||||
# Use this function to bail out of the script safely instead if raising a
|
||||
# normal Python exception. You may pass an optional exception class keyword
|
||||
# argument for posterity in the logs if you wish; otherwise, ProvisioningException
|
||||
# is used. This function implicitly calls a "self.log_err" with the passed message
|
||||
|
||||
# Within the VMBuilderScript class, several common variables are exposed through
|
||||
# the parent VMBuilder class:
|
||||
# self.vm_name: The name of the VM from PVC's perspective
|
||||
@ -132,9 +146,8 @@
|
||||
# since they could still do destructive things to /dev and the like!
|
||||
|
||||
|
||||
# This import is always required here, as VMBuilder is used by the VMBuilderScript class
|
||||
# and ProvisioningError is the primary exception that should be raised within the class.
|
||||
from pvcapid.vmbuilder import VMBuilder, ProvisioningError
|
||||
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
|
||||
from pvcapid.vmbuilder import VMBuilder
|
||||
|
||||
|
||||
# The VMBuilderScript class must be named as such, and extend VMBuilder.
|
||||
|
@ -32,6 +32,20 @@
|
||||
# function is provided in context of the example; see the other examples for
|
||||
# more potential uses.
|
||||
|
||||
# Within the VMBuilderScript class, several helper functions are exposed through
|
||||
# the parent VMBuilder class:
|
||||
# self.log_info(message):
|
||||
# Use this function to log an "informational" message instead of "print()"
|
||||
# self.log_warn(message):
|
||||
# Use this function to log a "warning" message
|
||||
# self.log_err(message):
|
||||
# Use this function to log an "error" message outside of an exception (see below)
|
||||
# self.fail(message, exception=<ExceptionClass>):
|
||||
# Use this function to bail out of the script safely instead if raising a
|
||||
# normal Python exception. You may pass an optional exception class keyword
|
||||
# argument for posterity in the logs if you wish; otherwise, ProvisioningException
|
||||
# is used. This function implicitly calls a "self.log_err" with the passed message
|
||||
|
||||
# Within the VMBuilderScript class, several common variables are exposed through
|
||||
# the parent VMBuilder class:
|
||||
# self.vm_name: The name of the VM from PVC's perspective
|
||||
@ -133,9 +147,8 @@
|
||||
# since they could still do destructive things to /dev and the like!
|
||||
|
||||
|
||||
# This import is always required here, as VMBuilder is used by the VMBuilderScript class
|
||||
# and ProvisioningError is the primary exception that should be raised within the class.
|
||||
from pvcapid.vmbuilder import VMBuilder, ProvisioningError
|
||||
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
|
||||
from pvcapid.vmbuilder import VMBuilder
|
||||
|
||||
|
||||
# The VMBuilderScript class must be named as such, and extend VMBuilder.
|
||||
@ -283,9 +296,9 @@ class VMBuilderScript(VMBuilder):
|
||||
import os
|
||||
|
||||
# First loop: Create the destination disks
|
||||
print("Creating destination disk volumes")
|
||||
self.log_info("Creating destination disk volumes")
|
||||
for volume in self.vm_data["volumes"]:
|
||||
print(f"Processing volume {volume['volume_name']}")
|
||||
self.log_info(f"Processing volume {volume['volume_name']}")
|
||||
with open_zk(config) as zkhandler:
|
||||
success, message = pvc_ceph.add_volume(
|
||||
zkhandler,
|
||||
@ -293,16 +306,14 @@ class VMBuilderScript(VMBuilder):
|
||||
f"{self.vm_name}_{volume['disk_id']}",
|
||||
f"{volume['disk_size_gb']}G",
|
||||
)
|
||||
print(message)
|
||||
self.log_info(message)
|
||||
if not success:
|
||||
raise ProvisioningError(
|
||||
f"Failed to create volume '{volume['disk_id']}'."
|
||||
)
|
||||
self.fail(f"Failed to create volume '{volume['disk_id']}'.")
|
||||
|
||||
# Second loop: Map the destination disks
|
||||
print("Mapping destination disk volumes")
|
||||
self.log_info("Mapping destination disk volumes")
|
||||
for volume in self.vm_data["volumes"]:
|
||||
print(f"Processing volume {volume['volume_name']}")
|
||||
self.log_info(f"Processing volume {volume['volume_name']}")
|
||||
dst_volume_name = f"{self.vm_name}_{volume['disk_id']}"
|
||||
dst_volume = f"{volume['pool']}/{dst_volume_name}"
|
||||
|
||||
@ -312,14 +323,14 @@ class VMBuilderScript(VMBuilder):
|
||||
volume["pool"],
|
||||
dst_volume_name,
|
||||
)
|
||||
print(message)
|
||||
self.log_info(message)
|
||||
if not success:
|
||||
raise ProvisioningError(f"Failed to map volume '{dst_volume}'.")
|
||||
self.fail(f"Failed to map volume '{dst_volume}'.")
|
||||
|
||||
# Third loop: Map the source disks
|
||||
print("Mapping source disk volumes")
|
||||
self.log_info("Mapping source disk volumes")
|
||||
for volume in self.vm_data["volumes"]:
|
||||
print(f"Processing volume {volume['volume_name']}")
|
||||
self.log_info(f"Processing volume {volume['volume_name']}")
|
||||
src_volume_name = volume["volume_name"]
|
||||
src_volume = f"{volume['pool']}/{src_volume_name}"
|
||||
|
||||
@ -329,9 +340,9 @@ class VMBuilderScript(VMBuilder):
|
||||
volume["pool"],
|
||||
src_volume_name,
|
||||
)
|
||||
print(message)
|
||||
self.log_info(message)
|
||||
if not success:
|
||||
raise ProvisioningError(f"Failed to map volume '{src_volume}'.")
|
||||
self.fail(f"Failed to map volume '{src_volume}'.")
|
||||
|
||||
def install(self):
|
||||
"""
|
||||
@ -351,14 +362,14 @@ class VMBuilderScript(VMBuilder):
|
||||
dst_volume = f"{volume['pool']}/{dst_volume_name}"
|
||||
dst_devpath = f"/dev/rbd/{dst_volume}"
|
||||
|
||||
print(
|
||||
self.log_info(
|
||||
f"Converting {volume['volume_format']} {src_volume} at {src_devpath} to {dst_volume} at {dst_devpath}"
|
||||
)
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"qemu-img convert -C -f {volume['volume_format']} -O raw {src_devpath} {dst_devpath}"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
self.fail(
|
||||
f"Failed to convert {volume['volume_format']} volume '{src_volume}' to raw volume '{dst_volume}' with qemu-img: {stderr}"
|
||||
)
|
||||
|
||||
@ -368,7 +379,7 @@ class VMBuilderScript(VMBuilder):
|
||||
|
||||
This function is also called if there is ANY exception raised in the prepare()
|
||||
or install() steps. While this doesn't mean you shouldn't or can't raise exceptions
|
||||
here, be warned that doing so might cause loops. Do this only if you really need to.
|
||||
here, be warned that doing so might cause loops. Do this only if you really need to!
|
||||
"""
|
||||
|
||||
# Run any imports first
|
||||
@ -388,7 +399,7 @@ class VMBuilderScript(VMBuilder):
|
||||
src_volume_name,
|
||||
)
|
||||
if not success:
|
||||
raise ProvisioningError(
|
||||
self.log_err(
|
||||
f"Failed to unmap source volume '{src_volume_name}': {message}"
|
||||
)
|
||||
|
||||
@ -404,6 +415,6 @@ class VMBuilderScript(VMBuilder):
|
||||
dst_volume_name,
|
||||
)
|
||||
if not success:
|
||||
raise ProvisioningError(
|
||||
self.log_err(
|
||||
f"Failed to unmap destination volume '{dst_volume_name}': {message}"
|
||||
)
|
||||
|
@ -31,6 +31,20 @@
|
||||
# function is provided in context of the example; see the other examples for
|
||||
# more potential uses.
|
||||
|
||||
# Within the VMBuilderScript class, several helper functions are exposed through
|
||||
# the parent VMBuilder class:
|
||||
# self.log_info(message):
|
||||
# Use this function to log an "informational" message instead of "print()"
|
||||
# self.log_warn(message):
|
||||
# Use this function to log a "warning" message
|
||||
# self.log_err(message):
|
||||
# Use this function to log an "error" message outside of an exception (see below)
|
||||
# self.fail(message, exception=<ExceptionClass>):
|
||||
# Use this function to bail out of the script safely instead if raising a
|
||||
# normal Python exception. You may pass an optional exception class keyword
|
||||
# argument for posterity in the logs if you wish; otherwise, ProvisioningException
|
||||
# is used. This function implicitly calls a "self.log_err" with the passed message
|
||||
|
||||
# Within the VMBuilderScript class, several common variables are exposed through
|
||||
# the parent VMBuilder class:
|
||||
# self.vm_name: The name of the VM from PVC's perspective
|
||||
@ -132,9 +146,8 @@
|
||||
# since they could still do destructive things to /dev and the like!
|
||||
|
||||
|
||||
# This import is always required here, as VMBuilder is used by the VMBuilderScript class
|
||||
# and ProvisioningError is the primary exception that should be raised within the class.
|
||||
from pvcapid.vmbuilder import VMBuilder, ProvisioningError
|
||||
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
|
||||
from pvcapid.vmbuilder import VMBuilder
|
||||
|
||||
|
||||
# The VMBuilderScript class must be named as such, and extend VMBuilder.
|
||||
@ -159,7 +172,7 @@ class VMBuilderScript(VMBuilder):
|
||||
if retcode:
|
||||
# Raise a ProvisioningError for any exception; the provisioner will handle
|
||||
# this gracefully and properly, avoiding dangling mounts, RBD maps, etc.
|
||||
raise ProvisioningError("Failed to find critical dependency: debootstrap")
|
||||
self.fail("Failed to find critical dependency: debootstrap")
|
||||
|
||||
def create(self):
|
||||
"""
|
||||
@ -312,9 +325,9 @@ class VMBuilderScript(VMBuilder):
|
||||
volume["source_volume"],
|
||||
f"{self.vm_name}_{volume['disk_id']}",
|
||||
)
|
||||
print(message)
|
||||
self.log_info(message)
|
||||
if not success:
|
||||
raise ProvisioningError(
|
||||
self.fail(
|
||||
f"Failed to clone volume '{volume['source_volume']}' to '{volume['disk_id']}'."
|
||||
)
|
||||
else:
|
||||
@ -325,11 +338,9 @@ class VMBuilderScript(VMBuilder):
|
||||
f"{self.vm_name}_{volume['disk_id']}",
|
||||
f"{volume['disk_size_gb']}G",
|
||||
)
|
||||
print(message)
|
||||
self.log_info(message)
|
||||
if not success:
|
||||
raise ProvisioningError(
|
||||
f"Failed to create volume '{volume['disk_id']}'."
|
||||
)
|
||||
self.fail(f"Failed to create volume '{volume['disk_id']}'.")
|
||||
|
||||
# Second loop: Map the disks to the local system
|
||||
for volume in self.vm_data["volumes"]:
|
||||
@ -342,9 +353,9 @@ class VMBuilderScript(VMBuilder):
|
||||
volume["pool"],
|
||||
dst_volume_name,
|
||||
)
|
||||
print(message)
|
||||
self.log_info(message)
|
||||
if not success:
|
||||
raise ProvisioningError(f"Failed to map volume '{dst_volume}'.")
|
||||
self.fail(f"Failed to map volume '{dst_volume}'.")
|
||||
|
||||
# Third loop: Create filesystems on the volumes
|
||||
for volume in self.vm_data["volumes"]:
|
||||
@ -370,19 +381,17 @@ class VMBuilderScript(VMBuilder):
|
||||
f"mkswap -f /dev/rbd/{dst_volume}"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
f"Failed to create swap on '{dst_volume}': {stderr}"
|
||||
)
|
||||
self.fail(f"Failed to create swap on '{dst_volume}': {stderr}")
|
||||
else:
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"mkfs.{volume['filesystem']} {filesystem_args} /dev/rbd/{dst_volume}"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
self.fail(
|
||||
f"Faield to create {volume['filesystem']} file on '{dst_volume}': {stderr}"
|
||||
)
|
||||
|
||||
print(stdout)
|
||||
self.log_info(stdout)
|
||||
|
||||
# Create a temporary directory to use during install
|
||||
temp_dir = "/tmp/target"
|
||||
@ -413,7 +422,7 @@ class VMBuilderScript(VMBuilder):
|
||||
f"mount {mapped_dst_volume} {mount_path}"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
self.fail(
|
||||
f"Failed to mount '{mapped_dst_volume}' on '{mount_path}': {stderr}"
|
||||
)
|
||||
|
||||
@ -480,10 +489,10 @@ class VMBuilderScript(VMBuilder):
|
||||
if volume["mountpoint"] == "/":
|
||||
root_volume = volume
|
||||
if not root_volume:
|
||||
raise ProvisioningError("Failed to find root volume in volumes list")
|
||||
self.fail("Failed to find root volume in volumes list")
|
||||
|
||||
# Perform a debootstrap installation
|
||||
print(
|
||||
self.log_info(
|
||||
f"Installing system with debootstrap: debootstrap --include={','.join(deb_packages)} {deb_release} {temp_dir} {deb_mirror}"
|
||||
)
|
||||
os.system(
|
||||
@ -735,7 +744,7 @@ GRUB_DISABLE_LINUX_UUID=false
|
||||
f"umount {mount_path}"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
self.log_err(
|
||||
f"Failed to unmount '{mapped_dst_volume}' on '{mount_path}': {stderr}"
|
||||
)
|
||||
|
||||
@ -747,6 +756,4 @@ GRUB_DISABLE_LINUX_UUID=false
|
||||
dst_volume_name,
|
||||
)
|
||||
if not success:
|
||||
raise ProvisioningError(
|
||||
f"Failed to unmap '{mapped_dst_volume}': {stderr}"
|
||||
)
|
||||
self.log_err(f"Failed to unmap '{mapped_dst_volume}': {stderr}")
|
||||
|
@ -31,6 +31,20 @@
|
||||
# function is provided in context of the example; see the other examples for
|
||||
# more potential uses.
|
||||
|
||||
# Within the VMBuilderScript class, several helper functions are exposed through
|
||||
# the parent VMBuilder class:
|
||||
# self.log_info(message):
|
||||
# Use this function to log an "informational" message instead of "print()"
|
||||
# self.log_warn(message):
|
||||
# Use this function to log a "warning" message
|
||||
# self.log_err(message):
|
||||
# Use this function to log an "error" message outside of an exception (see below)
|
||||
# self.fail(message, exception=<ExceptionClass>):
|
||||
# Use this function to bail out of the script safely instead if raising a
|
||||
# normal Python exception. You may pass an optional exception class keyword
|
||||
# argument for posterity in the logs if you wish; otherwise, ProvisioningException
|
||||
# is used. This function implicitly calls a "self.log_err" with the passed message
|
||||
|
||||
# Within the VMBuilderScript class, several common variables are exposed through
|
||||
# the parent VMBuilder class:
|
||||
# self.vm_name: The name of the VM from PVC's perspective
|
||||
@ -132,9 +146,8 @@
|
||||
# since they could still do destructive things to /dev and the like!
|
||||
|
||||
|
||||
# This import is always required here, as VMBuilder is used by the VMBuilderScript class
|
||||
# and ProvisioningError is the primary exception that should be raised within the class.
|
||||
from pvcapid.vmbuilder import VMBuilder, ProvisioningError
|
||||
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
|
||||
from pvcapid.vmbuilder import VMBuilder
|
||||
|
||||
|
||||
# The VMBuilderScript class must be named as such, and extend VMBuilder.
|
||||
@ -159,7 +172,7 @@ class VMBuilderScript(VMBuilder):
|
||||
if retcode:
|
||||
# Raise a ProvisioningError for any exception; the provisioner will handle
|
||||
# this gracefully and properly, avoiding dangling mounts, RBD maps, etc.
|
||||
raise ProvisioningError("Failed to find critical dependency: rinse")
|
||||
self.fail("Failed to find critical dependency: rinse")
|
||||
|
||||
def create(self):
|
||||
"""
|
||||
@ -312,9 +325,9 @@ class VMBuilderScript(VMBuilder):
|
||||
volume["source_volume"],
|
||||
f"{self.vm_name}_{volume['disk_id']}",
|
||||
)
|
||||
print(message)
|
||||
self.log_info(message)
|
||||
if not success:
|
||||
raise ProvisioningError(
|
||||
self.fail(
|
||||
f"Failed to clone volume '{volume['source_volume']}' to '{volume['disk_id']}'."
|
||||
)
|
||||
else:
|
||||
@ -325,11 +338,9 @@ class VMBuilderScript(VMBuilder):
|
||||
f"{self.vm_name}_{volume['disk_id']}",
|
||||
f"{volume['disk_size_gb']}G",
|
||||
)
|
||||
print(message)
|
||||
self.log_info(message)
|
||||
if not success:
|
||||
raise ProvisioningError(
|
||||
f"Failed to create volume '{volume['disk_id']}'."
|
||||
)
|
||||
self.fail(f"Failed to create volume '{volume['disk_id']}'.")
|
||||
|
||||
# Second loop: Map the disks to the local system
|
||||
for volume in self.vm_data["volumes"]:
|
||||
@ -342,9 +353,9 @@ class VMBuilderScript(VMBuilder):
|
||||
volume["pool"],
|
||||
dst_volume_name,
|
||||
)
|
||||
print(message)
|
||||
self.log_info(message)
|
||||
if not success:
|
||||
raise ProvisioningError(f"Failed to map volume '{dst_volume}'.")
|
||||
self.fail(f"Failed to map volume '{dst_volume}'.")
|
||||
|
||||
# Third loop: Create filesystems on the volumes
|
||||
for volume in self.vm_data["volumes"]:
|
||||
@ -370,19 +381,17 @@ class VMBuilderScript(VMBuilder):
|
||||
f"mkswap -f /dev/rbd/{dst_volume}"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
f"Failed to create swap on '{dst_volume}': {stderr}"
|
||||
)
|
||||
self.fail(f"Failed to create swap on '{dst_volume}': {stderr}")
|
||||
else:
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"mkfs.{volume['filesystem']} {filesystem_args} /dev/rbd/{dst_volume}"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
self.fail(
|
||||
f"Faield to create {volume['filesystem']} file on '{dst_volume}': {stderr}"
|
||||
)
|
||||
|
||||
print(stdout)
|
||||
self.log_info(stdout)
|
||||
|
||||
# Create a temporary directory to use during install
|
||||
temp_dir = "/tmp/target"
|
||||
@ -413,7 +422,7 @@ class VMBuilderScript(VMBuilder):
|
||||
f"mount {mapped_dst_volume} {mount_path}"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
self.fail(
|
||||
f"Failed to mount '{mapped_dst_volume}' on '{mount_path}': {stderr}"
|
||||
)
|
||||
|
||||
@ -492,7 +501,7 @@ class VMBuilderScript(VMBuilder):
|
||||
if volume["mountpoint"] == "/":
|
||||
root_volume = volume
|
||||
if not root_volume:
|
||||
raise ProvisioningError("Failed to find root volume in volumes list")
|
||||
self.fail("Failed to find root volume in volumes list")
|
||||
|
||||
if rinse_mirror is not None:
|
||||
mirror_arg = f"--mirror {rinse_mirror}"
|
||||
@ -509,7 +518,7 @@ class VMBuilderScript(VMBuilder):
|
||||
fh.write(f"{pkg}\n")
|
||||
|
||||
# Perform a rinse installation
|
||||
print(
|
||||
self.log_info(
|
||||
f"Installing system with rinse: rinse --arch {rinse_architecture} --directory {temporary_directory} --distribution {rinse_release} --cache-dir {rinse_cache} --add-pkg-list /tmp/addpkg --verbose {mirror_arg}"
|
||||
)
|
||||
os.system(
|
||||
@ -711,7 +720,7 @@ GRUB_SERIAL_COMMAND="serial --speed=115200 --unit=0 --word=8 --parity=no --stop=
|
||||
f"umount {mount_path}"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
self.log_err(
|
||||
f"Failed to unmount '{mapped_dst_volume}' on '{mount_path}': {stderr}"
|
||||
)
|
||||
|
||||
@ -723,6 +732,4 @@ GRUB_SERIAL_COMMAND="serial --speed=115200 --unit=0 --word=8 --parity=no --stop=
|
||||
dst_volume_name,
|
||||
)
|
||||
if not success:
|
||||
raise ProvisioningError(
|
||||
f"Failed to unmap '{mapped_dst_volume}': {stderr}"
|
||||
)
|
||||
self.log_err(f"Failed to unmap '{mapped_dst_volume}': {stderr}")
|
||||
|
@ -57,6 +57,20 @@
|
||||
# function is provided in context of the example; see the other examples for
|
||||
# more potential uses.
|
||||
|
||||
# Within the VMBuilderScript class, several helper functions are exposed through
|
||||
# the parent VMBuilder class:
|
||||
# self.log_info(message):
|
||||
# Use this function to log an "informational" message instead of "print()"
|
||||
# self.log_warn(message):
|
||||
# Use this function to log a "warning" message
|
||||
# self.log_err(message):
|
||||
# Use this function to log an "error" message outside of an exception (see below)
|
||||
# self.fail(message, exception=<ExceptionClass>):
|
||||
# Use this function to bail out of the script safely instead if raising a
|
||||
# normal Python exception. You may pass an optional exception class keyword
|
||||
# argument for posterity in the logs if you wish; otherwise, ProvisioningException
|
||||
# is used. This function implicitly calls a "self.log_err" with the passed message
|
||||
|
||||
# Within the VMBuilderScript class, several common variables are exposed through
|
||||
# the parent VMBuilder class:
|
||||
# self.vm_name: The name of the VM from PVC's perspective
|
||||
@ -158,9 +172,8 @@
|
||||
# since they could still do destructive things to /dev and the like!
|
||||
|
||||
|
||||
# This import is always required here, as VMBuilder is used by the VMBuilderScript class
|
||||
# and ProvisioningError is the primary exception that should be raised within the class.
|
||||
from pvcapid.vmbuilder import VMBuilder, ProvisioningError
|
||||
# This import is always required here, as VMBuilder is used by the VMBuilderScript class.
|
||||
from pvcapid.vmbuilder import VMBuilder
|
||||
|
||||
|
||||
# Set up some variables for later; if you frequently use these tools, you might benefit from
|
||||
@ -189,9 +202,7 @@ class VMBuilderScript(VMBuilder):
|
||||
# Ensure that our required runtime variables are defined
|
||||
|
||||
if self.vm_data["script_arguments"].get("pfsense_wan_iface") is None:
|
||||
raise ProvisioningError(
|
||||
"Required script argument 'pfsense_wan_iface' not provided"
|
||||
)
|
||||
self.fail("Required script argument 'pfsense_wan_iface' not provided")
|
||||
|
||||
if self.vm_data["script_arguments"].get("pfsense_wan_dhcp") is None:
|
||||
for argument in [
|
||||
@ -199,9 +210,7 @@ class VMBuilderScript(VMBuilder):
|
||||
"pfsense_wan_gateway",
|
||||
]:
|
||||
if self.vm_data["script_arguments"].get(argument) is None:
|
||||
raise ProvisioningError(
|
||||
f"Required script argument '{argument}' not provided"
|
||||
)
|
||||
self.fail(f"Required script argument '{argument}' not provided")
|
||||
|
||||
# Ensure we have all dependencies intalled on the provisioner system
|
||||
for dependency in "wget", "unzip", "gzip":
|
||||
@ -209,9 +218,7 @@ class VMBuilderScript(VMBuilder):
|
||||
if retcode:
|
||||
# Raise a ProvisioningError for any exception; the provisioner will handle
|
||||
# this gracefully and properly, avoiding dangling mounts, RBD maps, etc.
|
||||
raise ProvisioningError(
|
||||
f"Failed to find critical dependency: {dependency}"
|
||||
)
|
||||
self.fail(f"Failed to find critical dependency: {dependency}")
|
||||
|
||||
# Create a temporary directory to use for Packer binaries/scripts
|
||||
packer_temp_dir = "/tmp/packer"
|
||||
@ -361,41 +368,39 @@ class VMBuilderScript(VMBuilder):
|
||||
packer_temp_dir = "/tmp/packer"
|
||||
|
||||
# Download pfSense image file to temporary target directory
|
||||
print(f"Downloading pfSense ISO image from {PFSENSE_ISO_URL}")
|
||||
self.log_info(f"Downloading pfSense ISO image from {PFSENSE_ISO_URL}")
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"wget --output-document={packer_temp_dir}/dl/pfsense.iso.gz {PFSENSE_ISO_URL}"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
f"Failed to download pfSense image from {PFSENSE_ISO_URL}"
|
||||
)
|
||||
self.fail(f"Failed to download pfSense image from {PFSENSE_ISO_URL}")
|
||||
|
||||
# Extract pfSense image file under temporary target directory
|
||||
print(f"Extracting pfSense ISO image")
|
||||
self.log_info(f"Extracting pfSense ISO image")
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"gzip --decompress {packer_temp_dir}/dl/pfsense.iso.gz"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError("Failed to extract pfSense ISO image")
|
||||
self.fail("Failed to extract pfSense ISO image")
|
||||
|
||||
# Download Packer to temporary target directory
|
||||
print(f"Downloading Packer from {PACKER_URL}")
|
||||
self.log_info(f"Downloading Packer from {PACKER_URL}")
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"wget --output-document={packer_temp_dir}/packer.zip {PACKER_URL}"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(f"Failed to download Packer from {PACKER_URL}")
|
||||
self.fail(f"Failed to download Packer from {PACKER_URL}")
|
||||
|
||||
# Extract Packer under temporary target directory
|
||||
print(f"Extracting Packer binary")
|
||||
self.log_info(f"Extracting Packer binary")
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
f"unzip {packer_temp_dir}/packer.zip -d {packer_temp_dir}"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError("Failed to extract Packer binary")
|
||||
self.fail("Failed to extract Packer binary")
|
||||
|
||||
# Output the Packer configuration
|
||||
print(f"Generating Packer configurations")
|
||||
self.log_info(f"Generating Packer configurations")
|
||||
first_volume = self.vm_data["volumes"][0]
|
||||
first_volume_size_mb = int(first_volume["disk_size_gb"]) * 1024
|
||||
|
||||
@ -829,7 +834,7 @@ class VMBuilderScript(VMBuilder):
|
||||
fh.write(pfsense_config)
|
||||
|
||||
# Create the disk(s)
|
||||
print(f"Creating volumes")
|
||||
self.log_info(f"Creating volumes")
|
||||
for volume in self.vm_data["volumes"]:
|
||||
with open_zk(config) as zkhandler:
|
||||
success, message = pvc_ceph.add_volume(
|
||||
@ -838,14 +843,12 @@ class VMBuilderScript(VMBuilder):
|
||||
f"{self.vm_name}_{volume['disk_id']}",
|
||||
f"{volume['disk_size_gb']}G",
|
||||
)
|
||||
print(message)
|
||||
self.log_info(message)
|
||||
if not success:
|
||||
raise ProvisioningError(
|
||||
f"Failed to create volume '{volume['disk_id']}'."
|
||||
)
|
||||
self.fail(f"Failed to create volume '{volume['disk_id']}'.")
|
||||
|
||||
# Map the target RBD volumes
|
||||
print(f"Mapping volumes")
|
||||
self.log_info(f"Mapping volumes")
|
||||
for volume in self.vm_data["volumes"]:
|
||||
dst_volume_name = f"{self.vm_name}_{volume['disk_id']}"
|
||||
dst_volume = f"{volume['pool']}/{dst_volume_name}"
|
||||
@ -856,9 +859,9 @@ class VMBuilderScript(VMBuilder):
|
||||
volume["pool"],
|
||||
dst_volume_name,
|
||||
)
|
||||
print(message)
|
||||
self.log_info(message)
|
||||
if not success:
|
||||
raise ProvisioningError(f"Failed to map volume '{dst_volume}'.")
|
||||
self.fail(f"Failed to map volume '{dst_volume}'.")
|
||||
|
||||
def install(self):
|
||||
"""
|
||||
@ -871,7 +874,7 @@ class VMBuilderScript(VMBuilder):
|
||||
|
||||
packer_temp_dir = "/tmp/packer"
|
||||
|
||||
print(
|
||||
self.log_info(
|
||||
f"Running Packer: PACKER_LOG=1 PACKER_CONFIG_DIR={packer_temp_dir} PACKER_CACHE_DIR={packer_temp_dir} {packer_temp_dir}/packer build {packer_temp_dir}/build.json"
|
||||
)
|
||||
os.system(
|
||||
@ -879,9 +882,9 @@ class VMBuilderScript(VMBuilder):
|
||||
)
|
||||
|
||||
if not os.path.exists(f"{packer_temp_dir}/bin/{self.vm_name}"):
|
||||
raise ProvisioningError("Packer failed to build output image")
|
||||
self.fail("Packer failed to build output image")
|
||||
|
||||
print("Copying output image to first volume")
|
||||
self.log_info("Copying output image to first volume")
|
||||
first_volume = self.vm_data["volumes"][0]
|
||||
dst_volume_name = f"{self.vm_name}_{first_volume['disk_id']}"
|
||||
dst_volume = f"{first_volume['pool']}/{dst_volume_name}"
|
||||
|
@ -26,6 +26,7 @@ from flask_restful import Resource, Api, reqparse, abort
|
||||
from celery import Celery
|
||||
from kombu import Queue
|
||||
from lxml.objectify import fromstring as lxml_fromstring
|
||||
from uuid import uuid4
|
||||
|
||||
from daemon_lib.common import getPrimaryNode
|
||||
from daemon_lib.zkhandler import ZKConnection
|
||||
@ -56,12 +57,11 @@ from flask_sqlalchemy import SQLAlchemy
|
||||
|
||||
# Create Flask app and set config values
|
||||
app = flask.Flask(__name__)
|
||||
app.config["CELERY_BROKER_URL"] = "redis://{}:{}{}".format(
|
||||
config["queue_host"], config["queue_port"], config["queue_path"]
|
||||
)
|
||||
app.config["CELERY_RESULT_BACKEND"] = "redis://{}:{}{}".format(
|
||||
celery_task_uri = "redis://{}:{}{}".format(
|
||||
config["queue_host"], config["queue_port"], config["queue_path"]
|
||||
)
|
||||
app.config["CELERY_BROKER_URL"] = celery_task_uri
|
||||
app.config["CELERY_RESULT_BACKEND"] = celery_task_uri
|
||||
|
||||
|
||||
# Set up Celery queues
|
||||
@ -71,6 +71,11 @@ def get_all_nodes(zkhandler):
|
||||
return [n["name"] for n in all_nodes]
|
||||
|
||||
|
||||
@ZKConnection(config)
|
||||
def get_primary_node(zkhandler):
|
||||
return getPrimaryNode(zkhandler)
|
||||
|
||||
|
||||
app.config["CELERY_QUEUES"] = tuple(
|
||||
[Queue(h, routing_key=f"{h}.#") for h in get_all_nodes()]
|
||||
)
|
||||
@ -78,16 +83,14 @@ app.config["CELERY_QUEUES"] = tuple(
|
||||
|
||||
# Set up Celery queue routing
|
||||
def route_task(name, args, kwargs, options, task=None, **kw):
|
||||
@ZKConnection(config)
|
||||
def get_primary_node(zkhandler):
|
||||
return getPrimaryNode(zkhandler)
|
||||
|
||||
print("----")
|
||||
print(f"Incoming Celery task: '{name}' with args {args}, kwargs {kwargs}")
|
||||
|
||||
# If an explicit routing_key is set and it's in the kwargs of the function, use it to set the queue
|
||||
if options["routing_key"] != "default" and options["routing_key"] in kwargs.keys():
|
||||
run_on = kwargs[options["routing_key"]]
|
||||
if run_on == "primary":
|
||||
run_on = get_primary_node()
|
||||
# Otherwise, use the primary node
|
||||
else:
|
||||
run_on = get_primary_node()
|
||||
@ -100,6 +103,19 @@ def route_task(name, args, kwargs, options, task=None, **kw):
|
||||
|
||||
app.config["CELERY_ROUTES"] = (route_task,)
|
||||
|
||||
|
||||
# Set up Celery task ID generator
|
||||
# WHY? We don't want to use UUIDs; they're too long and cumbersome. Instead, use a shorter partial UUID.
|
||||
def run_celery_task(task_def, **kwargs):
|
||||
task_id = str(uuid4()).split("-")[0]
|
||||
task = task_def.apply_async(
|
||||
(),
|
||||
kwargs,
|
||||
task_id=task_id,
|
||||
)
|
||||
return task
|
||||
|
||||
|
||||
# Set up SQLAlchemy backend
|
||||
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
||||
app.config["SQLALCHEMY_DATABASE_URI"] = "postgresql://{}:{}@{}:{}/{}".format(
|
||||
@ -129,7 +145,12 @@ api = Api(blueprint)
|
||||
app.register_blueprint(blueprint)
|
||||
|
||||
# Create celery definition
|
||||
celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"])
|
||||
celery = Celery(
|
||||
app.name,
|
||||
broker=celery_task_uri,
|
||||
result_backend=celery_task_uri,
|
||||
result_extended=True,
|
||||
)
|
||||
celery.conf.update(app.config)
|
||||
|
||||
#
|
||||
@ -198,9 +219,15 @@ def Authenticator(function):
|
||||
#
|
||||
# Job functions
|
||||
#
|
||||
@celery.task(name="provisioner.create", bind=True)
|
||||
@celery.task(name="provisioner.create", bind=True, routing_key="run_on")
|
||||
def create_vm(
|
||||
self, vm_name, profile_name, define_vm=True, start_vm=True, script_run_args=[]
|
||||
self,
|
||||
vm_name=None,
|
||||
profile_name=None,
|
||||
define_vm=True,
|
||||
start_vm=True,
|
||||
script_run_args=[],
|
||||
run_on="primary",
|
||||
):
|
||||
return api_vmbuilder.create_vm(
|
||||
self,
|
||||
@ -212,13 +239,13 @@ def create_vm(
|
||||
)
|
||||
|
||||
|
||||
@celery.task(name="storage.benchmark", bind=True)
|
||||
def run_benchmark(self, pool):
|
||||
@celery.task(name="storage.benchmark", bind=True, routing_key="run_on")
|
||||
def run_benchmark(self, pool=None, run_on="primary"):
|
||||
return api_benchmark.run_benchmark(self, pool)
|
||||
|
||||
|
||||
@celery.task(name="vm.flush_locks", bind=True, routing_key="run_on")
|
||||
def vm_flush_locks(self, domain, force_unlock=False, run_on="primary"):
|
||||
def vm_flush_locks(self, domain=None, force_unlock=False, run_on="primary"):
|
||||
@ZKConnection(config)
|
||||
def run_vm_flush_locks(zkhandler, self, domain, force_unlock=False):
|
||||
return vm_worker_flush_locks(zkhandler, self, domain, force_unlock=force_unlock)
|
||||
@ -227,7 +254,7 @@ def vm_flush_locks(self, domain, force_unlock=False, run_on="primary"):
|
||||
|
||||
|
||||
@celery.task(name="vm.device_attach", bind=True, routing_key="run_on")
|
||||
def vm_device_attach(self, domain, xml, run_on=None):
|
||||
def vm_device_attach(self, domain=None, xml=None, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_vm_device_attach(zkhandler, self, domain, xml):
|
||||
return vm_worker_attach_device(zkhandler, self, domain, xml)
|
||||
@ -236,7 +263,7 @@ def vm_device_attach(self, domain, xml, run_on=None):
|
||||
|
||||
|
||||
@celery.task(name="vm.device_detach", bind=True, routing_key="run_on")
|
||||
def vm_device_detach(self, domain, xml, run_on=None):
|
||||
def vm_device_detach(self, domain=None, xml=None, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_vm_device_detach(zkhandler, self, domain, xml):
|
||||
return vm_worker_detach_device(zkhandler, self, domain, xml)
|
||||
@ -247,8 +274,8 @@ def vm_device_detach(self, domain, xml, run_on=None):
|
||||
@celery.task(name="osd.add", bind=True, routing_key="run_on")
|
||||
def osd_add(
|
||||
self,
|
||||
device,
|
||||
weight,
|
||||
device=None,
|
||||
weight=None,
|
||||
ext_db_ratio=None,
|
||||
ext_db_size=None,
|
||||
split_count=None,
|
||||
@ -284,8 +311,8 @@ def osd_add(
|
||||
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
|
||||
def osd_replace(
|
||||
self,
|
||||
osd_id,
|
||||
new_device,
|
||||
osd_id=None,
|
||||
new_device=None,
|
||||
old_device=None,
|
||||
weight=None,
|
||||
ext_db_ratio=None,
|
||||
@ -322,7 +349,7 @@ def osd_replace(
|
||||
|
||||
|
||||
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
|
||||
def osd_refresh(self, osd_id, device, ext_db_flag=False, run_on=None):
|
||||
def osd_refresh(self, osd_id=None, device=None, ext_db_flag=False, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
|
||||
return osd_worker_refresh_osd(
|
||||
@ -333,7 +360,7 @@ def osd_refresh(self, osd_id, device, ext_db_flag=False, run_on=None):
|
||||
|
||||
|
||||
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
|
||||
def osd_remove(self, osd_id, force_flag=False, skip_zap_flag=False, run_on=None):
|
||||
def osd_remove(self, osd_id=None, force_flag=False, skip_zap_flag=False, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_osd_remove(
|
||||
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
|
||||
@ -346,7 +373,7 @@ def osd_remove(self, osd_id, force_flag=False, skip_zap_flag=False, run_on=None)
|
||||
|
||||
|
||||
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
|
||||
def osd_add_db_vg(self, device, run_on=None):
|
||||
def osd_add_db_vg(self, device=None, run_on=None):
|
||||
@ZKConnection(config)
|
||||
def run_osd_add_db_vg(zkhandler, self, run_on, device):
|
||||
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
|
||||
@ -2428,10 +2455,10 @@ class API_VM_Locks(Resource):
|
||||
else:
|
||||
return vm_node_detail, retcode
|
||||
|
||||
task = vm_flush_locks.delay(vm, run_on=vm_node)
|
||||
task = run_celery_task(vm_flush_locks, domain=vm, run_on=vm_node)
|
||||
|
||||
return (
|
||||
{"task_id": task.id, "run_on": vm_node},
|
||||
{"task_id": task.id, "task_name": "vm.flush_locks", "run_on": vm_node},
|
||||
202,
|
||||
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
||||
)
|
||||
@ -2563,10 +2590,10 @@ class API_VM_Device(Resource):
|
||||
else:
|
||||
return vm_node_detail, retcode
|
||||
|
||||
task = vm_device_attach.delay(vm, xml, run_on=vm_node)
|
||||
task = run_celery_task(vm_device_attach, domain=vm, xml=xml, run_on=vm_node)
|
||||
|
||||
return (
|
||||
{"task_id": task.id, "run_on": vm_node},
|
||||
{"task_id": task.id, "task_name": "vm.device_attach", "run_on": vm_node},
|
||||
202,
|
||||
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
||||
)
|
||||
@ -2617,10 +2644,10 @@ class API_VM_Device(Resource):
|
||||
else:
|
||||
return vm_node_detail, retcode
|
||||
|
||||
task = vm_device_detach.delay(vm, xml, run_on=vm_node)
|
||||
task = run_celery_task(vm_device_detach, domain=vm, xml=xml, run_on=vm_node)
|
||||
|
||||
return (
|
||||
{"task_id": task.id, "run_on": vm_node},
|
||||
{"task_id": task.id, "task_name": "vm.device_detach", "run_on": vm_node},
|
||||
202,
|
||||
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
||||
)
|
||||
@ -4373,11 +4400,17 @@ class API_Storage_Ceph_Benchmark(Resource):
|
||||
"message": 'Pool "{}" is not valid.'.format(reqargs.get("pool"))
|
||||
}, 400
|
||||
|
||||
task = run_benchmark.delay(reqargs.get("pool", None))
|
||||
task = run_celery_task(
|
||||
run_benchmark, pool=reqargs.get("pool", None), run_on="primary"
|
||||
)
|
||||
return (
|
||||
{"task_id": task.id},
|
||||
{
|
||||
"task_id": task.id,
|
||||
"task_name": "storage.benchmark",
|
||||
"run_on": get_primary_node(),
|
||||
},
|
||||
202,
|
||||
{"Location": Api.url_for(api, API_Storage_Ceph_Benchmark, task_id=task.id)},
|
||||
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
||||
)
|
||||
|
||||
|
||||
@ -4493,10 +4526,12 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
|
||||
"""
|
||||
node = reqargs.get("node", None)
|
||||
|
||||
task = osd_add_db_vg.delay(reqargs.get("device", None), run_on=node)
|
||||
task = run_celery_task(
|
||||
osd_add_db_vg, device=reqargs.get("device", None), run_on=node
|
||||
)
|
||||
|
||||
return (
|
||||
{"task_id": task.id, "run_on": node},
|
||||
{"task_id": task.id, "task_name": "osd.add_db_vg", "run_on": node},
|
||||
202,
|
||||
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
||||
)
|
||||
@ -4690,17 +4725,18 @@ class API_Storage_Ceph_OSD_Root(Resource):
|
||||
"""
|
||||
node = reqargs.get("node", None)
|
||||
|
||||
task = osd_add.delay(
|
||||
reqargs.get("device", None),
|
||||
reqargs.get("weight", None),
|
||||
reqargs.get("ext_db_ratio", None),
|
||||
reqargs.get("ext_db_size", None),
|
||||
reqargs.get("osd_count", None),
|
||||
task = run_celery_task(
|
||||
osd_add,
|
||||
device=reqargs.get("device", None),
|
||||
weight=reqargs.get("weight", None),
|
||||
ext_db_ratio=reqargs.get("ext_db_ratio", None),
|
||||
ext_db_size=reqargs.get("ext_db_size", None),
|
||||
split_count=reqargs.get("osd_count", None),
|
||||
run_on=node,
|
||||
)
|
||||
|
||||
return (
|
||||
{"task_id": task.id, "run_on": node},
|
||||
{"task_id": task.id, "task_name": "osd.add", "run_on": node},
|
||||
202,
|
||||
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
||||
)
|
||||
@ -4808,18 +4844,19 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||
else:
|
||||
return osd_node_detail, retcode
|
||||
|
||||
task = osd_replace.delay(
|
||||
osdid,
|
||||
reqargs.get("new_device"),
|
||||
reqargs.get("old_device", None),
|
||||
reqargs.get("weight", None),
|
||||
reqargs.get("ext_db_ratio", None),
|
||||
reqargs.get("ext_db_size", None),
|
||||
task = run_celery_task(
|
||||
osd_replace,
|
||||
osd_id=osdid,
|
||||
new_device=reqargs.get("new_device"),
|
||||
old_device=reqargs.get("old_device", None),
|
||||
weight=reqargs.get("weight", None),
|
||||
ext_db_ratio=reqargs.get("ext_db_ratio", None),
|
||||
ext_db_size=reqargs.get("ext_db_size", None),
|
||||
run_on=node,
|
||||
)
|
||||
|
||||
return (
|
||||
{"task_id": task.id, "run_on": node},
|
||||
{"task_id": task.id, "task_name": "osd.replace", "run_on": node},
|
||||
202,
|
||||
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
||||
)
|
||||
@ -4865,14 +4902,16 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||
else:
|
||||
return osd_node_detail, retcode
|
||||
|
||||
task = osd_refresh.delay(
|
||||
osdid,
|
||||
reqargs.get("device", None),
|
||||
task = run_celery_task(
|
||||
osd_refresh,
|
||||
osd_id=osdid,
|
||||
device=reqargs.get("device", None),
|
||||
ext_db_flag=False,
|
||||
run_on=node,
|
||||
)
|
||||
|
||||
return (
|
||||
{"task_id": task.id, "run_on": node},
|
||||
{"task_id": task.id, "task_name": "osd.refresh", "run_on": node},
|
||||
202,
|
||||
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
||||
)
|
||||
@ -4934,14 +4973,15 @@ class API_Storage_Ceph_OSD_Element(Resource):
|
||||
else:
|
||||
return osd_node_detail, retcode
|
||||
|
||||
task = osd_remove.delay(
|
||||
osdid,
|
||||
task = run_celery_task(
|
||||
osd_remove,
|
||||
osd_id=osdid,
|
||||
force_flag=reqargs.get("force", False),
|
||||
run_on=node,
|
||||
)
|
||||
|
||||
return (
|
||||
{"task_id": task.id, "run_on": node},
|
||||
{"task_id": task.id, "task_name": "osd.remove", "run_on": node},
|
||||
202,
|
||||
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
||||
)
|
||||
@ -8462,122 +8502,24 @@ class API_Provisioner_Create_Root(Resource):
|
||||
else:
|
||||
start_vm = False
|
||||
|
||||
task = create_vm.delay(
|
||||
reqargs.get("name", None),
|
||||
reqargs.get("profile", None),
|
||||
task = run_celery_task(
|
||||
create_vm,
|
||||
vm_name=reqargs.get("name", None),
|
||||
profile_name=reqargs.get("profile", None),
|
||||
define_vm=define_vm,
|
||||
start_vm=start_vm,
|
||||
script_run_args=reqargs.get("arg", []),
|
||||
run_on="primary",
|
||||
)
|
||||
return (
|
||||
{"task_id": task.id},
|
||||
202,
|
||||
{
|
||||
"Location": Api.url_for(
|
||||
api, API_Provisioner_Status_Element, task_id=task.id
|
||||
)
|
||||
"task_id": task.id,
|
||||
"task_name": "provisioner.create",
|
||||
"run_on": get_primary_node(),
|
||||
},
|
||||
202,
|
||||
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
|
||||
)
|
||||
|
||||
|
||||
api.add_resource(API_Provisioner_Create_Root, "/provisioner/create")
|
||||
|
||||
|
||||
# /provisioner/status
|
||||
class API_Provisioner_Status_Root(Resource):
|
||||
@Authenticator
|
||||
def get(self):
|
||||
"""
|
||||
View status of provisioner Celery queue
|
||||
---
|
||||
tags:
|
||||
- provisioner
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
active:
|
||||
type: object
|
||||
description: Celery app.control.inspect active tasks
|
||||
reserved:
|
||||
type: object
|
||||
description: Celery app.control.inspect reserved tasks
|
||||
scheduled:
|
||||
type: object
|
||||
description: Celery app.control.inspect scheduled tasks
|
||||
"""
|
||||
queue = celery.control.inspect()
|
||||
response = {
|
||||
"scheduled": queue.scheduled(),
|
||||
"active": queue.active(),
|
||||
"reserved": queue.reserved(),
|
||||
}
|
||||
return response
|
||||
|
||||
|
||||
api.add_resource(API_Provisioner_Status_Root, "/provisioner/status")
|
||||
|
||||
|
||||
# /provisioner/status/<task_id>
|
||||
class API_Provisioner_Status_Element(Resource):
|
||||
@Authenticator
|
||||
def get(self, task_id):
|
||||
"""
|
||||
View status of a provisioner Celery worker job {task_id}
|
||||
---
|
||||
tags:
|
||||
- provisioner
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
total:
|
||||
type: integer
|
||||
description: Total number of steps
|
||||
current:
|
||||
type: integer
|
||||
description: Current steps completed
|
||||
state:
|
||||
type: string
|
||||
description: Current job state
|
||||
status:
|
||||
type: string
|
||||
description: Status details about job
|
||||
404:
|
||||
description: Not found
|
||||
schema:
|
||||
type: object
|
||||
id: Message
|
||||
"""
|
||||
task = create_vm.AsyncResult(task_id)
|
||||
if task.state == "PENDING":
|
||||
response = {
|
||||
"state": task.state,
|
||||
"current": 0,
|
||||
"total": 1,
|
||||
"status": "Pending job start",
|
||||
}
|
||||
elif task.state != "FAILURE":
|
||||
response = {
|
||||
"state": task.state,
|
||||
"current": task.info.get("current", 0),
|
||||
"total": task.info.get("total", 1),
|
||||
"status": task.info.get("status", ""),
|
||||
}
|
||||
if "result" in task.info:
|
||||
response["result"] = task.info["result"]
|
||||
else:
|
||||
response = {
|
||||
"state": task.state,
|
||||
"current": 1,
|
||||
"total": 1,
|
||||
"status": str(task.info),
|
||||
}
|
||||
return response
|
||||
|
||||
|
||||
api.add_resource(API_Provisioner_Status_Element, "/provisioner/status/<task_id>")
|
||||
|
@ -26,7 +26,6 @@ import re
|
||||
import os
|
||||
|
||||
# import sys
|
||||
import time
|
||||
import importlib.util
|
||||
import uuid
|
||||
|
||||
@ -35,6 +34,7 @@ from contextlib import contextmanager
|
||||
from pvcapid.Daemon import config
|
||||
|
||||
from daemon_lib.zkhandler import ZKHandler
|
||||
from daemon_lib.celery import start, fail, log_info, log_warn, log_err, update, finish
|
||||
|
||||
import daemon_lib.common as pvc_common
|
||||
import daemon_lib.node as pvc_node
|
||||
@ -87,6 +87,21 @@ class VMBuilder(object):
|
||||
self.vm_profile = vm_profile
|
||||
self.vm_data = vm_data
|
||||
|
||||
#
|
||||
# Helper class functions; used by the individual scripts
|
||||
#
|
||||
def log_info(self, msg):
|
||||
log_info(None, msg)
|
||||
|
||||
def log_warn(self, msg):
|
||||
log_warn(None, msg)
|
||||
|
||||
def log_err(self, msg):
|
||||
log_err(None, msg)
|
||||
|
||||
def fail(self, msg, exception=ProvisioningError):
|
||||
fail(None, msg, exception=exception)
|
||||
|
||||
#
|
||||
# Primary class functions; implemented by the individual scripts
|
||||
#
|
||||
@ -160,7 +175,11 @@ def open_db(config):
|
||||
)
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
except Exception:
|
||||
raise ClusterError("Failed to connect to Postgres")
|
||||
fail(
|
||||
None,
|
||||
"Failed to connect to Postgres",
|
||||
exception=ClusterError,
|
||||
)
|
||||
|
||||
try:
|
||||
yield cur
|
||||
@ -179,7 +198,11 @@ def open_zk(config):
|
||||
zkhandler = ZKHandler(config)
|
||||
zkhandler.connect()
|
||||
except Exception:
|
||||
raise ClusterError("Failed to connect to Zookeeper")
|
||||
fail(
|
||||
None,
|
||||
"Failed to connect to Zookeeper",
|
||||
exception=ClusterError,
|
||||
)
|
||||
|
||||
try:
|
||||
yield zkhandler
|
||||
@ -194,19 +217,28 @@ def open_zk(config):
|
||||
# Main VM provisioning function - executed by the Celery worker
|
||||
#
|
||||
def create_vm(
|
||||
self, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]
|
||||
celery, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]
|
||||
):
|
||||
print(f"Starting provisioning of VM '{vm_name}' with profile '{vm_profile}'")
|
||||
current_stage = 0
|
||||
total_stages = 10
|
||||
start(
|
||||
celery,
|
||||
f"Starting provisioning of VM '{vm_name}' with profile '{vm_profile}'",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
# Phase 1 - setup
|
||||
# * Get the profile elements
|
||||
# * Get the details from these elements
|
||||
# * Assemble a VM configuration dictionary
|
||||
self.update_state(
|
||||
state="RUNNING",
|
||||
meta={"current": 1, "total": 10, "status": "Collecting configuration"},
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Collecting configuration details",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
vm_id = re.findall(r"/(\d+)$/", vm_name)
|
||||
if not vm_id:
|
||||
@ -313,13 +345,14 @@ def create_vm(
|
||||
argument_name, argument_data = argument.split("=")
|
||||
script_arguments[argument_name] = argument_data
|
||||
|
||||
print("Script arguments: {}".format(script_arguments))
|
||||
log_info(celery, f"Script arguments: {script_arguments}")
|
||||
vm_data["script_arguments"] = script_arguments
|
||||
|
||||
print(
|
||||
log_info(
|
||||
celery,
|
||||
"VM configuration data:\n{}".format(
|
||||
json.dumps(vm_data, sort_keys=True, indent=2)
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
# Phase 2 - verification
|
||||
@ -327,21 +360,21 @@ def create_vm(
|
||||
# * Ensure that all networks are valid
|
||||
# * Ensure that there is enough disk space in the Ceph cluster for the disks
|
||||
# This is the "safe fail" step when an invalid configuration will be caught
|
||||
self.update_state(
|
||||
state="RUNNING",
|
||||
meta={
|
||||
"current": 2,
|
||||
"total": 10,
|
||||
"status": "Verifying configuration against cluster",
|
||||
},
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Verifying configuration against cluster",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
with open_zk(config) as zkhandler:
|
||||
# Verify that a VM with this name does not already exist
|
||||
if pvc_vm.searchClusterByName(zkhandler, vm_name):
|
||||
raise ClusterError(
|
||||
"A VM with the name '{}' already exists in the cluster.".format(vm_name)
|
||||
fail(
|
||||
celery,
|
||||
f"A VM with the name '{vm_name}' already exists in the cluster.",
|
||||
exception=ClusterError,
|
||||
)
|
||||
|
||||
# Verify that at least one host has enough free RAM to run the VM
|
||||
@ -361,16 +394,15 @@ def create_vm(
|
||||
target_node = node["name"]
|
||||
# Raise if no node was found
|
||||
if not target_node:
|
||||
raise ClusterError(
|
||||
"No ready cluster node contains at least {}+512 MB of free RAM.".format(
|
||||
vm_data["system_details"]["vram_mb"]
|
||||
)
|
||||
fail(
|
||||
celery,
|
||||
f"No ready cluster node contains at least {vm_data['system_details']['vram_mb']}+512 MB of free RAM",
|
||||
exception=ClusterError,
|
||||
)
|
||||
|
||||
print(
|
||||
'Selecting target node "{}" with "{}" MB free RAM'.format(
|
||||
target_node, last_free
|
||||
)
|
||||
log_info(
|
||||
celery,
|
||||
f'Selecting target node "{target_node}" with "{last_free}" MB free RAM',
|
||||
)
|
||||
|
||||
# Verify that all configured networks are present on the cluster
|
||||
@ -382,11 +414,13 @@ def create_vm(
|
||||
"cluster",
|
||||
"storage",
|
||||
]:
|
||||
raise ClusterError(
|
||||
'The network VNI "{}" is not present on the cluster.'.format(vni)
|
||||
fail(
|
||||
celery,
|
||||
f'The network VNI "{vni}" is not present on the cluster.',
|
||||
exception=ClusterError,
|
||||
)
|
||||
|
||||
print("All configured networks for VM are valid")
|
||||
log_info(celery, "All configured networks for VM are valid")
|
||||
|
||||
# Verify that there is enough disk space free to provision all VM disks
|
||||
pools = dict()
|
||||
@ -396,10 +430,10 @@ def create_vm(
|
||||
zkhandler, volume["pool"], volume["source_volume"]
|
||||
)
|
||||
if not volume_data:
|
||||
raise ClusterError(
|
||||
"The source volume {}/{} could not be found.".format(
|
||||
volume["pool"], volume["source_volume"]
|
||||
)
|
||||
fail(
|
||||
celery,
|
||||
f"The source volume {volume['pool']}/{volume['source_volume']} could not be found.",
|
||||
exception=ClusterError,
|
||||
)
|
||||
if not volume["pool"] in pools:
|
||||
pools[volume["pool"]] = int(
|
||||
@ -427,8 +461,10 @@ def create_vm(
|
||||
if not pool_information:
|
||||
raise
|
||||
except Exception:
|
||||
raise ClusterError(
|
||||
'Pool "{}" is not present on the cluster.'.format(pool)
|
||||
fail(
|
||||
celery,
|
||||
f'Pool "{pool}" is not present on the cluster.',
|
||||
exception=ClusterError,
|
||||
)
|
||||
pool_free_space_gb = int(
|
||||
pool_information["stats"]["free_bytes"] / 1024 / 1024 / 1024
|
||||
@ -436,13 +472,13 @@ def create_vm(
|
||||
pool_vm_usage_gb = int(pools[pool])
|
||||
|
||||
if pool_vm_usage_gb >= pool_free_space_gb:
|
||||
raise ClusterError(
|
||||
'Pool "{}" has only {} GB free and VM requires {} GB.'.format(
|
||||
pool, pool_free_space_gb, pool_vm_usage_gb
|
||||
)
|
||||
fail(
|
||||
celery,
|
||||
f'Pool "{pool}" has only {pool_free_space_gb} GB free but VM requires {pool_vm_usage_gb} GB.',
|
||||
exception=ClusterError,
|
||||
)
|
||||
|
||||
print("There is enough space on cluster to store VM volumes")
|
||||
log_info(celery, "There is enough space on cluster to store VM volumes")
|
||||
|
||||
# Verify that every specified filesystem is valid
|
||||
used_filesystems = list()
|
||||
@ -461,33 +497,44 @@ def create_vm(
|
||||
elif filesystem == "swap":
|
||||
retcode, stdout, stderr = pvc_common.run_os_command("which mkswap")
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
"Failed to find binary for mkswap: {}".format(stderr)
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to find binary for mkswap: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
else:
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(
|
||||
"which mkfs.{}".format(filesystem)
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
"Failed to find binary for mkfs.{}: {}".format(filesystem, stderr)
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to find binary for mkfs.{filesystem}: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
print("All selected filesystems are valid")
|
||||
log_info(celery, "All selected filesystems are valid")
|
||||
|
||||
# Phase 3 - provisioning script preparation
|
||||
# * Import the provisioning script as a library with importlib
|
||||
# * Ensure the required function(s) are present
|
||||
self.update_state(
|
||||
state="RUNNING",
|
||||
meta={"current": 3, "total": 10, "status": "Preparing provisioning script"},
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Preparing provisioning script",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
# Write the script out to a temporary file
|
||||
retcode, stdout, stderr = pvc_common.run_os_command("mktemp")
|
||||
if retcode:
|
||||
raise ProvisioningError("Failed to create a temporary file: {}".format(stderr))
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to create a temporary file: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
script_file = stdout.strip()
|
||||
with open(script_file, "w") as fh:
|
||||
fh.write(vm_data["script"])
|
||||
@ -507,12 +554,17 @@ def create_vm(
|
||||
vm_data=vm_data,
|
||||
)
|
||||
|
||||
print("Provisioning script imported successfully")
|
||||
log_info(celery, "Provisioning script imported successfully")
|
||||
|
||||
# Create temporary directory for external chroot
|
||||
retcode, stdout, stderr = pvc_common.run_os_command("mktemp -d")
|
||||
if retcode:
|
||||
raise ProvisioningError(f"Failed to create a temporary directory: {stderr}")
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to create a temporary directory: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
temp_dir = stdout.strip()
|
||||
|
||||
# Bind mount / to the chroot location /
|
||||
@ -520,8 +572,10 @@ def create_vm(
|
||||
f"mount --bind --options ro / {temp_dir}"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
f"Failed to mount rootfs onto {temp_dir} for chroot: {stderr}"
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to mount rootfs into {temp_dir} for chroot: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Mount tmpfs to the chroot location /tmp
|
||||
@ -529,8 +583,10 @@ def create_vm(
|
||||
f"mount --type tmpfs tmpfs {temp_dir}/tmp"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
f"Failed to mount tmpfs onto {temp_dir}/tmp for chroot: {stderr}"
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to mount tmpfs onto {temp_dir}/tmp for chroot: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Bind mount /dev to the chroot location /dev
|
||||
@ -538,8 +594,10 @@ def create_vm(
|
||||
f"mount --bind --options ro /dev {temp_dir}/dev"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
f"Failed to mount devfs onto {temp_dir}/dev for chroot: {stderr}"
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to mount devfs onto {temp_dir}/dev for chroot: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Bind mount /run to the chroot location /run
|
||||
@ -547,8 +605,10 @@ def create_vm(
|
||||
f"mount --bind --options rw /run {temp_dir}/run"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
f"Failed to mount runfs onto {temp_dir}/run for chroot: {stderr}"
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to mount runfs onto {temp_dir}/run for chroot: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Bind mount /sys to the chroot location /sys
|
||||
@ -556,8 +616,10 @@ def create_vm(
|
||||
f"mount --bind --options rw /sys {temp_dir}/sys"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
f"Failed to mount sysfs onto {temp_dir}/sys for chroot: {stderr}"
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to mount sysfs onto {temp_dir}/sys for chroot: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Bind mount /proc to the chroot location /proc
|
||||
@ -565,14 +627,16 @@ def create_vm(
|
||||
f"mount --bind --options rw /proc {temp_dir}/proc"
|
||||
)
|
||||
if retcode:
|
||||
raise ProvisioningError(
|
||||
f"Failed to mount procfs onto {temp_dir}/proc for chroot: {stderr}"
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to mount procfs onto {temp_dir}/proc for chroot: {stderr}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
print("Chroot environment prepared successfully")
|
||||
log_info(celery, "Chroot environment prepared successfully")
|
||||
|
||||
def general_cleanup():
|
||||
print("Running upper cleanup steps")
|
||||
log_info(celery, "Running upper cleanup steps")
|
||||
|
||||
try:
|
||||
# Unmount bind-mounted devfs on the chroot
|
||||
@ -599,68 +663,73 @@ def create_vm(
|
||||
retcode, stdout, stderr = pvc_common.run_os_command(f"umount {temp_dir}")
|
||||
except Exception as e:
|
||||
# We don't care about fails during cleanup, log and continue
|
||||
print(f"Suberror during general cleanup unmounts: {e}")
|
||||
log_warn(celery, f"Suberror during general cleanup unmounts: {e}")
|
||||
|
||||
try:
|
||||
# Remove the temp_dir
|
||||
os.rmdir(temp_dir)
|
||||
except Exception as e:
|
||||
# We don't care about fails during cleanup, log and continue
|
||||
print(f"Suberror during general cleanup directory removal: {e}")
|
||||
log_warn(celery, f"Suberror during general cleanup directory removal: {e}")
|
||||
|
||||
try:
|
||||
# Remote temporary script (don't fail if not removed)
|
||||
os.remove(script_file)
|
||||
except Exception as e:
|
||||
# We don't care about fails during cleanup, log and continue
|
||||
print(f"Suberror during general cleanup script removal: {e}")
|
||||
log_warn(celery, f"Suberror during general cleanup script removal: {e}")
|
||||
|
||||
def fail_clean(celery, msg, exception=ProvisioningError):
|
||||
try:
|
||||
vm_builder.cleanup()
|
||||
general_cleanup()
|
||||
except Exception:
|
||||
pass
|
||||
fail(celery, msg, exception=exception)
|
||||
|
||||
# Phase 4 - script: setup()
|
||||
# * Run pre-setup steps
|
||||
self.update_state(
|
||||
state="RUNNING",
|
||||
meta={
|
||||
"current": 4,
|
||||
"total": 10,
|
||||
"status": "Running script setup() step",
|
||||
},
|
||||
current_stage += 1
|
||||
update(
|
||||
celery, "Running script setup() step", current=current_stage, total=total_stages
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
print("Running script setup() step")
|
||||
|
||||
try:
|
||||
with chroot(temp_dir):
|
||||
vm_builder.setup()
|
||||
except Exception as e:
|
||||
general_cleanup()
|
||||
raise ProvisioningError(f"Error in script setup() step: {e}")
|
||||
fail_clean(
|
||||
celery,
|
||||
f"Error in script setup() step: {e}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Phase 5 - script: create()
|
||||
# * Prepare the libvirt XML defintion for the VM
|
||||
self.update_state(
|
||||
state="RUNNING",
|
||||
meta={
|
||||
"current": 5,
|
||||
"total": 10,
|
||||
"status": "Running script create() step",
|
||||
},
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Running script create() step",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
if define_vm:
|
||||
print("Running script create() step")
|
||||
|
||||
try:
|
||||
with chroot(temp_dir):
|
||||
vm_schema = vm_builder.create()
|
||||
except Exception as e:
|
||||
general_cleanup()
|
||||
raise ProvisioningError(f"Error in script create() step: {e}")
|
||||
fail_clean(
|
||||
celery,
|
||||
f"Error in script create() step: {e}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
print("Generated VM schema:\n{}\n".format(vm_schema))
|
||||
log_info(celery, "Generated VM schema:\n{}\n".format(vm_schema))
|
||||
|
||||
print("Defining VM on cluster")
|
||||
log_info(celery, "Defining VM on cluster")
|
||||
node_limit = vm_data["system_details"]["node_limit"]
|
||||
if node_limit:
|
||||
node_limit = node_limit.split(",")
|
||||
@ -679,23 +748,19 @@ def create_vm(
|
||||
vm_profile,
|
||||
initial_state="provision",
|
||||
)
|
||||
print(retmsg)
|
||||
log_info(celery, retmsg)
|
||||
else:
|
||||
print("Skipping VM definition due to define_vm=False")
|
||||
log_info("Skipping VM definition due to define_vm=False")
|
||||
|
||||
# Phase 6 - script: prepare()
|
||||
# * Run preparation steps (e.g. disk creation and mapping, filesystem creation, etc.)
|
||||
self.update_state(
|
||||
state="RUNNING",
|
||||
meta={
|
||||
"current": 6,
|
||||
"total": 10,
|
||||
"status": "Running script prepare() step",
|
||||
},
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Running script prepare() step",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
print("Running script prepare() step")
|
||||
|
||||
try:
|
||||
with chroot(temp_dir):
|
||||
@ -704,21 +769,21 @@ def create_vm(
|
||||
with chroot(temp_dir):
|
||||
vm_builder.cleanup()
|
||||
general_cleanup()
|
||||
raise ProvisioningError(f"Error in script prepare() step: {e}")
|
||||
fail_clean(
|
||||
celery,
|
||||
f"Error in script prepare() step: {e}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Phase 7 - script: install()
|
||||
# * Run installation with arguments
|
||||
self.update_state(
|
||||
state="RUNNING",
|
||||
meta={
|
||||
"current": 7,
|
||||
"total": 10,
|
||||
"status": "Running script install() step",
|
||||
},
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Running script install() step",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
print("Running script install() step")
|
||||
|
||||
try:
|
||||
with chroot(temp_dir):
|
||||
@ -727,63 +792,62 @@ def create_vm(
|
||||
with chroot(temp_dir):
|
||||
vm_builder.cleanup()
|
||||
general_cleanup()
|
||||
raise ProvisioningError(f"Error in script install() step: {e}")
|
||||
fail_clean(
|
||||
celery,
|
||||
f"Error in script install() step: {e}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Phase 8 - script: cleanup()
|
||||
# * Run cleanup steps
|
||||
self.update_state(
|
||||
state="RUNNING",
|
||||
meta={
|
||||
"current": 8,
|
||||
"total": 10,
|
||||
"status": "Running script cleanup() step",
|
||||
},
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Running script cleanup() step",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
print("Running script cleanup() step")
|
||||
|
||||
try:
|
||||
with chroot(temp_dir):
|
||||
vm_builder.cleanup()
|
||||
except Exception as e:
|
||||
general_cleanup()
|
||||
raise ProvisioningError(f"Error in script cleanup() step: {e}")
|
||||
fail(
|
||||
celery,
|
||||
f"Error in script cleanup() step: {e}",
|
||||
exception=ProvisioningError,
|
||||
)
|
||||
|
||||
# Phase 9 - general cleanup
|
||||
# * Clean up the chroot from earlier
|
||||
self.update_state(
|
||||
state="RUNNING",
|
||||
meta={
|
||||
"current": 9,
|
||||
"total": 10,
|
||||
"status": "Running general cleanup steps",
|
||||
},
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
"Running general cleanup steps",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
general_cleanup()
|
||||
|
||||
# Phase 10 - startup
|
||||
# * Start the VM in the PVC cluster
|
||||
self.update_state(
|
||||
state="RUNNING",
|
||||
meta={
|
||||
"current": 10,
|
||||
"total": 10,
|
||||
"status": "Starting VM",
|
||||
},
|
||||
)
|
||||
time.sleep(1)
|
||||
current_stage += 1
|
||||
update(celery, "Starting VM", current=current_stage, total=total_stages)
|
||||
|
||||
if start_vm:
|
||||
print("Starting VM")
|
||||
with open_zk(config) as zkhandler:
|
||||
success, message = pvc_vm.start_vm(zkhandler, vm_name)
|
||||
print(message)
|
||||
log_info(celery, message)
|
||||
|
||||
end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned and started successfully'
|
||||
else:
|
||||
end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned successfully'
|
||||
|
||||
return {"status": end_message, "current": 10, "total": 10}
|
||||
current_stage += 1
|
||||
return finish(
|
||||
celery,
|
||||
end_message,
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
@ -25,10 +25,10 @@ CELERY_BIN="$( which celery )"
|
||||
# app arguments work in a non-backwards-compatible way with Celery 5.
|
||||
case "$( cat /etc/debian_version )" in
|
||||
10.*)
|
||||
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --concurrency 3 --hostname $(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
||||
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
||||
;;
|
||||
*)
|
||||
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --concurrency 3 --hostname $(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
||||
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --events --concurrency 3 --hostname pvcworkerd@$(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
||||
;;
|
||||
esac
|
||||
|
||||
|
@ -534,6 +534,55 @@ def cli_cluster_maintenance_off():
|
||||
finish(retcode, retdata)
|
||||
|
||||
|
||||
###############################################################################
|
||||
# > pvc cluster task
|
||||
###############################################################################
|
||||
@click.command(name="task", short_help="Show status of worker task.")
|
||||
@connection_req
|
||||
@click.argument("task_id", required=False, default=None)
|
||||
@click.option(
|
||||
"--wait/--no-wait",
|
||||
"wait_flag",
|
||||
is_flag=True,
|
||||
default=True,
|
||||
show_default=True,
|
||||
help="""Wait or don't wait for task to complete, showing progress. Requires TASK_ID; overrides "-f"/"--format".""",
|
||||
)
|
||||
@format_opt(
|
||||
{
|
||||
"pretty": cli_cluster_task_format_pretty,
|
||||
"raw": lambda d: "\n".join([t["id"] for t in d])
|
||||
if isinstance(d, list)
|
||||
else d["state"],
|
||||
"json": lambda d: jdumps(d),
|
||||
"json-pretty": lambda d: jdumps(d, indent=2),
|
||||
}
|
||||
)
|
||||
def cli_cluster_task(task_id, wait_flag, format_function):
|
||||
"""
|
||||
Show the current status of worker task TASK_ID or a list of all active and pending tasks.
|
||||
"""
|
||||
if task_id is None:
|
||||
wait_flag = False
|
||||
|
||||
if wait_flag:
|
||||
# First validate that this is actually a valid task that is running
|
||||
retcode, retdata = pvc.lib.common.task_status(CLI_CONFIG, None)
|
||||
if task_id in [i["id"] for i in retdata]:
|
||||
task = [i for i in retdata if i["id"] == task_id][0]
|
||||
retmsg = wait_for_celery_task(
|
||||
CLI_CONFIG,
|
||||
{"task_id": task["id"], "task_name": task["name"]},
|
||||
start_late=True,
|
||||
)
|
||||
else:
|
||||
retmsg = f"No task with ID {task_id} found."
|
||||
finish(retcode, retmsg)
|
||||
else:
|
||||
retcode, retdata = pvc.lib.common.task_status(CLI_CONFIG, task_id)
|
||||
finish(retcode, retdata, format_function)
|
||||
|
||||
|
||||
###############################################################################
|
||||
# > pvc node
|
||||
###############################################################################
|
||||
@ -575,18 +624,6 @@ def cli_node_primary(
|
||||
Set NODE in primary coordinator state, making it the primary coordinator for the cluster.
|
||||
"""
|
||||
|
||||
# Handle active provisioner task warnings
|
||||
_, tasks_retdata = pvc.lib.provisioner.task_status(CLI_CONFIG, None)
|
||||
if len(tasks_retdata) > 0:
|
||||
echo(
|
||||
CLI_CONFIG,
|
||||
f"""\
|
||||
NOTE: There are currently {len(tasks_retdata)} active or queued provisioner tasks.
|
||||
These jobs will continue executing, but their status visibility will be lost until
|
||||
the current primary node returns to primary state.
|
||||
""",
|
||||
)
|
||||
|
||||
retcode, retdata = pvc.lib.node.node_coordinator_state(CLI_CONFIG, node, "primary")
|
||||
if not retcode or "already" in retdata:
|
||||
finish(retcode, retdata)
|
||||
@ -625,18 +662,6 @@ def cli_node_secondary(
|
||||
Set NODE in secondary coordinator state, making another active node the primary node for the cluster.
|
||||
"""
|
||||
|
||||
# Handle active provisioner task warnings
|
||||
_, tasks_retdata = pvc.lib.provisioner.task_status(CLI_CONFIG, None)
|
||||
if len(tasks_retdata) > 0:
|
||||
echo(
|
||||
CLI_CONFIG,
|
||||
f"""\
|
||||
NOTE: There are currently {len(tasks_retdata)} active or queued provisioner tasks.
|
||||
These jobs will continue executing, but their status visibility will be lost until
|
||||
the current primary node returns to primary state.
|
||||
""",
|
||||
)
|
||||
|
||||
retcode, retdata = pvc.lib.node.node_coordinator_state(
|
||||
CLI_CONFIG, node, "secondary"
|
||||
)
|
||||
@ -3296,15 +3321,26 @@ def cli_storage_benchmark():
|
||||
@click.command(name="run", short_help="Run a storage benchmark.")
|
||||
@connection_req
|
||||
@click.argument("pool")
|
||||
@click.option(
|
||||
"--wait/--no-wait",
|
||||
"wait_flag",
|
||||
is_flag=True,
|
||||
default=True,
|
||||
show_default=True,
|
||||
help="Wait or don't wait for task to complete, showing progress",
|
||||
)
|
||||
@confirm_opt(
|
||||
"Storage benchmarks take approximately 10 minutes to run and generate significant load on the cluster; they should be run sparingly. Continue"
|
||||
)
|
||||
def cli_storage_benchmark_run(pool):
|
||||
def cli_storage_benchmark_run(pool, wait_flag):
|
||||
"""
|
||||
Run a storage benchmark on POOL in the background.
|
||||
"""
|
||||
|
||||
retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(CLI_CONFIG, pool)
|
||||
retcode, retmsg = pvc.lib.storage.ceph_benchmark_run(CLI_CONFIG, pool, wait_flag)
|
||||
|
||||
if retcode and wait_flag:
|
||||
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
|
||||
finish(retcode, retmsg)
|
||||
|
||||
|
||||
@ -5581,15 +5617,15 @@ def cli_provisioner_profile_list(limit, format_function):
|
||||
help="Start the VM automatically upon completion of provisioning.",
|
||||
)
|
||||
@click.option(
|
||||
"-w",
|
||||
"--wait",
|
||||
"--wait/--no-wait",
|
||||
"wait_flag",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Wait for provisioning to complete, showing progress",
|
||||
default=True,
|
||||
show_default=True,
|
||||
help="Wait or don't wait for task to complete, showing progress",
|
||||
)
|
||||
def cli_provisioner_create(
|
||||
name, profile, wait_flag, define_flag, start_flag, script_args
|
||||
name, profile, define_flag, start_flag, script_args, wait_flag
|
||||
):
|
||||
"""
|
||||
Create a new VM NAME with profile PROFILE.
|
||||
@ -5610,39 +5646,13 @@ def cli_provisioner_create(
|
||||
if not define_flag:
|
||||
start_flag = False
|
||||
|
||||
retcode, retdata = pvc.lib.provisioner.vm_create(
|
||||
CLI_CONFIG, name, profile, wait_flag, define_flag, start_flag, script_args
|
||||
retcode, retmsg = pvc.lib.provisioner.vm_create(
|
||||
CLI_CONFIG, name, profile, define_flag, start_flag, script_args, wait_flag
|
||||
)
|
||||
|
||||
if retcode and wait_flag:
|
||||
task_id = retdata
|
||||
retdata = wait_for_provisioner(CLI_CONFIG, task_id)
|
||||
|
||||
finish(retcode, retdata)
|
||||
|
||||
|
||||
###############################################################################
|
||||
# > pvc provisioner status
|
||||
###############################################################################
|
||||
@click.command(name="status", short_help="Show status of provisioner job.")
|
||||
@connection_req
|
||||
@click.argument("job", required=False, default=None)
|
||||
@format_opt(
|
||||
{
|
||||
"pretty": cli_provisioner_status_format_pretty,
|
||||
"raw": lambda d: "\n".join([t["id"] for t in d])
|
||||
if isinstance(d, list)
|
||||
else d["state"],
|
||||
"json": lambda d: jdumps(d),
|
||||
"json-pretty": lambda d: jdumps(d, indent=2),
|
||||
}
|
||||
)
|
||||
def cli_provisioner_status(job, format_function):
|
||||
"""
|
||||
Show status of provisioner job JOB or a list of jobs.
|
||||
"""
|
||||
retcode, retdata = pvc.lib.provisioner.task_status(CLI_CONFIG, job)
|
||||
finish(retcode, retdata, format_function)
|
||||
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
|
||||
finish(retcode, retmsg)
|
||||
|
||||
|
||||
###############################################################################
|
||||
@ -6159,7 +6169,6 @@ cli_provisioner_profile.add_command(cli_provisioner_profile_remove)
|
||||
cli_provisioner_profile.add_command(cli_provisioner_profile_list)
|
||||
cli_provisioner.add_command(cli_provisioner_profile)
|
||||
cli_provisioner.add_command(cli_provisioner_create)
|
||||
cli_provisioner.add_command(cli_provisioner_status)
|
||||
cli.add_command(cli_provisioner)
|
||||
cli_cluster.add_command(cli_cluster_status)
|
||||
cli_cluster.add_command(cli_cluster_init)
|
||||
@ -6168,6 +6177,7 @@ cli_cluster.add_command(cli_cluster_restore)
|
||||
cli_cluster_maintenance.add_command(cli_cluster_maintenance_on)
|
||||
cli_cluster_maintenance.add_command(cli_cluster_maintenance_off)
|
||||
cli_cluster.add_command(cli_cluster_maintenance)
|
||||
cli_cluster.add_command(cli_cluster_task)
|
||||
cli.add_command(cli_cluster)
|
||||
cli_connection.add_command(cli_connection_add)
|
||||
cli_connection.add_command(cli_connection_remove)
|
||||
|
@ -47,7 +47,6 @@ from pvc.lib.provisioner import format_list_userdata as provisioner_format_userd
|
||||
from pvc.lib.provisioner import format_list_script as provisioner_format_script_list
|
||||
from pvc.lib.provisioner import format_list_ova as provisioner_format_ova_list
|
||||
from pvc.lib.provisioner import format_list_profile as provisioner_format_profile_list
|
||||
from pvc.lib.provisioner import format_list_task as provisioner_format_task_status
|
||||
|
||||
|
||||
# Define colour values for use in formatters
|
||||
@ -262,6 +261,184 @@ def cli_cluster_status_format_short(CLI_CONFIG, data):
|
||||
return "\n".join(output)
|
||||
|
||||
|
||||
def cli_cluster_task_format_pretty(CLI_CONFIG, task_data):
|
||||
"""
|
||||
Pretty format the output of cli_cluster_task
|
||||
"""
|
||||
if not isinstance(task_data, list):
|
||||
job_state = task_data["state"]
|
||||
if job_state == "RUNNING":
|
||||
retdata = "Job state: RUNNING\nStage: {}/{}\nStatus: {}".format(
|
||||
task_data["current"], task_data["total"], task_data["status"]
|
||||
)
|
||||
elif job_state == "FAILED":
|
||||
retdata = "Job state: FAILED\nStatus: {}".format(task_data["status"])
|
||||
elif job_state == "COMPLETED":
|
||||
retdata = "Job state: COMPLETED\nStatus: {}".format(task_data["status"])
|
||||
else:
|
||||
retdata = "Job state: {}\nStatus: {}".format(
|
||||
task_data["state"], task_data["status"]
|
||||
)
|
||||
return retdata
|
||||
|
||||
task_list_output = []
|
||||
|
||||
# Determine optimal column widths
|
||||
task_id_length = 3
|
||||
task_name_length = 5
|
||||
task_type_length = 7
|
||||
task_worker_length = 7
|
||||
task_arg_name_length = 5
|
||||
task_arg_data_length = 10
|
||||
|
||||
tasks = list()
|
||||
for task in task_data:
|
||||
# task_id column
|
||||
_task_id_length = len(str(task["id"])) + 1
|
||||
if _task_id_length > task_id_length:
|
||||
task_id_length = _task_id_length
|
||||
# task_name column
|
||||
_task_name_length = len(str(task["name"])) + 1
|
||||
if _task_name_length > task_name_length:
|
||||
task_name_length = _task_name_length
|
||||
# task_worker column
|
||||
_task_worker_length = len(str(task["worker"])) + 1
|
||||
if _task_worker_length > task_worker_length:
|
||||
task_worker_length = _task_worker_length
|
||||
# task_type column
|
||||
_task_type_length = len(str(task["type"])) + 1
|
||||
if _task_type_length > task_type_length:
|
||||
task_type_length = _task_type_length
|
||||
|
||||
updated_kwargs = list()
|
||||
for arg_name, arg_data in task["kwargs"].items():
|
||||
# Skip the "run_on" argument
|
||||
if arg_name == "run_on":
|
||||
continue
|
||||
|
||||
# task_arg_name column
|
||||
_task_arg_name_length = len(str(arg_name)) + 1
|
||||
if _task_arg_name_length > task_arg_name_length:
|
||||
task_arg_name_length = _task_arg_name_length
|
||||
|
||||
if len(str(arg_data)) > 17:
|
||||
arg_data = arg_data[:17] + "..."
|
||||
|
||||
# task_arg_data column
|
||||
_task_arg_data_length = len(str(arg_data)) + 1
|
||||
if _task_arg_data_length > task_arg_data_length:
|
||||
task_arg_data_length = _task_arg_data_length
|
||||
|
||||
updated_kwargs.append({"name": arg_name, "data": arg_data})
|
||||
task["kwargs"] = updated_kwargs
|
||||
tasks.append(task)
|
||||
|
||||
# Format the string (header)
|
||||
task_list_output.append(
|
||||
"{bold}{task_header: <{task_header_length}} {arg_header: <{arg_header_length}}{end_bold}".format(
|
||||
bold=ansii["bold"],
|
||||
end_bold=ansii["end"],
|
||||
task_header_length=task_id_length
|
||||
+ task_name_length
|
||||
+ task_type_length
|
||||
+ task_worker_length
|
||||
+ 3,
|
||||
arg_header_length=task_arg_name_length + task_arg_data_length,
|
||||
task_header="Tasks "
|
||||
+ "".join(
|
||||
[
|
||||
"-"
|
||||
for _ in range(
|
||||
6,
|
||||
task_id_length
|
||||
+ task_name_length
|
||||
+ task_type_length
|
||||
+ task_worker_length
|
||||
+ 2,
|
||||
)
|
||||
]
|
||||
),
|
||||
arg_header="Arguments "
|
||||
+ "".join(
|
||||
[
|
||||
"-"
|
||||
for _ in range(11, task_arg_name_length + task_arg_data_length + 1)
|
||||
]
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
task_list_output.append(
|
||||
"{bold}{task_id: <{task_id_length}} {task_name: <{task_name_length}} {task_type: <{task_type_length}} \
|
||||
{task_worker: <{task_worker_length}} \
|
||||
{task_arg_name: <{task_arg_name_length}} \
|
||||
{task_arg_data: <{task_arg_data_length}}{end_bold}".format(
|
||||
task_id_length=task_id_length,
|
||||
task_name_length=task_name_length,
|
||||
task_type_length=task_type_length,
|
||||
task_worker_length=task_worker_length,
|
||||
task_arg_name_length=task_arg_name_length,
|
||||
task_arg_data_length=task_arg_data_length,
|
||||
bold=ansii["bold"],
|
||||
end_bold=ansii["end"],
|
||||
task_id="ID",
|
||||
task_name="Name",
|
||||
task_type="Status",
|
||||
task_worker="Worker",
|
||||
task_arg_name="Name",
|
||||
task_arg_data="Data",
|
||||
)
|
||||
)
|
||||
|
||||
# Format the string (elements)
|
||||
for task in sorted(tasks, key=lambda i: i.get("type", None)):
|
||||
task_list_output.append(
|
||||
"{bold}{task_id: <{task_id_length}} {task_name: <{task_name_length}} {task_type: <{task_type_length}} \
|
||||
{task_worker: <{task_worker_length}} \
|
||||
{task_arg_name: <{task_arg_name_length}} \
|
||||
{task_arg_data: <{task_arg_data_length}}{end_bold}".format(
|
||||
task_id_length=task_id_length,
|
||||
task_name_length=task_name_length,
|
||||
task_type_length=task_type_length,
|
||||
task_worker_length=task_worker_length,
|
||||
task_arg_name_length=task_arg_name_length,
|
||||
task_arg_data_length=task_arg_data_length,
|
||||
bold="",
|
||||
end_bold="",
|
||||
task_id=task["id"],
|
||||
task_name=task["name"],
|
||||
task_type=task["type"],
|
||||
task_worker=task["worker"],
|
||||
task_arg_name=str(task["kwargs"][0]["name"]),
|
||||
task_arg_data=str(task["kwargs"][0]["data"]),
|
||||
)
|
||||
)
|
||||
for arg in task["kwargs"][1:]:
|
||||
task_list_output.append(
|
||||
"{bold}{task_id: <{task_id_length}} {task_name: <{task_name_length}} {task_type: <{task_type_length}} \
|
||||
{task_worker: <{task_worker_length}} \
|
||||
{task_arg_name: <{task_arg_name_length}} \
|
||||
{task_arg_data: <{task_arg_data_length}}{end_bold}".format(
|
||||
task_id_length=task_id_length,
|
||||
task_name_length=task_name_length,
|
||||
task_type_length=task_type_length,
|
||||
task_worker_length=task_worker_length,
|
||||
task_arg_name_length=task_arg_name_length,
|
||||
task_arg_data_length=task_arg_data_length,
|
||||
bold="",
|
||||
end_bold="",
|
||||
task_id="",
|
||||
task_name="",
|
||||
task_type="",
|
||||
task_worker="",
|
||||
task_arg_name=str(arg["name"]),
|
||||
task_arg_data=str(arg["data"]),
|
||||
)
|
||||
)
|
||||
|
||||
return "\n".join(task_list_output)
|
||||
|
||||
|
||||
def cli_connection_list_format_pretty(CLI_CONFIG, data):
|
||||
"""
|
||||
Pretty format the output of cli_connection_list
|
||||
@ -724,11 +901,3 @@ def cli_provisioner_profile_list_format_pretty(CLI_CONFIG, data):
|
||||
"""
|
||||
|
||||
return provisioner_format_profile_list(CLI_CONFIG, data)
|
||||
|
||||
|
||||
def cli_provisioner_status_format_pretty(CLI_CONFIG, data):
|
||||
"""
|
||||
Pretty format the output of cli_provisioner_status
|
||||
"""
|
||||
|
||||
return provisioner_format_task_status(CLI_CONFIG, data)
|
||||
|
@ -65,36 +65,46 @@ def cli_node_waiter(config, node, state_field, state_value):
|
||||
echo(config, f" done. [{int(t_end - t_start)}s]")
|
||||
|
||||
|
||||
def wait_for_celery_task(CLI_CONFIG, task_detail):
|
||||
def wait_for_celery_task(CLI_CONFIG, task_detail, start_late=False):
|
||||
"""
|
||||
Wait for a Celery task to complete
|
||||
"""
|
||||
|
||||
task_id = task_detail["task_id"]
|
||||
run_on = task_detail["run_on"]
|
||||
task_name = task_detail["task_name"]
|
||||
|
||||
echo(CLI_CONFIG, f"Task ID: {task_id} assigned to node {run_on}")
|
||||
echo(CLI_CONFIG, "")
|
||||
if not start_late:
|
||||
run_on = task_detail["run_on"]
|
||||
|
||||
# Wait for the task to start
|
||||
echo(CLI_CONFIG, "Waiting for task to start...", newline=False)
|
||||
while True:
|
||||
sleep(0.5)
|
||||
echo(CLI_CONFIG, f"Task ID: {task_id} ({task_name}) assigned to node {run_on}")
|
||||
echo(CLI_CONFIG, "")
|
||||
|
||||
# Wait for the task to start
|
||||
echo(CLI_CONFIG, "Waiting for task to start...", newline=False)
|
||||
while True:
|
||||
sleep(0.5)
|
||||
task_status = pvc.lib.common.task_status(
|
||||
CLI_CONFIG, task_id=task_id, is_watching=True
|
||||
)
|
||||
if task_status.get("state") != "PENDING":
|
||||
break
|
||||
echo(CLI_CONFIG, ".", newline=False)
|
||||
echo(CLI_CONFIG, " done.")
|
||||
echo(CLI_CONFIG, "")
|
||||
|
||||
echo(
|
||||
CLI_CONFIG,
|
||||
task_status.get("status") + ":",
|
||||
)
|
||||
else:
|
||||
task_status = pvc.lib.common.task_status(
|
||||
CLI_CONFIG, task_id=task_id, is_watching=True
|
||||
)
|
||||
if task_status.get("state") != "PENDING":
|
||||
break
|
||||
echo(CLI_CONFIG, ".", newline=False)
|
||||
echo(CLI_CONFIG, " done.")
|
||||
echo(CLI_CONFIG, "")
|
||||
|
||||
echo(CLI_CONFIG, f"Watching existing task {task_id} ({task_name}):")
|
||||
|
||||
# Start following the task state, updating progress as we go
|
||||
total_task = task_status.get("total")
|
||||
echo(
|
||||
CLI_CONFIG,
|
||||
task_status.get("status") + ":",
|
||||
)
|
||||
with progressbar(length=total_task, show_eta=False) as bar:
|
||||
last_task = 0
|
||||
maxlen = 21
|
||||
@ -132,60 +142,3 @@ def wait_for_celery_task(CLI_CONFIG, task_detail):
|
||||
retdata = task_status.get("state") + ": " + task_status.get("status")
|
||||
|
||||
return retdata
|
||||
|
||||
|
||||
def wait_for_provisioner(CLI_CONFIG, task_id):
|
||||
"""
|
||||
Wait for a provisioner task to complete
|
||||
"""
|
||||
|
||||
echo(CLI_CONFIG, f"Task ID: {task_id}")
|
||||
echo(CLI_CONFIG, "")
|
||||
|
||||
# Wait for the task to start
|
||||
echo(CLI_CONFIG, "Waiting for task to start...", newline=False)
|
||||
while True:
|
||||
sleep(1)
|
||||
task_status = pvc.lib.provisioner.task_status(
|
||||
CLI_CONFIG, task_id, is_watching=True
|
||||
)
|
||||
if task_status.get("state") != "PENDING":
|
||||
break
|
||||
echo(CLI_CONFIG, ".", newline=False)
|
||||
echo(CLI_CONFIG, " done.")
|
||||
echo(CLI_CONFIG, "")
|
||||
|
||||
# Start following the task state, updating progress as we go
|
||||
total_task = task_status.get("total")
|
||||
with progressbar(length=total_task, show_eta=False) as bar:
|
||||
last_task = 0
|
||||
maxlen = 0
|
||||
while True:
|
||||
sleep(1)
|
||||
if task_status.get("state") != "RUNNING":
|
||||
break
|
||||
if task_status.get("current") > last_task:
|
||||
current_task = int(task_status.get("current"))
|
||||
bar.update(current_task - last_task)
|
||||
last_task = current_task
|
||||
# The extensive spaces at the end cause this to overwrite longer previous messages
|
||||
curlen = len(str(task_status.get("status")))
|
||||
if curlen > maxlen:
|
||||
maxlen = curlen
|
||||
lendiff = maxlen - curlen
|
||||
overwrite_whitespace = " " * lendiff
|
||||
echo(
|
||||
CLI_CONFIG,
|
||||
" " + task_status.get("status") + overwrite_whitespace,
|
||||
newline=False,
|
||||
)
|
||||
task_status = pvc.lib.provisioner.task_status(
|
||||
CLI_CONFIG, task_id, is_watching=True
|
||||
)
|
||||
if task_status.get("state") == "SUCCESS":
|
||||
bar.update(total_task - last_task)
|
||||
|
||||
echo(CLI_CONFIG, "")
|
||||
retdata = task_status.get("state") + ": " + task_status.get("status")
|
||||
|
||||
return retdata
|
||||
|
@ -202,6 +202,24 @@ def call_api(
|
||||
return response
|
||||
|
||||
|
||||
def get_wait_retdata(response, wait_flag):
|
||||
if response.status_code == 202:
|
||||
retvalue = True
|
||||
retjson = response.json()
|
||||
if not wait_flag:
|
||||
retdata = (
|
||||
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
|
||||
)
|
||||
else:
|
||||
# Just return the task JSON without formatting
|
||||
retdata = response.json()
|
||||
else:
|
||||
retvalue = False
|
||||
retdata = response.json().get("message", "")
|
||||
|
||||
return retvalue, retdata
|
||||
|
||||
|
||||
def task_status(config, task_id=None, is_watching=False):
|
||||
"""
|
||||
Get information about Celery job {task_id}, or all tasks if None
|
||||
@ -249,6 +267,7 @@ def task_status(config, task_id=None, is_watching=False):
|
||||
task["type"] = task_type
|
||||
task["worker"] = task_host
|
||||
task["id"] = task_job.get("id")
|
||||
task["name"] = task_job.get("name")
|
||||
try:
|
||||
task["args"] = literal_eval(task_job.get("args"))
|
||||
except Exception:
|
||||
|
@ -25,8 +25,7 @@ from requests_toolbelt.multipart.encoder import (
|
||||
)
|
||||
|
||||
import pvc.lib.ansiprint as ansiprint
|
||||
from pvc.lib.common import UploadProgressBar, call_api
|
||||
from ast import literal_eval
|
||||
from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata
|
||||
|
||||
|
||||
#
|
||||
@ -700,7 +699,7 @@ def profile_remove(config, name):
|
||||
return retvalue, response.json().get("message", "")
|
||||
|
||||
|
||||
def vm_create(config, name, profile, wait_flag, define_flag, start_flag, script_args):
|
||||
def vm_create(config, name, profile, define_flag, start_flag, script_args, wait_flag):
|
||||
"""
|
||||
Create a new VM named {name} with profile {profile}
|
||||
|
||||
@ -717,85 +716,7 @@ def vm_create(config, name, profile, wait_flag, define_flag, start_flag, script_
|
||||
}
|
||||
response = call_api(config, "post", "/provisioner/create", params=params)
|
||||
|
||||
if response.status_code == 202:
|
||||
retvalue = True
|
||||
if not wait_flag:
|
||||
retdata = "Task ID: {}".format(response.json()["task_id"])
|
||||
else:
|
||||
# Just return the task_id raw, instead of formatting it
|
||||
retdata = response.json()["task_id"]
|
||||
else:
|
||||
retvalue = False
|
||||
retdata = response.json().get("message", "")
|
||||
|
||||
return retvalue, retdata
|
||||
|
||||
|
||||
def task_status(config, task_id=None, is_watching=False):
|
||||
"""
|
||||
Get information about provisioner job {task_id} or all tasks if None
|
||||
|
||||
API endpoint: GET /api/v1/provisioner/status
|
||||
API arguments:
|
||||
API schema: {json_data_object}
|
||||
"""
|
||||
if task_id is not None:
|
||||
response = call_api(
|
||||
config, "get", "/provisioner/status/{task_id}".format(task_id=task_id)
|
||||
)
|
||||
else:
|
||||
response = call_api(config, "get", "/provisioner/status")
|
||||
|
||||
if task_id is not None:
|
||||
if response.status_code == 200:
|
||||
retvalue = True
|
||||
respjson = response.json()
|
||||
if is_watching:
|
||||
# Just return the raw JSON to the watching process instead of including value
|
||||
return respjson
|
||||
else:
|
||||
return retvalue, respjson
|
||||
else:
|
||||
retvalue = False
|
||||
retdata = response.json().get("message", "")
|
||||
else:
|
||||
retvalue = True
|
||||
task_data_raw = response.json()
|
||||
# Format the Celery data into a more useful data structure
|
||||
task_data = list()
|
||||
for task_type in ["active", "reserved", "scheduled"]:
|
||||
try:
|
||||
type_data = task_data_raw[task_type]
|
||||
except Exception:
|
||||
type_data = None
|
||||
|
||||
if not type_data:
|
||||
type_data = dict()
|
||||
for task_host in type_data:
|
||||
for task_job in task_data_raw[task_type][task_host]:
|
||||
task = dict()
|
||||
if task_type == "reserved":
|
||||
task["type"] = "pending"
|
||||
else:
|
||||
task["type"] = task_type
|
||||
task["worker"] = task_host
|
||||
task["id"] = task_job.get("id")
|
||||
try:
|
||||
task_args = literal_eval(task_job.get("args"))
|
||||
except Exception:
|
||||
task_args = task_job.get("args")
|
||||
task["vm_name"] = task_args[0]
|
||||
task["vm_profile"] = task_args[1]
|
||||
try:
|
||||
task_kwargs = literal_eval(task_job.get("kwargs"))
|
||||
except Exception:
|
||||
task_kwargs = task_job.get("kwargs")
|
||||
task["vm_define"] = str(bool(task_kwargs["define_vm"]))
|
||||
task["vm_start"] = str(bool(task_kwargs["start_vm"]))
|
||||
task_data.append(task)
|
||||
retdata = task_data
|
||||
|
||||
return retvalue, retdata
|
||||
return get_wait_retdata(response, wait_flag)
|
||||
|
||||
|
||||
#
|
||||
@ -1862,158 +1783,3 @@ def format_list_profile(config, profile_data):
|
||||
)
|
||||
|
||||
return "\n".join(profile_list_output)
|
||||
|
||||
|
||||
def format_list_task(config, task_data):
|
||||
if not isinstance(task_data, list):
|
||||
job_state = task_data["state"]
|
||||
if job_state == "RUNNING":
|
||||
retdata = "Job state: RUNNING\nStage: {}/{}\nStatus: {}".format(
|
||||
task_data["current"], task_data["total"], task_data["status"]
|
||||
)
|
||||
elif job_state == "FAILED":
|
||||
retdata = "Job state: FAILED\nStatus: {}".format(task_data["status"])
|
||||
elif job_state == "COMPLETED":
|
||||
retdata = "Job state: COMPLETED\nStatus: {}".format(task_data["status"])
|
||||
else:
|
||||
retdata = "Job state: {}\nStatus: {}".format(
|
||||
task_data["state"], task_data["status"]
|
||||
)
|
||||
return retdata
|
||||
|
||||
task_list_output = []
|
||||
|
||||
# Determine optimal column widths
|
||||
task_id_length = 7
|
||||
task_type_length = 7
|
||||
task_worker_length = 7
|
||||
task_vm_name_length = 5
|
||||
task_vm_profile_length = 8
|
||||
task_vm_define_length = 8
|
||||
task_vm_start_length = 7
|
||||
|
||||
for task in task_data:
|
||||
# task_id column
|
||||
_task_id_length = len(str(task["id"])) + 1
|
||||
if _task_id_length > task_id_length:
|
||||
task_id_length = _task_id_length
|
||||
# task_worker column
|
||||
_task_worker_length = len(str(task["worker"])) + 1
|
||||
if _task_worker_length > task_worker_length:
|
||||
task_worker_length = _task_worker_length
|
||||
# task_type column
|
||||
_task_type_length = len(str(task["type"])) + 1
|
||||
if _task_type_length > task_type_length:
|
||||
task_type_length = _task_type_length
|
||||
# task_vm_name column
|
||||
_task_vm_name_length = len(str(task["vm_name"])) + 1
|
||||
if _task_vm_name_length > task_vm_name_length:
|
||||
task_vm_name_length = _task_vm_name_length
|
||||
# task_vm_profile column
|
||||
_task_vm_profile_length = len(str(task["vm_profile"])) + 1
|
||||
if _task_vm_profile_length > task_vm_profile_length:
|
||||
task_vm_profile_length = _task_vm_profile_length
|
||||
# task_vm_define column
|
||||
_task_vm_define_length = len(str(task["vm_define"])) + 1
|
||||
if _task_vm_define_length > task_vm_define_length:
|
||||
task_vm_define_length = _task_vm_define_length
|
||||
# task_vm_start column
|
||||
_task_vm_start_length = len(str(task["vm_start"])) + 1
|
||||
if _task_vm_start_length > task_vm_start_length:
|
||||
task_vm_start_length = _task_vm_start_length
|
||||
|
||||
# Format the string (header)
|
||||
task_list_output.append(
|
||||
"{bold}{task_header: <{task_header_length}} {vms_header: <{vms_header_length}}{end_bold}".format(
|
||||
bold=ansiprint.bold(),
|
||||
end_bold=ansiprint.end(),
|
||||
task_header_length=task_id_length
|
||||
+ task_type_length
|
||||
+ task_worker_length
|
||||
+ 2,
|
||||
vms_header_length=task_vm_name_length
|
||||
+ task_vm_profile_length
|
||||
+ task_vm_define_length
|
||||
+ task_vm_start_length
|
||||
+ 3,
|
||||
task_header="Tasks "
|
||||
+ "".join(
|
||||
[
|
||||
"-"
|
||||
for _ in range(
|
||||
6, task_id_length + task_type_length + task_worker_length + 1
|
||||
)
|
||||
]
|
||||
),
|
||||
vms_header="VM Details "
|
||||
+ "".join(
|
||||
[
|
||||
"-"
|
||||
for _ in range(
|
||||
11,
|
||||
task_vm_name_length
|
||||
+ task_vm_profile_length
|
||||
+ task_vm_define_length
|
||||
+ task_vm_start_length
|
||||
+ 2,
|
||||
)
|
||||
]
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
task_list_output.append(
|
||||
"{bold}{task_id: <{task_id_length}} {task_type: <{task_type_length}} \
|
||||
{task_worker: <{task_worker_length}} \
|
||||
{task_vm_name: <{task_vm_name_length}} \
|
||||
{task_vm_profile: <{task_vm_profile_length}} \
|
||||
{task_vm_define: <{task_vm_define_length}} \
|
||||
{task_vm_start: <{task_vm_start_length}}{end_bold}".format(
|
||||
task_id_length=task_id_length,
|
||||
task_type_length=task_type_length,
|
||||
task_worker_length=task_worker_length,
|
||||
task_vm_name_length=task_vm_name_length,
|
||||
task_vm_profile_length=task_vm_profile_length,
|
||||
task_vm_define_length=task_vm_define_length,
|
||||
task_vm_start_length=task_vm_start_length,
|
||||
bold=ansiprint.bold(),
|
||||
end_bold=ansiprint.end(),
|
||||
task_id="Job ID",
|
||||
task_type="Status",
|
||||
task_worker="Worker",
|
||||
task_vm_name="Name",
|
||||
task_vm_profile="Profile",
|
||||
task_vm_define="Define?",
|
||||
task_vm_start="Start?",
|
||||
)
|
||||
)
|
||||
|
||||
# Format the string (elements)
|
||||
for task in sorted(task_data, key=lambda i: i.get("type", None)):
|
||||
task_list_output.append(
|
||||
"{bold}{task_id: <{task_id_length}} {task_type: <{task_type_length}} \
|
||||
{task_worker: <{task_worker_length}} \
|
||||
{task_vm_name: <{task_vm_name_length}} \
|
||||
{task_vm_profile: <{task_vm_profile_length}} \
|
||||
{task_vm_define: <{task_vm_define_length}} \
|
||||
{task_vm_start: <{task_vm_start_length}}{end_bold}".format(
|
||||
task_id_length=task_id_length,
|
||||
task_type_length=task_type_length,
|
||||
task_worker_length=task_worker_length,
|
||||
task_vm_name_length=task_vm_name_length,
|
||||
task_vm_profile_length=task_vm_profile_length,
|
||||
task_vm_define_length=task_vm_define_length,
|
||||
task_vm_start_length=task_vm_start_length,
|
||||
bold="",
|
||||
end_bold="",
|
||||
task_id=task["id"],
|
||||
task_type=task["type"],
|
||||
task_worker=task["worker"],
|
||||
task_vm_name=task["vm_name"],
|
||||
task_vm_profile=task["vm_profile"],
|
||||
task_vm_define=task["vm_define"],
|
||||
task_vm_start=task["vm_start"],
|
||||
)
|
||||
)
|
||||
|
||||
return "\n".join(task_list_output)
|
||||
|
@ -29,7 +29,7 @@ from requests_toolbelt.multipart.encoder import (
|
||||
)
|
||||
|
||||
import pvc.lib.ansiprint as ansiprint
|
||||
from pvc.lib.common import UploadProgressBar, call_api
|
||||
from pvc.lib.common import UploadProgressBar, call_api, get_wait_retdata
|
||||
|
||||
#
|
||||
# Supplemental functions
|
||||
@ -175,21 +175,7 @@ def ceph_osd_db_vg_add(config, node, device, wait_flag):
|
||||
params = {"node": node, "device": device}
|
||||
response = call_api(config, "post", "/storage/ceph/osddb", params=params)
|
||||
|
||||
if response.status_code == 202:
|
||||
retvalue = True
|
||||
retjson = response.json()
|
||||
if not wait_flag:
|
||||
retdata = (
|
||||
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
|
||||
)
|
||||
else:
|
||||
# Just return the task JSON without formatting
|
||||
retdata = response.json()
|
||||
else:
|
||||
retvalue = False
|
||||
retdata = response.json().get("message", "")
|
||||
|
||||
return retvalue, retdata
|
||||
return get_wait_retdata(response, wait_flag)
|
||||
|
||||
|
||||
#
|
||||
@ -265,21 +251,7 @@ def ceph_osd_add(
|
||||
|
||||
response = call_api(config, "post", "/storage/ceph/osd", params=params)
|
||||
|
||||
if response.status_code == 202:
|
||||
retvalue = True
|
||||
retjson = response.json()
|
||||
if not wait_flag:
|
||||
retdata = (
|
||||
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
|
||||
)
|
||||
else:
|
||||
# Just return the task JSON without formatting
|
||||
retdata = response.json()
|
||||
else:
|
||||
retvalue = False
|
||||
retdata = response.json().get("message", "")
|
||||
|
||||
return retvalue, retdata
|
||||
return get_wait_retdata(response, wait_flag)
|
||||
|
||||
|
||||
def ceph_osd_replace(
|
||||
@ -308,21 +280,7 @@ def ceph_osd_replace(
|
||||
|
||||
response = call_api(config, "post", f"/storage/ceph/osd/{osdid}", params=params)
|
||||
|
||||
if response.status_code == 202:
|
||||
retvalue = True
|
||||
retjson = response.json()
|
||||
if not wait_flag:
|
||||
retdata = (
|
||||
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
|
||||
)
|
||||
else:
|
||||
# Just return the task JSON without formatting
|
||||
retdata = response.json()
|
||||
else:
|
||||
retvalue = False
|
||||
retdata = response.json().get("message", "")
|
||||
|
||||
return retvalue, retdata
|
||||
return get_wait_retdata(response, wait_flag)
|
||||
|
||||
|
||||
def ceph_osd_refresh(config, osdid, device, wait_flag):
|
||||
@ -338,21 +296,7 @@ def ceph_osd_refresh(config, osdid, device, wait_flag):
|
||||
}
|
||||
response = call_api(config, "put", f"/storage/ceph/osd/{osdid}", params=params)
|
||||
|
||||
if response.status_code == 202:
|
||||
retvalue = True
|
||||
retjson = response.json()
|
||||
if not wait_flag:
|
||||
retdata = (
|
||||
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
|
||||
)
|
||||
else:
|
||||
# Just return the task JSON without formatting
|
||||
retdata = response.json()
|
||||
else:
|
||||
retvalue = False
|
||||
retdata = response.json().get("message", "")
|
||||
|
||||
return retvalue, retdata
|
||||
return get_wait_retdata(response, wait_flag)
|
||||
|
||||
|
||||
def ceph_osd_remove(config, osdid, force_flag, wait_flag):
|
||||
@ -368,21 +312,7 @@ def ceph_osd_remove(config, osdid, force_flag, wait_flag):
|
||||
config, "delete", "/storage/ceph/osd/{osdid}".format(osdid=osdid), params=params
|
||||
)
|
||||
|
||||
if response.status_code == 202:
|
||||
retvalue = True
|
||||
retjson = response.json()
|
||||
if not wait_flag:
|
||||
retdata = (
|
||||
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
|
||||
)
|
||||
else:
|
||||
# Just return the task JSON without formatting
|
||||
retdata = response.json()
|
||||
else:
|
||||
retvalue = False
|
||||
retdata = response.json().get("message", "")
|
||||
|
||||
return retvalue, retdata
|
||||
return get_wait_retdata(response, wait_flag)
|
||||
|
||||
|
||||
def ceph_osd_state(config, osdid, state):
|
||||
@ -1765,7 +1695,7 @@ def format_list_snapshot(config, snapshot_list):
|
||||
#
|
||||
# Benchmark functions
|
||||
#
|
||||
def ceph_benchmark_run(config, pool):
|
||||
def ceph_benchmark_run(config, pool, wait_flag):
|
||||
"""
|
||||
Run a storage benchmark against {pool}
|
||||
|
||||
@ -1776,14 +1706,7 @@ def ceph_benchmark_run(config, pool):
|
||||
params = {"pool": pool}
|
||||
response = call_api(config, "post", "/storage/ceph/benchmark", params=params)
|
||||
|
||||
if response.status_code == 202:
|
||||
retvalue = True
|
||||
retdata = "Task ID: {}".format(response.json()["task_id"])
|
||||
else:
|
||||
retvalue = False
|
||||
retdata = response.json().get("message", "")
|
||||
|
||||
return retvalue, retdata
|
||||
return get_wait_retdata(response, wait_flag)
|
||||
|
||||
|
||||
def ceph_benchmark_list(config, job):
|
||||
|
@ -23,7 +23,7 @@ import time
|
||||
import re
|
||||
|
||||
import pvc.lib.ansiprint as ansiprint
|
||||
from pvc.lib.common import call_api, format_bytes, format_metric
|
||||
from pvc.lib.common import call_api, format_bytes, format_metric, get_wait_retdata
|
||||
|
||||
|
||||
#
|
||||
@ -425,21 +425,7 @@ def vm_locks(config, vm, wait_flag):
|
||||
"""
|
||||
response = call_api(config, "post", f"/vm/{vm}/locks")
|
||||
|
||||
if response.status_code == 202:
|
||||
retvalue = True
|
||||
retjson = response.json()
|
||||
if not wait_flag:
|
||||
retdata = (
|
||||
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
|
||||
)
|
||||
else:
|
||||
# Just return the task JSON without formatting
|
||||
retdata = response.json()
|
||||
else:
|
||||
retvalue = False
|
||||
retdata = response.json().get("message", "")
|
||||
|
||||
return retvalue, retdata
|
||||
return get_wait_retdata(response, wait_flag)
|
||||
|
||||
|
||||
def vm_backup(config, vm, backup_path, incremental_parent=None, retain_snapshot=False):
|
||||
|
@ -41,11 +41,17 @@ def start(celery, msg, current=0, total=1):
|
||||
sleep(1)
|
||||
|
||||
|
||||
def fail(celery, msg, current=1, total=1):
|
||||
def fail(celery, msg, exception=None, current=1, total=1):
|
||||
if exception is None:
|
||||
exception = TaskFailure
|
||||
|
||||
msg = f"{type(exception()).__name__}: {msg}"
|
||||
|
||||
logger = getLogger(__name__)
|
||||
logger.error(msg)
|
||||
|
||||
sys.tracebacklimit = 0
|
||||
raise TaskFailure(msg)
|
||||
raise exception(msg)
|
||||
|
||||
|
||||
def log_info(celery, msg):
|
||||
|
@ -22,7 +22,6 @@
|
||||
import gevent.pywsgi
|
||||
import flask
|
||||
import sys
|
||||
import time
|
||||
import psycopg2
|
||||
|
||||
from threading import Thread
|
||||
@ -123,9 +122,7 @@ class MetadataAPIInstance(object):
|
||||
self.logger.out("Stopping Metadata API at 169.254.169.254:80", state="i")
|
||||
try:
|
||||
self.md_http_server.stop()
|
||||
time.sleep(0.1)
|
||||
self.md_http_server.close()
|
||||
time.sleep(0.1)
|
||||
self.md_http_server = None
|
||||
self.logger.out("Successfully stopped Metadata API", state="o")
|
||||
except Exception as e:
|
||||
|
@ -610,8 +610,7 @@ class NodeInstance(object):
|
||||
# 6. Start client API
|
||||
if self.config["enable_api"]:
|
||||
self.logger.out("Starting PVC API client service", state="i")
|
||||
common.run_os_command("systemctl enable pvcapid.service")
|
||||
common.run_os_command("systemctl start pvcapid.service")
|
||||
common.run_os_command("systemctl enable --now pvcapid.service")
|
||||
# 7. Start metadata API; just continue if we fail
|
||||
self.metadata_api.start()
|
||||
# 8. Start DHCP servers
|
||||
@ -670,13 +669,14 @@ class NodeInstance(object):
|
||||
self.zkhandler.write([("base.config.primary_node.sync_lock", "")])
|
||||
lock.release()
|
||||
self.logger.out("Released write lock for synchronization phase B", state="o")
|
||||
# 3. Stop client API
|
||||
# 3. Stop metadata API
|
||||
self.metadata_api.stop()
|
||||
# 4. Stop client API
|
||||
if self.config["enable_api"]:
|
||||
self.logger.out("Stopping PVC API client service", state="i")
|
||||
common.run_os_command("systemctl stop pvcapid.service")
|
||||
common.run_os_command("systemctl disable pvcapid.service")
|
||||
# 4. Stop metadata API
|
||||
self.metadata_api.stop()
|
||||
common.run_os_command(
|
||||
"systemctl disable --now pvcapid.service", background=True
|
||||
)
|
||||
time.sleep(0.1) # Time fir new writer to acquire the lock
|
||||
|
||||
# Synchronize nodes C (I am reader)
|
||||
|
@ -78,7 +78,7 @@ def start_keydb(logger, config):
|
||||
|
||||
def start_api_worker(logger, config):
|
||||
if config["enable_api"]:
|
||||
logger.out("Starting API worker daemon", state="i")
|
||||
logger.out("Starting Celery Worker daemon", state="i")
|
||||
# TODO: Move our handling out of Systemd and integrate it directly as a subprocess?
|
||||
common.run_os_command("systemctl start pvcworkerd.service")
|
||||
|
||||
|
Reference in New Issue
Block a user