diff --git a/api-daemon/pvcapid/provisioner.py b/api-daemon/pvcapid/provisioner.py index 06918b0b..dcebdbfb 100755 --- a/api-daemon/pvcapid/provisioner.py +++ b/api-daemon/pvcapid/provisioner.py @@ -26,6 +26,8 @@ import re from pvcapid.Daemon import config, strtobool +from daemon_lib.zkhandler import ZKConnection + import daemon_lib.common as pvc_common import daemon_lib.node as pvc_node import daemon_lib.vm as pvc_vm @@ -1049,7 +1051,8 @@ def delete_profile(name): # # Main VM provisioning function - executed by the Celery worker # -def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]): +@ZKConnection(config) +def create_vm(self, zkhandler, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]): # Runtime imports import time import importlib @@ -1068,12 +1071,6 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r print('FATAL - failed to connect to Postgres') raise Exception - try: - zk_conn = pvc_common.startZKConnection(config['coordinators']) - except Exception: - print('FATAL - failed to connect to Zookeeper') - raise Exception - # Phase 1 - setup # * Get the profile elements # * Get the details from these elements @@ -1174,11 +1171,11 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r time.sleep(1) # Verify that a VM with this name does not already exist - if pvc_vm.searchClusterByName(zk_conn, vm_name): + if pvc_vm.searchClusterByName(zkhandler, vm_name): raise ClusterError("A VM with the name '{}' already exists in the cluster.".format(vm_name)) # Verify that at least one host has enough free RAM to run the VM - _discard, nodes = pvc_node.get_list(zk_conn, None) + _discard, nodes = pvc_node.get_list(zkhandler, None) target_node = None last_free = 0 for node in nodes: @@ -1199,7 +1196,7 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r print('Selecting target node "{}" with "{}" MB free RAM'.format(target_node, last_free)) # Verify that all configured networks are present on the cluster - cluster_networks, _discard = pvc_network.getClusterNetworkList(zk_conn) + cluster_networks, _discard = pvc_network.getClusterNetworkList(zkhandler) for network in vm_data['networks']: vni = str(network['vni']) if vni not in cluster_networks: @@ -1211,7 +1208,7 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r pools = dict() for volume in vm_data['volumes']: if volume.get('source_volume') is not None: - volume_data = pvc_ceph.getVolumeInformation(zk_conn, volume['pool'], volume['source_volume']) + volume_data = pvc_ceph.getVolumeInformation(zkhandler, volume['pool'], volume['source_volume']) if not volume_data: raise ClusterError('The source volume {}/{} could not be found.'.format(volume['pool'], volume['source_volume'])) if not volume['pool'] in pools: @@ -1226,7 +1223,7 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r for pool in pools: try: - pool_information = pvc_ceph.getPoolInformation(zk_conn, pool) + pool_information = pvc_ceph.getPoolInformation(zkhandler, pool) if not pool_information: raise except Exception: @@ -1424,7 +1421,7 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r node_selector = vm_data['system_details']['node_selector'] node_autostart = vm_data['system_details']['node_autostart'] migration_method = vm_data['system_details']['migration_method'] - retcode, retmsg = pvc_vm.define_vm(zk_conn, vm_schema.strip(), target_node, node_limit, node_selector, node_autostart, migration_method, vm_profile, initial_state='provision') + retcode, retmsg = pvc_vm.define_vm(zkhandler, vm_schema.strip(), target_node, node_limit, node_selector, node_autostart, migration_method, vm_profile, initial_state='provision') print(retmsg) else: print("Skipping VM definition") @@ -1436,12 +1433,12 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r for volume in vm_data['volumes']: if volume.get('source_volume') is not None: - success, message = pvc_ceph.clone_volume(zk_conn, volume['pool'], volume['source_volume'], "{}_{}".format(vm_name, volume['disk_id'])) + success, message = pvc_ceph.clone_volume(zkhandler, volume['pool'], volume['source_volume'], "{}_{}".format(vm_name, volume['disk_id'])) print(message) if not success: raise ProvisioningError('Failed to clone volume "{}" to "{}".'.format(volume['source_volume'], volume['disk_id'])) else: - success, message = pvc_ceph.add_volume(zk_conn, volume['pool'], "{}_{}".format(vm_name, volume['disk_id']), "{}G".format(volume['disk_size_gb'])) + success, message = pvc_ceph.add_volume(zkhandler, volume['pool'], "{}_{}".format(vm_name, volume['disk_id']), "{}G".format(volume['disk_size_gb'])) print(message) if not success: raise ProvisioningError('Failed to create volume "{}".'.format(volume['disk_id'])) @@ -1465,11 +1462,11 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r print('Converting {} source volume {} to raw format on {}'.format(volume['volume_format'], src_volume, dst_volume)) # Map the target RBD device - retcode, retmsg = pvc_ceph.map_volume(zk_conn, volume['pool'], dst_volume_name) + retcode, retmsg = pvc_ceph.map_volume(zkhandler, volume['pool'], dst_volume_name) if not retcode: raise ProvisioningError('Failed to map destination volume "{}": {}'.format(dst_volume_name, retmsg)) # Map the source RBD device - retcode, retmsg = pvc_ceph.map_volume(zk_conn, volume['pool'], src_volume_name) + retcode, retmsg = pvc_ceph.map_volume(zkhandler, volume['pool'], src_volume_name) if not retcode: raise ProvisioningError('Failed to map source volume "{}": {}'.format(src_volume_name, retmsg)) # Convert from source to target @@ -1484,11 +1481,11 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r raise ProvisioningError('Failed to convert {} volume "{}" to raw volume "{}": {}'.format(volume['volume_format'], src_volume, dst_volume, stderr)) # Unmap the source RBD device (don't bother later) - retcode, retmsg = pvc_ceph.unmap_volume(zk_conn, volume['pool'], src_volume_name) + retcode, retmsg = pvc_ceph.unmap_volume(zkhandler, volume['pool'], src_volume_name) if not retcode: raise ProvisioningError('Failed to unmap source volume "{}": {}'.format(src_volume_name, retmsg)) # Unmap the target RBD device (don't bother later) - retcode, retmsg = pvc_ceph.unmap_volume(zk_conn, volume['pool'], dst_volume_name) + retcode, retmsg = pvc_ceph.unmap_volume(zkhandler, volume['pool'], dst_volume_name) if not retcode: raise ProvisioningError('Failed to unmap destination volume "{}": {}'.format(dst_volume_name, retmsg)) else: @@ -1508,7 +1505,7 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r filesystem_args = ' '.join(filesystem_args_list) # Map the RBD device - retcode, retmsg = pvc_ceph.map_volume(zk_conn, volume['pool'], dst_volume_name) + retcode, retmsg = pvc_ceph.map_volume(zkhandler, volume['pool'], dst_volume_name) if not retcode: raise ProvisioningError('Failed to map volume "{}": {}'.format(dst_volume, retmsg)) @@ -1646,8 +1643,7 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r if start_vm: self.update_state(state='RUNNING', meta={'current': 10, 'total': 10, 'status': 'Starting VM'}) time.sleep(1) - retcode, retmsg = pvc_vm.start_vm(zk_conn, vm_name) + retcode, retmsg = pvc_vm.start_vm(zkhandler, vm_name) print(retmsg) - pvc_common.stopZKConnection(zk_conn) return {'status': 'VM "{}" with profile "{}" has been provisioned and started successfully'.format(vm_name, vm_profile), 'current': 10, 'total': 10}