Handle cancelling flushes when new ones run

Store the flush_thread of a node as a class object. Before starting a
new flush thread (either flush or unflush), stop the existing one if it
exists to prevent further migrations, then start the new thread. Set the
object to None on init and again once the task actually finishes. Remove
the inflush flag as this is not required when using these threads and
functionally does nothing any longer, but add the flush_stopper flag to
trigger cancellation of the current job.
This commit is contained in:
Joshua Boniface 2019-07-10 01:07:56 -04:00
parent c7c8c8bcbb
commit 8f160abf90
1 changed files with 36 additions and 14 deletions

View File

@ -78,8 +78,10 @@ class NodeInstance(object):
self.upstream_dev = None self.upstream_dev = None
self.upstream_ipaddr = None self.upstream_ipaddr = None
self.upstream_cidrnetmask = None self.upstream_cidrnetmask = None
# Threads
self.flush_thread = None
# Flags # Flags
self.inflush = False self.flush_stopper = False
# Zookeeper handlers for changed states # Zookeeper handlers for changed states
@self.zk_conn.DataWatch('/nodes/{}/daemonstate'.format(self.name)) @self.zk_conn.DataWatch('/nodes/{}/daemonstate'.format(self.name))
@ -135,14 +137,21 @@ class NodeInstance(object):
# toggle state management of this node # toggle state management of this node
if self.name == self.this_node: if self.name == self.this_node:
if self.domain_state == 'flush' and self.inflush == False: # Stop any existing flush jobs
# Do flushing in a thread so it doesn't block the migrates out if self.flush_thread:
flush_thread = threading.Thread(target=self.flush, args=(), kwargs={}) self.flush_stopper = True
flush_thread.start() self.logger.out('Waiting for previous migration to complete'.format(self.name), state='i')
if self.domain_state == 'unflush' and self.inflush == False: while self.flush_stopper:
# Do unflushing in a thread so it doesn't block the migrates in time.sleep(1)
flush_thread = threading.Thread(target=self.unflush, args=(), kwargs={}) self.flush_stopper = False
flush_thread.start() # Do flushing in a thread so it doesn't block the migrates out
if self.domain_state == 'flush':
self.flush_thread = threading.Thread(target=self.flush, args=(), kwargs={})
self.flush_thread.start()
# Do unflushing in a thread so it doesn't block the migrates in
if self.domain_state == 'unflush':
self.flush_thread = threading.Thread(target=self.unflush, args=(), kwargs={})
self.flush_thread.start()
@self.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name)) @self.zk_conn.DataWatch('/nodes/{}/memfree'.format(self.name))
def watch_node_memfree(data, stat, event=''): def watch_node_memfree(data, stat, event=''):
@ -344,11 +353,17 @@ class NodeInstance(object):
# Flush all VMs on the host # Flush all VMs on the host
def flush(self): def flush(self):
# Begin flush # Begin flush
self.inflush = True
self.logger.out('Flushing node "{}" of running VMs'.format(self.name), state='i') self.logger.out('Flushing node "{}" of running VMs'.format(self.name), state='i')
self.logger.out('Domain list: {}'.format(', '.join(self.domain_list))) self.logger.out('Domain list: {}'.format(', '.join(self.domain_list)))
fixed_domain_list = self.domain_list.copy() fixed_domain_list = self.domain_list.copy()
for dom_uuid in fixed_domain_list: for dom_uuid in fixed_domain_list:
# Allow us to cancel the operation
if self.flush_stopper:
self.logger.out('Aborting node flush'.format(self.name), state='i')
self.flush_thread = None
self.flush_stopper = False
return
self.logger.out('Selecting target to migrate VM "{}"'.format(dom_uuid), state='i') self.logger.out('Selecting target to migrate VM "{}"'.format(dom_uuid), state='i')
target_node = common.findTargetHypervisor(self.zk_conn, 'mem', dom_uuid) target_node = common.findTargetHypervisor(self.zk_conn, 'mem', dom_uuid)
@ -382,13 +397,20 @@ class NodeInstance(object):
zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.name): '' }) zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.name): '' })
zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'flushed' }) zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'flushed' })
self.inflush = False self.flush_thread = None
self.flush_stopper = False
def unflush(self): def unflush(self):
self.inflush = True
self.logger.out('Restoring node {} to active service.'.format(self.name), state='i') self.logger.out('Restoring node {} to active service.'.format(self.name), state='i')
fixed_domain_list = self.d_domain.copy() fixed_domain_list = self.d_domain.copy()
for dom_uuid in fixed_domain_list: for dom_uuid in fixed_domain_list:
# Allow us to cancel the operation
if self.flush_stopper:
self.logger.out('Aborting node unflush'.format(self.name), state='i')
self.flush_thread = None
self.flush_stopper = False
return
try: try:
last_node = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid)) last_node = zkhandler.readdata(self.zk_conn, '/domains/{}/lastnode'.format(dom_uuid))
except: except:
@ -409,5 +431,5 @@ class NodeInstance(object):
time.sleep(1) time.sleep(1)
zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'ready' }) zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'ready' })
self.inflush = False self.flush_thread = None
self.flush_stopper = False