Add Celery queueing for VM creation function
Also define this function and provide the planned workflow.
This commit is contained in:
@ -58,7 +58,7 @@ def install(**kwargs):
deb_mirror = ""
deb_packages = kwargs['deb_packages']
deb_packages = kwargs['deb_packages'].split(',')
deb_packages = ["linux-image-amd64", "grub-pc", "cloud-init", "python3-cffi-backend"]
@ -26,8 +26,10 @@ import psycopg2
import psycopg2.extras
import os
import re
import time
import client_lib.common as pvc_common
import client_lib.node as pvc_node
import client_lib.vm as pvc_vm
import as pvc_network
import client_lib.ceph as pvc_ceph
@ -239,7 +241,7 @@ def create_template_storage(name):
close_database(conn, cur)
return flask.jsonify(retmsg), retcode
def create_template_storage_element(name, disk_id, disk_size_gb, mountpoint=None, filesystem=None):
def create_template_storage_element(name, pool, disk_id, disk_size_gb, mountpoint=None, filesystem=None):
if not list_template_storage(name, is_fuzzy=False):
retmsg = { "message": "The storage template {} does not exist".format(name) }
retcode = 400
@ -266,8 +268,8 @@ def create_template_storage_element(name, disk_id, disk_size_gb, mountpoint=None
args = (name,)
cur.execute(query, args)
template_id = cur.fetchone()['id']
query = "INSERT INTO storage (storage_template, disk_id, disk_size_gb, mountpoint, filesystem) VALUES (%s, %s, %s, %s, %s);"
args = (template_id, disk_id, disk_size_gb, mountpoint, filesystem)
query = "INSERT INTO storage (storage_template, pool, disk_id, disk_size_gb, mountpoint, filesystem) VALUES (%s, %s, %s, %s, %s, %s);"
args = (template_id, pool, disk_id, disk_size_gb, mountpoint, filesystem)
cur.execute(query, args)
retmsg = { "name": name, "disk_id": disk_id }
retcode = 200
@ -509,17 +511,21 @@ def list_profile(limit, is_fuzzy=True):
for profile in orig_data:
profile_data = dict()
profile_data['name'] = profile['name']
# Parse the name of each subelement
for etype in 'system_template', 'network_template', 'storage_template', 'script':
query = 'SELECT name from {} WHERE id = %s'.format(etype)
args = (profile[etype],)
cur.execute(query, args)
name = cur.fetchone()['name']
profile_data[etype] = name
# Split the arguments back into a list
profile_data['arguments'] = profile['arguments'].split('|')
# Append the new data to our actual output structure
close_database(conn, cur)
return data
def create_profile(name, system_template, network_template, storage_template, script):
def create_profile(name, system_template, network_template, storage_template, script, arguments=[]):
if list_profile(name, is_fuzzy=False):
retmsg = { "message": "The profile {} already exists".format(name) }
retcode = 400
@ -565,10 +571,12 @@ def create_profile(name, system_template, network_template, storage_template, sc
retcode = 400
return flask.jsonify(retmsg), retcode
arguments_formatted = '|'.join(arguments)
conn, cur = open_database(config)
query = "INSERT INTO profile (name, system_template, network_template, storage_template, script) VALUES (%s, %s, %s, %s, %s);"
args = (name, system_template_id, network_template_id, storage_template_id, script_id)
query = "INSERT INTO profile (name, system_template, network_template, storage_template, script, arguments) VALUES (%s, %s, %s, %s, %s, %s);"
args = (name, system_template_id, network_template_id, storage_template_id, script_id, arguments_formatted)
cur.execute(query, args)
retmsg = { "name": name }
retcode = 200
@ -598,9 +606,76 @@ def delete_profile(name):
return flask.jsonify(retmsg), retcode
# Job functions
# VM provisioning helper functions
def create_vm(vm_name, profile_name):
# Main VM provisioning function - executed by the Celery worker
def create_vm(self, vm_name, vm_profile):
# Runtime imports
import time
import importlib
# Phase 1 - setup
# * Get the profile elements
# * Get the details from these elements
# * Assemble a VM configuration dictionary
self.update_state(state='RUNNING', meta={'current': 1, 'total': 10, 'status': 'Collecting configuration'})
zk_conn = pvc_common.startZKConnection(config['coordinators'])
print(pvc_node.get_list(zk_conn, None))
# Phase 2 - verification
# * Ensure that at least one node has enough free RAM to hold the VM (becomes main host)
# * Ensure that all networks are valid
# * Ensure that there is enough disk space in the Ceph cluster for the disks
# This is the "safe fail" step when an invalid configuration will be caught
self.update_state(state='RUNNING', meta={'current': 2, 'total': 10, 'status': 'Verifying configuration against cluster'})
# Phase 3 - disk creation
# * Create each Ceph storage volume for the disks
self.update_state(state='RUNNING', meta={'current': 3, 'total': 10, 'status': 'Creating storage volumes'})
# Phase 4 - disk mapping
# * Map each volume to the local host in order
# * Format each volume with any specified filesystems
# * If any mountpoints are specified, create a temporary mount directory
# * Mount any volumes to their respective mountpoints
self.update_state(state='RUNNING', meta={'current': 4, 'total': 10, 'status': 'Mapping, formatting, and mounting storage volumes locally'})
# Phase 5 - provisioning script preparation
# * Import the provisioning script as a library with importlib
# * Ensure the required function(s) are present
self.update_state(state='RUNNING', meta={'current': 5, 'total': 10, 'status': 'Preparing provisioning script'})
# Phase 6 - provisioning script execution
# * Execute the provisioning script main function ("install") passing any custom arguments
self.update_state(state='RUNNING', meta={'current': 6, 'total': 10, 'status': 'Executing provisioning script'})
# Phase 7 - install cleanup
# * Unmount any mounted volumes
# * Remove any temporary directories
self.update_state(state='RUNNING', meta={'current': 7, 'total': 10, 'status': 'Cleaning up local mounts and directories'})
# Phase 8 - configuration creation
# * Create the libvirt XML configuration
self.update_state(state='RUNNING', meta={'current': 8, 'total': 10, 'status': 'Preparing Libvirt XML configuration'})
# Phase 9 - definition
# * Create the VM in the PVC cluster
# * Start the VM in the PVC cluster
self.update_state(state='RUNNING', meta={'current': 9, 'total': 10, 'status': 'Defining and starting VM on the cluster'})
return {"status": "VM '{}' with profile '{}' has been provisioned and started successfully".format(vm_name, vm_profile), "current": 10, "total": 10}
@ -28,6 +28,8 @@ import uu
import gevent.pywsgi
import celery as Celery
import provisioner_lib.provisioner as pvcprovisioner
# Parse the configuration file
@ -51,6 +53,7 @@ try:
# Create the config object
config = {
'debug': o_config['pvc']['debug'],
'coordinators': o_config['pvc']['coordinators'],
'listen_address': o_config['pvc']['provisioner']['listen_address'],
'listen_port': int(o_config['pvc']['provisioner']['listen_port']),
'auth_enabled': o_config['pvc']['provisioner']['authentication']['enabled'],
@ -63,7 +66,10 @@ try:
'database_port': int(o_config['pvc']['provisioner']['database']['port']),
'database_name': o_config['pvc']['provisioner']['database']['name'],
'database_user': o_config['pvc']['provisioner']['database']['user'],
'database_password': o_config['pvc']['provisioner']['database']['pass']
'database_password': o_config['pvc']['provisioner']['database']['pass'],
'queue_host': o_config['pvc']['provisioner']['queue']['host'],
'queue_port': o_config['pvc']['provisioner']['queue']['port'],
'queue_path': o_config['pvc']['provisioner']['queue']['path'],
# Set the config object in the pvcapi namespace
@ -82,6 +88,8 @@ except Exception as e:
api = flask.Flask(__name__)
api.config['CELERY_BROKER_URL'] = 'redis://{}:{}{}'.format(config['queue_host'], config['queue_port'], config['queue_path'])
api.config['CELERY_RESULT_BACKEND'] = 'redis://{}:{}{}'.format(config['queue_host'], config['queue_port'], config['queue_path'])
if config['debug']:
api.config['DEBUG'] = True
@ -89,6 +97,17 @@ if config['debug']:
if config['auth_enabled']:
api.config["SECRET_KEY"] = config['auth_secret_key']
celery = Celery.Celery(, broker=api.config['CELERY_BROKER_URL'])
# Job functions
def create_vm(self, vm_name, profile_name):
return pvcprovisioner.create_vm(self, vm_name, profile_name)
# Authentication decorator function
def authenticator(function):
def authenticate(*args, **kwargs):
@ -561,6 +580,10 @@ def api_template_storage_disk_root(template):
* type: Disk identifier in 'sdX' or 'vdX' format, unique within template
* optional: false
* requires: N/A
?pool: The storage pool in which to store the disk.
* type: Storage Pool name
* optional: false
* requires: N/A
?disk_size: The disk size in GB.
* type: integer, Gigabytes (GB)
* optional: false
@ -591,6 +614,11 @@ def api_template_storage_disk_root(template):
return flask.jsonify({"message": "A disk ID in sdX/vdX format must be specified."}), 400
if 'pool' in flask.request.values:
pool = flask.request.values['pool']
return flask.jsonify({"message": "A pool name must be specified."}), 400
if 'disk_size' in flask.request.values:
disk_size = flask.request.values['disk_size']
@ -606,7 +634,7 @@ def api_template_storage_disk_root(template):
mountpoint = None
return pvcprovisioner.create_template_storage_element(template, disk_id, disk_size, filesystem, mountpoint)
return pvcprovisioner.create_template_storage_element(template, pool, disk_id, disk_size, filesystem, mountpoint)
if flask.request.method == 'DELETE':
if 'disk_id' in flask.request.values:
@ -625,6 +653,10 @@ def api_template_storage_disk_element(template, disk_id):
GET: Show details of disk <disk_id> storage template <template>.
POST: Add new storage VNI <vni> to storage template <template>.
?pool: The storage pool in which to store the disk.
* type: Storage Pool name
* optional: false
* requires: N/A
?disk_size: The disk size in GB.
* type: integer, Gigabytes (GB)
* optional: false
@ -650,6 +682,11 @@ def api_template_storage_disk_element(template, disk_id):
return flask.jsonify({"message": "Found no disk with ID {} in storage template {}".format(disk_id, template)}), 404
if flask.request.method == 'POST':
if 'pool' in flask.request.values:
pool = flask.request.values['pool']
return flask.jsonify({"message": "A pool name must be specified."}), 400
if 'disk_size' in flask.request.values:
disk_size = flask.request.values['disk_size']
@ -665,7 +702,7 @@ def api_template_storage_disk_element(template, disk_id):
mountpoint = None
return pvcprovisioner.create_template_storage_element(template, disk_id, disk_size, mountpoint, filesystem)
return pvcprovisioner.create_template_storage_element(template, pool, disk_id, disk_size, mountpoint, filesystem)
if flask.request.method == 'DELETE':
return pvcprovisioner.delete_template_storage_element(template, disk_id)
@ -787,6 +824,10 @@ def api_profile_root():
* type: text
* optional: false
* requires: N/A
?arg: An arbitrary key=value argument for use by the provisioning script.
* type: key-value pair, multiple
* optional: true
* requires: N/A
if flask.request.method == 'GET':
# Get name limit
@ -828,7 +869,12 @@ def api_profile_root():
return flask.jsonify({"message": "A script must be specified."}), 400
return pvcprovisioner.create_profile(name, system_template, network_template, storage_template, script)
if 'arg' in flask.request.values:
arguments = flask.request.values.getlist('arg')
arguments = None
return pvcprovisioner.create_profile(name, system_template, network_template, storage_template, script, arguments)
@api.route('/api/v1/profile/<profile>', methods=['GET', 'POST', 'DELETE'])
@ -901,32 +947,96 @@ def api_create_root():
/create - Create new VM on the cluster.
POST: Create new VM.
?name: The name of the VM.
* type: text
* optional: false
* requires: N/A
?profile: The profile name of the VM.
* type: text
* optional: flase
* requires: N/A
if 'name' in flask.request.values:
name = flask.request.values['name']
return flask.jsonify({"message": "A VM name must be specified."}), 400
if 'profile' in flask.request.values:
profile = flask.request.values['profile']
return flask.jsonify({"message": "A VM profile must be specified."}), 400
print("starting task")
task = create_vm.delay(name, profile)
return flask.jsonify({"task_id":}), 202, {'Location': flask.url_for('api_status_root',}
@api.route('/api/v1/status/<task_id>', methods=['GET'])
def api_status_root(task_id):
/status - Report on VM creation status.
GET: Get status of the VM provisioning.
?task: The task ID returned from the '/create' endpoint.
* type: text
* optional: flase
* requires: N/A
task = create_vm.AsyncResult(task_id)
if task.state == 'PENDING':
# job did not start yet
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
elif task.state != 'FAILURE':
# job is still running
response = {
'state': task.state,
'current':'current', 0),
'total':'total', 1),
'status':'status', '')
if 'result' in
response['result'] =['result']
# something went wrong in the background job
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(, # this is the exception raised
return flask.jsonify(response)
# Entrypoint
if config['debug']:
# Run in Flask standard mode
||||['listen_address'], config['listen_port'])
if config['ssl_enabled']:
# Run the WSGI server with SSL
http_server = gevent.pywsgi.WSGIServer(
(config['listen_address'], config['listen_port']),
if __name__ == '__main__':
if config['debug']:
# Run in Flask standard mode
||||['listen_address'], config['listen_port'])
# Run the ?WSGI server without SSL
http_server = gevent.pywsgi.WSGIServer(
(config['listen_address'], config['listen_port']),
if config['ssl_enabled']:
# Run the WSGI server with SSL
http_server = gevent.pywsgi.WSGIServer(
(config['listen_address'], config['listen_port']),
# Run the ?WSGI server without SSL
http_server = gevent.pywsgi.WSGIServer(
(config['listen_address'], config['listen_port']),
print('Starting PyWSGI server at {}:{} with SSL={}, Authentication={}'.format(config['listen_address'], config['listen_port'], config['ssl_enabled'], config['auth_enabled']))
print('Starting PyWSGI server at {}:{} with SSL={}, Authentication={}'.format(config['listen_address'], config['listen_port'], config['ssl_enabled'], config['auth_enabled']))
@ -15,6 +15,11 @@
# debug: Enable/disable API debug mode
debug: True
# coordinators: The list of cluster coordinator hostnames
- hv1
- hv2
- hv3
# provisioner: Configuration of the Provisioner API listener
# listen_address: IP address(es) to listen on; use for all interfaces
@ -43,9 +48,9 @@ pvc:
key_file: ""
# database: Backend database configuration
# host: PostgreSQL hostname, invariably 'localhost
# host: PostgreSQL hostname, usually 'localhost'
# port: PostgreSQL port, invariably 'localhost'
# port: PostgreSQL port, invariably '5432'
port: 5432
# name: PostgreSQL database name, invariably 'pvcprov'
name: pvcprov
@ -53,3 +58,11 @@ pvc:
user: pvcprov
# pass: PostgreSQL user password, randomly generated
pass: pvcprov
# queue: Celery backend queue using the PVC Zookeeper cluster
# host: Redis hostname, usually 'localhost'
host: localhost
# port: Redis port, invariably '6279'
port: 6379
# path: Redis queue path, invariably '/0'
path: /0
@ -4,9 +4,9 @@ create table system_template (id SERIAL PRIMARY KEY, name TEXT NOT NULL UNIQUE,
create table network_template (id SERIAL PRIMARY KEY, name TEXT NOT NULL UNIQUE, mac_template TEXT);
create table network (id SERIAL PRIMARY KEY, network_template INT REFERENCES network_template(id), vni INT NOT NULL);
create table storage_template (id SERIAL PRIMARY KEY, name TEXT NOT NULL UNIQUE);
create table storage (id SERIAL PRIMARY KEY, storage_template INT REFERENCES storage_template(id), disk_id TEXT NOT NULL, disk_size_gb INT NOT NULL, mountpoint TEXT, filesystem TEXT);
create table storage (id SERIAL PRIMARY KEY, storage_template INT REFERENCES storage_template(id), pool TEXT NOT NULL, disk_id TEXT NOT NULL, disk_size_gb INT NOT NULL, mountpoint TEXT, filesystem TEXT);
create table script (id SERIAL PRIMARY KEY, name TEXT NOT NULL UNIQUE, script TEXT NOT NULL);
create table profile (id SERIAL PRIMARY KEY, name TEXT NOT NULL UNIQUE, system_template INT REFERENCES system_template(id), network_template INT REFERENCES network_template(id), storage_template INT REFERENCES storage_template(id), script INT REFERENCES script(id));
create table profile (id SERIAL PRIMARY KEY, name TEXT NOT NULL UNIQUE, system_template INT REFERENCES system_template(id), network_template INT REFERENCES network_template(id), storage_template INT REFERENCES storage_template(id), script INT REFERENCES script(id), arguments text);
grant all privileges on database pvcprov to pvcprov;
grant all privileges on all tables in schema public to pvcprov;
grant all privileges on all sequences in schema public to pvcprov;
Reference in New Issue
Block a user