diff --git a/node-daemon/pvcnoded/VMInstance.py b/node-daemon/pvcnoded/VMInstance.py index 1b21396d..ee775550 100644 --- a/node-daemon/pvcnoded/VMInstance.py +++ b/node-daemon/pvcnoded/VMInstance.py @@ -199,14 +199,14 @@ class VMInstance(object): # Start the log watcher self.console_log_instance.start() - self.logger.out('Starting VM', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Starting VM', state='i', prefix='Domain {}'.format(self.domuuid)) self.instart = True # Start up a new Libvirt connection libvirt_name = "qemu:///system" lv_conn = libvirt.open(libvirt_name) if lv_conn == None: - self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid)) self.instart = False return @@ -228,11 +228,11 @@ class VMInstance(object): xmlconfig = zkhandler.readdata(self.zk_conn, '/domains/{}/xml'.format(self.domuuid)) dom = lv_conn.createXML(xmlconfig, 0) self.addDomainToList() - self.logger.out('Successfully started VM', state='o', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Successfully started VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.dom = dom zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): '' }) except libvirt.libvirtError as e: - self.logger.out('Failed to create VM', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Failed to create VM', state='e', prefix='Domain {}'.format(self.domuuid)) zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'fail' }) zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): str(e) }) self.dom = None @@ -243,14 +243,14 @@ class VMInstance(object): # Restart the VM def restart_vm(self): - self.logger.out('Restarting VM', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Restarting VM', state='i', prefix='Domain {}'.format(self.domuuid)) self.inrestart = True # Start up a new Libvirt connection libvirt_name = "qemu:///system" lv_conn = libvirt.open(libvirt_name) if lv_conn == None: - self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid)) self.inrestart = False return @@ -265,14 +265,14 @@ class VMInstance(object): # Stop the VM forcibly without updating state def terminate_vm(self): - self.logger.out('Terminating VM', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Terminating VM', state='i', prefix='Domain {}'.format(self.domuuid)) self.instop = True try: self.dom.destroy() except AttributeError: - self.logger.out('Failed to terminate VM', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Failed to terminate VM', state='e', prefix='Domain {}'.format(self.domuuid)) self.removeDomainFromList() - self.logger.out('Successfully terminated VM', state='o', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Successfully terminated VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.dom = None self.instop = False @@ -281,18 +281,18 @@ class VMInstance(object): # Stop the VM forcibly def stop_vm(self): - self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}'.format(self.domuuid)) self.instop = True try: self.dom.destroy() except AttributeError: - self.logger.out('Failed to stop VM', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Failed to stop VM', state='e', prefix='Domain {}'.format(self.domuuid)) self.removeDomainFromList() if self.inrestart == False: zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' }) - self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.dom = None self.instop = False @@ -301,7 +301,7 @@ class VMInstance(object): # Shutdown the VM gracefully def shutdown_vm(self): - self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}'.format(self.domuuid)) is_aborted = False self.inshutdown = True self.dom.shutdown() @@ -313,7 +313,7 @@ class VMInstance(object): # Abort shutdown if the state changes to start current_state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) if current_state not in ['shutdown', 'restart']: - self.logger.out('Aborting VM shutdown due to state change', state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Aborting VM shutdown due to state change', state='i', prefix='Domain {}'.format(self.domuuid)) is_aborted = True break @@ -325,14 +325,14 @@ class VMInstance(object): if lvdomstate != libvirt.VIR_DOMAIN_RUNNING: self.removeDomainFromList() zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' }) - self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.dom = None # Stop the log watcher self.console_log_instance.stop() break if tick >= self.config['vm_shutdown_timeout']: - self.logger.out('Shutdown timeout ({}s) expired, forcing off'.format(self.config['vm_shutdown_timeout']), state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Shutdown timeout ({}s) expired, forcing off'.format(self.config['vm_shutdown_timeout']), state='e', prefix='Domain {}'.format(self.domuuid)) zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' }) break @@ -346,32 +346,6 @@ class VMInstance(object): time.sleep(1) zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) - def live_migrate_vm(self): - dest_lv = 'qemu+tcp://{}.{}/system'.format(self.node, self.config['cluster_domain']) - dest_tcp = 'tcp://{}.{}'.format(self.node, self.config['cluster_domain']) - try: - # Open a connection to the destination - dest_lv_conn = libvirt.open(dest_lv) - if not dest_lv_conn: - raise - except: - self.logger.out('Failed to open connection to {}; aborting live migration.'.format(dest_lv), state='e', prefix='Domain {}:'.format(self.domuuid)) - return False - - try: - # Send the live migration; force the destination URI to ensure we transit over the cluster network - target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, dest_tcp, 0) - if not target_dom: - raise - except Exception as e: - self.logger.out('Failed to send VM to {} - aborting live migration; error: {}'.format(dest_lv, e), state='e', prefix='Domain {}:'.format(self.domuuid)) - dest_lv_conn.close() - return False - - self.logger.out('Successfully migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid)) - dest_lv_conn.close() - return True - # Migrate the VM to a target host def migrate_vm(self, force_live=False): # Don't try to migrate a node to itself, set back to start @@ -381,106 +355,196 @@ class VMInstance(object): return self.inmigrate = True - self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid)) - migrate_ret = self.live_migrate_vm() - if not migrate_ret: + # Acquire exclusive lock on the domain node key + migrate_lock = zkhandler.exclusivelock(self.zk_conn, '/domains/{}/node'.format(self.domuuid)) + migrate_lock.acquire() + + time.sleep(0.2) # Initial delay for the first writer to grab the lock + + # Synchronize nodes A (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring read lock for synchronization A', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired read lock for synchronization A', state='o', prefix='Domain {}'.format(self.domuuid)) + self.logger.out('Releasing read lock for synchronization A', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.release() + self.logger.out('Released read lock for synchronization A', state='o', prefix='Domain {}'.format(self.domuuid)) + + # Synchronize nodes B (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring write lock for synchronization B', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired write lock for synchronization B', state='o', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.2) # Time for reader to acquire the lock + + def migrate_live(): + self.logger.out('Setting up live migration', state='i', prefix='Domain {}'.format(self.domuuid)) + # Set up destination connection + dest_lv = 'qemu+tcp://{}.{}/system'.format(self.node, self.config['cluster_domain']) + dest_tcp = 'tcp://{}.{}'.format(self.node, self.config['cluster_domain']) + try: + self.logger.out('Opening remote libvirt connection', state='i', prefix='Domain {}'.format(self.domuuid)) + # Open a connection to the destination + dest_lv_conn = libvirt.open(dest_lv) + if not dest_lv_conn: + raise + except: + self.logger.out('Failed to open connection to {}; aborting live migration.'.format(dest_lv), state='e', prefix='Domain {}'.format(self.domuuid)) + return False + + try: + self.logger.out('Live migrating VM', state='i', prefix='Domain {}'.format(self.domuuid)) + # Send the live migration; force the destination URI to ensure we transit over the cluster network + target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, dest_tcp, 0) + if not target_dom: + raise + except Exception as e: + self.logger.out('Failed to send VM to {} - aborting live migration; error: {}'.format(dest_lv, e), state='e', prefix='Domain {}'.format(self.domuuid)) + dest_lv_conn.close() + return False + + self.logger.out('Successfully migrated VM', state='o', prefix='Domain {}'.format(self.domuuid)) + dest_lv_conn.close() + self.console_log_instance.stop() + self.removeDomainFromList() + + return True + + def migrate_shutdown(): + self.logger.out('Shutting down VM for migration', state='i', prefix='Domain {}'.format(self.domuuid)) + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'shutdown' }) + while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) != 'stop': + time.sleep(0.5) + return True + + do_migrate_shutdown = False + migrate_live_result = migrate_live() + if not migrate_live_result: if force_live: - self.logger.out('Could not live migrate VM; live migration enforced, aborting', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Could not live migrate VM; live migration enforced, aborting', state='e', prefix='Domain {}'.format(self.domuuid)) zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start', '/domains/{}/node'.format(self.domuuid): self.this_node.name, '/domains/{}/lastnode'.format(self.domuuid): '' }) else: - self.logger.out('Could not live migrate VM; shutting down to migrate instead', state='e', prefix='Domain {}:'.format(self.domuuid)) - zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'shutdown' }) - else: - self.removeDomainFromList() - # Stop the log watcher - self.console_log_instance.stop() + do_migrate_shutdown = True + self.logger.out('Releasing write lock for synchronization B', state='i', prefix='Domain {}'.format(self.domuuid)) + zkhandler.writedata(self.zk_conn, { '/locks/primary_node': self.domuuid }) + lock.release() + self.logger.out('Released write lock for synchronization B', state='o') + + # Synchronize nodes C (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring write lock for synchronization C', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired write lock for synchronization C', state='o', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.2) # Time for reader to acquire the lock + + if do_migrate_shutdown: + migrate_shutdown_result = migrate_live() + + self.logger.out('Releasing write lock for synchronization C', state='i', prefix='Domain {}'.format(self.domuuid)) + zkhandler.writedata(self.zk_conn, { '/locks/primary_node': self.domuuid }) + lock.release() + self.logger.out('Released write lock for synchronization C', state='o') + + # Synchronize nodes D (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring read lock for synchronization D', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired read lock for synchronization D', state='o', prefix='Domain {}'.format(self.domuuid)) + self.logger.out('Releasing read lock for synchronization D', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.release() + self.logger.out('Released read lock for synchronization D', state='o', prefix='Domain {}'.format(self.domuuid)) + + # Wait 0.5 seconds for everything to stabilize before we declare all-done + time.sleep(0.5) + migrate_lock.release() self.inmigrate = False + return - # Receive the migration from another host (wait until VM is running) + # Receive the migration from another host def receive_migrate(self): self.inreceive = True live_receive = True - tick = 0 - self.logger.out('Receiving migration', state='i', prefix='Domain {}:'.format(self.domuuid)) - while True: - # Wait 1 second and increment the tick - time.sleep(1) - tick += 1 - # Get zookeeper state and look for the VM in the local libvirt database - self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) - self.dom = self.lookupByUUID(self.domuuid) + self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid)) - # If the dom is found - if self.dom: - lvdomstate = self.dom.state()[0] - if lvdomstate == libvirt.VIR_DOMAIN_RUNNING: - # VM has been received and started - self.addDomainToList() - zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) - self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid)) - break - else: - # If the state is no longer migrate - if self.state not in ['migrate', 'migrate-live']: - # The receive was aborted before it timed out or was completed - self.logger.out('Receive aborted via state change', state='w', prefix='Domain {}:'.format(self.domuuid)) - break - # If the dom is not found + # Ensure our lock key is populated + zkhandler.writedata(self.zk_conn, { '/locks/domain_migrate': self.domuuid }) + + # Synchronize nodes A (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring write lock for synchronization A', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired write lock for synchronization A', state='o', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.2) # Time for reader to acquire the lock + self.logger.out('Releasing write lock for synchronization A', state='i', prefix='Domain {}'.format(self.domuuid)) + zkhandler.writedata(self.zk_conn, { '/locks/domain_migrate': self.domuuid }) + lock.release() + self.logger.out('Released write lock for synchronization A', state='o', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.1) # Time for new writer to acquire the lock + + # Synchronize nodes B (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring read lock for synchronization B', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired read lock for synchronization B', state='o', prefix='Domain {}'.format(self.domuuid)) + self.logger.out('Releasing read lock for synchronization B', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.release() + self.logger.out('Released read lock for synchronization B', state='o', prefix='Domain {}'.format(self.domuuid)) + + # Synchronize nodes C (I am reader) + lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring read lock for synchronization C', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired read lock for synchronization C', state='o', prefix='Domain {}'.format(self.domuuid)) + self.logger.out('Releasing read lock for synchronization C', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.release() + self.logger.out('Released read lock for synchronization C', state='o', prefix='Domain {}'.format(self.domuuid)) + + # Synchronize nodes D (I am writer) + lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate') + self.logger.out('Acquiring write lock for synchronization D', state='i', prefix='Domain {}'.format(self.domuuid)) + lock.acquire() + self.logger.out('Acquired write lock for synchronization D', state='o', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.2) # Time for reader to acquire the lock + + self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) + self.dom = self.lookupByUUID(self.domuuid) + if self.dom: + lvdomstate = self.dom.state()[0] + if lvdomstate == libvirt.VIR_DOMAIN_RUNNING: + # VM has been received and started + self.addDomainToList() + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) + self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}'.format(self.domuuid)) else: - # If the state is changed to shutdown or stop - if self.state == 'shutdown' or self.state == 'stop': - # The receive failed on the remote end, and VM is being shut down instead - live_receive = False - self.logger.out('Send failed on remote end', state='w', prefix='Domain {}:'.format(self.domuuid)) - break - - # If we've already been waiting 90s for a receive - # HARDCODE: 90s should be plenty of time for even extremely large VMs on reasonable networks - if tick > 90: - # The receive timed out + # The receive somehow failed zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'fail' }) - self.logger.out('Receive timed out without state change', state='e', prefix='Domain {}:'.format(self.domuuid)) - break + else: + if self.state in ['start']: + # The receive was aborted + self.logger.out('Receive aborted via state change', state='w', prefix='Domain {}'.format(self.domuuid)) + elif self.state in ['stop']: + # The send was shutdown-based + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) + else: + # The send failed catastrophically + self.logger.out('Send failed catastrophically, VM in undefined state', state='e', prefix='Domain {}'.format(self.domuuid)) - # We are waiting on a shutdown - if not live_receive: - tick = 0 - self.logger.out('Waiting for VM to shut down on remote end', state='i', prefix='Domain {}:'.format(self.domuuid)) - while True: - # Wait 1 second and increment the tick - time.sleep(1) - tick += 1 - - # Get zookeeper state and look for the VM in the local libvirt database - self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) - - # If the VM has stopped - if self.state == 'stop': - # Wait one more second to avoid race conditions - time.sleep(1) - # Start the VM up - zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) - break - - # If we've already been waiting 120s for a shutdown - # HARDCODE: The remote timeout is 90s, so an extra 30s of buffer - if tick > 120: - # The shutdown timed out; something is very amiss, so switch state to fail and abort - zkhandler.writedata(self.zk_conn, { - '/domains/{}/state'.format(self.domuuid): 'fail', - '/domains/{}/failedreason'.format(self.domuuid): 'Timeout waiting for migrate or shutdown' - }) - self.logger.out('Shutdown timed out without state change', state='e', prefix='Domain {}:'.format(self.domuuid)) - break + self.logger.out('Releasing write lock for synchronization D', state='i', prefix='Domain {}'.format(self.domuuid)) + zkhandler.writedata(self.zk_conn, { '/locks/domain_migrate': '' }) + lock.release() + self.logger.out('Released write lock for synchronization D', state='o', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.1) # Time for new writer to acquire the lock self.inreceive = False + return # # Main function to manage a VM (taking only self) @@ -608,7 +672,7 @@ class VMInstance(object): # Open a libvirt connection lv_conn = libvirt.open(libvirt_name) if lv_conn == None: - self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid)) return None # Lookup the UUID