Improve handling of VM migrations

The VM migration code was very old, very spaghettified, and prone to
strange failures.

Improve this by taking cues from the node primary migration. Use
synchronization between the nodes to ensure lockstep completion of the
migration in discrete steps.

A proper queue can be built later to integrate with this code more
cleanly.

References #108
This commit is contained in:
Joshua Boniface 2020-10-20 12:13:26 -04:00
parent 726501f4d4
commit f9e7e9884f
1 changed files with 186 additions and 122 deletions

View File

@ -199,14 +199,14 @@ class VMInstance(object):
# Start the log watcher
self.console_log_instance.start()
self.logger.out('Starting VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Starting VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.instart = True
# Start up a new Libvirt connection
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid))
self.instart = False
return
@ -228,11 +228,11 @@ class VMInstance(object):
xmlconfig = zkhandler.readdata(self.zk_conn, '/domains/{}/xml'.format(self.domuuid))
dom = lv_conn.createXML(xmlconfig, 0)
self.addDomainToList()
self.logger.out('Successfully started VM', state='o', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Successfully started VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = dom
zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): '' })
except libvirt.libvirtError as e:
self.logger.out('Failed to create VM', state='e', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Failed to create VM', state='e', prefix='Domain {}'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'fail' })
zkhandler.writedata(self.zk_conn, { '/domains/{}/failedreason'.format(self.domuuid): str(e) })
self.dom = None
@ -243,14 +243,14 @@ class VMInstance(object):
# Restart the VM
def restart_vm(self):
self.logger.out('Restarting VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Restarting VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.inrestart = True
# Start up a new Libvirt connection
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid))
self.inrestart = False
return
@ -265,14 +265,14 @@ class VMInstance(object):
# Stop the VM forcibly without updating state
def terminate_vm(self):
self.logger.out('Terminating VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Terminating VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.instop = True
try:
self.dom.destroy()
except AttributeError:
self.logger.out('Failed to terminate VM', state='e', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Failed to terminate VM', state='e', prefix='Domain {}'.format(self.domuuid))
self.removeDomainFromList()
self.logger.out('Successfully terminated VM', state='o', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Successfully terminated VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = None
self.instop = False
@ -281,18 +281,18 @@ class VMInstance(object):
# Stop the VM forcibly
def stop_vm(self):
self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Forcibly stopping VM', state='i', prefix='Domain {}'.format(self.domuuid))
self.instop = True
try:
self.dom.destroy()
except AttributeError:
self.logger.out('Failed to stop VM', state='e', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Failed to stop VM', state='e', prefix='Domain {}'.format(self.domuuid))
self.removeDomainFromList()
if self.inrestart == False:
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' })
self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = None
self.instop = False
@ -301,7 +301,7 @@ class VMInstance(object):
# Shutdown the VM gracefully
def shutdown_vm(self):
self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Gracefully stopping VM', state='i', prefix='Domain {}'.format(self.domuuid))
is_aborted = False
self.inshutdown = True
self.dom.shutdown()
@ -313,7 +313,7 @@ class VMInstance(object):
# Abort shutdown if the state changes to start
current_state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid))
if current_state not in ['shutdown', 'restart']:
self.logger.out('Aborting VM shutdown due to state change', state='i', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Aborting VM shutdown due to state change', state='i', prefix='Domain {}'.format(self.domuuid))
is_aborted = True
break
@ -325,14 +325,14 @@ class VMInstance(object):
if lvdomstate != libvirt.VIR_DOMAIN_RUNNING:
self.removeDomainFromList()
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' })
self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}'.format(self.domuuid))
self.dom = None
# Stop the log watcher
self.console_log_instance.stop()
break
if tick >= self.config['vm_shutdown_timeout']:
self.logger.out('Shutdown timeout ({}s) expired, forcing off'.format(self.config['vm_shutdown_timeout']), state='e', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Shutdown timeout ({}s) expired, forcing off'.format(self.config['vm_shutdown_timeout']), state='e', prefix='Domain {}'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' })
break
@ -346,32 +346,6 @@ class VMInstance(object):
time.sleep(1)
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
def live_migrate_vm(self):
dest_lv = 'qemu+tcp://{}.{}/system'.format(self.node, self.config['cluster_domain'])
dest_tcp = 'tcp://{}.{}'.format(self.node, self.config['cluster_domain'])
try:
# Open a connection to the destination
dest_lv_conn = libvirt.open(dest_lv)
if not dest_lv_conn:
raise
except:
self.logger.out('Failed to open connection to {}; aborting live migration.'.format(dest_lv), state='e', prefix='Domain {}:'.format(self.domuuid))
return False
try:
# Send the live migration; force the destination URI to ensure we transit over the cluster network
target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, dest_tcp, 0)
if not target_dom:
raise
except Exception as e:
self.logger.out('Failed to send VM to {} - aborting live migration; error: {}'.format(dest_lv, e), state='e', prefix='Domain {}:'.format(self.domuuid))
dest_lv_conn.close()
return False
self.logger.out('Successfully migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid))
dest_lv_conn.close()
return True
# Migrate the VM to a target host
def migrate_vm(self, force_live=False):
# Don't try to migrate a node to itself, set back to start
@ -381,106 +355,196 @@ class VMInstance(object):
return
self.inmigrate = True
self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid))
migrate_ret = self.live_migrate_vm()
if not migrate_ret:
# Acquire exclusive lock on the domain node key
migrate_lock = zkhandler.exclusivelock(self.zk_conn, '/domains/{}/node'.format(self.domuuid))
migrate_lock.acquire()
time.sleep(0.2) # Initial delay for the first writer to grab the lock
# Synchronize nodes A (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate')
self.logger.out('Acquiring read lock for synchronization A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired read lock for synchronization A', state='o', prefix='Domain {}'.format(self.domuuid))
self.logger.out('Releasing read lock for synchronization A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released read lock for synchronization A', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes B (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate')
self.logger.out('Acquiring write lock for synchronization B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired write lock for synchronization B', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.2) # Time for reader to acquire the lock
def migrate_live():
self.logger.out('Setting up live migration', state='i', prefix='Domain {}'.format(self.domuuid))
# Set up destination connection
dest_lv = 'qemu+tcp://{}.{}/system'.format(self.node, self.config['cluster_domain'])
dest_tcp = 'tcp://{}.{}'.format(self.node, self.config['cluster_domain'])
try:
self.logger.out('Opening remote libvirt connection', state='i', prefix='Domain {}'.format(self.domuuid))
# Open a connection to the destination
dest_lv_conn = libvirt.open(dest_lv)
if not dest_lv_conn:
raise
except:
self.logger.out('Failed to open connection to {}; aborting live migration.'.format(dest_lv), state='e', prefix='Domain {}'.format(self.domuuid))
return False
try:
self.logger.out('Live migrating VM', state='i', prefix='Domain {}'.format(self.domuuid))
# Send the live migration; force the destination URI to ensure we transit over the cluster network
target_dom = self.dom.migrate(dest_lv_conn, libvirt.VIR_MIGRATE_LIVE, None, dest_tcp, 0)
if not target_dom:
raise
except Exception as e:
self.logger.out('Failed to send VM to {} - aborting live migration; error: {}'.format(dest_lv, e), state='e', prefix='Domain {}'.format(self.domuuid))
dest_lv_conn.close()
return False
self.logger.out('Successfully migrated VM', state='o', prefix='Domain {}'.format(self.domuuid))
dest_lv_conn.close()
self.console_log_instance.stop()
self.removeDomainFromList()
return True
def migrate_shutdown():
self.logger.out('Shutting down VM for migration', state='i', prefix='Domain {}'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'shutdown' })
while zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) != 'stop':
time.sleep(0.5)
return True
do_migrate_shutdown = False
migrate_live_result = migrate_live()
if not migrate_live_result:
if force_live:
self.logger.out('Could not live migrate VM; live migration enforced, aborting', state='e', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Could not live migrate VM; live migration enforced, aborting', state='e', prefix='Domain {}'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, {
'/domains/{}/state'.format(self.domuuid): 'start',
'/domains/{}/node'.format(self.domuuid): self.this_node.name,
'/domains/{}/lastnode'.format(self.domuuid): ''
})
else:
self.logger.out('Could not live migrate VM; shutting down to migrate instead', state='e', prefix='Domain {}:'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'shutdown' })
else:
self.removeDomainFromList()
# Stop the log watcher
self.console_log_instance.stop()
do_migrate_shutdown = True
self.logger.out('Releasing write lock for synchronization B', state='i', prefix='Domain {}'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, { '/locks/primary_node': self.domuuid })
lock.release()
self.logger.out('Released write lock for synchronization B', state='o')
# Synchronize nodes C (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate')
self.logger.out('Acquiring write lock for synchronization C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired write lock for synchronization C', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.2) # Time for reader to acquire the lock
if do_migrate_shutdown:
migrate_shutdown_result = migrate_live()
self.logger.out('Releasing write lock for synchronization C', state='i', prefix='Domain {}'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, { '/locks/primary_node': self.domuuid })
lock.release()
self.logger.out('Released write lock for synchronization C', state='o')
# Synchronize nodes D (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate')
self.logger.out('Acquiring read lock for synchronization D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired read lock for synchronization D', state='o', prefix='Domain {}'.format(self.domuuid))
self.logger.out('Releasing read lock for synchronization D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released read lock for synchronization D', state='o', prefix='Domain {}'.format(self.domuuid))
# Wait 0.5 seconds for everything to stabilize before we declare all-done
time.sleep(0.5)
migrate_lock.release()
self.inmigrate = False
return
# Receive the migration from another host (wait until VM is running)
# Receive the migration from another host
def receive_migrate(self):
self.inreceive = True
live_receive = True
tick = 0
self.logger.out('Receiving migration', state='i', prefix='Domain {}:'.format(self.domuuid))
while True:
# Wait 1 second and increment the tick
time.sleep(1)
tick += 1
# Get zookeeper state and look for the VM in the local libvirt database
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid))
self.dom = self.lookupByUUID(self.domuuid)
self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid))
# If the dom is found
if self.dom:
lvdomstate = self.dom.state()[0]
if lvdomstate == libvirt.VIR_DOMAIN_RUNNING:
# VM has been received and started
self.addDomainToList()
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}:'.format(self.domuuid))
break
else:
# If the state is no longer migrate
if self.state not in ['migrate', 'migrate-live']:
# The receive was aborted before it timed out or was completed
self.logger.out('Receive aborted via state change', state='w', prefix='Domain {}:'.format(self.domuuid))
break
# If the dom is not found
# Ensure our lock key is populated
zkhandler.writedata(self.zk_conn, { '/locks/domain_migrate': self.domuuid })
# Synchronize nodes A (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate')
self.logger.out('Acquiring write lock for synchronization A', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired write lock for synchronization A', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.2) # Time for reader to acquire the lock
self.logger.out('Releasing write lock for synchronization A', state='i', prefix='Domain {}'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, { '/locks/domain_migrate': self.domuuid })
lock.release()
self.logger.out('Released write lock for synchronization A', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.1) # Time for new writer to acquire the lock
# Synchronize nodes B (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate')
self.logger.out('Acquiring read lock for synchronization B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired read lock for synchronization B', state='o', prefix='Domain {}'.format(self.domuuid))
self.logger.out('Releasing read lock for synchronization B', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released read lock for synchronization B', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes C (I am reader)
lock = zkhandler.readlock(self.zk_conn, '/locks/domain_migrate')
self.logger.out('Acquiring read lock for synchronization C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired read lock for synchronization C', state='o', prefix='Domain {}'.format(self.domuuid))
self.logger.out('Releasing read lock for synchronization C', state='i', prefix='Domain {}'.format(self.domuuid))
lock.release()
self.logger.out('Released read lock for synchronization C', state='o', prefix='Domain {}'.format(self.domuuid))
# Synchronize nodes D (I am writer)
lock = zkhandler.writelock(self.zk_conn, '/locks/domain_migrate')
self.logger.out('Acquiring write lock for synchronization D', state='i', prefix='Domain {}'.format(self.domuuid))
lock.acquire()
self.logger.out('Acquired write lock for synchronization D', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.2) # Time for reader to acquire the lock
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid))
self.dom = self.lookupByUUID(self.domuuid)
if self.dom:
lvdomstate = self.dom.state()[0]
if lvdomstate == libvirt.VIR_DOMAIN_RUNNING:
# VM has been received and started
self.addDomainToList()
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}'.format(self.domuuid))
else:
# If the state is changed to shutdown or stop
if self.state == 'shutdown' or self.state == 'stop':
# The receive failed on the remote end, and VM is being shut down instead
live_receive = False
self.logger.out('Send failed on remote end', state='w', prefix='Domain {}:'.format(self.domuuid))
break
# If we've already been waiting 90s for a receive
# HARDCODE: 90s should be plenty of time for even extremely large VMs on reasonable networks
if tick > 90:
# The receive timed out
# The receive somehow failed
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'fail' })
self.logger.out('Receive timed out without state change', state='e', prefix='Domain {}:'.format(self.domuuid))
break
else:
if self.state in ['start']:
# The receive was aborted
self.logger.out('Receive aborted via state change', state='w', prefix='Domain {}'.format(self.domuuid))
elif self.state in ['stop']:
# The send was shutdown-based
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
else:
# The send failed catastrophically
self.logger.out('Send failed catastrophically, VM in undefined state', state='e', prefix='Domain {}'.format(self.domuuid))
# We are waiting on a shutdown
if not live_receive:
tick = 0
self.logger.out('Waiting for VM to shut down on remote end', state='i', prefix='Domain {}:'.format(self.domuuid))
while True:
# Wait 1 second and increment the tick
time.sleep(1)
tick += 1
# Get zookeeper state and look for the VM in the local libvirt database
self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid))
# If the VM has stopped
if self.state == 'stop':
# Wait one more second to avoid race conditions
time.sleep(1)
# Start the VM up
zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' })
break
# If we've already been waiting 120s for a shutdown
# HARDCODE: The remote timeout is 90s, so an extra 30s of buffer
if tick > 120:
# The shutdown timed out; something is very amiss, so switch state to fail and abort
zkhandler.writedata(self.zk_conn, {
'/domains/{}/state'.format(self.domuuid): 'fail',
'/domains/{}/failedreason'.format(self.domuuid): 'Timeout waiting for migrate or shutdown'
})
self.logger.out('Shutdown timed out without state change', state='e', prefix='Domain {}:'.format(self.domuuid))
break
self.logger.out('Releasing write lock for synchronization D', state='i', prefix='Domain {}'.format(self.domuuid))
zkhandler.writedata(self.zk_conn, { '/locks/domain_migrate': '' })
lock.release()
self.logger.out('Released write lock for synchronization D', state='o', prefix='Domain {}'.format(self.domuuid))
time.sleep(0.1) # Time for new writer to acquire the lock
self.inreceive = False
return
#
# Main function to manage a VM (taking only self)
@ -608,7 +672,7 @@ class VMInstance(object):
# Open a libvirt connection
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}:'.format(self.domuuid))
self.logger.out('Failed to open local libvirt connection', state='e', prefix='Domain {}'.format(self.domuuid))
return None
# Lookup the UUID