diff --git a/daemon-common/vm.py b/daemon-common/vm.py index fec7f5a6..19d08ab4 100644 --- a/daemon-common/vm.py +++ b/daemon-common/vm.py @@ -24,7 +24,6 @@ import re import lxml.objectify import lxml.etree -import daemon_lib.zkhandler as zkhandler import daemon_lib.common as common import daemon_lib.ceph as ceph @@ -33,20 +32,20 @@ import daemon_lib.ceph as ceph # # Cluster search functions # -def getClusterDomainList(zk_conn): +def getClusterDomainList(zkhandler): # Get a list of UUIDs by listing the children of /domains - uuid_list = zkhandler.listchildren(zk_conn, '/domains') + uuid_list = zkhandler.children('/domains') name_list = [] # For each UUID, get the corresponding name from the data for uuid in uuid_list: - name_list.append(zkhandler.readdata(zk_conn, '/domains/%s' % uuid)) + name_list.append(zkhandler.read('/domains/{}'.format(uuid))) return uuid_list, name_list -def searchClusterByUUID(zk_conn, uuid): +def searchClusterByUUID(zkhandler, uuid): try: # Get the lists - uuid_list, name_list = getClusterDomainList(zk_conn) + uuid_list, name_list = getClusterDomainList(zkhandler) # We're looking for UUID, so find that element ID index = uuid_list.index(uuid) # Get the name_list element at that index @@ -58,10 +57,10 @@ def searchClusterByUUID(zk_conn, uuid): return name -def searchClusterByName(zk_conn, name): +def searchClusterByName(zkhandler, name): try: # Get the lists - uuid_list, name_list = getClusterDomainList(zk_conn) + uuid_list, name_list = getClusterDomainList(zkhandler) # We're looking for name, so find that element ID index = name_list.index(name) # Get the uuid_list element at that index @@ -73,67 +72,83 @@ def searchClusterByName(zk_conn, name): return uuid -def getDomainUUID(zk_conn, domain): +def getDomainUUID(zkhandler, domain): # Validate that VM exists in cluster if common.validateUUID(domain): - dom_name = searchClusterByUUID(zk_conn, domain) - dom_uuid = searchClusterByName(zk_conn, dom_name) + dom_name = searchClusterByUUID(zkhandler, domain) + dom_uuid = searchClusterByName(zkhandler, dom_name) else: - dom_uuid = searchClusterByName(zk_conn, domain) - dom_name = searchClusterByUUID(zk_conn, dom_uuid) + dom_uuid = searchClusterByName(zkhandler, domain) + dom_name = searchClusterByUUID(zkhandler, dom_uuid) return dom_uuid -def getDomainName(zk_conn, domain): +def getDomainName(zkhandler, domain): # Validate that VM exists in cluster if common.validateUUID(domain): - dom_name = searchClusterByUUID(zk_conn, domain) - dom_uuid = searchClusterByName(zk_conn, dom_name) + dom_name = searchClusterByUUID(zkhandler, domain) + dom_uuid = searchClusterByName(zkhandler, dom_name) else: - dom_uuid = searchClusterByName(zk_conn, domain) - dom_name = searchClusterByUUID(zk_conn, dom_uuid) + dom_uuid = searchClusterByName(zkhandler, domain) + dom_name = searchClusterByUUID(zkhandler, dom_uuid) return dom_name +# +# Helper functions +# +def change_state(zkhandler, dom_uuid, new_state): + lock = zkhandler.exclusivelock('/domains/{}/state'.format(dom_uuid)) + with lock: + zkhandler.write([ + ('/domains/{}/state'.format(dom_uuid), new_state) + ]) + + # Wait for 1/2 second to allow state to flow to all nodes + time.sleep(0.5) + + # # Direct functions # -def is_migrated(zk_conn, domain): +def is_migrated(zkhandler, domain): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) - last_node = zkhandler.readdata(zk_conn, '/domains/{}/lastnode'.format(dom_uuid)) + last_node = zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)) if last_node: return True else: return False -def flush_locks(zk_conn, domain): +def flush_locks(zkhandler, domain): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Verify that the VM is in a stopped state; freeing locks is not safe otherwise - state = zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) + state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) if state != 'stop': return False, 'ERROR: VM "{}" is not in stopped state; flushing RBD locks on a running VM is dangerous.'.format(domain) # Tell the cluster to create a new OSD for the host flush_locks_string = 'flush_locks {}'.format(dom_uuid) - zkhandler.writedata(zk_conn, {'/cmd/domains': flush_locks_string}) + zkhandler.write([ + ('/cmd/domains', flush_locks_string) + ]) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/cmd/domains') + lock = zkhandler.readlock('/cmd/domains') with lock: try: - result = zkhandler.readdata(zk_conn, '/cmd/domains').split()[0] + result = zkhandler.read('/cmd/domains').split()[0] if result == 'success-flush_locks': message = 'Flushed locks on VM "{}"'.format(domain) success = True @@ -145,15 +160,17 @@ def flush_locks(zk_conn, domain): success = False # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/cmd/domains') + lock = zkhandler.writelock('/cmd/domains') with lock: time.sleep(0.5) - zkhandler.writedata(zk_conn, {'/cmd/domains': ''}) + zkhandler.write([ + ('/cmd/domains', '') + ]) return success, message -def define_vm(zk_conn, config_data, target_node, node_limit, node_selector, node_autostart, migration_method=None, profile=None, initial_state='stop'): +def define_vm(zkhandler, config_data, target_node, node_limit, node_selector, node_autostart, migration_method=None, profile=None, initial_state='stop'): # Parse the XML data try: parsed_xml = lxml.objectify.fromstring(config_data) @@ -163,14 +180,14 @@ def define_vm(zk_conn, config_data, target_node, node_limit, node_selector, node dom_name = parsed_xml.name.text # Ensure that the UUID and name are unique - if searchClusterByUUID(zk_conn, dom_uuid) or searchClusterByName(zk_conn, dom_name): + if searchClusterByUUID(zkhandler, dom_uuid) or searchClusterByName(zkhandler, dom_name): return False, 'ERROR: Specified VM "{}" or UUID "{}" matches an existing VM on the cluster'.format(dom_name, dom_uuid) if not target_node: - target_node = common.findTargetNode(zk_conn, dom_uuid) + target_node = common.findTargetNode(zkhandler, dom_uuid) else: # Verify node is valid - valid_node = common.verifyNode(zk_conn, target_node) + valid_node = common.verifyNode(zkhandler, target_node) if not valid_node: return False, 'ERROR: Specified node "{}" is invalid.'.format(target_node) @@ -194,64 +211,64 @@ def define_vm(zk_conn, config_data, target_node, node_limit, node_selector, node formatted_rbd_list = '' # Add the new domain to Zookeeper - zkhandler.writedata(zk_conn, { - '/domains/{}'.format(dom_uuid): dom_name, - '/domains/{}/state'.format(dom_uuid): initial_state, - '/domains/{}/node'.format(dom_uuid): target_node, - '/domains/{}/lastnode'.format(dom_uuid): '', - '/domains/{}/node_limit'.format(dom_uuid): formatted_node_limit, - '/domains/{}/node_selector'.format(dom_uuid): node_selector, - '/domains/{}/node_autostart'.format(dom_uuid): node_autostart, - '/domains/{}/migration_method'.format(dom_uuid): migration_method, - '/domains/{}/failedreason'.format(dom_uuid): '', - '/domains/{}/consolelog'.format(dom_uuid): '', - '/domains/{}/rbdlist'.format(dom_uuid): formatted_rbd_list, - '/domains/{}/profile'.format(dom_uuid): profile, - '/domains/{}/vnc'.format(dom_uuid): '', - '/domains/{}/xml'.format(dom_uuid): config_data - }) + zkhandler.write([ + ('/domains/{}'.format(dom_uuid), dom_name), + ('/domains/{}/state'.format(dom_uuid), initial_state), + ('/domains/{}/node'.format(dom_uuid), target_node), + ('/domains/{}/lastnode'.format(dom_uuid), ''), + ('/domains/{}/node_limit'.format(dom_uuid), formatted_node_limit), + ('/domains/{}/node_selector'.format(dom_uuid), node_selector), + ('/domains/{}/node_autostart'.format(dom_uuid), node_autostart), + ('/domains/{}/migration_method'.format(dom_uuid), migration_method), + ('/domains/{}/failedreason'.format(dom_uuid), ''), + ('/domains/{}/consolelog'.format(dom_uuid), ''), + ('/domains/{}/rbdlist'.format(dom_uuid), formatted_rbd_list), + ('/domains/{}/profile'.format(dom_uuid), profile), + ('/domains/{}/vnc'.format(dom_uuid), ''), + ('/domains/{}/xml'.format(dom_uuid), config_data) + ]) return True, 'Added new VM with Name "{}" and UUID "{}" to database.'.format(dom_name, dom_uuid) -def modify_vm_metadata(zk_conn, domain, node_limit, node_selector, node_autostart, provisioner_profile, migration_method): - dom_uuid = getDomainUUID(zk_conn, domain) +def modify_vm_metadata(zkhandler, domain, node_limit, node_selector, node_autostart, provisioner_profile, migration_method): + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) if node_limit is not None: - zkhandler.writedata(zk_conn, { - '/domains/{}/node_limit'.format(dom_uuid): node_limit - }) + zkhandler.write([ + ('/domains/{}/node_limit'.format(dom_uuid), node_limit) + ]) if node_selector is not None: - zkhandler.writedata(zk_conn, { - '/domains/{}/node_selector'.format(dom_uuid): node_selector - }) + zkhandler.write([ + ('/domains/{}/node_selector'.format(dom_uuid), node_selector) + ]) if node_autostart is not None: - zkhandler.writedata(zk_conn, { - '/domains/{}/node_autostart'.format(dom_uuid): node_autostart - }) + zkhandler.write([ + ('/domains/{}/node_autostart'.format(dom_uuid), node_autostart) + ]) if provisioner_profile is not None: - zkhandler.writedata(zk_conn, { - '/domains/{}/profile'.format(dom_uuid): provisioner_profile - }) + zkhandler.write([ + ('/domains/{}/profile'.format(dom_uuid), provisioner_profile) + ]) if migration_method is not None: - zkhandler.writedata(zk_conn, { - '/domains/{}/migration_method'.format(dom_uuid): migration_method - }) + zkhandler.write([ + ('/domains/{}/migration_method'.format(dom_uuid), migration_method) + ]) return True, 'Successfully modified PVC metadata of VM "{}".'.format(domain) -def modify_vm(zk_conn, domain, restart, new_vm_config): - dom_uuid = getDomainUUID(zk_conn, domain) +def modify_vm(zkhandler, domain, restart, new_vm_config): + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) - dom_name = getDomainName(zk_conn, domain) + dom_name = getDomainName(zkhandler, domain) # Parse and valiate the XML try: @@ -273,45 +290,41 @@ def modify_vm(zk_conn, domain, restart, new_vm_config): formatted_rbd_list = '' # Add the modified config to Zookeeper - zk_data = { - '/domains/{}'.format(dom_uuid): dom_name, - '/domains/{}/rbdlist'.format(dom_uuid): formatted_rbd_list, - '/domains/{}/xml'.format(dom_uuid): new_vm_config - } - zkhandler.writedata(zk_conn, zk_data) + zkhandler.write([ + ('/domains/{}'.format(dom_uuid), dom_name), + ('/domains/{}/rbdlist'.format(dom_uuid), formatted_rbd_list), + ('/domains/{}/xml'.format(dom_uuid), new_vm_config) + ]) if restart: - lock = zkhandler.exclusivelock(zk_conn, '/domains/{}/state'.format(dom_uuid)) - lock.acquire() - zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'restart'}) - lock.release() + change_state(zkhandler, dom_uuid, 'restart') return True, 'Successfully modified configuration of VM "{}".'.format(domain) -def dump_vm(zk_conn, domain): - dom_uuid = getDomainUUID(zk_conn, domain) +def dump_vm(zkhandler, domain): + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Gram the domain XML and dump it to stdout - vm_xml = zkhandler.readdata(zk_conn, '/domains/{}/xml'.format(dom_uuid)) + vm_xml = zkhandler.read('/domains/{}/xml'.format(dom_uuid)) return True, vm_xml -def rename_vm(zk_conn, domain, new_domain): - dom_uuid = getDomainUUID(zk_conn, domain) +def rename_vm(zkhandler, domain, new_domain): + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Verify that the VM is in a stopped state; renaming is not supported otherwise - state = zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) + state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) if state != 'stop': return False, 'ERROR: VM "{}" is not in stopped state; VMs cannot be renamed while running.'.format(domain) # Parse and valiate the XML - vm_config = common.getDomainXML(zk_conn, dom_uuid) + vm_config = common.getDomainXML(zkhandler, dom_uuid) # Obtain the RBD disk list using the common functions ddisks = common.getDomainDisks(vm_config, {}) @@ -328,207 +341,180 @@ def rename_vm(zk_conn, domain, new_domain): # Skip renaming if nothing changed if rbd_new == rbd: continue - ceph.rename_volume(zk_conn, pool_list[idx], rbd, rbd_new) + ceph.rename_volume(zkhandler, pool_list[idx], rbd, rbd_new) # Replace the name in the config vm_config_new = lxml.etree.tostring(vm_config, encoding='ascii', method='xml').decode().replace(domain, new_domain) # Get VM information - _b, dom_info = get_info(zk_conn, dom_uuid) + _b, dom_info = get_info(zkhandler, dom_uuid) # Undefine the old VM - undefine_vm(zk_conn, dom_uuid) + undefine_vm(zkhandler, dom_uuid) # Define the new VM - define_vm(zk_conn, vm_config_new, dom_info['node'], dom_info['node_limit'], dom_info['node_selector'], dom_info['node_autostart'], migration_method=dom_info['migration_method'], profile=dom_info['profile'], initial_state='stop') + define_vm(zkhandler, vm_config_new, dom_info['node'], dom_info['node_limit'], dom_info['node_selector'], dom_info['node_autostart'], migration_method=dom_info['migration_method'], profile=dom_info['profile'], initial_state='stop') # If the VM is migrated, store that if dom_info['migrated'] != 'no': - zkhandler.writedata(zk_conn, {'/domains/{}/lastnode'.format(dom_uuid): dom_info['last_node']}) + zkhandler.write([ + ('/domains/{}/lastnode'.format(dom_uuid), dom_info['last_node']) + ]) return True, 'Successfully renamed VM "{}" to "{}".'.format(domain, new_domain) -def undefine_vm(zk_conn, domain): +def undefine_vm(zkhandler, domain): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Shut down the VM - current_vm_state = zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) + current_vm_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) if current_vm_state != 'stop': - # Set the domain into stop mode - lock = zkhandler.exclusivelock(zk_conn, '/domains/{}/state'.format(dom_uuid)) - lock.acquire() - zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'stop'}) - lock.release() - - # Wait for 2 seconds to allow state to flow to all nodes - time.sleep(2) + change_state(zkhandler, dom_uuid, 'stop') # Gracefully terminate the class instances - zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'delete'}) - time.sleep(2) + change_state(zkhandler, dom_uuid, 'delete') # Delete the configurations - zkhandler.deletekey(zk_conn, '/domains/{}'.format(dom_uuid)) + zkhandler.delete('/domains/{}'.format(dom_uuid)) return True, 'Undefined VM "{}" from the cluster.'.format(domain) -def remove_vm(zk_conn, domain): +def remove_vm(zkhandler, domain): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) - disk_list = common.getDomainDiskList(zk_conn, dom_uuid) + disk_list = common.getDomainDiskList(zkhandler, dom_uuid) # Shut down the VM - current_vm_state = zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) + current_vm_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) if current_vm_state != 'stop': - # Set the domain into stop mode - lock = zkhandler.exclusivelock(zk_conn, '/domains/{}/state'.format(dom_uuid)) - lock.acquire() - zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'stop'}) - lock.release() - - # Wait for 2 seconds to allow state to flow to all nodes - time.sleep(2) + change_state(zkhandler, dom_uuid, 'stop') # Gracefully terminate the class instances - zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'delete'}) - time.sleep(2) + change_state(zkhandler, dom_uuid, 'delete') # Delete the configurations - zkhandler.deletekey(zk_conn, '/domains/{}'.format(dom_uuid)) - time.sleep(2) + zkhandler.delete('/domains/{}'.format(dom_uuid)) + + # Wait for 1 second to allow state to flow to all nodes + time.sleep(1) # Remove disks for disk in disk_list: # vmpool/vmname_volume try: disk_pool, disk_name = disk.split('/') - retcode, message = ceph.remove_volume(zk_conn, disk_pool, disk_name) + retcode, message = ceph.remove_volume(zkhandler, disk_pool, disk_name) except ValueError: continue return True, 'Removed VM "{}" and disks from the cluster.'.format(domain) -def start_vm(zk_conn, domain): +def start_vm(zkhandler, domain): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Set the VM to start - lock = zkhandler.exclusivelock(zk_conn, '/domains/{}/state'.format(dom_uuid)) - lock.acquire() - zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'start'}) - lock.release() + change_state(zkhandler, dom_uuid, 'start') return True, 'Starting VM "{}".'.format(domain) -def restart_vm(zk_conn, domain, wait=False): +def restart_vm(zkhandler, domain, wait=False): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get state and verify we're OK to proceed - current_state = zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) + current_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) if current_state != 'start': return False, 'ERROR: VM "{}" is not in "start" state!'.format(domain) retmsg = 'Restarting VM "{}".'.format(domain) # Set the VM to restart - lock = zkhandler.exclusivelock(zk_conn, '/domains/{}/state'.format(dom_uuid)) - lock.acquire() - zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'restart'}) - lock.release() + change_state(zkhandler, dom_uuid, 'restart') if wait: - while zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) == 'restart': + while zkhandler.read('/domains/{}/state'.format(dom_uuid)) == 'restart': time.sleep(1) retmsg = 'Restarted VM "{}"'.format(domain) return True, retmsg -def shutdown_vm(zk_conn, domain, wait=False): +def shutdown_vm(zkhandler, domain, wait=False): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get state and verify we're OK to proceed - current_state = zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) + current_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) if current_state != 'start': return False, 'ERROR: VM "{}" is not in "start" state!'.format(domain) retmsg = 'Shutting down VM "{}"'.format(domain) # Set the VM to shutdown - lock = zkhandler.exclusivelock(zk_conn, '/domains/{}/state'.format(dom_uuid)) - lock.acquire() - zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'shutdown'}) - lock.release() + change_state(zkhandler, dom_uuid, 'shutdown') if wait: - while zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) == 'shutdown': + while zkhandler.read('/domains/{}/state'.format(dom_uuid)) == 'shutdown': time.sleep(1) retmsg = 'Shut down VM "{}"'.format(domain) return True, retmsg -def stop_vm(zk_conn, domain): +def stop_vm(zkhandler, domain): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) - # Set the VM to start - lock = zkhandler.exclusivelock(zk_conn, '/domains/{}/state'.format(dom_uuid)) - lock.acquire() - zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'stop'}) - lock.release() + # Set the VM to stop + change_state(zkhandler, dom_uuid, 'stop') return True, 'Forcibly stopping VM "{}".'.format(domain) -def disable_vm(zk_conn, domain): +def disable_vm(zkhandler, domain): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get state and verify we're OK to proceed - current_state = zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) + current_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) if current_state != 'stop': return False, 'ERROR: VM "{}" must be stopped before disabling!'.format(domain) - # Set the VM to start - lock = zkhandler.exclusivelock(zk_conn, '/domains/{}/state'.format(dom_uuid)) - lock.acquire() - zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'disable'}) - lock.release() + # Set the VM to disable + change_state(zkhandler, dom_uuid, 'disable') return True, 'Marked VM "{}" as disable.'.format(domain) -def move_vm(zk_conn, domain, target_node, wait=False, force_live=False): +def move_vm(zkhandler, domain, target_node, wait=False, force_live=False): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get state and verify we're OK to proceed - current_state = zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) + current_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) if current_state != 'start': # If the current state isn't start, preserve it; we're not doing live migration target_state = current_state @@ -538,26 +524,28 @@ def move_vm(zk_conn, domain, target_node, wait=False, force_live=False): else: target_state = 'migrate' - current_node = zkhandler.readdata(zk_conn, '/domains/{}/node'.format(dom_uuid)) + current_node = zkhandler.read('/domains/{}/node'.format(dom_uuid)) if not target_node: - target_node = common.findTargetNode(zk_conn, dom_uuid) + target_node = common.findTargetNode(zkhandler, dom_uuid) else: # Verify node is valid - valid_node = common.verifyNode(zk_conn, target_node) + valid_node = common.verifyNode(zkhandler, target_node) if not valid_node: return False, 'ERROR: Specified node "{}" is invalid.'.format(target_node) # Check if node is within the limit - node_limit = zkhandler.readdata(zk_conn, '/domains/{}/node_limit'.format(dom_uuid)) + node_limit = zkhandler.read('/domains/{}/node_limit'.format(dom_uuid)) if node_limit and target_node not in node_limit.split(','): return False, 'ERROR: Specified node "{}" is not in the allowed list of nodes for VM "{}".'.format(target_node, domain) # Verify if node is current node if target_node == current_node: - last_node = zkhandler.readdata(zk_conn, '/domains/{}/lastnode'.format(dom_uuid)) + last_node = zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)) if last_node: - zkhandler.writedata(zk_conn, {'/domains/{}/lastnode'.format(dom_uuid): ''}) + zkhandler.write([ + ('/domains/{}/lastnode'.format(dom_uuid), '') + ]) return True, 'Making temporary migration permanent for VM "{}".'.format(domain) return False, 'ERROR: VM "{}" is already running on node "{}".'.format(domain, current_node) @@ -567,31 +555,33 @@ def move_vm(zk_conn, domain, target_node, wait=False, force_live=False): retmsg = 'Permanently migrating VM "{}" to node "{}".'.format(domain, target_node) - lock = zkhandler.exclusivelock(zk_conn, '/domains/{}/state'.format(dom_uuid)) - lock.acquire() - zkhandler.writedata(zk_conn, { - '/domains/{}/state'.format(dom_uuid): target_state, - '/domains/{}/node'.format(dom_uuid): target_node, - '/domains/{}/lastnode'.format(dom_uuid): '' - }) - lock.release() + lock = zkhandler.exclusivelock('/domains/{}/state'.format(dom_uuid)) + with lock: + zkhandler.write([ + ('/domains/{}/state'.format(dom_uuid), target_state), + ('/domains/{}/node'.format(dom_uuid), target_node), + ('/domains/{}/lastnode'.format(dom_uuid), '') + ]) + + # Wait for 1/2 second for migration to start + time.sleep(0.5) if wait: - while zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) == target_state: + while zkhandler.read('/domains/{}/state'.format(dom_uuid)) == target_state: time.sleep(1) retmsg = 'Permanently migrated VM "{}" to node "{}"'.format(domain, target_node) return True, retmsg -def migrate_vm(zk_conn, domain, target_node, force_migrate, wait=False, force_live=False): +def migrate_vm(zkhandler, domain, target_node, force_migrate, wait=False, force_live=False): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get state and verify we're OK to proceed - current_state = zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) + current_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) if current_state != 'start': # If the current state isn't start, preserve it; we're not doing live migration target_state = current_state @@ -601,22 +591,22 @@ def migrate_vm(zk_conn, domain, target_node, force_migrate, wait=False, force_li else: target_state = 'migrate' - current_node = zkhandler.readdata(zk_conn, '/domains/{}/node'.format(dom_uuid)) - last_node = zkhandler.readdata(zk_conn, '/domains/{}/lastnode'.format(dom_uuid)) + current_node = zkhandler.read('/domains/{}/node'.format(dom_uuid)) + last_node = zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)) if last_node and not force_migrate: return False, 'ERROR: VM "{}" has been previously migrated.'.format(domain) if not target_node: - target_node = common.findTargetNode(zk_conn, dom_uuid) + target_node = common.findTargetNode(zkhandler, dom_uuid) else: # Verify node is valid - valid_node = common.verifyNode(zk_conn, target_node) + valid_node = common.verifyNode(zkhandler, target_node) if not valid_node: return False, 'ERROR: Specified node "{}" is invalid.'.format(target_node) # Check if node is within the limit - node_limit = zkhandler.readdata(zk_conn, '/domains/{}/node_limit'.format(dom_uuid)) + node_limit = zkhandler.read('/domains/{}/node_limit'.format(dom_uuid)) if node_limit and target_node not in node_limit.split(','): return False, 'ERROR: Specified node "{}" is not in the allowed list of nodes for VM "{}".'.format(target_node, domain) @@ -633,31 +623,33 @@ def migrate_vm(zk_conn, domain, target_node, force_migrate, wait=False, force_li retmsg = 'Migrating VM "{}" to node "{}".'.format(domain, target_node) - lock = zkhandler.exclusivelock(zk_conn, '/domains/{}/state'.format(dom_uuid)) - lock.acquire() - zkhandler.writedata(zk_conn, { - '/domains/{}/state'.format(dom_uuid): target_state, - '/domains/{}/node'.format(dom_uuid): target_node, - '/domains/{}/lastnode'.format(dom_uuid): current_node - }) - lock.release() + lock = zkhandler.exclusivelock(zkhandler, '/domains/{}/state'.format(dom_uuid)) + with lock: + zkhandler.write([ + ('/domains/{}/state'.format(dom_uuid), target_state), + ('/domains/{}/node'.format(dom_uuid), target_node), + ('/domains/{}/lastnode'.format(dom_uuid), current_node) + ]) + + # Wait for 1/2 second for migration to start + time.sleep(0.5) if wait: - while zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) == target_state: + while zkhandler.read('/domains/{}/state'.format(dom_uuid)) == target_state: time.sleep(1) retmsg = 'Migrated VM "{}" to node "{}"'.format(domain, target_node) return True, retmsg -def unmigrate_vm(zk_conn, domain, wait=False, force_live=False): +def unmigrate_vm(zkhandler, domain, wait=False, force_live=False): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get state and verify we're OK to proceed - current_state = zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) + current_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) if current_state != 'start': # If the current state isn't start, preserve it; we're not doing live migration target_state = current_state @@ -667,38 +659,40 @@ def unmigrate_vm(zk_conn, domain, wait=False, force_live=False): else: target_state = 'migrate' - target_node = zkhandler.readdata(zk_conn, '/domains/{}/lastnode'.format(dom_uuid)) + target_node = zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)) if target_node == '': return False, 'ERROR: VM "{}" has not been previously migrated.'.format(domain) retmsg = 'Unmigrating VM "{}" back to node "{}".'.format(domain, target_node) - lock = zkhandler.exclusivelock(zk_conn, '/domains/{}/state'.format(dom_uuid)) - lock.acquire() - zkhandler.writedata(zk_conn, { - '/domains/{}/state'.format(dom_uuid): target_state, - '/domains/{}/node'.format(dom_uuid): target_node, - '/domains/{}/lastnode'.format(dom_uuid): '' - }) - lock.release() + lock = zkhandler.exclusivelock(zkhandler, '/domains/{}/state'.format(dom_uuid)) + with lock: + zkhandler.write([ + ('/domains/{}/state'.format(dom_uuid), target_state), + ('/domains/{}/node'.format(dom_uuid), target_node), + ('/domains/{}/lastnode'.format(dom_uuid), '') + ]) + + # Wait for 1/2 second for migration to start + time.sleep(0.5) if wait: - while zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) == target_state: + while zkhandler.read('/domains/{}/state'.format(dom_uuid)) == target_state: time.sleep(1) retmsg = 'Unmigrated VM "{}" back to node "{}"'.format(domain, target_node) return True, retmsg -def get_console_log(zk_conn, domain, lines=1000): +def get_console_log(zkhandler, domain, lines=1000): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get the data from ZK - console_log = zkhandler.readdata(zk_conn, '/domains/{}/consolelog'.format(dom_uuid)) + console_log = zkhandler.read('/domains/{}/consolelog'.format(dom_uuid)) # Shrink the log buffer to length lines shrunk_log = console_log.split('\n')[-lines:] @@ -707,24 +701,24 @@ def get_console_log(zk_conn, domain, lines=1000): return True, loglines -def get_info(zk_conn, domain): +def get_info(zkhandler, domain): # Validate that VM exists in cluster - dom_uuid = getDomainUUID(zk_conn, domain) + dom_uuid = getDomainUUID(zkhandler, domain) if not dom_uuid: return False, 'ERROR: No VM named "{}" is present in the cluster.'.format(domain) # Gather information from XML config and print it - domain_information = common.getInformationFromXML(zk_conn, dom_uuid) + domain_information = common.getInformationFromXML(zkhandler, dom_uuid) if not domain_information: return False, 'ERROR: Could not get information about VM "{}".'.format(domain) return True, domain_information -def get_list(zk_conn, node, state, limit, is_fuzzy=True): +def get_list(zkhandler, node, state, limit, is_fuzzy=True): if node: # Verify node is valid - if not common.verifyNode(zk_conn, node): + if not common.verifyNode(zkhandler, node): return False, 'Specified node "{}" is invalid.'.format(node) if state: @@ -732,7 +726,7 @@ def get_list(zk_conn, node, state, limit, is_fuzzy=True): if state not in valid_states: return False, 'VM state "{}" is not valid.'.format(state) - full_vm_list = zkhandler.listchildren(zk_conn, '/domains') + full_vm_list = zkhandler.children('/domains') vm_list = [] # Set our limit to a sensible regex @@ -751,33 +745,33 @@ def get_list(zk_conn, node, state, limit, is_fuzzy=True): vm_state = {} for vm in full_vm_list: # Check we don't match the limit - name = zkhandler.readdata(zk_conn, '/domains/{}'.format(vm)) - vm_node[vm] = zkhandler.readdata(zk_conn, '/domains/{}/node'.format(vm)) - vm_state[vm] = zkhandler.readdata(zk_conn, '/domains/{}/state'.format(vm)) + name = zkhandler.read('/domains/{}'.format(vm)) + vm_node[vm] = zkhandler.read('/domains/{}/node'.format(vm)) + vm_state[vm] = zkhandler.read('/domains/{}/state'.format(vm)) # Handle limiting if limit: try: if re.match(limit, vm): if not node and not state: - vm_list.append(common.getInformationFromXML(zk_conn, vm)) + vm_list.append(common.getInformationFromXML(zkhandler, vm)) else: if vm_node[vm] == node or vm_state[vm] == state: - vm_list.append(common.getInformationFromXML(zk_conn, vm)) + vm_list.append(common.getInformationFromXML(zkhandler, vm)) if re.match(limit, name): if not node and not state: - vm_list.append(common.getInformationFromXML(zk_conn, vm)) + vm_list.append(common.getInformationFromXML(zkhandler, vm)) else: if vm_node[vm] == node or vm_state[vm] == state: - vm_list.append(common.getInformationFromXML(zk_conn, vm)) + vm_list.append(common.getInformationFromXML(zkhandler, vm)) except Exception as e: return False, 'Regex Error: {}'.format(e) else: # Check node to avoid unneeded ZK calls if not node and not state: - vm_list.append(common.getInformationFromXML(zk_conn, vm)) + vm_list.append(common.getInformationFromXML(zkhandler, vm)) else: if vm_node[vm] == node or vm_state[vm] == state: - vm_list.append(common.getInformationFromXML(zk_conn, vm)) + vm_list.append(common.getInformationFromXML(zkhandler, vm)) return True, vm_list