pvc/api-daemon/pvcapid/vmbuilder.py
Joshua M. Boniface b58fa06f67 Add OVA script support
1. Ensure that system_template and script are not nullable in the DB.
2. Ensure that the CLI and API enforce the above and clean up CLI
arguments for profile add.
3. Ensure that, before uploading OVAs, a 'default_ova' provisioning
script is present.
4. Use the 'default_ova' script for new OVA uploads.
5. Ensure that OVA details are properly added to the vm_data dict in the
provisioner vmbuilder.
2022-10-06 10:48:12 -04:00

772 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
# vmbuilder.py - pvc api vm builder (provisioner) functions
# part of the parallel virtual cluster (pvc) system
#
# copyright (c) 2018-2022 joshua m. boniface <joshua@boniface.me>
#
# this program is free software: you can redistribute it and/or modify
# it under the terms of the gnu general public license as published by
# the free software foundation, version 3.
#
# this program is distributed in the hope that it will be useful,
# but without any warranty; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import json
import psycopg2
import psycopg2.extras
import re
import os
# import sys
import time
import importlib.util
import uuid
from contextlib import contextmanager
from pvcapid.Daemon import config
from daemon_lib.zkhandler import ZKHandler
import daemon_lib.common as pvc_common
import daemon_lib.node as pvc_node
import daemon_lib.vm as pvc_vm
import daemon_lib.network as pvc_network
import daemon_lib.ceph as pvc_ceph
#
# Exceptions (used by Celery tasks)
#
class ValidationError(Exception):
"""
An exception that results from some value being un- or mis-defined.
"""
pass
class ClusterError(Exception):
"""
An exception that results from the PVC cluster being out of alignment with the action.
"""
pass
class ProvisioningError(Exception):
"""
An exception that results from a failure of a provisioning command.
"""
pass
#
# VMBuilder class - subclassed by install scripts
#
class VMBuilder(object):
def __init__(
self,
vm_name,
vm_id,
vm_profile,
vm_data,
):
self.vm_name = vm_name
self.vm_id = vm_id
self.vm_uuid = uuid.uuid4()
self.vm_profile = vm_profile
self.vm_data = vm_data
#
# Primary class functions; implemented by the individual scripts
#
def setup(self):
"""
setup(): Perform special setup steps before proceeding
OPTIONAL
"""
pass
def create(self):
"""
create(): Create the VM libvirt schema definition which is defined afterwards
"""
pass
def prepare(self):
"""
prepare(): Prepare any disks/volumes for the install step
"""
pass
def install(self):
"""
install(): Perform the installation
"""
pass
def cleanup(self):
"""
cleanup(): Perform any cleanup required after the prepare() step or on failure of the install() step
"""
pass
#
# Helper functions (as context managers)
#
@contextmanager
def chroot(destination):
"""
Change root directory to a given destination
"""
try:
real_root = os.open("/", os.O_RDONLY)
os.chroot(destination)
fake_root = os.open("/", os.O_RDONLY)
os.fchdir(fake_root)
yield
except Exception:
raise
finally:
os.fchdir(real_root)
os.chroot(".")
os.fchdir(real_root)
os.close(fake_root)
os.close(real_root)
del fake_root
del real_root
@contextmanager
def open_db(config):
try:
conn = psycopg2.connect(
host=config["database_host"],
port=config["database_port"],
dbname=config["database_name"],
user=config["database_user"],
password=config["database_password"],
)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
except Exception:
raise ClusterError("Failed to connect to Postgres")
try:
yield cur
except Exception:
raise
finally:
conn.commit()
cur.close()
conn.close()
del conn
@contextmanager
def open_zk(config):
try:
zkhandler = ZKHandler(config)
zkhandler.connect()
except Exception:
raise ClusterError("Failed to connect to Zookeeper")
try:
yield zkhandler
except Exception:
raise
finally:
zkhandler.disconnect()
del zkhandler
#
# Main VM provisioning function - executed by the Celery worker
#
def create_vm(
self, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]
):
print(f"Starting provisioning of VM '{vm_name}' with profile '{vm_profile}'")
# Phase 1 - setup
# * Get the profile elements
# * Get the details from these elements
# * Assemble a VM configuration dictionary
self.update_state(
state="RUNNING",
meta={"current": 1, "total": 10, "status": "Collecting configuration"},
)
time.sleep(1)
vm_id = re.findall(r"/(\d+)$/", vm_name)
if not vm_id:
vm_id = 0
else:
vm_id = vm_id[0]
vm_data = dict()
with open_db(config) as db_cur:
# Get the profile information
query = "SELECT * FROM profile WHERE name = %s"
args = (vm_profile,)
db_cur.execute(query, args)
profile_data = db_cur.fetchone()
if profile_data.get("arguments"):
vm_data["script_arguments"] = profile_data.get("arguments").split("|")
else:
vm_data["script_arguments"] = []
# Get the system details
query = "SELECT * FROM system_template WHERE id = %s"
args = (profile_data["system_template"],)
db_cur.execute(query, args)
vm_data["system_details"] = db_cur.fetchone()
# Get the MAC template
query = "SELECT mac_template FROM network_template WHERE id = %s"
args = (profile_data["network_template"],)
db_cur.execute(query, args)
db_row = db_cur.fetchone()
if db_row:
vm_data["mac_template"] = db_row.get("mac_template")
else:
vm_data["mac_template"] = None
# Get the networks
query = "SELECT * FROM network WHERE network_template = %s"
args = (profile_data["network_template"],)
db_cur.execute(query, args)
_vm_networks = db_cur.fetchall()
vm_networks = list()
# Set the eth_bridge for each network
for network in _vm_networks:
vni = network["vni"]
if vni in ["upstream", "cluster", "storage"]:
eth_bridge = "br{}".format(vni)
else:
eth_bridge = "vmbr{}".format(vni)
network["eth_bridge"] = eth_bridge
vm_networks.append(network)
vm_data["networks"] = vm_networks
# Get the storage volumes
# ORDER BY ensures disks are always in the sdX/vdX order, regardless of add order
query = "SELECT * FROM storage WHERE storage_template = %s ORDER BY disk_id"
args = (profile_data["storage_template"],)
db_cur.execute(query, args)
vm_data["volumes"] = db_cur.fetchall()
# Get the script
query = "SELECT script FROM script WHERE id = %s"
args = (profile_data["script"],)
db_cur.execute(query, args)
db_row = db_cur.fetchone()
if db_row:
vm_data["script"] = db_row.get("script")
else:
vm_data["script"] = None
if profile_data.get("profile_type") == "ova":
query = "SELECT * FROM ova WHERE id = %s"
args = (profile_data["ova"],)
db_cur.execute(query, args)
vm_data["ova_details"] = db_cur.fetchone()
query = "SELECT * FROM ova_volume WHERE ova = %s"
args = (profile_data["ova"],)
db_cur.execute(query, args)
# Replace the existing volumes list with our OVA volume list
vm_data["volumes"] = db_cur.fetchall()
retcode, stdout, stderr = pvc_common.run_os_command("uname -m")
vm_data["system_architecture"] = stdout.strip()
monitor_list = list()
coordinator_names = config["storage_hosts"]
for coordinator in coordinator_names:
monitor_list.append("{}.{}".format(coordinator, config["storage_domain"]))
vm_data["ceph_monitor_list"] = monitor_list
vm_data["ceph_monitor_port"] = config["ceph_monitor_port"]
vm_data["ceph_monitor_secret"] = config["ceph_storage_secret_uuid"]
# Parse the script arguments
script_arguments = dict()
for argument in vm_data["script_arguments"]:
argument_name, argument_data = argument.split("=")
script_arguments[argument_name] = argument_data
# Parse the runtime arguments
if script_run_args is not None:
for argument in script_run_args:
argument_name, argument_data = argument.split("=")
script_arguments[argument_name] = argument_data
print("Script arguments: {}".format(script_arguments))
vm_data["script_arguments"] = script_arguments
print(
"VM configuration data:\n{}".format(
json.dumps(vm_data, sort_keys=True, indent=2)
)
)
# Phase 2 - verification
# * Ensure that at least one node has enough free RAM to hold the VM (becomes main host)
# * Ensure that all networks are valid
# * Ensure that there is enough disk space in the Ceph cluster for the disks
# This is the "safe fail" step when an invalid configuration will be caught
self.update_state(
state="RUNNING",
meta={
"current": 2,
"total": 10,
"status": "Verifying configuration against cluster",
},
)
time.sleep(1)
with open_zk(config) as zkhandler:
# Verify that a VM with this name does not already exist
if pvc_vm.searchClusterByName(zkhandler, vm_name):
raise ClusterError(
"A VM with the name '{}' already exists in the cluster.".format(vm_name)
)
# Verify that at least one host has enough free RAM to run the VM
_discard, nodes = pvc_node.get_list(zkhandler, None)
target_node = None
last_free = 0
for node in nodes:
# Skip the node if it is not ready to run VMs
if node["daemon_state"] != "run" or node["domain_state"] != "ready":
continue
# Skip the node if its free memory is less than the new VM's size, plus a 512MB buffer
if node["memory"]["free"] < (vm_data["system_details"]["vram_mb"] + 512):
continue
# If this node has the most free, use it
if node["memory"]["free"] > last_free:
last_free = node["memory"]["free"]
target_node = node["name"]
# Raise if no node was found
if not target_node:
raise ClusterError(
"No ready cluster node contains at least {}+512 MB of free RAM.".format(
vm_data["system_details"]["vram_mb"]
)
)
print(
'Selecting target node "{}" with "{}" MB free RAM'.format(
target_node, last_free
)
)
# Verify that all configured networks are present on the cluster
cluster_networks, _discard = pvc_network.getClusterNetworkList(zkhandler)
for network in vm_data["networks"]:
vni = str(network["vni"])
if vni not in cluster_networks and vni not in [
"upstream",
"cluster",
"storage",
]:
raise ClusterError(
'The network VNI "{}" is not present on the cluster.'.format(vni)
)
print("All configured networks for VM are valid")
# Verify that there is enough disk space free to provision all VM disks
pools = dict()
for volume in vm_data["volumes"]:
if volume.get("source_volume") is not None:
volume_data = pvc_ceph.getVolumeInformation(
zkhandler, volume["pool"], volume["source_volume"]
)
if not volume_data:
raise ClusterError(
"The source volume {}/{} could not be found.".format(
volume["pool"], volume["source_volume"]
)
)
if not volume["pool"] in pools:
pools[volume["pool"]] = int(
pvc_ceph.format_bytes_fromhuman(volume_data["stats"]["size"])
/ 1024
/ 1024
/ 1024
)
else:
pools[volume["pool"]] += int(
pvc_ceph.format_bytes_fromhuman(volume_data["stats"]["size"])
/ 1024
/ 1024
/ 1024
)
else:
if not volume["pool"] in pools:
pools[volume["pool"]] = volume["disk_size_gb"]
else:
pools[volume["pool"]] += volume["disk_size_gb"]
for pool in pools:
try:
pool_information = pvc_ceph.getPoolInformation(zkhandler, pool)
if not pool_information:
raise
except Exception:
raise ClusterError(
'Pool "{}" is not present on the cluster.'.format(pool)
)
pool_free_space_gb = int(
pool_information["stats"]["free_bytes"] / 1024 / 1024 / 1024
)
pool_vm_usage_gb = int(pools[pool])
if pool_vm_usage_gb >= pool_free_space_gb:
raise ClusterError(
'Pool "{}" has only {} GB free and VM requires {} GB.'.format(
pool, pool_free_space_gb, pool_vm_usage_gb
)
)
print("There is enough space on cluster to store VM volumes")
# Verify that every specified filesystem is valid
used_filesystems = list()
for volume in vm_data["volumes"]:
if volume["source_volume"] is not None:
continue
if volume["filesystem"] and volume["filesystem"] not in used_filesystems:
used_filesystems.append(volume["filesystem"])
for filesystem in used_filesystems:
if filesystem == "swap":
retcode, stdout, stderr = pvc_common.run_os_command("which mkswap")
if retcode:
raise ProvisioningError(
"Failed to find binary for mkswap: {}".format(stderr)
)
else:
retcode, stdout, stderr = pvc_common.run_os_command(
"which mkfs.{}".format(filesystem)
)
if retcode:
raise ProvisioningError(
"Failed to find binary for mkfs.{}: {}".format(filesystem, stderr)
)
print("All selected filesystems are valid")
# Phase 3 - provisioning script preparation
# * Import the provisioning script as a library with importlib
# * Ensure the required function(s) are present
self.update_state(
state="RUNNING",
meta={"current": 3, "total": 10, "status": "Preparing provisioning script"},
)
time.sleep(1)
# Write the script out to a temporary file
retcode, stdout, stderr = pvc_common.run_os_command("mktemp")
if retcode:
raise ProvisioningError("Failed to create a temporary file: {}".format(stderr))
script_file = stdout.strip()
with open(script_file, "w") as fh:
fh.write(vm_data["script"])
fh.write("\n")
# Import the script file
loader = importlib.machinery.SourceFileLoader("installer_script", script_file)
spec = importlib.util.spec_from_loader(loader.name, loader)
installer_script = importlib.util.module_from_spec(spec)
spec.loader.exec_module(installer_script)
# Set up the VMBuilderScript object
vm_builder = installer_script.VMBuilderScript(
vm_name=vm_name,
vm_id=vm_id,
vm_profile=vm_profile,
vm_data=vm_data,
)
print("Provisioning script imported successfully")
# Create temporary directory for external chroot
retcode, stdout, stderr = pvc_common.run_os_command("mktemp -d")
if retcode:
raise ProvisioningError(f"Failed to create a temporary directory: {stderr}")
temp_dir = stdout.strip()
# Bind mount / to the chroot location /
retcode, stdout, stderr = pvc_common.run_os_command(
f"mount --bind --options ro / {temp_dir}"
)
if retcode:
raise ProvisioningError(
f"Failed to mount rootfs onto {temp_dir} for chroot: {stderr}"
)
# Mount tmpfs to the chroot location /tmp
retcode, stdout, stderr = pvc_common.run_os_command(
f"mount --type tmpfs tmpfs {temp_dir}/tmp"
)
if retcode:
raise ProvisioningError(
f"Failed to mount tmpfs onto {temp_dir}/tmp for chroot: {stderr}"
)
# Bind mount /dev to the chroot location /dev
retcode, stdout, stderr = pvc_common.run_os_command(
f"mount --bind --options ro /dev {temp_dir}/dev"
)
if retcode:
raise ProvisioningError(
f"Failed to mount devfs onto {temp_dir}/dev for chroot: {stderr}"
)
# Bind mount /run to the chroot location /run
retcode, stdout, stderr = pvc_common.run_os_command(
f"mount --bind --options rw /run {temp_dir}/run"
)
if retcode:
raise ProvisioningError(
f"Failed to mount runfs onto {temp_dir}/run for chroot: {stderr}"
)
# Bind mount /sys to the chroot location /sys
retcode, stdout, stderr = pvc_common.run_os_command(
f"mount --bind --options rw /sys {temp_dir}/sys"
)
if retcode:
raise ProvisioningError(
f"Failed to mount sysfs onto {temp_dir}/sys for chroot: {stderr}"
)
print("Chroot environment prepared successfully")
def general_cleanup():
print("Running upper cleanup steps")
try:
# Unmount bind-mounted devfs on the chroot
retcode, stdout, stderr = pvc_common.run_os_command(
f"umount {temp_dir}/dev"
)
# Unmount bind-mounted runfs on the chroot
retcode, stdout, stderr = pvc_common.run_os_command(
f"umount {temp_dir}/run"
)
# Unmount bind-mounted sysfs on the chroot
retcode, stdout, stderr = pvc_common.run_os_command(
f"umount {temp_dir}/sys"
)
# Unmount bind-mounted tmpfs on the chroot
retcode, stdout, stderr = pvc_common.run_os_command(
f"umount {temp_dir}/tmp"
)
# Unmount bind-mounted rootfs on the chroot
retcode, stdout, stderr = pvc_common.run_os_command(f"umount {temp_dir}")
except Exception as e:
# We don't care about fails during cleanup, log and continue
print(f"Suberror during general cleanup unmounts: {e}")
try:
# Remove the temp_dir
os.rmdir(temp_dir)
except Exception as e:
# We don't care about fails during cleanup, log and continue
print(f"Suberror during general cleanup directory removal: {e}")
try:
# Remote temporary script (don't fail if not removed)
os.remove(script_file)
except Exception as e:
# We don't care about fails during cleanup, log and continue
print(f"Suberror during general cleanup script removal: {e}")
# Phase 4 - script: setup()
# * Run pre-setup steps
self.update_state(
state="RUNNING",
meta={
"current": 4,
"total": 10,
"status": "Running script setup() step",
},
)
time.sleep(1)
print("Running script setup() step")
try:
with chroot(temp_dir):
vm_builder.setup()
except Exception as e:
general_cleanup()
raise ProvisioningError(f"Error in script setup() step: {e}")
# Phase 5 - script: create()
# * Prepare the libvirt XML defintion for the VM
self.update_state(
state="RUNNING",
meta={
"current": 5,
"total": 10,
"status": "Running script create() step",
},
)
time.sleep(1)
if define_vm:
print("Running script create() step")
try:
with chroot(temp_dir):
vm_schema = vm_builder.create()
except Exception as e:
general_cleanup()
raise ProvisioningError(f"Error in script create() step: {e}")
print("Generated VM schema:\n{}\n".format(vm_schema))
print("Defining VM on cluster")
node_limit = vm_data["system_details"]["node_limit"]
if node_limit:
node_limit = node_limit.split(",")
node_selector = vm_data["system_details"]["node_selector"]
node_autostart = vm_data["system_details"]["node_autostart"]
migration_method = vm_data["system_details"]["migration_method"]
with open_zk(config) as zkhandler:
retcode, retmsg = pvc_vm.define_vm(
zkhandler,
vm_schema.strip(),
target_node,
node_limit,
node_selector,
node_autostart,
migration_method,
vm_profile,
initial_state="provision",
)
print(retmsg)
else:
print("Skipping VM definition due to define_vm=False")
# Phase 6 - script: prepare()
# * Run preparation steps (e.g. disk creation and mapping, filesystem creation, etc.)
self.update_state(
state="RUNNING",
meta={
"current": 6,
"total": 10,
"status": "Running script prepare() step",
},
)
time.sleep(1)
print("Running script prepare() step")
try:
with chroot(temp_dir):
vm_builder.prepare()
except Exception as e:
with chroot(temp_dir):
vm_builder.cleanup()
general_cleanup()
raise ProvisioningError(f"Error in script prepare() step: {e}")
# Phase 7 - script: install()
# * Run installation with arguments
self.update_state(
state="RUNNING",
meta={
"current": 7,
"total": 10,
"status": "Running script install() step",
},
)
time.sleep(1)
print("Running script install() step")
try:
with chroot(temp_dir):
vm_builder.install()
except Exception as e:
with chroot(temp_dir):
vm_builder.cleanup()
general_cleanup()
raise ProvisioningError(f"Error in script install() step: {e}")
# Phase 8 - script: cleanup()
# * Run cleanup steps
self.update_state(
state="RUNNING",
meta={
"current": 8,
"total": 10,
"status": "Running script cleanup() step",
},
)
time.sleep(1)
print("Running script cleanup() step")
try:
with chroot(temp_dir):
vm_builder.cleanup()
except Exception as e:
general_cleanup()
raise ProvisioningError(f"Error in script cleanup() step: {e}")
# Phase 9 - general cleanup
# * Clean up the chroot from earlier
self.update_state(
state="RUNNING",
meta={
"current": 9,
"total": 10,
"status": "Running general cleanup steps",
},
)
time.sleep(1)
general_cleanup()
# Phase 10 - startup
# * Start the VM in the PVC cluster
self.update_state(
state="RUNNING",
meta={
"current": 10,
"total": 10,
"status": "Starting VM",
},
)
time.sleep(1)
if start_vm:
print("Starting VM")
with open_zk(config) as zkhandler:
success, message = pvc_vm.start_vm(zkhandler, vm_name)
print(message)
end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned and started successfully'
else:
end_message = f'VM "{vm_name}" with profile "{vm_profile}" has been provisioned successfully'
return {"status": end_message, "current": 10, "total": 10}