From c6986aa5b88f39ec1a4abf3541dcf3a1f51e64e7 Mon Sep 17 00:00:00 2001 From: Joshua Boniface Date: Tue, 3 Dec 2019 23:39:13 -0500 Subject: [PATCH] Add Celery queueing for VM creation function Also define this function and provide the planned workflow. --- .../examples/provisioning_script.py | 2 +- .../provisioner_lib/provisioner.py | 93 ++++++++++- client-provisioner/pvc-provisioner.py | 158 +++++++++++++++--- .../pvc-provisioner.sample.yaml | 17 +- client-provisioner/schema.sql | 4 +- 5 files changed, 236 insertions(+), 38 deletions(-) diff --git a/client-provisioner/examples/provisioning_script.py b/client-provisioner/examples/provisioning_script.py index 1cd85ef2..81c69a04 100644 --- a/client-provisioner/examples/provisioning_script.py +++ b/client-provisioner/examples/provisioning_script.py @@ -58,7 +58,7 @@ def install(**kwargs): except: deb_mirror = "http://ftp.debian.org/debian" try: - deb_packages = kwargs['deb_packages'] + deb_packages = kwargs['deb_packages'].split(',') except: deb_packages = ["linux-image-amd64", "grub-pc", "cloud-init", "python3-cffi-backend"] diff --git a/client-provisioner/provisioner_lib/provisioner.py b/client-provisioner/provisioner_lib/provisioner.py index 5fc90961..de2e6d12 100755 --- a/client-provisioner/provisioner_lib/provisioner.py +++ b/client-provisioner/provisioner_lib/provisioner.py @@ -26,8 +26,10 @@ import psycopg2 import psycopg2.extras import os import re +import time import client_lib.common as pvc_common +import client_lib.node as pvc_node import client_lib.vm as pvc_vm import client_lib.network as pvc_network import client_lib.ceph as pvc_ceph @@ -239,7 +241,7 @@ def create_template_storage(name): close_database(conn, cur) return flask.jsonify(retmsg), retcode -def create_template_storage_element(name, disk_id, disk_size_gb, mountpoint=None, filesystem=None): +def create_template_storage_element(name, pool, disk_id, disk_size_gb, mountpoint=None, filesystem=None): if not list_template_storage(name, is_fuzzy=False): retmsg = { "message": "The storage template {} does not exist".format(name) } retcode = 400 @@ -266,8 +268,8 @@ def create_template_storage_element(name, disk_id, disk_size_gb, mountpoint=None args = (name,) cur.execute(query, args) template_id = cur.fetchone()['id'] - query = "INSERT INTO storage (storage_template, disk_id, disk_size_gb, mountpoint, filesystem) VALUES (%s, %s, %s, %s, %s);" - args = (template_id, disk_id, disk_size_gb, mountpoint, filesystem) + query = "INSERT INTO storage (storage_template, pool, disk_id, disk_size_gb, mountpoint, filesystem) VALUES (%s, %s, %s, %s, %s, %s);" + args = (template_id, pool, disk_id, disk_size_gb, mountpoint, filesystem) cur.execute(query, args) retmsg = { "name": name, "disk_id": disk_id } retcode = 200 @@ -509,17 +511,21 @@ def list_profile(limit, is_fuzzy=True): for profile in orig_data: profile_data = dict() profile_data['name'] = profile['name'] + # Parse the name of each subelement for etype in 'system_template', 'network_template', 'storage_template', 'script': query = 'SELECT name from {} WHERE id = %s'.format(etype) args = (profile[etype],) cur.execute(query, args) name = cur.fetchone()['name'] profile_data[etype] = name + # Split the arguments back into a list + profile_data['arguments'] = profile['arguments'].split('|') + # Append the new data to our actual output structure data.append(profile_data) close_database(conn, cur) return data -def create_profile(name, system_template, network_template, storage_template, script): +def create_profile(name, system_template, network_template, storage_template, script, arguments=[]): if list_profile(name, is_fuzzy=False): retmsg = { "message": "The profile {} already exists".format(name) } retcode = 400 @@ -565,10 +571,12 @@ def create_profile(name, system_template, network_template, storage_template, sc retcode = 400 return flask.jsonify(retmsg), retcode + arguments_formatted = '|'.join(arguments) + conn, cur = open_database(config) try: - query = "INSERT INTO profile (name, system_template, network_template, storage_template, script) VALUES (%s, %s, %s, %s, %s);" - args = (name, system_template_id, network_template_id, storage_template_id, script_id) + query = "INSERT INTO profile (name, system_template, network_template, storage_template, script, arguments) VALUES (%s, %s, %s, %s, %s, %s);" + args = (name, system_template_id, network_template_id, storage_template_id, script_id, arguments_formatted) cur.execute(query, args) retmsg = { "name": name } retcode = 200 @@ -598,9 +606,76 @@ def delete_profile(name): return flask.jsonify(retmsg), retcode # -# Job functions +# VM provisioning helper functions # -def create_vm(vm_name, profile_name): - pass +# +# Main VM provisioning function - executed by the Celery worker +# +def create_vm(self, vm_name, vm_profile): + # Runtime imports + import time + import importlib + + # Phase 1 - setup + # * Get the profile elements + # * Get the details from these elements + # * Assemble a VM configuration dictionary + self.update_state(state='RUNNING', meta={'current': 1, 'total': 10, 'status': 'Collecting configuration'}) + time.sleep(5) + + zk_conn = pvc_common.startZKConnection(config['coordinators']) + print(pvc_node.get_list(zk_conn, None)) + pvc_common.stopZKConnection(zk_conn) + + # Phase 2 - verification + # * Ensure that at least one node has enough free RAM to hold the VM (becomes main host) + # * Ensure that all networks are valid + # * Ensure that there is enough disk space in the Ceph cluster for the disks + # This is the "safe fail" step when an invalid configuration will be caught + self.update_state(state='RUNNING', meta={'current': 2, 'total': 10, 'status': 'Verifying configuration against cluster'}) + time.sleep(5) + + # Phase 3 - disk creation + # * Create each Ceph storage volume for the disks + self.update_state(state='RUNNING', meta={'current': 3, 'total': 10, 'status': 'Creating storage volumes'}) + time.sleep(5) + + # Phase 4 - disk mapping + # * Map each volume to the local host in order + # * Format each volume with any specified filesystems + # * If any mountpoints are specified, create a temporary mount directory + # * Mount any volumes to their respective mountpoints + self.update_state(state='RUNNING', meta={'current': 4, 'total': 10, 'status': 'Mapping, formatting, and mounting storage volumes locally'}) + time.sleep(5) + + # Phase 5 - provisioning script preparation + # * Import the provisioning script as a library with importlib + # * Ensure the required function(s) are present + self.update_state(state='RUNNING', meta={'current': 5, 'total': 10, 'status': 'Preparing provisioning script'}) + time.sleep(5) + + # Phase 6 - provisioning script execution + # * Execute the provisioning script main function ("install") passing any custom arguments + self.update_state(state='RUNNING', meta={'current': 6, 'total': 10, 'status': 'Executing provisioning script'}) + time.sleep(5) + + # Phase 7 - install cleanup + # * Unmount any mounted volumes + # * Remove any temporary directories + self.update_state(state='RUNNING', meta={'current': 7, 'total': 10, 'status': 'Cleaning up local mounts and directories'}) + time.sleep(5) + + # Phase 8 - configuration creation + # * Create the libvirt XML configuration + self.update_state(state='RUNNING', meta={'current': 8, 'total': 10, 'status': 'Preparing Libvirt XML configuration'}) + time.sleep(5) + + # Phase 9 - definition + # * Create the VM in the PVC cluster + # * Start the VM in the PVC cluster + self.update_state(state='RUNNING', meta={'current': 9, 'total': 10, 'status': 'Defining and starting VM on the cluster'}) + time.sleep(5) + + return {"status": "VM '{}' with profile '{}' has been provisioned and started successfully".format(vm_name, vm_profile), "current": 10, "total": 10} diff --git a/client-provisioner/pvc-provisioner.py b/client-provisioner/pvc-provisioner.py index 086e6543..2889e3fa 100755 --- a/client-provisioner/pvc-provisioner.py +++ b/client-provisioner/pvc-provisioner.py @@ -28,6 +28,8 @@ import uu import gevent.pywsgi +import celery as Celery + import provisioner_lib.provisioner as pvcprovisioner # Parse the configuration file @@ -51,6 +53,7 @@ try: # Create the config object config = { 'debug': o_config['pvc']['debug'], + 'coordinators': o_config['pvc']['coordinators'], 'listen_address': o_config['pvc']['provisioner']['listen_address'], 'listen_port': int(o_config['pvc']['provisioner']['listen_port']), 'auth_enabled': o_config['pvc']['provisioner']['authentication']['enabled'], @@ -63,7 +66,10 @@ try: 'database_port': int(o_config['pvc']['provisioner']['database']['port']), 'database_name': o_config['pvc']['provisioner']['database']['name'], 'database_user': o_config['pvc']['provisioner']['database']['user'], - 'database_password': o_config['pvc']['provisioner']['database']['pass'] + 'database_password': o_config['pvc']['provisioner']['database']['pass'], + 'queue_host': o_config['pvc']['provisioner']['queue']['host'], + 'queue_port': o_config['pvc']['provisioner']['queue']['port'], + 'queue_path': o_config['pvc']['provisioner']['queue']['path'], } # Set the config object in the pvcapi namespace @@ -82,6 +88,8 @@ except Exception as e: exit(1) api = flask.Flask(__name__) +api.config['CELERY_BROKER_URL'] = 'redis://{}:{}{}'.format(config['queue_host'], config['queue_port'], config['queue_path']) +api.config['CELERY_RESULT_BACKEND'] = 'redis://{}:{}{}'.format(config['queue_host'], config['queue_port'], config['queue_path']) if config['debug']: api.config['DEBUG'] = True @@ -89,6 +97,17 @@ if config['debug']: if config['auth_enabled']: api.config["SECRET_KEY"] = config['auth_secret_key'] +print(api.name) +celery = Celery.Celery(api.name, broker=api.config['CELERY_BROKER_URL']) +celery.conf.update(api.config) + +# +# Job functions +# +@celery.task(bind=True) +def create_vm(self, vm_name, profile_name): + return pvcprovisioner.create_vm(self, vm_name, profile_name) + # Authentication decorator function def authenticator(function): def authenticate(*args, **kwargs): @@ -561,6 +580,10 @@ def api_template_storage_disk_root(template): * type: Disk identifier in 'sdX' or 'vdX' format, unique within template * optional: false * requires: N/A + ?pool: The storage pool in which to store the disk. + * type: Storage Pool name + * optional: false + * requires: N/A ?disk_size: The disk size in GB. * type: integer, Gigabytes (GB) * optional: false @@ -591,6 +614,11 @@ def api_template_storage_disk_root(template): else: return flask.jsonify({"message": "A disk ID in sdX/vdX format must be specified."}), 400 + if 'pool' in flask.request.values: + pool = flask.request.values['pool'] + else: + return flask.jsonify({"message": "A pool name must be specified."}), 400 + if 'disk_size' in flask.request.values: disk_size = flask.request.values['disk_size'] else: @@ -606,7 +634,7 @@ def api_template_storage_disk_root(template): else: mountpoint = None - return pvcprovisioner.create_template_storage_element(template, disk_id, disk_size, filesystem, mountpoint) + return pvcprovisioner.create_template_storage_element(template, pool, disk_id, disk_size, filesystem, mountpoint) if flask.request.method == 'DELETE': if 'disk_id' in flask.request.values: @@ -625,6 +653,10 @@ def api_template_storage_disk_element(template, disk_id): GET: Show details of disk storage template