Simplify locking process for VM migration
Rather than using a cumbersome and overly complex ping-pong of read and write locks, instead move to a much simpler process using exclusive locks. Describing the process in ASCII or narrative is cumbersome, but the process ping-pongs via a set of exclusive locks and wait timers, so that the two sides are able to synchronize via blocking the exclusive lock. The end result is a much more streamlined migration (takes about half the time all things considered) which should be less error-prone.
This commit is contained in:
		@@ -425,42 +425,12 @@ class VMInstance(object):
 | 
			
		||||
        migrate_lock_node.acquire()
 | 
			
		||||
        migrate_lock_state.acquire()
 | 
			
		||||
 | 
			
		||||
        time.sleep(0.2)  # Initial delay for the first writer to grab the lock
 | 
			
		||||
 | 
			
		||||
        # Don't try to migrate a node to itself, set back to start
 | 
			
		||||
        if self.node == self.lastnode or self.node == self.this_node.name:
 | 
			
		||||
            abort_migrate('Target node matches the current active node during initial check')
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # Synchronize nodes A (I am reader)
 | 
			
		||||
        lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
 | 
			
		||||
        self.logger.out('Acquiring read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        lock.acquire()
 | 
			
		||||
        self.logger.out('Acquired read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        if self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) == '':
 | 
			
		||||
            self.logger.out('Waiting for peer', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
            ticks = 0
 | 
			
		||||
            while self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) == '':
 | 
			
		||||
                time.sleep(0.1)
 | 
			
		||||
                ticks += 1
 | 
			
		||||
                if ticks > 300:
 | 
			
		||||
                    self.logger.out('Timed out waiting 30s for peer', state='e', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
                    aborted = True
 | 
			
		||||
                    break
 | 
			
		||||
        self.logger.out('Releasing read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        lock.release()
 | 
			
		||||
        self.logger.out('Released read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
 | 
			
		||||
        if aborted:
 | 
			
		||||
            abort_migrate('Timed out waiting for peer')
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # Synchronize nodes B (I am writer)
 | 
			
		||||
        lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
 | 
			
		||||
        self.logger.out('Acquiring write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        lock.acquire()
 | 
			
		||||
        self.logger.out('Acquired write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        time.sleep(0.5)  # Time for reader to acquire the lock
 | 
			
		||||
        time.sleep(0.5)  # Initial delay for the first writer to grab the lock
 | 
			
		||||
 | 
			
		||||
        def migrate_live():
 | 
			
		||||
            self.logger.out('Setting up live migration', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
@@ -500,9 +470,14 @@ class VMInstance(object):
 | 
			
		||||
            self.shutdown_vm()
 | 
			
		||||
            return True
 | 
			
		||||
 | 
			
		||||
        do_migrate_shutdown = False
 | 
			
		||||
        self.logger.out('Acquiring lock for phase B', state='i')
 | 
			
		||||
        lock = self.zkhandler.exclusivelock(('domain.migrate.sync_lock', self.domuuid))
 | 
			
		||||
        try:
 | 
			
		||||
            lock.acquire(timeout=30.0)
 | 
			
		||||
        except Exception:
 | 
			
		||||
            abort_migrate('Timed out waiting for peer')
 | 
			
		||||
            return
 | 
			
		||||
        migrate_live_result = False
 | 
			
		||||
 | 
			
		||||
        # Do a final verification
 | 
			
		||||
        if self.node == self.lastnode or self.node == self.this_node.name:
 | 
			
		||||
            abort_migrate('Target node matches the current active node during final check')
 | 
			
		||||
@@ -510,7 +485,6 @@ class VMInstance(object):
 | 
			
		||||
        if self.node != target_node:
 | 
			
		||||
            abort_migrate('Target node changed during preparation')
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        if not force_shutdown:
 | 
			
		||||
            # A live migrate is attemped 3 times in succession
 | 
			
		||||
            ticks = 0
 | 
			
		||||
@@ -525,59 +499,27 @@ class VMInstance(object):
 | 
			
		||||
                    break
 | 
			
		||||
        else:
 | 
			
		||||
            migrate_live_result = False
 | 
			
		||||
 | 
			
		||||
        if not migrate_live_result:
 | 
			
		||||
            if force_live:
 | 
			
		||||
                self.logger.out('Could not live migrate VM while live migration enforced', state='e', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
                aborted = True
 | 
			
		||||
            else:
 | 
			
		||||
                do_migrate_shutdown = True
 | 
			
		||||
 | 
			
		||||
        self.logger.out('Releasing write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
                migrate_shutdown()
 | 
			
		||||
        lock.release()
 | 
			
		||||
        self.logger.out('Released write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
 | 
			
		||||
        if aborted:
 | 
			
		||||
            abort_migrate('Live migration failed and is required')
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # Synchronize nodes C (I am writer)
 | 
			
		||||
        lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
 | 
			
		||||
        self.logger.out('Acquiring write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        time.sleep(0.5)
 | 
			
		||||
 | 
			
		||||
        self.logger.out('Acquiring lock for phase D', state='i')
 | 
			
		||||
        lock.acquire()
 | 
			
		||||
        self.logger.out('Acquired write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        time.sleep(0.5)  # Time for reader to acquire the lock
 | 
			
		||||
 | 
			
		||||
        if do_migrate_shutdown:
 | 
			
		||||
            migrate_shutdown()
 | 
			
		||||
 | 
			
		||||
        self.logger.out('Releasing write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        lock.release()
 | 
			
		||||
        self.logger.out('Released write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
 | 
			
		||||
        # Synchronize nodes D (I am reader)
 | 
			
		||||
        lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
 | 
			
		||||
        self.logger.out('Acquiring read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        lock.acquire()
 | 
			
		||||
        self.logger.out('Acquired read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
 | 
			
		||||
        self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid))
 | 
			
		||||
        self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid))
 | 
			
		||||
 | 
			
		||||
        self.logger.out('Releasing read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        lock.release()
 | 
			
		||||
        self.logger.out('Released read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
 | 
			
		||||
        # Wait for the receive side to complete before we declare all-done and release locks
 | 
			
		||||
        ticks = 0
 | 
			
		||||
        while self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) != '':
 | 
			
		||||
            time.sleep(0.1)
 | 
			
		||||
            ticks += 1
 | 
			
		||||
            if ticks > 100:
 | 
			
		||||
                self.logger.out('Sync lock clear exceeded 10s timeout, continuing', state='w', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
                break
 | 
			
		||||
        migrate_lock_node.release()
 | 
			
		||||
        migrate_lock_state.release()
 | 
			
		||||
        lock.release()
 | 
			
		||||
 | 
			
		||||
        self.inmigrate = False
 | 
			
		||||
        return
 | 
			
		||||
@@ -592,55 +534,37 @@ class VMInstance(object):
 | 
			
		||||
 | 
			
		||||
        self.logger.out('Receiving VM migration from node "{}"'.format(self.last_currentnode), state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
 | 
			
		||||
        # Short delay to ensure sender is in sync
 | 
			
		||||
        time.sleep(0.5)
 | 
			
		||||
 | 
			
		||||
        # Ensure our lock key is populated
 | 
			
		||||
        self.zkhandler.write([
 | 
			
		||||
            (('domain.migrate.sync_lock', self.domuuid), self.domuuid)
 | 
			
		||||
        ])
 | 
			
		||||
 | 
			
		||||
        # Synchronize nodes A (I am writer)
 | 
			
		||||
        lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
 | 
			
		||||
        self.logger.out('Acquiring write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        lock.acquire()
 | 
			
		||||
        self.logger.out('Acquired write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        time.sleep(1)  # Time for reader to acquire the lock
 | 
			
		||||
        self.logger.out('Releasing write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        self.logger.out('Acquiring lock for phase A', state='i')
 | 
			
		||||
        lock = self.zkhandler.exclusivelock(('domain.migrate.sync_lock', self.domuuid))
 | 
			
		||||
        try:
 | 
			
		||||
            lock.acquire(timeout=30.0)
 | 
			
		||||
        except Exception:
 | 
			
		||||
            self.logger.out('Failed to acquire exclusive lock for VM', state='w')
 | 
			
		||||
            return
 | 
			
		||||
        # Exactly twice the amount of time that the other side is waiting
 | 
			
		||||
        time.sleep(1)
 | 
			
		||||
        lock.release()
 | 
			
		||||
        self.logger.out('Released write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        time.sleep(0.1)  # Time for new writer to acquire the lock
 | 
			
		||||
 | 
			
		||||
        # Synchronize nodes B (I am reader)
 | 
			
		||||
        lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
 | 
			
		||||
        self.logger.out('Acquiring read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        time.sleep(0.5)
 | 
			
		||||
 | 
			
		||||
        self.logger.out('Acquiring lock for phase C', state='i')
 | 
			
		||||
        lock.acquire()
 | 
			
		||||
        self.logger.out('Acquired read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        self.logger.out('Releasing read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        # This is strictly a synchrozing step
 | 
			
		||||
        time.sleep(0.1)
 | 
			
		||||
        lock.release()
 | 
			
		||||
        self.logger.out('Released read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
 | 
			
		||||
        # Synchronize nodes C (I am reader)
 | 
			
		||||
        lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid))
 | 
			
		||||
        self.logger.out('Acquiring read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        time.sleep(0.5)
 | 
			
		||||
 | 
			
		||||
        self.logger.out('Acquiring lock for phase E', state='i')
 | 
			
		||||
        lock.acquire()
 | 
			
		||||
        self.logger.out('Acquired read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
 | 
			
		||||
        # Set the updated data
 | 
			
		||||
        self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid))
 | 
			
		||||
        self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid))
 | 
			
		||||
 | 
			
		||||
        self.logger.out('Releasing read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        lock.release()
 | 
			
		||||
        self.logger.out('Released read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
 | 
			
		||||
        # Synchronize nodes D (I am writer)
 | 
			
		||||
        lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid))
 | 
			
		||||
        self.logger.out('Acquiring write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        lock.acquire()
 | 
			
		||||
        self.logger.out('Acquired write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        time.sleep(0.5)  # Time for reader to acquire the lock
 | 
			
		||||
 | 
			
		||||
        self.state = self.zkhandler.read(('domain.state', self.domuuid))
 | 
			
		||||
        self.dom = self.lookupByUUID(self.domuuid)
 | 
			
		||||
        if self.dom:
 | 
			
		||||
@@ -672,10 +596,7 @@ class VMInstance(object):
 | 
			
		||||
                else:
 | 
			
		||||
                    # The send failed or was aborted
 | 
			
		||||
                    self.logger.out('Migrate aborted or failed; VM in state {}'.format(self.state), state='w', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
 | 
			
		||||
        self.logger.out('Releasing write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
        lock.release()
 | 
			
		||||
        self.logger.out('Released write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
 | 
			
		||||
 | 
			
		||||
        self.zkhandler.write([
 | 
			
		||||
            (('domain.migrate.sync_lock', self.domuuid), '')
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user