Implement flush locking for nodes

Implements a locking mechanism to prevent clobbering of node
flushes. When a flush begins, a global cluster lock is placed
which is freed once the flush completes. While the lock is in place,
other flush events queue waiting for the lock to free before
proceeding.

Modifies the CLI output flow when the `--wait` option is specified.
First, if a lock exists when running the command, the message is
tweaked to indicate this, and the client will wait first for the
lock to free, and then for the flush as normal. Second, the wait
depends on the active lock rather than the domain_status for
consistency purposes.

Closes #32
This commit is contained in:
Joshua Boniface 2019-05-10 23:52:24 -04:00
parent 045ad131af
commit c19902d952
3 changed files with 34 additions and 7 deletions

View File

@ -650,6 +650,8 @@ def init_zookeeper(zk_host):
transaction.create('/ceph', ''.encode('ascii')) transaction.create('/ceph', ''.encode('ascii'))
transaction.create('/ceph/osds', ''.encode('ascii')) transaction.create('/ceph/osds', ''.encode('ascii'))
transaction.create('/ceph/pools', ''.encode('ascii')) transaction.create('/ceph/pools', ''.encode('ascii'))
transaction.create('/locks', ''.encode('ascii'))
transaction.create('/locks/flush_lock', 'False'.encode('ascii'))
transaction.commit() transaction.commit()
# Close the Zookeeper connection # Close the Zookeeper connection

View File

@ -134,24 +134,33 @@ def flush_node(zk_conn, node, wait):
if not common.verifyNode(zk_conn, node): if not common.verifyNode(zk_conn, node):
return False, 'ERROR: No node named "{}" is present in the cluster.'.format(node) return False, 'ERROR: No node named "{}" is present in the cluster.'.format(node)
if zkhandler.readdata(zk_conn, '/locks/flush_lock') == 'True':
retmsg = 'Flushing hypervisor {} of running VMs. A flush lock currently exists; flush will continue once the lock is freed.'.format(node)
lock_wait = True
else:
retmsg = 'Flushing hypervisor {} of running VMs.'.format(node) retmsg = 'Flushing hypervisor {} of running VMs.'.format(node)
lock_wait = False
# Wait cannot be triggered from the API # Wait cannot be triggered from the API
if wait: if wait:
click.echo(retmsg) click.echo(retmsg)
retmsg = "" retmsg = ""
if lock_wait:
time.sleep(1)
while zkhandler.readdata(zk_conn, '/locks/flush_lock') == 'True':
time.sleep(1)
click.echo('Previous flush completed. Proceeding with flush.')
# Add the new domain to Zookeeper # Add the new domain to Zookeeper
zkhandler.writedata(zk_conn, { zkhandler.writedata(zk_conn, {
'/nodes/{}/domainstate'.format(node): 'flush' '/nodes/{}/domainstate'.format(node): 'flush'
}) })
if wait == True: # Wait cannot be triggered from the API
while True: if wait:
time.sleep(1)
while zkhandler.readdata(zk_conn, '/locks/flush_lock') == 'True':
time.sleep(1) time.sleep(1)
node_state = zkhandler.readdata(zk_conn, '/nodes/{}/domainstate'.format(node))
if node_state == "flushed":
break
return True, retmsg return True, retmsg

View File

@ -316,6 +316,17 @@ class NodeInstance(object):
# Flush all VMs on the host # Flush all VMs on the host
def flush(self): def flush(self):
# Wait indefinitely for the flush_lock to be freed
time.sleep(0.5)
while zkhandler.readdata(self.zk_conn, '/locks/flush_lock') == 'True':
time.sleep(2)
# Acquire the flush lock
zkhandler.writedata(self.zk_conn, {
'/locks/flush_lock'.format(node): 'True'
})
# Begin flush
self.inflush = True self.inflush = True
self.logger.out('Flushing node "{}" of running VMs'.format(self.name), state='i') self.logger.out('Flushing node "{}" of running VMs'.format(self.name), state='i')
self.logger.out('Domain list: {}'.format(', '.join(self.domain_list))) self.logger.out('Domain list: {}'.format(', '.join(self.domain_list)))
@ -347,6 +358,11 @@ class NodeInstance(object):
zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'flushed' }) zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'flushed' })
self.inflush = False self.inflush = False
# Release the flush lock
zkhandler.writedata(self.zk_conn, {
'/locks/flush_lock'.format(node): 'False'
})
def unflush(self): def unflush(self):
self.inflush = True self.inflush = True
self.logger.out('Restoring node {} to active service.'.format(self.name), state='i') self.logger.out('Restoring node {} to active service.'.format(self.name), state='i')