From 790098f181697b56ea64418554b6c927cb1b348b Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Tue, 1 Jun 2021 11:46:27 -0400 Subject: [PATCH] Convert VMInstance to new zkhandler --- node-daemon/pvcnoded/Daemon.py | 4 +- .../pvcnoded/VMConsoleWatcherInstance.py | 10 +- node-daemon/pvcnoded/VMInstance.py | 211 +++++++++++------- 3 files changed, 138 insertions(+), 87 deletions(-) diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index 12e551a8..e1efbab2 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -963,7 +963,7 @@ if enable_hypervisor: @zkhandler.zk_conn.DataWatch('/cmd/domains') def cmd_domains(data, stat, event=''): if data: - VMInstance.run_command(zkhandler.zk_conn, logger, this_node, data.decode('ascii')) + VMInstance.run_command(zkhandler, logger, this_node, data.decode('ascii')) # VM domain objects @zkhandler.zk_conn.ChildrenWatch('/domains') @@ -973,7 +973,7 @@ if enable_hypervisor: # Add any missing domains to the list for domain in new_domain_list: if domain not in domain_list: - d_domain[domain] = VMInstance.VMInstance(domain, zkhandler.zk_conn, config, logger, this_node) + d_domain[domain] = VMInstance.VMInstance(domain, zkhandler, config, logger, this_node) # Remove any deleted domains from the list for domain in domain_list: diff --git a/node-daemon/pvcnoded/VMConsoleWatcherInstance.py b/node-daemon/pvcnoded/VMConsoleWatcherInstance.py index 9561c678..3b3d2e72 100644 --- a/node-daemon/pvcnoded/VMConsoleWatcherInstance.py +++ b/node-daemon/pvcnoded/VMConsoleWatcherInstance.py @@ -25,15 +25,13 @@ import time from threading import Thread, Event from collections import deque -import pvcnoded.zkhandler as zkhandler - class VMConsoleWatcherInstance(object): # Initialization function - def __init__(self, domuuid, domname, zk_conn, config, logger, this_node): + def __init__(self, domuuid, domname, zkhandler, config, logger, this_node): self.domuuid = domuuid self.domname = domname - self.zk_conn = zk_conn + self.zkhandler = zkhandler self.config = config self.logfile = '{}/{}.log'.format(config['console_log_directory'], self.domname) self.console_log_lines = config['console_log_lines'] @@ -93,7 +91,9 @@ class VMConsoleWatcherInstance(object): self.fetch_lines() # Update Zookeeper with the new loglines if they changed if self.loglines != self.last_loglines: - zkhandler.writedata(self.zk_conn, {'/domains/{}/consolelog'.format(self.domuuid): self.loglines}) + self.zkhandler.write([ + ('/domains/{}/consolelog'.format(self.domuuid), self.loglines) + ]) self.last_loglines = self.loglines def fetch_lines(self): diff --git a/node-daemon/pvcnoded/VMInstance.py b/node-daemon/pvcnoded/VMInstance.py index e4e1c2bb..8175de9b 100644 --- a/node-daemon/pvcnoded/VMInstance.py +++ b/node-daemon/pvcnoded/VMInstance.py @@ -28,7 +28,6 @@ from threading import Thread from xml.etree import ElementTree -import pvcnoded.zkhandler as zkhandler import pvcnoded.common as common import pvcnoded.VMConsoleWatcherInstance as VMConsoleWatcherInstance @@ -36,10 +35,10 @@ import pvcnoded.VMConsoleWatcherInstance as VMConsoleWatcherInstance import daemon_lib.common as daemon_common -def flush_locks(zk_conn, logger, dom_uuid, this_node=None): +def flush_locks(zkhandler, logger, dom_uuid, this_node=None): logger.out('Flushing RBD locks for VM "{}"'.format(dom_uuid), state='i') # Get the list of RBD images - rbd_list = zkhandler.readdata(zk_conn, '/domains/{}/rbdlist'.format(dom_uuid)).split(',') + rbd_list = zkhandler.read('/domains/{}/rbdlist'.format(dom_uuid)).split(',') for rbd in rbd_list: # Check if a lock exists lock_list_retcode, lock_list_stdout, lock_list_stderr = common.run_os_command('rbd lock list --format json {}'.format(rbd)) @@ -57,17 +56,21 @@ def flush_locks(zk_conn, logger, dom_uuid, this_node=None): if lock_list: # Loop through the locks for lock in lock_list: - if this_node is not None and zkhandler.readdata(zk_conn, '/domains/{}/state'.format(dom_uuid)) != 'stop' and lock['address'].split(':')[0] != this_node.storage_ipaddr: + if this_node is not None and zkhandler.read('/domains/{}/state'.format(dom_uuid)) != 'stop' and lock['address'].split(':')[0] != this_node.storage_ipaddr: logger.out('RBD lock does not belong to this host (lock owner: {}): freeing this lock would be unsafe, aborting'.format(lock['address'].split(':')[0], state='e')) - zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'fail'}) - zkhandler.writedata(zk_conn, {'/domains/{}/failedreason'.format(dom_uuid): 'Could not safely free RBD lock {} ({}) on volume {}; stop VM and flush locks manually'.format(lock['id'], lock['address'], rbd)}) + zkhandler.write([ + ('/domains/{}/state'.format(dom_uuid), 'fail'), + ('/domains/{}/failedreason'.format(dom_uuid), 'Could not safely free RBD lock {} ({}) on volume {}; stop VM and flush locks manually'.format(lock['id'], lock['address'], rbd)) + ]) break # Free the lock lock_remove_retcode, lock_remove_stdout, lock_remove_stderr = common.run_os_command('rbd lock remove {} "{}" "{}"'.format(rbd, lock['id'], lock['locker'])) if lock_remove_retcode != 0: logger.out('Failed to free RBD lock "{}" on volume "{}": {}'.format(lock['id'], rbd, lock_remove_stderr), state='e') - zkhandler.writedata(zk_conn, {'/domains/{}/state'.format(dom_uuid): 'fail'}) - zkhandler.writedata(zk_conn, {'/domains/{}/failedreason'.format(dom_uuid): 'Could not free RBD lock {} ({}) on volume {}: {}'.format(lock['id'], lock['address'], rbd, lock_remove_stderr)}) + zkhandler.write([ + ('/domains/{}/state'.format(dom_uuid), 'fail'), + ('/domains/{}/failedreason'.format(dom_uuid), 'Could not free RBD lock {} ({}) on volume {}: {}'.format(lock['id'], lock['address'], rbd, lock_remove_stderr)) + ]) break logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock['id'], rbd), state='o') @@ -75,7 +78,7 @@ def flush_locks(zk_conn, logger, dom_uuid, this_node=None): # Primary command function -def run_command(zk_conn, logger, this_node, data): +def run_command(zkhandler, logger, this_node, data): # Get the command and args command, args = data.split() @@ -86,45 +89,49 @@ def run_command(zk_conn, logger, this_node, data): # Verify that the VM is set to run on this node if this_node.d_domain[dom_uuid].getnode() == this_node.name: # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/cmd/domains') + zk_lock = zkhandler.writelock('/cmd/domains') with zk_lock: # Flush the lock - result = flush_locks(zk_conn, logger, dom_uuid, this_node) + result = flush_locks(zkhandler, logger, dom_uuid, this_node) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/cmd/domains': 'success-{}'.format(data)}) + zkhandler.write([ + ('/cmd/domains', 'success-{}'.format(data)) + ]) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/cmd/domains': 'failure-{}'.format(data)}) + zkhandler.write([ + ('/cmd/domains', 'failure-{}'.format(data)) + ]) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) class VMInstance(object): # Initialization function - def __init__(self, domuuid, zk_conn, config, logger, this_node): + def __init__(self, domuuid, zkhandler, config, logger, this_node): # Passed-in variables on creation self.domuuid = domuuid - self.zk_conn = zk_conn + self.zkhandler = zkhandler self.config = config self.logger = logger self.this_node = this_node # Get data from zookeeper - self.domname = zkhandler.readdata(zk_conn, '/domains/{}'.format(domuuid)) - self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) - self.node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) - self.lastnode = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(self.domuuid)) - self.last_currentnode = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) - self.last_lastnode = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(self.domuuid)) + self.domname = self.zkhandler.read('/domains/{}'.format(domuuid)) + self.state = self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) + self.node = self.zkhandler.read('/domains/{}/node'.format(self.domuuid)) + self.lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid)) + self.last_currentnode = self.zkhandler.read('/domains/{}/node'.format(self.domuuid)) + self.last_lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid)) try: - self.pinpolicy = zkhandler.readdata(self.zk_conn, '/domains/{}/pinpolicy'.format(self.domuuid)) + self.pinpolicy = self.zkhandler.read('/domains/{}/pinpolicy'.format(self.domuuid)) except Exception: self.pinpolicy = "none" try: - self.migration_method = zkhandler.readdata(self.zk_conn, '/domains/{}/migration_method'.format(self.domuuid)) + self.migration_method = self.zkhandler.read('/domains/{}/migration_method'.format(self.domuuid)) except Exception: self.migration_method = 'none' @@ -140,10 +147,10 @@ class VMInstance(object): self.dom = self.lookupByUUID(self.domuuid) # Log watcher instance - self.console_log_instance = VMConsoleWatcherInstance.VMConsoleWatcherInstance(self.domuuid, self.domname, self.zk_conn, self.config, self.logger, self.this_node) + self.console_log_instance = VMConsoleWatcherInstance.VMConsoleWatcherInstance(self.domuuid, self.domname, self.zkhandler, self.config, self.logger, self.this_node) # Watch for changes to the state field in Zookeeper - @self.zk_conn.DataWatch('/domains/{}/state'.format(self.domuuid)) + @self.zkhandler.zk_conn.DataWatch('/domains/{}/state'.format(self.domuuid)) def watch_state(data, stat, event=""): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -173,7 +180,7 @@ class VMInstance(object): if self.dom is not None: memory = int(self.dom.info()[2] / 1024) else: - domain_information = daemon_common.getInformationFromXML(self.zk_conn, self.domuuid) + domain_information = daemon_common.getInformationFromXML(self.zkhandler, self.domuuid) memory = int(domain_information['memory']) except Exception: memory = 0 @@ -195,7 +202,9 @@ class VMInstance(object): # Add the domain to the domain_list array self.this_node.domain_list.append(self.domuuid) # Push the change up to Zookeeper - zkhandler.writedata(self.zk_conn, {'/nodes/{}/runningdomains'.format(self.this_node.name): ' '.join(self.this_node.domain_list)}) + self.zkhandler.write([ + ('/nodes/{}/runningdomains'.format(self.this_node.name), ' '.join(self.this_node.domain_list)) + ]) except Exception as e: self.logger.out('Error adding domain to list: {}'.format(e), state='e') @@ -205,7 +214,9 @@ class VMInstance(object): # Remove the domain from the domain_list array self.this_node.domain_list.remove(self.domuuid) # Push the change up to Zookeeper - zkhandler.writedata(self.zk_conn, {'/nodes/{}/runningdomains'.format(self.this_node.name): ' '.join(self.this_node.domain_list)}) + self.zkhandler.write([ + ('/nodes/{}/runningdomains'.format(self.this_node.name), ' '.join(self.this_node.domain_list)) + ]) except Exception as e: self.logger.out('Error removing domain from list: {}'.format(e), state='e') @@ -218,11 +229,17 @@ class VMInstance(object): self.logger.out('Updating VNC data', state='i', prefix='Domain {}'.format(self.domuuid)) port = graphics.get('port', '') listen = graphics.get('listen', '') - zkhandler.writedata(self.zk_conn, {'/domains/{}/vnc'.format(self.domuuid): '{}:{}'.format(listen, port)}) + self.zkhandler.write([ + ('/domains/{}/vnc'.format(self.domuuid), '{}:{}'.format(listen, port)) + ]) else: - zkhandler.writedata(self.zk_conn, {'/domains/{}/vnc'.format(self.domuuid): ''}) + self.zkhandler.write([ + ('/domains/{}/vnc'.format(self.domuuid), '') + ]) else: - zkhandler.writedata(self.zk_conn, {'/domains/{}/vnc'.format(self.domuuid): ''}) + self.zkhandler.write([ + ('/domains/{}/vnc'.format(self.domuuid), '') + ]) # Start up the VM def start_vm(self): @@ -251,8 +268,8 @@ class VMInstance(object): if self.getdom() is None or self.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING: # Flush locks self.logger.out('Flushing RBD locks', state='i', prefix='Domain {}'.format(self.domuuid)) - flush_locks(self.zk_conn, self.logger, self.domuuid, self.this_node) - if zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) == 'fail': + flush_locks(self.zkhandler, self.logger, self.domuuid, self.this_node) + if self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) == 'fail': lv_conn.close() self.dom = None self.instart = False @@ -261,21 +278,27 @@ class VMInstance(object): if curstate == libvirt.VIR_DOMAIN_RUNNING: # If it is running just update the model self.addDomainToList() - zkhandler.writedata(self.zk_conn, {'/domains/{}/failedreason'.format(self.domuuid): ''}) + self.zkhandler.write([ + ('/domains/{}/failedreason'.format(self.domuuid), '') + ]) else: # Or try to create it try: # Grab the domain information from Zookeeper - xmlconfig = zkhandler.readdata(self.zk_conn, '/domains/{}/xml'.format(self.domuuid)) + xmlconfig = self.zkhandler.read('/domains/{}/xml'.format(self.domuuid)) dom = lv_conn.createXML(xmlconfig, 0) self.addDomainToList() self.logger.out('Successfully started VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.dom = dom - zkhandler.writedata(self.zk_conn, {'/domains/{}/failedreason'.format(self.domuuid): ''}) + self.zkhandler.write([ + ('/domains/{}/failedreason'.format(self.domuuid), '') + ]) except libvirt.libvirtError as e: self.logger.out('Failed to create VM', state='e', prefix='Domain {}'.format(self.domuuid)) - zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'fail'}) - zkhandler.writedata(self.zk_conn, {'/domains/{}/failedreason'.format(self.domuuid): str(e)}) + self.zkhandler.write([ + ('/domains/{}/state'.format(self.domuuid), 'fail'), + ('/domains/{}/failedreason'.format(self.domuuid), str(e)) + ]) lv_conn.close() self.dom = None self.instart = False @@ -303,7 +326,9 @@ class VMInstance(object): self.start_vm() self.addDomainToList() - zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'start'}) + self.zkhandler.write([ + ('/domains/{}/state'.format(self.domuuid), 'start') + ]) lv_conn.close() self.inrestart = False @@ -334,7 +359,9 @@ class VMInstance(object): self.removeDomainFromList() if self.inrestart is False: - zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'stop'}) + self.zkhandler.write([ + ('/domains/{}/state'.format(self.domuuid), 'stop') + ]) self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.dom = None @@ -355,7 +382,7 @@ class VMInstance(object): time.sleep(1) # Abort shutdown if the state changes to start - current_state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) + current_state = self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) if current_state not in ['shutdown', 'restart']: self.logger.out('Aborting VM shutdown due to state change', state='i', prefix='Domain {}'.format(self.domuuid)) is_aborted = True @@ -368,7 +395,9 @@ class VMInstance(object): if lvdomstate != libvirt.VIR_DOMAIN_RUNNING: self.removeDomainFromList() - zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'stop'}) + self.zkhandler.write([ + ('/domains/{}/state'.format(self.domuuid), 'stop') + ]) self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.dom = None # Stop the log watcher @@ -377,7 +406,9 @@ class VMInstance(object): if tick >= self.config['vm_shutdown_timeout']: self.logger.out('Shutdown timeout ({}s) expired, forcing off'.format(self.config['vm_shutdown_timeout']), state='e', prefix='Domain {}'.format(self.domuuid)) - zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'stop'}) + self.zkhandler.write([ + ('/domains/{}/state'.format(self.domuuid), 'stop') + ]) break self.inshutdown = False @@ -388,7 +419,9 @@ class VMInstance(object): if self.inrestart: # Wait to prevent race conditions time.sleep(1) - zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'start'}) + self.zkhandler.write([ + ('/domains/{}/state'.format(self.domuuid), 'start') + ]) # Migrate the VM to a target host def migrate_vm(self, force_live=False, force_shutdown=False): @@ -405,24 +438,24 @@ class VMInstance(object): self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid)) # Used for sanity checking later - target_node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) + target_node = self.zkhandler.read('/domains/{}/node'.format(self.domuuid)) aborted = False def abort_migrate(reason): - zkhandler.writedata(self.zk_conn, { - '/domains/{}/state'.format(self.domuuid): 'start', - '/domains/{}/node'.format(self.domuuid): self.this_node.name, - '/domains/{}/lastnode'.format(self.domuuid): self.last_lastnode - }) + self.zkhandler.write([ + ('/domains/{}/state'.format(self.domuuid), 'start'), + ('/domains/{}/node'.format(self.domuuid), self.this_node.name), + ('/domains/{}/lastnode'.format(self.domuuid), self.last_lastnode) + ]) migrate_lock_node.release() migrate_lock_state.release() self.inmigrate = False self.logger.out('Aborted migration: {}'.format(reason), state='i', prefix='Domain {}'.format(self.domuuid)) # Acquire exclusive lock on the domain node key - migrate_lock_node = zkhandler.exclusivelock(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) - migrate_lock_state = zkhandler.exclusivelock(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) + migrate_lock_node = self.zkhandler.exclusivelock('/domains/{}/node'.format(self.domuuid)) + migrate_lock_state = self.zkhandler.exclusivelock('/domains/{}/state'.format(self.domuuid)) migrate_lock_node.acquire() migrate_lock_state.acquire() @@ -434,14 +467,14 @@ class VMInstance(object): return # Synchronize nodes A (I am reader) - lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.readlock('/locks/domain_migrate/{}'.format(self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid)) - if zkhandler.readdata(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) == '': + if self.zkhandler.read('/locks/domain_migrate/{}'.format(self.domuuid)) == '': self.logger.out('Waiting for peer', state='i', prefix='Domain {}'.format(self.domuuid)) ticks = 0 - while zkhandler.readdata(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) == '': + while self.zkhandler.read('/locks/domain_migrate/{}'.format(self.domuuid)) == '': time.sleep(0.1) ticks += 1 if ticks > 300: @@ -457,7 +490,7 @@ class VMInstance(object): return # Synchronize nodes B (I am writer) - lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.writelock('/locks/domain_migrate/{}'.format(self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) @@ -498,8 +531,10 @@ class VMInstance(object): def migrate_shutdown(): self.logger.out('Shutting down VM for offline migration', state='i', prefix='Domain {}'.format(self.domuuid)) - zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'shutdown'}) - while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) != 'stop': + self.zkhandler.write([ + ('/domains/{}/state'.format(self.domuuid), 'shutdown') + ]) + while self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) != 'stop': time.sleep(0.5) return True @@ -545,7 +580,7 @@ class VMInstance(object): return # Synchronize nodes C (I am writer) - lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.writelock('/locks/domain_migrate/{}'.format(self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) @@ -559,20 +594,20 @@ class VMInstance(object): self.logger.out('Released write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) # Synchronize nodes D (I am reader) - lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.readlock('/locks/domain_migrate/{}'.format(self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) - self.last_currentnode = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) - self.last_lastnode = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(self.domuuid)) + self.last_currentnode = self.zkhandler.read('/domains/{}/node'.format(self.domuuid)) + self.last_lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid)) self.logger.out('Releasing read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) lock.release() self.logger.out('Released read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) # Wait for the receive side to complete before we declare all-done and release locks - while zkhandler.readdata(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) != '': + while self.zkhandler.read('/locks/domain_migrate/{}'.format(self.domuuid)) != '': time.sleep(0.5) migrate_lock_node.release() migrate_lock_state.release() @@ -591,10 +626,12 @@ class VMInstance(object): self.logger.out('Receiving VM migration from node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid)) # Ensure our lock key is populated - zkhandler.writedata(self.zk_conn, {'/locks/domain_migrate/{}'.format(self.domuuid): self.domuuid}) + self.zkhandler.write([ + ('/locks/domain_migrate/{}'.format(self.domuuid), self.domuuid) + ]) # Synchronize nodes A (I am writer) - lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.writelock('/locks/domain_migrate/{}'.format(self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid)) @@ -605,7 +642,7 @@ class VMInstance(object): time.sleep(0.1) # Time fir new writer to acquire the lock # Synchronize nodes B (I am reader) - lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.readlock('/locks/domain_migrate/{}'.format(self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) @@ -614,38 +651,44 @@ class VMInstance(object): self.logger.out('Released read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) # Synchronize nodes C (I am reader) - lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.readlock('/locks/domain_migrate/{}'.format(self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) # Set the updated data - self.last_currentnode = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) - self.last_lastnode = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(self.domuuid)) + self.last_currentnode = self.zkhandler.read('/domains/{}/node'.format(self.domuuid)) + self.last_lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid)) self.logger.out('Releasing read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) lock.release() self.logger.out('Released read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) # Synchronize nodes D (I am writer) - lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.writelock('/locks/domain_migrate/{}'.format(self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) time.sleep(0.5) # Time fir reader to acquire the lock - self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) + self.state = self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) self.dom = self.lookupByUUID(self.domuuid) if self.dom: lvdomstate = self.dom.state()[0] if lvdomstate == libvirt.VIR_DOMAIN_RUNNING: # VM has been received and started self.addDomainToList() - zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'start'}) + self.zkhandler.write([ + ('/domains/{}/state'.format(self.domuuid), 'start') + ]) self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}'.format(self.domuuid)) else: # The receive somehow failed - zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'fail'}) + self.zkhandler.write([ + ('/domains/{}/state'.format(self.domuuid), 'fail'), + ('/domains/{}/failed_reason'.format(self.domuuid), 'Failed to receive migration') + ]) + self.logger.out('Failed to receive migrated VM', state='e', prefix='Domain {}'.format(self.domuuid)) else: if self.node == self.this_node.name: if self.state in ['start']: @@ -653,7 +696,9 @@ class VMInstance(object): self.logger.out('Receive aborted via state change', state='w', prefix='Domain {}'.format(self.domuuid)) elif self.state in ['stop']: # The send was shutdown-based - zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'start'}) + self.zkhandler.write([ + ('/domains/{}/state'.format(self.domuuid), 'start') + ]) else: # The send failed or was aborted self.logger.out('Migrate aborted or failed; VM in state {}'.format(self.state), state='w', prefix='Domain {}'.format(self.domuuid)) @@ -662,7 +707,9 @@ class VMInstance(object): lock.release() self.logger.out('Released write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) - zkhandler.writedata(self.zk_conn, {'/locks/domain_migrate/{}'.format(self.domuuid): ''}) + self.zkhandler.write([ + ('/locks/domain_migrate/{}'.format(self.domuuid), '') + ]) self.inreceive = False return @@ -671,9 +718,9 @@ class VMInstance(object): # def manage_vm_state(self): # Update the current values from zookeeper - self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) - self.node = zkhandler.readdata(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) - self.lastnode = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(self.domuuid)) + self.state = self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) + self.node = self.zkhandler.read('/domains/{}/node'.format(self.domuuid)) + self.lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid)) # Check the current state of the VM try: @@ -721,7 +768,9 @@ class VMInstance(object): elif self.state == "migrate" or self.state == "migrate-live": # Start the log watcher self.console_log_instance.start() - zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'start'}) + self.zkhandler.write([ + ('/domains/{}/state'.format(self.domuuid), 'start') + ]) # Add domain to running list self.addDomainToList() # VM should be restarted @@ -744,7 +793,9 @@ class VMInstance(object): self.receive_migrate() # VM should be restarted (i.e. started since it isn't running) if self.state == "restart": - zkhandler.writedata(self.zk_conn, {'/domains/{}/state'.format(self.domuuid): 'start'}) + self.zkhandler.write([ + ('/domains/{}/state'.format(self.domuuid), 'start') + ]) # VM should be shut down; ensure it's gone from this node's domain_list elif self.state == "shutdown": self.removeDomainFromList()