diff --git a/node-daemon/pvcd/DomainInstance.py b/node-daemon/pvcd/DomainInstance.py index c3550905..58bc505c 100644 --- a/node-daemon/pvcd/DomainInstance.py +++ b/node-daemon/pvcd/DomainInstance.py @@ -236,51 +236,61 @@ class DomainInstance(object): self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid)) self.inshutdown = True self.dom.shutdown() - try: - tick = 0 - while self.dom.state()[0] == libvirt.VIR_DOMAIN_RUNNING and tick < 60: - tick += 1 - time.sleep(0.5) + tick = 0 + while True: + tick += 1 + time.sleep(1) - if tick >= 60: + try: + lvdomstate = self.dom.state()[0] + except: + lvdomstate = None + + if lvdomstate != libvirt.VIR_DOMAIN_RUNNING: + self.removeDomainFromList() + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' }) + self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}:'.format(self.domuuid)) + self.dom = None + # Stop the log watcher + self.console_log_instance.stop() + break + + # HARDCODE: 90s is a reasonable amount of time for any operating system to shut down cleanly + if tick >= 90: self.logger.out('Shutdown timeout expired', state='e', prefix='Domain {}:'.format(self.domuuid)) - self.stop_vm() - self.inshutdown = False - return - except: - pass + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' }) + break - self.removeDomainFromList() - - if self.inrestart == False: - zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' }) - - self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}:'.format(self.domuuid)) - self.dom = None self.inshutdown = False - # Stop the log watcher - self.console_log_instance.stop() + if self.inrestart: + # Wait to prevent race conditions + time.sleep(1) + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) - def live_migrate_vm(self, dest_node): + def live_migrate_vm(self): + dest_lv = 'qemu+tcp://{}.{}/system'.format(self.node, self.config['cluster_domain']) + dest_tcp = 'tcp://{}.{}'.format(self.node, self.config['cluster_domain']) try: - dest_lv_conn = libvirt.open('qemu+tcp://{}/system'.format(self.node)) - if dest_lv_conn == None: + # Open a connection to the destination + dest_lv_conn = libvirt.open(dest_lv) + if not dest_lv_conn: raise except: - self.logger.out('Failed to open connection to qemu+tcp://{}/system; aborting migration.'.format(self.node), state='e', prefix='Domain {}:'.format(self.domuuid)) + self.logger.out('Failed to open connection to {}; aborting live migration.'.format(dest_lv), state='e', prefix='Domain {}:'.format(self.domuuid)) return False try: - target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, None, 0) - if target_dom == None: + # Send the live migration; force the destination URI to ensure we transit over the cluster network + target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, dest_tcp, 0) + if not target_dom: raise - self.logger.out('Successfully migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid)) - except: + self.logger.out('Failed to send VM to {}; aborting live migration.'.format(dest_lv), state='e', prefix='Domain {}:'.format(self.domuuid)) dest_lv_conn.close() return False + self.logger.out('Successfully migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid)) dest_lv_conn.close() return True @@ -289,54 +299,95 @@ class DomainInstance(object): self.inmigrate = True self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}:'.format(self.domuuid)) - migrate_ret = self.live_migrate_vm(self.node) + migrate_ret = self.live_migrate_vm() if not migrate_ret: self.logger.out('Could not live migrate VM; shutting down to migrate instead', state='e', prefix='Domain {}:'.format(self.domuuid)) - self.shutdown_vm() + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'shutdown' }) else: self.removeDomainFromList() + # Stop the log watcher + self.console_log_instance.stop() - zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) self.inmigrate = False - # Stop the log watcher - self.console_log_instance.stop() - # Receive the migration from another host (wait until VM is running) def receive_migrate(self): - # Start the log watcher - self.console_log_instance.start() - self.inreceive = True + live_receive = True + tick = 0 self.logger.out('Receiving migration', state='i', prefix='Domain {}:'.format(self.domuuid)) while True: + # Wait 1 second and increment the tick time.sleep(1) + tick += 1 + + # Get zookeeper state and look for the VM in the local libvirt database self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) self.dom = self.lookupByUUID(self.domuuid) - if self.dom == None and self.state == 'migrate': - continue - - if self.state != 'migrate': + # If the dom is found + if self.dom: + lvdomstate = self.dom.state()[0] + if lvdomstate == libvirt.VIR_DOMAIN_RUNNING: + # VM has been received and started + self.addDomainToList() + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) + self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid)) + break + else: + # If the state is no longer migrate + if self.state != 'migrate': + # The receive was aborted before it timed out or was completed + self.logger.out('Receive aborted via state change', state='w', prefix='Domain {}:'.format(self.domuuid)) + break + # If the dom is not found + else: + # If the state is changed to shutdown or stop + if self.state == 'shutdown' or self.state == 'stop': + # The receive failed on the remote end, and VM is being shut down instead + live_receive = False + self.logger.out('Send failed on remote end', state='w', prefix='Domain {}:'.format(self.domuuid)) + break + + # If we've already been waiting 90s for a receive + # HARDCODE: 90s should be plenty of time for even extremely large VMs on reasonable networks + if tick > 90: + # The receive timed out + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'failed' }) + self.logger.out('Receive timed out without state change', state='e', prefix='Domain {}:'.format(self.domuuid)) break - try: - if self.dom.state()[0] == libvirt.VIR_DOMAIN_RUNNING: + # We are waiting on a shutdown + if not live_receive: + tick = 0 + self.logger.out('Waiting for VM to shut down on remote end', state='i', prefix='Domain {}:'.format(self.domuuid)) + while True: + # Wait 1 second and increment the tick + time.sleep(1) + tick += 1 + + # Get zookeeper state and look for the VM in the local libvirt database + self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) + + # If the VM has stopped + if self.state == 'stop': + # Wait one more second to avoid race conditions + time.sleep(1) + # Start the VM up + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) break - except: - continue - - try: - dom_state = self.dom.state()[0] - except AttributeError: - dom_state = None - - if dom_state == libvirt.VIR_DOMAIN_RUNNING: - self.addDomainToList() - self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid)) - else: - self.logger.out('Failed to receive migrated VM', state='e', prefix='Domain {}:'.format(self.domuuid)) + # If we've already been waiting 120s for a shutdown + # HARDCODE: The remote timeout is 90s, so an extra 30s of buffer + if tick > 120: + # The shutdown timed out; something is very amiss, so switch state to failed and abort + zkhandler.writedata(self.zk_conn, { + '/domains/{}/state'.format(self.domuuid): 'failed', + '/domains/{}/failedreason'.format(self.domuuid): 'Timeout waiting for migrate or shutdown' + }) + self.logger.out('Shutdown timed out without state change', state='e', prefix='Domain {}:'.format(self.domuuid)) + break + self.inreceive = False # @@ -430,7 +481,10 @@ class DomainInstance(object): # VM should be migrated away from this node if self.state == "migrate": self.migrate_vm() - # VM should be terminated + # VM should be shutdown gracefully + elif self.state == 'shutdown': + self.shutdown_vm() + # VM should be forcibly terminated else: self.terminate_vm()