Avoid stopping duplicates, just lock our own key

This commit is contained in:
Joshua Boniface 2020-10-20 16:10:39 -04:00
parent a6d492ed9f
commit 398d33778f
1 changed files with 12 additions and 24 deletions

View File

@ -356,12 +356,6 @@ class VMInstance(object):
zkhandler.writedata(self.zk_conn, { '/domains/{}/lastnode'.format(self.domuuid): '' }) zkhandler.writedata(self.zk_conn, { '/domains/{}/lastnode'.format(self.domuuid): '' })
return return
# Wait for any in-progress migrations
if zkhandler.readdata(self.zk_conn, '/locks/domain_migrate') != '':
self.logger.out('Queueing for completion of existing migration', state='i', prefix='Domain {}'.format(self.domuuid))
while zkhandler.readdata(self.zk_conn, '/locks/domain_migrate') != '':
time.sleep(0.1)
self.inmigrate = True self.inmigrate = True
self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid))
@ -381,14 +375,14 @@ class VMInstance(object):
time.sleep(0.2) # Initial delay for the first writer to grab the lock time.sleep(0.2) # Initial delay for the first writer to grab the lock
# Synchronize nodes A (I am reader) # Synchronize nodes A (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate') lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
if zkhandler.readdata(self.zk_conn, '/locks/domain_migrate') == '': if zkhandler.readdata(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) == '':
self.logger.out('Waiting for peer', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Waiting for peer', state='i', prefix='Domain {}'.format(self.domuuid))
ticks = 0 ticks = 0
while zkhandler.readdata(self.zk_conn, '/locks/domain_migrate') == '': while zkhandler.readdata(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid)) == '':
time.sleep(0.1) time.sleep(0.1)
ticks += 1 ticks += 1
if ticks > 300: if ticks > 300:
@ -404,7 +398,7 @@ class VMInstance(object):
return return
# Synchronize nodes B (I am writer) # Synchronize nodes B (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate') lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
@ -483,7 +477,7 @@ class VMInstance(object):
return return
# Synchronize nodes C (I am writer) # Synchronize nodes C (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate') lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
@ -497,7 +491,7 @@ class VMInstance(object):
self.logger.out('Released write lock for synchronization phase C', state='o') self.logger.out('Released write lock for synchronization phase C', state='o')
# Synchronize nodes D (I am reader) # Synchronize nodes D (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate') lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
@ -517,22 +511,16 @@ class VMInstance(object):
if self.node == self.lastnode: if self.node == self.lastnode:
return return
# Wait for any in-progress migrations
if zkhandler.readdata(self.zk_conn, '/locks/domain_migrate') != '':
self.logger.out('Queueing for completion of existing migration', state='i', prefix='Domain {}'.format(self.domuuid))
while zkhandler.readdata(self.zk_conn, '/locks/domain_migrate') != '':
time.sleep(0.2)
self.inreceive = True self.inreceive = True
live_receive = True live_receive = True
self.logger.out('Receiving VM migration from node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Receiving VM migration from node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid))
# Ensure our lock key is populated # Ensure our lock key is populated
zkhandler.writedata(self.zk_conn, { '/locks/domain_migrate': self.domuuid }) zkhandler.writedata(self.zk_conn, { '/locks/domain_migrate/{}'.format(self.domuuid): self.domuuid })
# Synchronize nodes A (I am writer) # Synchronize nodes A (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate') lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid))
@ -543,7 +531,7 @@ class VMInstance(object):
time.sleep(0.1) # Time for new writer to acquire the lock time.sleep(0.1) # Time for new writer to acquire the lock
# Synchronize nodes B (I am reader) # Synchronize nodes B (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate') lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
@ -552,7 +540,7 @@ class VMInstance(object):
self.logger.out('Released read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Released read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes C (I am reader) # Synchronize nodes C (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate') lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
@ -561,7 +549,7 @@ class VMInstance(object):
self.logger.out('Released read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Released read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes D (I am writer) # Synchronize nodes D (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate') lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate/{}'.format(self.domuuid))
self.logger.out('Acquiring write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire() lock.acquire()
self.logger.out('Acquired write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Acquired write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
@ -591,7 +579,7 @@ class VMInstance(object):
self.logger.out('Migrate aborted or failed; VM in state {}'.format(self.state), state='w', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Migrate aborted or failed; VM in state {}'.format(self.state), state='w', prefix='Domain {}'.format(self.domuuid))
self.logger.out('Releasing write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Releasing write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, { '/locks/domain_migrate': '' }) zkhandler.writedata(self.zk_conn, { '/locks/domain_migrate/{}'.format(self.domuuid): '' })
lock.release() lock.release()
self.logger.out('Released write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) self.logger.out('Released write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.1) # Time for new writer to acquire the lock time.sleep(0.1) # Time for new writer to acquire the lock