From 059230d369c8e685e3a2949ae904fcb5da64263c Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Sun, 13 Jun 2021 13:41:21 -0400 Subject: [PATCH] Convert vm.py to new ZK schema handler --- daemon-common/vm.py | 185 ++++++++++++++++++++++---------------------- 1 file changed, 94 insertions(+), 91 deletions(-) diff --git a/daemon-common/vm.py b/daemon-common/vm.py index ef7dabfa..78342bac 100644 --- a/daemon-common/vm.py +++ b/daemon-common/vm.py @@ -34,11 +34,11 @@ import daemon_lib.ceph as ceph # def getClusterDomainList(zkhandler): # Get a list of UUIDs by listing the children of /domains - uuid_list = zkhandler.children('/domains') + uuid_list = zkhandler.children('base.domain') name_list = [] # For each UUID, get the corresponding name from the data for uuid in uuid_list: - name_list.append(zkhandler.read('/domains/{}'.format(uuid))) + name_list.append(zkhandler.read(('domain', uuid))) return uuid_list, name_list @@ -100,10 +100,10 @@ def getDomainName(zkhandler, domain): # Helper functions # def change_state(zkhandler, dom_uuid, new_state): - lock = zkhandler.exclusivelock('/domains/{}/state'.format(dom_uuid)) + lock = zkhandler.exclusivelock(('domain.state', dom_uuid)) with lock: zkhandler.write([ - ('/domains/{}/state'.format(dom_uuid), new_state) + (('domain.state', dom_uuid), new_state) ]) # Wait for 1/2 second to allow state to flow to all nodes @@ -119,7 +119,7 @@ def is_migrated(zkhandler, domain): if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) - last_node = zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)) + last_node = zkhandler.read(('domain.last_node', dom_uuid)) if last_node: return True else: @@ -133,22 +133,22 @@ def flush_locks(zkhandler, domain): return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Verify that the VM is in a stopped state; freeing locks is not safe otherwise - state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) + state = zkhandler.read(('domain.state', dom_uuid)) if state != 'stop': return False, 'ERROR: VM "{}" is not in stopped state; flushing RBD locks on a running VM is dangerous.'.format(domain) # Tell the cluster to create a new OSD for the host flush_locks_string = 'flush_locks {}'.format(dom_uuid) zkhandler.write([ - ('/cmd/domains', flush_locks_string) + ('base.cmd.domain', flush_locks_string) ]) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock('/cmd/domains') + lock = zkhandler.readlock('base.cmd.domain') with lock: try: - result = zkhandler.read('/cmd/domains').split()[0] + result = zkhandler.read('base.cmd.domain').split()[0] if result == 'success-flush_locks': message = 'Flushed locks on VM "{}"'.format(domain) success = True @@ -160,11 +160,11 @@ def flush_locks(zkhandler, domain): success = False # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock('/cmd/domains') + lock = zkhandler.writelock('base.cmd.domain') with lock: time.sleep(0.5) zkhandler.write([ - ('/cmd/domains', '') + ('base.cmd.domain', '') ]) return success, message @@ -212,20 +212,22 @@ def define_vm(zkhandler, config_data, target_node, node_limit, node_selector, no # Add the new domain to Zookeeper zkhandler.write([ - ('/domains/{}'.format(dom_uuid), dom_name), - ('/domains/{}/state'.format(dom_uuid), initial_state), - ('/domains/{}/node'.format(dom_uuid), target_node), - ('/domains/{}/lastnode'.format(dom_uuid), ''), - ('/domains/{}/node_limit'.format(dom_uuid), formatted_node_limit), - ('/domains/{}/node_selector'.format(dom_uuid), node_selector), - ('/domains/{}/node_autostart'.format(dom_uuid), node_autostart), - ('/domains/{}/migration_method'.format(dom_uuid), migration_method), - ('/domains/{}/failedreason'.format(dom_uuid), ''), - ('/domains/{}/consolelog'.format(dom_uuid), ''), - ('/domains/{}/rbdlist'.format(dom_uuid), formatted_rbd_list), - ('/domains/{}/profile'.format(dom_uuid), profile), - ('/domains/{}/vnc'.format(dom_uuid), ''), - ('/domains/{}/xml'.format(dom_uuid), config_data) + (('domain', dom_uuid), dom_name), + (('domain.xml', dom_uuid), config_data), + (('domain.state', dom_uuid), initial_state), + (('domain.profile', dom_uuid), profile), + (('domain.stats', dom_uuid), ''), + (('domain.node', dom_uuid), target_node), + (('domain.last_node', dom_uuid), ''), + (('domain.failed_reason', dom_uuid), ''), + (('domain.storage.volumes', dom_uuid), formatted_rbd_list), + (('domain.console.log', dom_uuid), ''), + (('domain.console.vnc', dom_uuid), ''), + (('domain.meta.autostart', dom_uuid), node_autostart), + (('domain.meta.migrate_method', dom_uuid), migration_method), + (('domain.meta.node_limit', dom_uuid), formatted_node_limit), + (('domain.meta.node_selector', dom_uuid), node_selector), + (('domain.migrate.sync-lock', dom_uuid), ''), ]) return True, 'Added new VM with Name "{}" and UUID "{}" to database.'.format(dom_name, dom_uuid) @@ -236,30 +238,27 @@ def modify_vm_metadata(zkhandler, domain, node_limit, node_selector, node_autost if not dom_uuid: return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) + update_list = list() + if node_limit is not None: - zkhandler.write([ - ('/domains/{}/node_limit'.format(dom_uuid), node_limit) - ]) + update_list.append((('domain.meta.node_limit', dom_uuid), node_limit)) if node_selector is not None: - zkhandler.write([ - ('/domains/{}/node_selector'.format(dom_uuid), node_selector) - ]) + update_list.append((('domain.meta.node_selector', dom_uuid), node_selector)) if node_autostart is not None: - zkhandler.write([ - ('/domains/{}/node_autostart'.format(dom_uuid), node_autostart) - ]) + update_list.append((('domain.meta.autostart', dom_uuid), node_autostart)) if provisioner_profile is not None: - zkhandler.write([ - ('/domains/{}/profile'.format(dom_uuid), provisioner_profile) - ]) + update_list.append((('domain.profile', dom_uuid), provisioner_profile)) if migration_method is not None: - zkhandler.write([ - ('/domains/{}/migration_method'.format(dom_uuid), migration_method) - ]) + update_list.append((('domain.meta.migrate_method', dom_uuid), migration_method)) + + if len(update_list) < 1: + return False, 'ERROR: No updates to apply.' + + zkhandler.write(update_list) return True, 'Successfully modified PVC metadata of VM "{}".'.format(domain) @@ -291,9 +290,9 @@ def modify_vm(zkhandler, domain, restart, new_vm_config): # Add the modified config to Zookeeper zkhandler.write([ - ('/domains/{}'.format(dom_uuid), dom_name), - ('/domains/{}/rbdlist'.format(dom_uuid), formatted_rbd_list), - ('/domains/{}/xml'.format(dom_uuid), new_vm_config) + (('domain', dom_uuid), dom_name), + (('domain.storage.volumes', dom_uuid), formatted_rbd_list), + (('domain.xml', dom_uuid), new_vm_config), ]) if restart: @@ -308,7 +307,7 @@ def dump_vm(zkhandler, domain): return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Gram the domain XML and dump it to stdout - vm_xml = zkhandler.read('/domains/{}/xml'.format(dom_uuid)) + vm_xml = zkhandler.read(('domain.xml', dom_uuid)) return True, vm_xml @@ -319,7 +318,7 @@ def rename_vm(zkhandler, domain, new_domain): return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Verify that the VM is in a stopped state; renaming is not supported otherwise - state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) + state = zkhandler.read(('domain.state', dom_uuid)) if state != 'stop': return False, 'ERROR: VM "{}" is not in stopped state; VMs cannot be renamed while running.'.format(domain) @@ -358,7 +357,7 @@ def rename_vm(zkhandler, domain, new_domain): # If the VM is migrated, store that if dom_info['migrated'] != 'no': zkhandler.write([ - ('/domains/{}/lastnode'.format(dom_uuid), dom_info['last_node']) + (('domain.last_node', dom_uuid), dom_info['last_node']) ]) return True, 'Successfully renamed VM "{}" to "{}".'.format(domain, new_domain) @@ -371,7 +370,7 @@ def undefine_vm(zkhandler, domain): return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Shut down the VM - current_vm_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) + current_vm_state = zkhandler.read(('domain.state', dom_uuid)) if current_vm_state != 'stop': change_state(zkhandler, dom_uuid, 'stop') @@ -379,7 +378,9 @@ def undefine_vm(zkhandler, domain): change_state(zkhandler, dom_uuid, 'delete') # Delete the configurations - zkhandler.delete('/domains/{}'.format(dom_uuid)) + zkhandler.delete([ + ('domain', dom_uuid) + ]) return True, 'Undefined VM "{}" from the cluster.'.format(domain) @@ -393,7 +394,7 @@ def remove_vm(zkhandler, domain): disk_list = common.getDomainDiskList(zkhandler, dom_uuid) # Shut down the VM - current_vm_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) + current_vm_state = zkhandler.read(('domain.state', dom_uuid)) if current_vm_state != 'stop': change_state(zkhandler, dom_uuid, 'stop') @@ -401,7 +402,9 @@ def remove_vm(zkhandler, domain): change_state(zkhandler, dom_uuid, 'delete') # Delete the configurations - zkhandler.delete('/domains/{}'.format(dom_uuid)) + zkhandler.delete([ + ('domain', dom_uuid) + ]) # Wait for 1 second to allow state to flow to all nodes time.sleep(1) @@ -437,7 +440,7 @@ def restart_vm(zkhandler, domain, wait=False): return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get state and verify we're OK to proceed - current_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) + current_state = zkhandler.read(('domain.state', dom_uuid)) if current_state != 'start': return False, 'ERROR: VM "{}" is not in "start" state!'.format(domain) @@ -447,8 +450,8 @@ def restart_vm(zkhandler, domain, wait=False): change_state(zkhandler, dom_uuid, 'restart') if wait: - while zkhandler.read('/domains/{}/state'.format(dom_uuid)) == 'restart': - time.sleep(1) + while zkhandler.read(('domain.state', dom_uuid)) == 'restart': + time.sleep(0.5) retmsg = 'Restarted VM "{}"'.format(domain) return True, retmsg @@ -461,7 +464,7 @@ def shutdown_vm(zkhandler, domain, wait=False): return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get state and verify we're OK to proceed - current_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) + current_state = zkhandler.read(('domain.state', dom_uuid)) if current_state != 'start': return False, 'ERROR: VM "{}" is not in "start" state!'.format(domain) @@ -471,8 +474,8 @@ def shutdown_vm(zkhandler, domain, wait=False): change_state(zkhandler, dom_uuid, 'shutdown') if wait: - while zkhandler.read('/domains/{}/state'.format(dom_uuid)) == 'shutdown': - time.sleep(1) + while zkhandler.read(('domain.state', dom_uuid)) == 'shutdown': + time.sleep(0.5) retmsg = 'Shut down VM "{}"'.format(domain) return True, retmsg @@ -497,7 +500,7 @@ def disable_vm(zkhandler, domain): return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get state and verify we're OK to proceed - current_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) + current_state = zkhandler.read(('domain.state', dom_uuid)) if current_state != 'stop': return False, 'ERROR: VM "{}" must be stopped before disabling!'.format(domain) @@ -514,7 +517,7 @@ def move_vm(zkhandler, domain, target_node, wait=False, force_live=False): return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get state and verify we're OK to proceed - current_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) + current_state = zkhandler.read(('domain.state', dom_uuid)) if current_state != 'start': # If the current state isn't start, preserve it; we're not doing live migration target_state = current_state @@ -524,7 +527,7 @@ def move_vm(zkhandler, domain, target_node, wait=False, force_live=False): else: target_state = 'migrate' - current_node = zkhandler.read('/domains/{}/node'.format(dom_uuid)) + current_node = zkhandler.read(('domain.node', dom_uuid)) if not target_node: target_node = common.findTargetNode(zkhandler, dom_uuid) @@ -535,16 +538,16 @@ def move_vm(zkhandler, domain, target_node, wait=False, force_live=False): return False, 'ERROR: Specified node "{}" is invalid.'.format(target_node) # Check if node is within the limit - node_limit = zkhandler.read('/domains/{}/node_limit'.format(dom_uuid)) + node_limit = zkhandler.read(('domain.meta.node_limit', dom_uuid)) if node_limit and target_node not in node_limit.split(','): return False, 'ERROR: Specified node "{}" is not in the allowed list of nodes for VM "{}".'.format(target_node, domain) # Verify if node is current node if target_node == current_node: - last_node = zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)) + last_node = zkhandler.read(('domain.last_node', dom_uuid)) if last_node: zkhandler.write([ - ('/domains/{}/lastnode'.format(dom_uuid), '') + (('domain.last_node', dom_uuid), '') ]) return True, 'Making temporary migration permanent for VM "{}".'.format(domain) @@ -555,20 +558,20 @@ def move_vm(zkhandler, domain, target_node, wait=False, force_live=False): retmsg = 'Permanently migrating VM "{}" to node "{}".'.format(domain, target_node) - lock = zkhandler.exclusivelock('/domains/{}/state'.format(dom_uuid)) + lock = zkhandler.exclusivelock(('domain.state', dom_uuid)) with lock: zkhandler.write([ - ('/domains/{}/state'.format(dom_uuid), target_state), - ('/domains/{}/node'.format(dom_uuid), target_node), - ('/domains/{}/lastnode'.format(dom_uuid), '') + (('domain.state', dom_uuid), target_state), + (('domain.node', dom_uuid), target_node), + (('domain.last_node', dom_uuid), '') ]) # Wait for 1/2 second for migration to start time.sleep(0.5) if wait: - while zkhandler.read('/domains/{}/state'.format(dom_uuid)) == target_state: - time.sleep(1) + while zkhandler.read(('domain.state', dom_uuid)) == target_state: + time.sleep(0.5) retmsg = 'Permanently migrated VM "{}" to node "{}"'.format(domain, target_node) return True, retmsg @@ -581,7 +584,7 @@ def migrate_vm(zkhandler, domain, target_node, force_migrate, wait=False, force_ return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get state and verify we're OK to proceed - current_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) + current_state = zkhandler.read(('domain.state', dom_uuid)) if current_state != 'start': # If the current state isn't start, preserve it; we're not doing live migration target_state = current_state @@ -591,8 +594,8 @@ def migrate_vm(zkhandler, domain, target_node, force_migrate, wait=False, force_ else: target_state = 'migrate' - current_node = zkhandler.read('/domains/{}/node'.format(dom_uuid)) - last_node = zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)) + current_node = zkhandler.read(('domain.node', dom_uuid)) + last_node = zkhandler.read(('domain.last_node', dom_uuid)) if last_node and not force_migrate: return False, 'ERROR: VM "{}" has been previously migrated.'.format(domain) @@ -606,7 +609,7 @@ def migrate_vm(zkhandler, domain, target_node, force_migrate, wait=False, force_ return False, 'ERROR: Specified node "{}" is invalid.'.format(target_node) # Check if node is within the limit - node_limit = zkhandler.read('/domains/{}/node_limit'.format(dom_uuid)) + node_limit = zkhandler.read(('domain.meta.node_limit', dom_uuid)) if node_limit and target_node not in node_limit.split(','): return False, 'ERROR: Specified node "{}" is not in the allowed list of nodes for VM "{}".'.format(target_node, domain) @@ -623,20 +626,20 @@ def migrate_vm(zkhandler, domain, target_node, force_migrate, wait=False, force_ retmsg = 'Migrating VM "{}" to node "{}".'.format(domain, target_node) - lock = zkhandler.exclusivelock('/domains/{}/state'.format(dom_uuid)) + lock = zkhandler.exclusivelock(('domain.state', dom_uuid)) with lock: zkhandler.write([ - ('/domains/{}/state'.format(dom_uuid), target_state), - ('/domains/{}/node'.format(dom_uuid), target_node), - ('/domains/{}/lastnode'.format(dom_uuid), current_node) + (('domain.state', dom_uuid), target_state), + (('domain.node', dom_uuid), target_node), + (('domain.last_node', dom_uuid), current_node) ]) # Wait for 1/2 second for migration to start time.sleep(0.5) if wait: - while zkhandler.read('/domains/{}/state'.format(dom_uuid)) == target_state: - time.sleep(1) + while zkhandler.read(('domain.state', dom_uuid)) == target_state: + time.sleep(0.5) retmsg = 'Migrated VM "{}" to node "{}"'.format(domain, target_node) return True, retmsg @@ -649,7 +652,7 @@ def unmigrate_vm(zkhandler, domain, wait=False, force_live=False): return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get state and verify we're OK to proceed - current_state = zkhandler.read('/domains/{}/state'.format(dom_uuid)) + current_state = zkhandler.read(('domain.state', dom_uuid)) if current_state != 'start': # If the current state isn't start, preserve it; we're not doing live migration target_state = current_state @@ -659,27 +662,27 @@ def unmigrate_vm(zkhandler, domain, wait=False, force_live=False): else: target_state = 'migrate' - target_node = zkhandler.read('/domains/{}/lastnode'.format(dom_uuid)) + target_node = zkhandler.read(('domain.last_node', dom_uuid)) if target_node == '': return False, 'ERROR: VM "{}" has not been previously migrated.'.format(domain) retmsg = 'Unmigrating VM "{}" back to node "{}".'.format(domain, target_node) - lock = zkhandler.exclusivelock('/domains/{}/state'.format(dom_uuid)) + lock = zkhandler.exclusivelock(('domain.state', dom_uuid)) with lock: zkhandler.write([ - ('/domains/{}/state'.format(dom_uuid), target_state), - ('/domains/{}/node'.format(dom_uuid), target_node), - ('/domains/{}/lastnode'.format(dom_uuid), '') + (('domain.state', dom_uuid), target_state), + (('domain.node', dom_uuid), target_node), + (('domain.last_node', dom_uuid), '') ]) # Wait for 1/2 second for migration to start time.sleep(0.5) if wait: - while zkhandler.read('/domains/{}/state'.format(dom_uuid)) == target_state: - time.sleep(1) + while zkhandler.read(('domain.state', dom_uuid)) == target_state: + time.sleep(0.5) retmsg = 'Unmigrated VM "{}" back to node "{}"'.format(domain, target_node) return True, retmsg @@ -692,7 +695,7 @@ def get_console_log(zkhandler, domain, lines=1000): return False, 'ERROR: Could not find VM "{}" in the cluster!'.format(domain) # Get the data from ZK - console_log = zkhandler.read('/domains/{}/consolelog'.format(dom_uuid)) + console_log = zkhandler.read(('domain.log.console', dom_uuid)) # Shrink the log buffer to length lines shrunk_log = console_log.split('\n')[-lines:] @@ -726,7 +729,7 @@ def get_list(zkhandler, node, state, limit, is_fuzzy=True): if state not in valid_states: return False, 'VM state "{}" is not valid.'.format(state) - full_vm_list = zkhandler.children('/domains') + full_vm_list = zkhandler.children('base.domain') vm_list = [] # Set our limit to a sensible regex @@ -745,9 +748,9 @@ def get_list(zkhandler, node, state, limit, is_fuzzy=True): vm_state = {} for vm in full_vm_list: # Check we don't match the limit - name = zkhandler.read('/domains/{}'.format(vm)) - vm_node[vm] = zkhandler.read('/domains/{}/node'.format(vm)) - vm_state[vm] = zkhandler.read('/domains/{}/state'.format(vm)) + name = zkhandler.read(('domain', vm)) + vm_node[vm] = zkhandler.read(('domain.node', vm)) + vm_state[vm] = zkhandler.read(('domain.state', vm)) # Handle limiting if limit: try: