Port provisioner to ZKConnection

This commit is contained in:
Joshua Boniface 2021-05-29 00:26:15 -04:00
parent 2c0bafc313
commit 49bbad8021
1 changed files with 18 additions and 22 deletions

View File

@ -26,6 +26,8 @@ import re
from pvcapid.Daemon import config, strtobool from pvcapid.Daemon import config, strtobool
from daemon_lib.zkhandler import ZKConnection
import daemon_lib.common as pvc_common import daemon_lib.common as pvc_common
import daemon_lib.node as pvc_node import daemon_lib.node as pvc_node
import daemon_lib.vm as pvc_vm import daemon_lib.vm as pvc_vm
@ -1049,7 +1051,8 @@ def delete_profile(name):
# #
# Main VM provisioning function - executed by the Celery worker # Main VM provisioning function - executed by the Celery worker
# #
def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]): @ZKConnection(config)
def create_vm(self, zkhandler, vm_name, vm_profile, define_vm=True, start_vm=True, script_run_args=[]):
# Runtime imports # Runtime imports
import time import time
import importlib import importlib
@ -1068,12 +1071,6 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r
print('FATAL - failed to connect to Postgres') print('FATAL - failed to connect to Postgres')
raise Exception raise Exception
try:
zk_conn = pvc_common.startZKConnection(config['coordinators'])
except Exception:
print('FATAL - failed to connect to Zookeeper')
raise Exception
# Phase 1 - setup # Phase 1 - setup
# * Get the profile elements # * Get the profile elements
# * Get the details from these elements # * Get the details from these elements
@ -1174,11 +1171,11 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r
time.sleep(1) time.sleep(1)
# Verify that a VM with this name does not already exist # Verify that a VM with this name does not already exist
if pvc_vm.searchClusterByName(zk_conn, vm_name): if pvc_vm.searchClusterByName(zkhandler, vm_name):
raise ClusterError("A VM with the name '{}' already exists in the cluster.".format(vm_name)) raise ClusterError("A VM with the name '{}' already exists in the cluster.".format(vm_name))
# Verify that at least one host has enough free RAM to run the VM # Verify that at least one host has enough free RAM to run the VM
_discard, nodes = pvc_node.get_list(zk_conn, None) _discard, nodes = pvc_node.get_list(zkhandler, None)
target_node = None target_node = None
last_free = 0 last_free = 0
for node in nodes: for node in nodes:
@ -1199,7 +1196,7 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r
print('Selecting target node "{}" with "{}" MB free RAM'.format(target_node, last_free)) print('Selecting target node "{}" with "{}" MB free RAM'.format(target_node, last_free))
# Verify that all configured networks are present on the cluster # Verify that all configured networks are present on the cluster
cluster_networks, _discard = pvc_network.getClusterNetworkList(zk_conn) cluster_networks, _discard = pvc_network.getClusterNetworkList(zkhandler)
for network in vm_data['networks']: for network in vm_data['networks']:
vni = str(network['vni']) vni = str(network['vni'])
if vni not in cluster_networks: if vni not in cluster_networks:
@ -1211,7 +1208,7 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r
pools = dict() pools = dict()
for volume in vm_data['volumes']: for volume in vm_data['volumes']:
if volume.get('source_volume') is not None: if volume.get('source_volume') is not None:
volume_data = pvc_ceph.getVolumeInformation(zk_conn, volume['pool'], volume['source_volume']) volume_data = pvc_ceph.getVolumeInformation(zkhandler, volume['pool'], volume['source_volume'])
if not volume_data: if not volume_data:
raise ClusterError('The source volume {}/{} could not be found.'.format(volume['pool'], volume['source_volume'])) raise ClusterError('The source volume {}/{} could not be found.'.format(volume['pool'], volume['source_volume']))
if not volume['pool'] in pools: if not volume['pool'] in pools:
@ -1226,7 +1223,7 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r
for pool in pools: for pool in pools:
try: try:
pool_information = pvc_ceph.getPoolInformation(zk_conn, pool) pool_information = pvc_ceph.getPoolInformation(zkhandler, pool)
if not pool_information: if not pool_information:
raise raise
except Exception: except Exception:
@ -1424,7 +1421,7 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r
node_selector = vm_data['system_details']['node_selector'] node_selector = vm_data['system_details']['node_selector']
node_autostart = vm_data['system_details']['node_autostart'] node_autostart = vm_data['system_details']['node_autostart']
migration_method = vm_data['system_details']['migration_method'] migration_method = vm_data['system_details']['migration_method']
retcode, retmsg = pvc_vm.define_vm(zk_conn, vm_schema.strip(), target_node, node_limit, node_selector, node_autostart, migration_method, vm_profile, initial_state='provision') retcode, retmsg = pvc_vm.define_vm(zkhandler, vm_schema.strip(), target_node, node_limit, node_selector, node_autostart, migration_method, vm_profile, initial_state='provision')
print(retmsg) print(retmsg)
else: else:
print("Skipping VM definition") print("Skipping VM definition")
@ -1436,12 +1433,12 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r
for volume in vm_data['volumes']: for volume in vm_data['volumes']:
if volume.get('source_volume') is not None: if volume.get('source_volume') is not None:
success, message = pvc_ceph.clone_volume(zk_conn, volume['pool'], volume['source_volume'], "{}_{}".format(vm_name, volume['disk_id'])) success, message = pvc_ceph.clone_volume(zkhandler, volume['pool'], volume['source_volume'], "{}_{}".format(vm_name, volume['disk_id']))
print(message) print(message)
if not success: if not success:
raise ProvisioningError('Failed to clone volume "{}" to "{}".'.format(volume['source_volume'], volume['disk_id'])) raise ProvisioningError('Failed to clone volume "{}" to "{}".'.format(volume['source_volume'], volume['disk_id']))
else: else:
success, message = pvc_ceph.add_volume(zk_conn, volume['pool'], "{}_{}".format(vm_name, volume['disk_id']), "{}G".format(volume['disk_size_gb'])) success, message = pvc_ceph.add_volume(zkhandler, volume['pool'], "{}_{}".format(vm_name, volume['disk_id']), "{}G".format(volume['disk_size_gb']))
print(message) print(message)
if not success: if not success:
raise ProvisioningError('Failed to create volume "{}".'.format(volume['disk_id'])) raise ProvisioningError('Failed to create volume "{}".'.format(volume['disk_id']))
@ -1465,11 +1462,11 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r
print('Converting {} source volume {} to raw format on {}'.format(volume['volume_format'], src_volume, dst_volume)) print('Converting {} source volume {} to raw format on {}'.format(volume['volume_format'], src_volume, dst_volume))
# Map the target RBD device # Map the target RBD device
retcode, retmsg = pvc_ceph.map_volume(zk_conn, volume['pool'], dst_volume_name) retcode, retmsg = pvc_ceph.map_volume(zkhandler, volume['pool'], dst_volume_name)
if not retcode: if not retcode:
raise ProvisioningError('Failed to map destination volume "{}": {}'.format(dst_volume_name, retmsg)) raise ProvisioningError('Failed to map destination volume "{}": {}'.format(dst_volume_name, retmsg))
# Map the source RBD device # Map the source RBD device
retcode, retmsg = pvc_ceph.map_volume(zk_conn, volume['pool'], src_volume_name) retcode, retmsg = pvc_ceph.map_volume(zkhandler, volume['pool'], src_volume_name)
if not retcode: if not retcode:
raise ProvisioningError('Failed to map source volume "{}": {}'.format(src_volume_name, retmsg)) raise ProvisioningError('Failed to map source volume "{}": {}'.format(src_volume_name, retmsg))
# Convert from source to target # Convert from source to target
@ -1484,11 +1481,11 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r
raise ProvisioningError('Failed to convert {} volume "{}" to raw volume "{}": {}'.format(volume['volume_format'], src_volume, dst_volume, stderr)) raise ProvisioningError('Failed to convert {} volume "{}" to raw volume "{}": {}'.format(volume['volume_format'], src_volume, dst_volume, stderr))
# Unmap the source RBD device (don't bother later) # Unmap the source RBD device (don't bother later)
retcode, retmsg = pvc_ceph.unmap_volume(zk_conn, volume['pool'], src_volume_name) retcode, retmsg = pvc_ceph.unmap_volume(zkhandler, volume['pool'], src_volume_name)
if not retcode: if not retcode:
raise ProvisioningError('Failed to unmap source volume "{}": {}'.format(src_volume_name, retmsg)) raise ProvisioningError('Failed to unmap source volume "{}": {}'.format(src_volume_name, retmsg))
# Unmap the target RBD device (don't bother later) # Unmap the target RBD device (don't bother later)
retcode, retmsg = pvc_ceph.unmap_volume(zk_conn, volume['pool'], dst_volume_name) retcode, retmsg = pvc_ceph.unmap_volume(zkhandler, volume['pool'], dst_volume_name)
if not retcode: if not retcode:
raise ProvisioningError('Failed to unmap destination volume "{}": {}'.format(dst_volume_name, retmsg)) raise ProvisioningError('Failed to unmap destination volume "{}": {}'.format(dst_volume_name, retmsg))
else: else:
@ -1508,7 +1505,7 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r
filesystem_args = ' '.join(filesystem_args_list) filesystem_args = ' '.join(filesystem_args_list)
# Map the RBD device # Map the RBD device
retcode, retmsg = pvc_ceph.map_volume(zk_conn, volume['pool'], dst_volume_name) retcode, retmsg = pvc_ceph.map_volume(zkhandler, volume['pool'], dst_volume_name)
if not retcode: if not retcode:
raise ProvisioningError('Failed to map volume "{}": {}'.format(dst_volume, retmsg)) raise ProvisioningError('Failed to map volume "{}": {}'.format(dst_volume, retmsg))
@ -1646,8 +1643,7 @@ def create_vm(self, vm_name, vm_profile, define_vm=True, start_vm=True, script_r
if start_vm: if start_vm:
self.update_state(state='RUNNING', meta={'current': 10, 'total': 10, 'status': 'Starting VM'}) self.update_state(state='RUNNING', meta={'current': 10, 'total': 10, 'status': 'Starting VM'})
time.sleep(1) time.sleep(1)
retcode, retmsg = pvc_vm.start_vm(zk_conn, vm_name) retcode, retmsg = pvc_vm.start_vm(zkhandler, vm_name)
print(retmsg) print(retmsg)
pvc_common.stopZKConnection(zk_conn)
return {'status': 'VM "{}" with profile "{}" has been provisioned and started successfully'.format(vm_name, vm_profile), 'current': 10, 'total': 10} return {'status': 'VM "{}" with profile "{}" has been provisioned and started successfully'.format(vm_name, vm_profile), 'current': 10, 'total': 10}