diff --git a/node-daemon/pvcnoded/objects/VMInstance.py b/node-daemon/pvcnoded/objects/VMInstance.py index 06128ca2..9b3a55ff 100644 --- a/node-daemon/pvcnoded/objects/VMInstance.py +++ b/node-daemon/pvcnoded/objects/VMInstance.py @@ -425,42 +425,12 @@ class VMInstance(object): migrate_lock_node.acquire() migrate_lock_state.acquire() - time.sleep(0.2) # Initial delay for the first writer to grab the lock - # Don't try to migrate a node to itself, set back to start if self.node == self.lastnode or self.node == self.this_node.name: abort_migrate('Target node matches the current active node during initial check') return - # Synchronize nodes A (I am reader) - lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid)) - self.logger.out('Acquiring read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid)) - lock.acquire() - self.logger.out('Acquired read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid)) - if self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) == '': - self.logger.out('Waiting for peer', state='i', prefix='Domain {}'.format(self.domuuid)) - ticks = 0 - while self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) == '': - time.sleep(0.1) - ticks += 1 - if ticks > 300: - self.logger.out('Timed out waiting 30s for peer', state='e', prefix='Domain {}'.format(self.domuuid)) - aborted = True - break - self.logger.out('Releasing read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid)) - lock.release() - self.logger.out('Released read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid)) - - if aborted: - abort_migrate('Timed out waiting for peer') - return - - # Synchronize nodes B (I am writer) - lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid)) - self.logger.out('Acquiring write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid)) - lock.acquire() - self.logger.out('Acquired write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) - time.sleep(0.5) # Time for reader to acquire the lock + time.sleep(0.5) # Initial delay for the first writer to grab the lock def migrate_live(): self.logger.out('Setting up live migration', state='i', prefix='Domain {}'.format(self.domuuid)) @@ -500,9 +470,14 @@ class VMInstance(object): self.shutdown_vm() return True - do_migrate_shutdown = False + self.logger.out('Acquiring lock for phase B', state='i') + lock = self.zkhandler.exclusivelock(('domain.migrate.sync_lock', self.domuuid)) + try: + lock.acquire(timeout=30.0) + except Exception: + abort_migrate('Timed out waiting for peer') + return migrate_live_result = False - # Do a final verification if self.node == self.lastnode or self.node == self.this_node.name: abort_migrate('Target node matches the current active node during final check') @@ -510,7 +485,6 @@ class VMInstance(object): if self.node != target_node: abort_migrate('Target node changed during preparation') return - if not force_shutdown: # A live migrate is attemped 3 times in succession ticks = 0 @@ -525,59 +499,27 @@ class VMInstance(object): break else: migrate_live_result = False - if not migrate_live_result: if force_live: self.logger.out('Could not live migrate VM while live migration enforced', state='e', prefix='Domain {}'.format(self.domuuid)) aborted = True else: - do_migrate_shutdown = True - - self.logger.out('Releasing write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid)) + migrate_shutdown() lock.release() - self.logger.out('Released write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) if aborted: abort_migrate('Live migration failed and is required') return - # Synchronize nodes C (I am writer) - lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid)) - self.logger.out('Acquiring write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.5) + + self.logger.out('Acquiring lock for phase D', state='i') lock.acquire() - self.logger.out('Acquired write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) - time.sleep(0.5) # Time for reader to acquire the lock - - if do_migrate_shutdown: - migrate_shutdown() - - self.logger.out('Releasing write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) - lock.release() - self.logger.out('Released write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) - - # Synchronize nodes D (I am reader) - lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid)) - self.logger.out('Acquiring read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) - lock.acquire() - self.logger.out('Acquired read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) - self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid)) self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid)) - - self.logger.out('Releasing read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) - lock.release() - self.logger.out('Released read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) - - # Wait for the receive side to complete before we declare all-done and release locks - ticks = 0 - while self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) != '': - time.sleep(0.1) - ticks += 1 - if ticks > 100: - self.logger.out('Sync lock clear exceeded 10s timeout, continuing', state='w', prefix='Domain {}'.format(self.domuuid)) - break migrate_lock_node.release() migrate_lock_state.release() + lock.release() self.inmigrate = False return @@ -592,55 +534,37 @@ class VMInstance(object): self.logger.out('Receiving VM migration from node "{}"'.format(self.last_currentnode), state='i', prefix='Domain {}'.format(self.domuuid)) - # Short delay to ensure sender is in sync - time.sleep(0.5) - # Ensure our lock key is populated self.zkhandler.write([ (('domain.migrate.sync_lock', self.domuuid), self.domuuid) ]) - # Synchronize nodes A (I am writer) - lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid)) - self.logger.out('Acquiring write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid)) - lock.acquire() - self.logger.out('Acquired write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid)) - time.sleep(1) # Time for reader to acquire the lock - self.logger.out('Releasing write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid)) + self.logger.out('Acquiring lock for phase A', state='i') + lock = self.zkhandler.exclusivelock(('domain.migrate.sync_lock', self.domuuid)) + try: + lock.acquire(timeout=30.0) + except Exception: + self.logger.out('Failed to acquire exclusive lock for VM', state='w') + return + # Exactly twice the amount of time that the other side is waiting + time.sleep(1) lock.release() - self.logger.out('Released write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid)) - time.sleep(0.1) # Time for new writer to acquire the lock - # Synchronize nodes B (I am reader) - lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid)) - self.logger.out('Acquiring read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.5) + + self.logger.out('Acquiring lock for phase C', state='i') lock.acquire() - self.logger.out('Acquired read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) - self.logger.out('Releasing read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid)) + # This is strictly a synchrozing step + time.sleep(0.1) lock.release() - self.logger.out('Released read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) - # Synchronize nodes C (I am reader) - lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid)) - self.logger.out('Acquiring read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) + time.sleep(0.5) + + self.logger.out('Acquiring lock for phase E', state='i') lock.acquire() - self.logger.out('Acquired read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) - # Set the updated data self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid)) self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid)) - - self.logger.out('Releasing read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) - lock.release() - self.logger.out('Released read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) - - # Synchronize nodes D (I am writer) - lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid)) - self.logger.out('Acquiring write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) - lock.acquire() - self.logger.out('Acquired write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) - time.sleep(0.5) # Time for reader to acquire the lock - self.state = self.zkhandler.read(('domain.state', self.domuuid)) self.dom = self.lookupByUUID(self.domuuid) if self.dom: @@ -672,10 +596,7 @@ class VMInstance(object): else: # The send failed or was aborted self.logger.out('Migrate aborted or failed; VM in state {}'.format(self.state), state='w', prefix='Domain {}'.format(self.domuuid)) - - self.logger.out('Releasing write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) lock.release() - self.logger.out('Released write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) self.zkhandler.write([ (('domain.migrate.sync_lock', self.domuuid), '')