From 85aba7cc18b6349de3429beea8a12d96925fc32a Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Wed, 9 Jun 2021 23:15:08 -0400 Subject: [PATCH] Convert VMInstance to new ZK schema handler --- node-daemon/pvcnoded/VMInstance.py | 153 +++++++++++++++-------------- 1 file changed, 79 insertions(+), 74 deletions(-) diff --git a/node-daemon/pvcnoded/VMInstance.py b/node-daemon/pvcnoded/VMInstance.py index c286b48d..6eeda7ba 100644 --- a/node-daemon/pvcnoded/VMInstance.py +++ b/node-daemon/pvcnoded/VMInstance.py @@ -38,7 +38,8 @@ import daemon_lib.common as daemon_common def flush_locks(zkhandler, logger, dom_uuid, this_node=None): logger.out('Flushing RBD locks for VM "{}"'.format(dom_uuid), state='i') # Get the list of RBD images - rbd_list = zkhandler.read('/domains/{}/rbdlist'.format(dom_uuid)).split(',') + rbd_list = zkhandler.read(('domain.storage.volumes', dom_uuid)).split(',') + for rbd in rbd_list: # Check if a lock exists lock_list_retcode, lock_list_stdout, lock_list_stderr = common.run_os_command('rbd lock list --format json {}'.format(rbd)) @@ -56,11 +57,11 @@ def flush_locks(zkhandler, logger, dom_uuid, this_node=None): if lock_list: # Loop through the locks for lock in lock_list: - if this_node is not None and zkhandler.read('/domains/{}/state'.format(dom_uuid)) != 'stop' and lock['address'].split(':')[0] != this_node.storage_ipaddr: + if this_node is not None and zkhandler.read(('domain.state', dom_uuid)) != 'stop' and lock['address'].split(':')[0] != this_node.storage_ipaddr: logger.out('RBD lock does not belong to this host (lock owner: {}): freeing this lock would be unsafe, aborting'.format(lock['address'].split(':')[0], state='e')) zkhandler.write([ - ('/domains/{}/state'.format(dom_uuid), 'fail'), - ('/domains/{}/failedreason'.format(dom_uuid), 'Could not safely free RBD lock {} ({}) on volume {}; stop VM and flush locks manually'.format(lock['id'], lock['address'], rbd)) + (('domain.state', dom_uuid), 'fail'), + (('domain.failed_reason', dom_uuid), 'Could not safely free RBD lock {} ({}) on volume {}; stop VM and flush locks manually'.format(lock['id'], lock['address'], rbd)), ]) break # Free the lock @@ -68,8 +69,8 @@ def flush_locks(zkhandler, logger, dom_uuid, this_node=None): if lock_remove_retcode != 0: logger.out('Failed to free RBD lock "{}" on volume "{}": {}'.format(lock['id'], rbd, lock_remove_stderr), state='e') zkhandler.write([ - ('/domains/{}/state'.format(dom_uuid), 'fail'), - ('/domains/{}/failedreason'.format(dom_uuid), 'Could not free RBD lock {} ({}) on volume {}: {}'.format(lock['id'], lock['address'], rbd, lock_remove_stderr)) + (('domain.state', dom_uuid), 'fail'), + (('domain.failed_reason', dom_uuid), 'Could not free RBD lock {} ({}) on volume {}: {}'.format(lock['id'], lock['address'], rbd, lock_remove_stderr)), ]) break logger.out('Freed RBD lock "{}" on volume "{}"'.format(lock['id'], rbd), state='o') @@ -89,7 +90,7 @@ def run_command(zkhandler, logger, this_node, data): # Verify that the VM is set to run on this node if this_node.d_domain[dom_uuid].getnode() == this_node.name: # Lock the command queue - zk_lock = zkhandler.writelock('/cmd/domains') + zk_lock = zkhandler.writelock('base.cmd.domain') with zk_lock: # Flush the lock result = flush_locks(zkhandler, logger, dom_uuid, this_node) @@ -97,13 +98,13 @@ def run_command(zkhandler, logger, this_node, data): if result: # Update the command queue zkhandler.write([ - ('/cmd/domains', 'success-{}'.format(data)) + ('base.cmd.domain', 'success-{}'.format(data)) ]) # Command failed else: # Update the command queue zkhandler.write([ - ('/cmd/domains', 'failure-{}'.format(data)) + ('base.cmd.domain', 'failure-{}'.format(data)) ]) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -120,18 +121,14 @@ class VMInstance(object): self.this_node = this_node # Get data from zookeeper - self.domname = self.zkhandler.read('/domains/{}'.format(domuuid)) - self.state = self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) - self.node = self.zkhandler.read('/domains/{}/node'.format(self.domuuid)) - self.lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid)) - self.last_currentnode = self.zkhandler.read('/domains/{}/node'.format(self.domuuid)) - self.last_lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid)) + self.domname = self.zkhandler.read(('domain', domuuid)) + self.state = self.zkhandler.read(('domain.state', domuuid)) + self.node = self.zkhandler.read(('domain.node', domuuid)) + self.lastnode = self.zkhandler.read(('domain.last_node', domuuid)) + self.last_currentnode = self.zkhandler.read(('domain.node', domuuid)) + self.last_lastnode = self.zkhandler.read(('domain.last_node', domuuid)) try: - self.pinpolicy = self.zkhandler.read('/domains/{}/pinpolicy'.format(self.domuuid)) - except Exception: - self.pinpolicy = "none" - try: - self.migration_method = self.zkhandler.read('/domains/{}/migration_method'.format(self.domuuid)) + self.migration_method = self.zkhandler.read(('domain.meta.migrate_method', self.domuuid)) except Exception: self.migration_method = 'none' @@ -150,7 +147,7 @@ class VMInstance(object): self.console_log_instance = VMConsoleWatcherInstance.VMConsoleWatcherInstance(self.domuuid, self.domname, self.zkhandler, self.config, self.logger, self.this_node) # Watch for changes to the state field in Zookeeper - @self.zkhandler.zk_conn.DataWatch('/domains/{}/state'.format(self.domuuid)) + @self.zkhandler.zk_conn.DataWatch(self.zkhandler.schema.path('domain.state', self.domuuid)) def watch_state(data, stat, event=""): if event and event.type == 'DELETED': # The key has been deleted after existing before; terminate this watcher @@ -203,7 +200,7 @@ class VMInstance(object): self.this_node.domain_list.append(self.domuuid) # Push the change up to Zookeeper self.zkhandler.write([ - ('/nodes/{}/runningdomains'.format(self.this_node.name), ' '.join(self.this_node.domain_list)) + (('node.running_domains', self.this_node_name), ' '.join(self.this_node.domain_list)) ]) except Exception as e: self.logger.out('Error adding domain to list: {}'.format(e), state='e') @@ -215,7 +212,7 @@ class VMInstance(object): self.this_node.domain_list.remove(self.domuuid) # Push the change up to Zookeeper self.zkhandler.write([ - ('/nodes/{}/runningdomains'.format(self.this_node.name), ' '.join(self.this_node.domain_list)) + (('node.running_domains', self.this_node_name), ' '.join(self.this_node.domain_list)) ]) except Exception as e: self.logger.out('Error removing domain from list: {}'.format(e), state='e') @@ -230,15 +227,15 @@ class VMInstance(object): port = graphics.get('port', '') listen = graphics.get('listen', '') self.zkhandler.write([ - ('/domains/{}/vnc'.format(self.domuuid), '{}:{}'.format(listen, port)) + (('domain.console.vnc', self.domuuid), '{}:{}'.format(listen, port)) ]) else: self.zkhandler.write([ - ('/domains/{}/vnc'.format(self.domuuid), '') + (('domain.console.vnc', self.domuuid), '') ]) else: self.zkhandler.write([ - ('/domains/{}/vnc'.format(self.domuuid), '') + (('domain.console.vnc', self.domuuid), '') ]) # Start up the VM @@ -269,7 +266,7 @@ class VMInstance(object): # Flush locks self.logger.out('Flushing RBD locks', state='i', prefix='Domain {}'.format(self.domuuid)) flush_locks(self.zkhandler, self.logger, self.domuuid, self.this_node) - if self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) == 'fail': + if self.zkhandler.read(('domain.state', self.domuuid)) == 'fail': lv_conn.close() self.dom = None self.instart = False @@ -279,25 +276,25 @@ class VMInstance(object): # If it is running just update the model self.addDomainToList() self.zkhandler.write([ - ('/domains/{}/failedreason'.format(self.domuuid), '') + (('domain.failed_reason', self.domuuid), '') ]) else: # Or try to create it try: # Grab the domain information from Zookeeper - xmlconfig = self.zkhandler.read('/domains/{}/xml'.format(self.domuuid)) + xmlconfig = self.zkhandler.read(('domain.xml', self.domuuid)) dom = lv_conn.createXML(xmlconfig, 0) self.addDomainToList() self.logger.out('Successfully started VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.dom = dom self.zkhandler.write([ - ('/domains/{}/failedreason'.format(self.domuuid), '') + (('domain.failed_reason', self.domuuid), '') ]) except libvirt.libvirtError as e: self.logger.out('Failed to create VM', state='e', prefix='Domain {}'.format(self.domuuid)) self.zkhandler.write([ - ('/domains/{}/state'.format(self.domuuid), 'fail'), - ('/domains/{}/failedreason'.format(self.domuuid), str(e)) + (('domain.state', self.domuuid), 'fail'), + (('domain.failed_reason', self.domuuid), str(e)) ]) lv_conn.close() self.dom = None @@ -327,7 +324,7 @@ class VMInstance(object): self.addDomainToList() self.zkhandler.write([ - ('/domains/{}/state'.format(self.domuuid), 'start') + (('domain.state', self.domuuid), 'start') ]) lv_conn.close() self.inrestart = False @@ -360,7 +357,7 @@ class VMInstance(object): if self.inrestart is False: self.zkhandler.write([ - ('/domains/{}/state'.format(self.domuuid), 'stop') + (('domain.state', self.domuuid), 'stop') ]) self.logger.out('Successfully stopped VM', state='o', prefix='Domain {}'.format(self.domuuid)) @@ -382,7 +379,7 @@ class VMInstance(object): time.sleep(1) # Abort shutdown if the state changes to start - current_state = self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) + current_state = self.zkhandler.read(('domain.state', self.domuuid)) if current_state not in ['shutdown', 'restart']: self.logger.out('Aborting VM shutdown due to state change', state='i', prefix='Domain {}'.format(self.domuuid)) is_aborted = True @@ -396,7 +393,7 @@ class VMInstance(object): if lvdomstate != libvirt.VIR_DOMAIN_RUNNING: self.removeDomainFromList() self.zkhandler.write([ - ('/domains/{}/state'.format(self.domuuid), 'stop') + (('domain.state', self.domuuid), 'stop') ]) self.logger.out('Successfully shutdown VM', state='o', prefix='Domain {}'.format(self.domuuid)) self.dom = None @@ -407,7 +404,7 @@ class VMInstance(object): if tick >= self.config['vm_shutdown_timeout']: self.logger.out('Shutdown timeout ({}s) expired, forcing off'.format(self.config['vm_shutdown_timeout']), state='e', prefix='Domain {}'.format(self.domuuid)) self.zkhandler.write([ - ('/domains/{}/state'.format(self.domuuid), 'stop') + (('domain.state', self.domuuid), 'stop') ]) break @@ -420,7 +417,7 @@ class VMInstance(object): # Wait to prevent race conditions time.sleep(1) self.zkhandler.write([ - ('/domains/{}/state'.format(self.domuuid), 'start') + (('domain.state', self.domuuid), 'start') ]) # Migrate the VM to a target host @@ -438,15 +435,15 @@ class VMInstance(object): self.logger.out('Migrating VM to node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid)) # Used for sanity checking later - target_node = self.zkhandler.read('/domains/{}/node'.format(self.domuuid)) + target_node = self.zkhandler.read(('domain.node', self.domuuid)) aborted = False def abort_migrate(reason): self.zkhandler.write([ - ('/domains/{}/state'.format(self.domuuid), 'start'), - ('/domains/{}/node'.format(self.domuuid), self.this_node.name), - ('/domains/{}/lastnode'.format(self.domuuid), self.last_lastnode) + (('domain.state', self.domuuid), 'start'), + (('domain.node', self.domuuid), self.this_node.name), + (('domain.last_node', self.domuuid), self.last_lastnode) ]) migrate_lock_node.release() migrate_lock_state.release() @@ -454,8 +451,8 @@ class VMInstance(object): self.logger.out('Aborted migration: {}'.format(reason), state='i', prefix='Domain {}'.format(self.domuuid)) # Acquire exclusive lock on the domain node key - migrate_lock_node = self.zkhandler.exclusivelock('/domains/{}/node'.format(self.domuuid)) - migrate_lock_state = self.zkhandler.exclusivelock('/domains/{}/state'.format(self.domuuid)) + migrate_lock_node = self.zkhandler.exclusivelock(('domain.node', self.domuuid)) + migrate_lock_state = self.zkhandler.exclusivelock(('domain.state', self.domuuid)) migrate_lock_node.acquire() migrate_lock_state.acquire() @@ -467,14 +464,14 @@ class VMInstance(object): return # Synchronize nodes A (I am reader) - lock = self.zkhandler.readlock('/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired read lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid)) - if self.zkhandler.read('/locks/domain_migrate/{}'.format(self.domuuid)) == '': + if self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) == '': self.logger.out('Waiting for peer', state='i', prefix='Domain {}'.format(self.domuuid)) ticks = 0 - while self.zkhandler.read('/locks/domain_migrate/{}'.format(self.domuuid)) == '': + while self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) == '': time.sleep(0.1) ticks += 1 if ticks > 300: @@ -490,7 +487,7 @@ class VMInstance(object): return # Synchronize nodes B (I am writer) - lock = self.zkhandler.writelock('/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired write lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) @@ -532,9 +529,9 @@ class VMInstance(object): def migrate_shutdown(): self.logger.out('Shutting down VM for offline migration', state='i', prefix='Domain {}'.format(self.domuuid)) self.zkhandler.write([ - ('/domains/{}/state'.format(self.domuuid), 'shutdown') + (('domain.state', self.domuuid), 'shutdown') ]) - while self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) != 'stop': + while self.zkhandler.read(('domain.state', self.domuuid)) != 'stop': time.sleep(0.5) return True @@ -580,7 +577,7 @@ class VMInstance(object): return # Synchronize nodes C (I am writer) - lock = self.zkhandler.writelock('/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) @@ -594,21 +591,26 @@ class VMInstance(object): self.logger.out('Released write lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) # Synchronize nodes D (I am reader) - lock = self.zkhandler.readlock('/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) - self.last_currentnode = self.zkhandler.read('/domains/{}/node'.format(self.domuuid)) - self.last_lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid)) + self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid)) + self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid)) self.logger.out('Releasing read lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) lock.release() self.logger.out('Released read lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) # Wait for the receive side to complete before we declare all-done and release locks - while self.zkhandler.read('/locks/domain_migrate/{}'.format(self.domuuid)) != '': - time.sleep(0.5) + ticks = 0 + while self.zkhandler.read(('domain.migrate.sync_lock', self.domuuid)) != '': + time.sleep(0.1) + ticks += 1 + if ticks > 100: + self.logger.out('Sync lock clear exceeded 10s timeout, continuing', state='w', prefix='Domain {}'.format(self.domuuid)) + break migrate_lock_node.release() migrate_lock_state.release() @@ -625,13 +627,16 @@ class VMInstance(object): self.logger.out('Receiving VM migration from node "{}"'.format(self.node), state='i', prefix='Domain {}'.format(self.domuuid)) + # Short delay to ensure sender is in sync + time.sleep(0.5) + # Ensure our lock key is populated self.zkhandler.write([ - ('/locks/domain_migrate/{}'.format(self.domuuid), self.domuuid) + (('domain.migrate.sync_lock', self.domuuid), self.domuuid) ]) # Synchronize nodes A (I am writer) - lock = self.zkhandler.writelock('/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase A', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired write lock for synchronization phase A', state='o', prefix='Domain {}'.format(self.domuuid)) @@ -642,7 +647,7 @@ class VMInstance(object): time.sleep(0.1) # Time for new writer to acquire the lock # Synchronize nodes B (I am reader) - lock = self.zkhandler.readlock('/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase B', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) @@ -651,27 +656,27 @@ class VMInstance(object): self.logger.out('Released read lock for synchronization phase B', state='o', prefix='Domain {}'.format(self.domuuid)) # Synchronize nodes C (I am reader) - lock = self.zkhandler.readlock('/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.readlock(('domain.migrate.sync_lock', self.domuuid)) self.logger.out('Acquiring read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) # Set the updated data - self.last_currentnode = self.zkhandler.read('/domains/{}/node'.format(self.domuuid)) - self.last_lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid)) + self.last_currentnode = self.zkhandler.read(('domain.node', self.domuuid)) + self.last_lastnode = self.zkhandler.read(('domain.last_node', self.domuuid)) self.logger.out('Releasing read lock for synchronization phase C', state='i', prefix='Domain {}'.format(self.domuuid)) lock.release() self.logger.out('Released read lock for synchronization phase C', state='o', prefix='Domain {}'.format(self.domuuid)) # Synchronize nodes D (I am writer) - lock = self.zkhandler.writelock('/locks/domain_migrate/{}'.format(self.domuuid)) + lock = self.zkhandler.writelock(('domain.migrate.sync_lock', self.domuuid)) self.logger.out('Acquiring write lock for synchronization phase D', state='i', prefix='Domain {}'.format(self.domuuid)) lock.acquire() self.logger.out('Acquired write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) time.sleep(0.5) # Time for reader to acquire the lock - self.state = self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) + self.state = self.zkhandler.read(('domain.state', self.domuuid)) self.dom = self.lookupByUUID(self.domuuid) if self.dom: lvdomstate = self.dom.state()[0] @@ -679,14 +684,14 @@ class VMInstance(object): # VM has been received and started self.addDomainToList() self.zkhandler.write([ - ('/domains/{}/state'.format(self.domuuid), 'start') + (('domain.state', self.domuuid), 'start') ]) self.logger.out('Successfully received migrated VM', state='o', prefix='Domain {}'.format(self.domuuid)) else: # The receive somehow failed self.zkhandler.write([ - ('/domains/{}/state'.format(self.domuuid), 'fail'), - ('/domains/{}/failed_reason'.format(self.domuuid), 'Failed to receive migration') + (('domain.state', self.domuuid), 'fail'), + (('domain.failed_reason', self.domuuid), 'Failed to receive migration') ]) self.logger.out('Failed to receive migrated VM', state='e', prefix='Domain {}'.format(self.domuuid)) else: @@ -697,7 +702,7 @@ class VMInstance(object): elif self.state in ['stop']: # The send was shutdown-based self.zkhandler.write([ - ('/domains/{}/state'.format(self.domuuid), 'start') + (('domain.state', self.domuuid), 'start') ]) else: # The send failed or was aborted @@ -708,7 +713,7 @@ class VMInstance(object): self.logger.out('Released write lock for synchronization phase D', state='o', prefix='Domain {}'.format(self.domuuid)) self.zkhandler.write([ - ('/locks/domain_migrate/{}'.format(self.domuuid), '') + (('domain.migrate.sync_lock', self.domuuid), '') ]) self.inreceive = False return @@ -718,9 +723,9 @@ class VMInstance(object): # def manage_vm_state(self): # Update the current values from zookeeper - self.state = self.zkhandler.read('/domains/{}/state'.format(self.domuuid)) - self.node = self.zkhandler.read('/domains/{}/node'.format(self.domuuid)) - self.lastnode = self.zkhandler.read('/domains/{}/lastnode'.format(self.domuuid)) + self.state = self.zkhandler.read(('domain.state', self.domuuid)) + self.node = self.zkhandler.read(('domain.node', self.domuuid)) + self.lastnode = self.zkhandler.read(('domain.last_node', self.domuuid)) # Check the current state of the VM try: @@ -769,7 +774,7 @@ class VMInstance(object): # Start the log watcher self.console_log_instance.start() self.zkhandler.write([ - ('/domains/{}/state'.format(self.domuuid), 'start') + (('domain.state', self.domuuid), 'start') ]) # Add domain to running list self.addDomainToList() @@ -794,7 +799,7 @@ class VMInstance(object): # VM should be restarted (i.e. started since it isn't running) if self.state == "restart": self.zkhandler.write([ - ('/domains/{}/state'.format(self.domuuid), 'start') + (('domain.state', self.domuuid), 'start') ]) # VM should be shut down; ensure it's gone from this node's domain_list elif self.state == "shutdown":