From f9e7e9884f9c98c4b5e1f8a3b2a8b5526991599f Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Tue, 20 Oct 2020 12:13:26 -0400 Subject: [PATCH] Improve handling of VM migrations The VM migration code was very old, very spaghettified, and prone to strange failures. Improve this by taking cues from the node primary migration. Use synchronization between the nodes to ensure lockstep completion of the migration in discrete steps. A proper queue can be built later to integrate with this code more cleanly. References #108 --- node-daemon/pvcnoded/VMInstance.py | 308 +++++++++++++++++------------ 1 file changed, 186 insertions(+), 122 deletions(-) diff --git a/node-daemon/pvcnoded/VMInstance.py b/node-daemon/pvcnoded/VMInstance.py index 1b21396d..ee775550 100644 --- a/node-daemon/pvcnoded/VMInstance.py +++ b/node-daemon/pvcnoded/VMInstance.py @@ -199,14 +199,14 @@ class VMInstance(object): # Start the log watcher self.console_log_instance.start() - self.logger.out('Starting VM', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Starting VM', state='i', prefix='Domain {}'.format(self.domuuid)) self.instart = True # Start up a new Libvirt connection libvirt_name = "qemu:///system" lv_conn = libvirt.open(libvirt_name) if lv_conn == None: - self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid)) self.instart = False return @@ -228,11 +228,11 @@ class VMInstance(object): xmlconfig = zkhandler.readdata(self.zk_conn, '/domains/{}/xml'.format(self.domuuid)) dom = lv_conn.createXML(xmlconfig, 0) self.addDomainToList() - self.logger.out('Successfully started VM', state='o', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Successfully started VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.dom = dom zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): '' }) except libvirt.libvirtError as e: - self.logger.out('Failed to create VM', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Failed to create VM', state='e', prefix='Domain {}'.format(self.domuuid)) zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'fail' }) zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): str(e) }) self.dom = None @@ -243,14 +243,14 @@ class VMInstance(object): # Restart the VM def restart_vm(self): - self.logger.out('Restarting VM', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Restarting VM', state='i', prefix='Domain {}'.format(self.domuuid)) self.inrestart = True # Start up a new Libvirt connection libvirt_name = "qemu:///system" lv_conn = libvirt.open(libvirt_name) if lv_conn == None: - self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid)) self.inrestart = False return @@ -265,14 +265,14 @@ class VMInstance(object): # Stop the VM forcibly without updating state def terminate_vm(self): - self.logger.out('Terminating VM', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Terminating VM', state='i', prefix='Domain {}'.format(self.domuuid)) self.instop = True try: self.dom.destroy() except AttributeError: - self.logger.out('Failed to terminate VM', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Failed to terminate VM', state='e', prefix='Domain {}'.format(self.domuuid)) self.removeDomainFromList() - self.logger.out('Successfully terminated VM', state='o', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Successfully terminated VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.dom = None self.instop = False @@ -281,18 +281,18 @@ class VMInstance(object): # Stop the VM forcibly def stop_vm(self): - self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}'.format(self.domuuid)) self.instop = True try: self.dom.destroy() except AttributeError: - self.logger.out('Failed to stop VM', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Failed to stop VM', state='e', prefix='Domain {}'.format(self.domuuid)) self.removeDomainFromList() if self.inrestart == False: zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' }) - self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.dom = None self.instop = False @@ -301,7 +301,7 @@ class VMInstance(object): # Shutdown the VM gracefully def shutdown_vm(self): - self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}'.format(self.domuuid)) is_aborted = False self.inshutdown = True self.dom.shutdown() @@ -313,7 +313,7 @@ class VMInstance(object): # Abort shutdown if the state changes to start current_state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) if current_state not in ['shutdown', 'restart']: - self.logger.out('Aborting VM shutdown due to state change', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Aborting VM shutdown due to state change', state='i', prefix='Domain {}'.format(self.domuuid)) is_aborted = True break @@ -325,14 +325,14 @@ class VMInstance(object): if lvdomstate != libvirt.VIR_DOMAIN_RUNNING: self.removeDomainFromList() zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' }) - self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.dom = None # Stop the log watcher self.console_log_instance.stop() break if tick >= self.config['vm_shutdown_timeout']: - self.logger.out('Shutdown timeout ({}s) expired, forcing off'.format(self.config['vm_shutdown_timeout']), state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Shutdown timeout ({}s) expired, forcing off'.format(self.config['vm_shutdown_timeout']), state='e', prefix='Domain {}'.format(self.domuuid)) zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' }) break @@ -346,32 +346,6 @@ class VMInstance(object): time.sleep(1) zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) - def live_migrate_vm(self): - dest_lv = 'qemu+tcp://{}.{}/system'.format(self.node, self.config['cluster_domain']) - dest_tcp = 'tcp://{}.{}'.format(self.node, self.config['cluster_domain']) - try: - # Open a connection to the destination - dest_lv_conn = libvirt.open(dest_lv) - if not dest_lv_conn: - raise - except: - self.logger.out('Failed to open connection to {}; aborting live migration.'.format(dest_lv), state='e', prefix='Domain {}:'.format(self.domuuid)) - return False - - try: - # Send the live migration; force the destination URI to ensure we transit over the cluster network - target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, dest_tcp, 0) - if not target_dom: - raise - except Exception as e: - self.logger.out('Failed to send VM to {} - aborting live migration; error: {}'.format(dest_lv, e), state='e', prefix='Domain {}:'.format(self.domuuid)) - dest_lv_conn.close() - return False - - self.logger.out('Successfully migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid)) - dest_lv_conn.close() - return True - # Migrate the VM to a target host def migrate_vm(self, force_live=False): # Don't try to migrate a node to itself, set back to start @@ -381,106 +355,196 @@ class VMInstance(object): return self.inmigrate = True - self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid)) - migrate_ret = self.live_migrate_vm() - if not migrate_ret: + # Acquire exclusive lock on the domain node key + migrate_lock = zkhandler.exclusivelock(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) + migrate_lock.acquire() + + time.sleep(0.2) # Initial delay for the first writer to grab the lock + + # Synchronize nodes A (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring read lock for synchronization A', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired read lock for synchronization A', state='o', prefix='Domain {}'.format(self.domuuid)) + self.logger.out('Releasing read lock for synchronization A', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.release() + self.logger.out('Released read lock for synchronization A', state='o', prefix='Domain {}'.format(self.domuuid)) + + # Synchronize nodes B (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring write lock for synchronization B', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired write lock for synchronization B', state='o', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.2) # Time for reader to acquire the lock + + def migrate_live(): + self.logger.out('Setting up live migration', state='i', prefix='Domain {}'.format(self.domuuid)) + # Set up destination connection + dest_lv = 'qemu+tcp://{}.{}/system'.format(self.node, self.config['cluster_domain']) + dest_tcp = 'tcp://{}.{}'.format(self.node, self.config['cluster_domain']) + try: + self.logger.out('Opening remote libvirt connection', state='i', prefix='Domain {}'.format(self.domuuid)) + # Open a connection to the destination + dest_lv_conn = libvirt.open(dest_lv) + if not dest_lv_conn: + raise + except: + self.logger.out('Failed to open connection to {}; aborting live migration.'.format(dest_lv), state='e', prefix='Domain {}'.format(self.domuuid)) + return False + + try: + self.logger.out('Live migrating VM', state='i', prefix='Domain {}'.format(self.domuuid)) + # Send the live migration; force the destination URI to ensure we transit over the cluster network + target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, dest_tcp, 0) + if not target_dom: + raise + except Exception as e: + self.logger.out('Failed to send VM to {} - aborting live migration; error: {}'.format(dest_lv, e), state='e', prefix='Domain {}'.format(self.domuuid)) + dest_lv_conn.close() + return False + + self.logger.out('Successfully migrated VM', state='o', prefix='Domain {}'.format(self.domuuid)) + dest_lv_conn.close() + self.console_log_instance.stop() + self.removeDomainFromList() + + return True + + def migrate_shutdown(): + self.logger.out('Shutting down VM for migration', state='i', prefix='Domain {}'.format(self.domuuid)) + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'shutdown' }) + while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) != 'stop': + time.sleep(0.5) + return True + + do_migrate_shutdown = False + migrate_live_result = migrate_live() + if not migrate_live_result: if force_live: - self.logger.out('Could not live migrate VM; live migration enforced, aborting', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Could not live migrate VM; live migration enforced, aborting', state='e', prefix='Domain {}'.format(self.domuuid)) zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start', '/domains/{}/node'.format(self.domuuid): self.this_node.name, '/domains/{}/lastnode'.format(self.domuuid): '' }) else: - self.logger.out('Could not live migrate VM; shutting down to migrate instead', state='e', prefix='Domain {}:'.format(self.domuuid)) - zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'shutdown' }) - else: - self.removeDomainFromList() - # Stop the log watcher - self.console_log_instance.stop() + do_migrate_shutdown = True + self.logger.out('Releasing write lock for synchronization B', state='i', prefix='Domain {}'.format(self.domuuid)) + zkhandler.writedata(self.zk_conn, { '/locks/primary_node': self.domuuid }) + lock.release() + self.logger.out('Released write lock for synchronization B', state='o') + + # Synchronize nodes C (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring write lock for synchronization C', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired write lock for synchronization C', state='o', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.2) # Time for reader to acquire the lock + + if do_migrate_shutdown: + migrate_shutdown_result = migrate_live() + + self.logger.out('Releasing write lock for synchronization C', state='i', prefix='Domain {}'.format(self.domuuid)) + zkhandler.writedata(self.zk_conn, { '/locks/primary_node': self.domuuid }) + lock.release() + self.logger.out('Released write lock for synchronization C', state='o') + + # Synchronize nodes D (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring read lock for synchronization D', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired read lock for synchronization D', state='o', prefix='Domain {}'.format(self.domuuid)) + self.logger.out('Releasing read lock for synchronization D', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.release() + self.logger.out('Released read lock for synchronization D', state='o', prefix='Domain {}'.format(self.domuuid)) + + # Wait 0.5 seconds for everything to stabilize before we declare all-done + time.sleep(0.5) + migrate_lock.release() self.inmigrate = False + return - # Receive the migration from another host (wait until VM is running) + # Receive the migration from another host def receive_migrate(self): self.inreceive = True live_receive = True - tick = 0 - self.logger.out('Receiving migration', state='i', prefix='Domain {}:'.format(self.domuuid)) - while True: - # Wait 1 second and increment the tick - time.sleep(1) - tick += 1 - # Get zookeeper state and look for the VM in the local libvirt database - self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) - self.dom = self.lookupByUUID(self.domuuid) + self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid)) - # If the dom is found - if self.dom: - lvdomstate = self.dom.state()[0] - if lvdomstate == libvirt.VIR_DOMAIN_RUNNING: - # VM has been received and started - self.addDomainToList() - zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) - self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid)) - break - else: - # If the state is no longer migrate - if self.state not in ['migrate', 'migrate-live']: - # The receive was aborted before it timed out or was completed - self.logger.out('Receive aborted via state change', state='w', prefix='Domain {}:'.format(self.domuuid)) - break - # If the dom is not found + # Ensure our lock key is populated + zkhandler.writedata(self.zk_conn, { '/locks/domain_migrate': self.domuuid }) + + # Synchronize nodes A (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring write lock for synchronization A', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired write lock for synchronization A', state='o', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.2) # Time for reader to acquire the lock + self.logger.out('Releasing write lock for synchronization A', state='i', prefix='Domain {}'.format(self.domuuid)) + zkhandler.writedata(self.zk_conn, { '/locks/domain_migrate': self.domuuid }) + lock.release() + self.logger.out('Released write lock for synchronization A', state='o', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.1) # Time for new writer to acquire the lock + + # Synchronize nodes B (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring read lock for synchronization B', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired read lock for synchronization B', state='o', prefix='Domain {}'.format(self.domuuid)) + self.logger.out('Releasing read lock for synchronization B', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.release() + self.logger.out('Released read lock for synchronization B', state='o', prefix='Domain {}'.format(self.domuuid)) + + # Synchronize nodes C (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring read lock for synchronization C', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired read lock for synchronization C', state='o', prefix='Domain {}'.format(self.domuuid)) + self.logger.out('Releasing read lock for synchronization C', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.release() + self.logger.out('Released read lock for synchronization C', state='o', prefix='Domain {}'.format(self.domuuid)) + + # Synchronize nodes D (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring write lock for synchronization D', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired write lock for synchronization D', state='o', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.2) # Time for reader to acquire the lock + + self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) + self.dom = self.lookupByUUID(self.domuuid) + if self.dom: + lvdomstate = self.dom.state()[0] + if lvdomstate == libvirt.VIR_DOMAIN_RUNNING: + # VM has been received and started + self.addDomainToList() + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) + self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}'.format(self.domuuid)) else: - # If the state is changed to shutdown or stop - if self.state == 'shutdown' or self.state == 'stop': - # The receive failed on the remote end, and VM is being shut down instead - live_receive = False - self.logger.out('Send failed on remote end', state='w', prefix='Domain {}:'.format(self.domuuid)) - break - - # If we've already been waiting 90s for a receive - # HARDCODE: 90s should be plenty of time for even extremely large VMs on reasonable networks - if tick > 90: - # The receive timed out + # The receive somehow failed zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'fail' }) - self.logger.out('Receive timed out without state change', state='e', prefix='Domain {}:'.format(self.domuuid)) - break + else: + if self.state in ['start']: + # The receive was aborted + self.logger.out('Receive aborted via state change', state='w', prefix='Domain {}'.format(self.domuuid)) + elif self.state in ['stop']: + # The send was shutdown-based + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) + else: + # The send failed catastrophically + self.logger.out('Send failed catastrophically, VM in undefined state', state='e', prefix='Domain {}'.format(self.domuuid)) - # We are waiting on a shutdown - if not live_receive: - tick = 0 - self.logger.out('Waiting for VM to shut down on remote end', state='i', prefix='Domain {}:'.format(self.domuuid)) - while True: - # Wait 1 second and increment the tick - time.sleep(1) - tick += 1 - - # Get zookeeper state and look for the VM in the local libvirt database - self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) - - # If the VM has stopped - if self.state == 'stop': - # Wait one more second to avoid race conditions - time.sleep(1) - # Start the VM up - zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) - break - - # If we've already been waiting 120s for a shutdown - # HARDCODE: The remote timeout is 90s, so an extra 30s of buffer - if tick > 120: - # The shutdown timed out; something is very amiss, so switch state to fail and abort - zkhandler.writedata(self.zk_conn, { - '/domains/{}/state'.format(self.domuuid): 'fail', - '/domains/{}/failedreason'.format(self.domuuid): 'Timeout waiting for migrate or shutdown' - }) - self.logger.out('Shutdown timed out without state change', state='e', prefix='Domain {}:'.format(self.domuuid)) - break + self.logger.out('Releasing write lock for synchronization D', state='i', prefix='Domain {}'.format(self.domuuid)) + zkhandler.writedata(self.zk_conn, { '/locks/domain_migrate': '' }) + lock.release() + self.logger.out('Released write lock for synchronization D', state='o', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.1) # Time for new writer to acquire the lock self.inreceive = False + return # # Main function to manage a VM (taking only self) @@ -608,7 +672,7 @@ class VMInstance(object): # Open a libvirt connection lv_conn = libvirt.open(libvirt_name) if lv_conn == None: - self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid)) return None # Lookup the UUID