From 4320fcdf0e3dfa56566ab25e29ba8a3f4bb42a94 Mon Sep 17 00:00:00 2001 From: Joshua Boniface Date: Tue, 26 Jun 2018 23:24:33 -0400 Subject: [PATCH] [#5] Use better dict-based format for write updates --- pvcd.py | 6 ++--- pvcd/NodeInstance.py | 64 +++++++++++++++++++++----------------------- pvcd/VMInstance.py | 18 ++++++------- pvcd/zkhandler.py | 16 ++++++----- 4 files changed, 52 insertions(+), 52 deletions(-) diff --git a/pvcd.py b/pvcd.py index 10ca19c5..12788468 100755 --- a/pvcd.py +++ b/pvcd.py @@ -146,7 +146,7 @@ zk_conn.add_listener(zk_listener) def cleanup(signum, frame): ansiiprint.echo('Terminating daemon', '', 'e') # Set stop state in Zookeeper - zkhandler.writedata(zk_conn, '/nodes/{}/daemonstate'.format(myhostname), [ 'stop' ]) + zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'stop' }) # Close the Zookeeper connection zk_conn.close() # Stop keepalive thread @@ -180,7 +180,7 @@ print(' {0}Kernel:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticda if zk_conn.exists('/nodes/{}'.format(myhostname)): print("Node is " + ansiiprint.green() + "present" + ansiiprint.end() + " in Zookeeper") # Update static data just in case it's changed - zkhandler.writedata(zk_conn, '/nodes/{}/staticdata'.format(myhostname), [ ' '.join(staticdata) ]) + zkhandler.writedata(zk_conn, { '/nodes/{}/staticdata'.format(myhostname): ' '.join(staticdata) }) else: print("Node is " + ansiiprint.red() + "absent" + ansiiprint.end() + " in Zookeeper; adding new node") keepalive_time = int(time.time()) @@ -202,7 +202,7 @@ else: transaction.create('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password'].encode('ascii')) transaction.commit() -zkhandler.writedata(zk_conn, '/nodes/{}/daemonstate'.format(myhostname), [ 'init' ]) +zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'init' }) t_node = dict() s_domain = dict() diff --git a/pvcd/NodeInstance.py b/pvcd/NodeInstance.py index 699402ef..2e54dfeb 100644 --- a/pvcd/NodeInstance.py +++ b/pvcd/NodeInstance.py @@ -151,36 +151,34 @@ class NodeInstance(): if target_hypervisor == None: ansiiprint.echo('Failed to find migration target for VM "{}"; shutting down'.format(dom_uuid), '', 'e') - transaction = self.zk_conn.transaction() - transaction.set_data('/domains/{}/state'.format(dom_uuid), 'shutdown'.encode('ascii')) - transaction.commit() + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(dom_uuid): 'shutdown' }) else: ansiiprint.echo('Migrating VM "{}" to hypervisor "{}"'.format(dom_uuid, target_hypervisor), '', 'i') - transaction = self.zk_conn.transaction() - transaction.set_data('/domains/{}/state'.format(dom_uuid), 'migrate'.encode('ascii')) - transaction.set_data('/domains/{}/hypervisor'.format(dom_uuid), target_hypervisor.encode('ascii')) - transaction.set_data('/domains/{}/lasthypervisor'.format(dom_uuid), current_hypervisor.encode('ascii')) - transaction.commit() + zkhandler.writedata(self.zk_conn, { + '/domains/{}/state'.format(dom_uuid): 'migrate', + '/domains/{}/hypervisor'.format(dom_uuid): target_hypervisor, + '/domains/{}/lasthypervisor'.format(dom_uuid): current_hypervisor + }) - zkhandler.writedata(self.zk_conn, '/nodes/{}/runningdomains'.format(self.name), [ '' ]) - zkhandler.writedata(self.zk_conn, '/nodes/{}/domainstate'.format(self.name), [ 'flushed' ]) + zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.name): '' }) + zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'flushed' }) self.inflush = False def unflush(self): self.inflush = True ansiiprint.echo('Restoring node {} to active service.'.format(self.name), '', 'i') - zkhandler.writedata(self.zk_conn, '/nodes/{}/domainstate'.format(self.name), [ 'ready' ]) + zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'ready' }) for dom_uuid in self.s_domain: last_hypervisor = zkhandler.readdata(self.zk_conn, '/domains/{}/lasthypervisor'.format(dom_uuid)) if last_hypervisor != self.name: continue ansiiprint.echo('Setting unmigration for VM "{}"'.format(dom_uuid), '', 'i') - transaction = self.zk_conn.transaction() - transaction.set_data('/domains/{}/state'.format(dom_uuid), 'migrate'.encode('ascii')) - transaction.set_data('/domains/{}/hypervisor'.format(dom_uuid), self.name.encode('ascii')) - transaction.set_data('/domains/{}/lasthypervisor'.format(dom_uuid), ''.encode('ascii')) - transaction.commit() + zkhandler.writedata(self.zk_conn, { + '/domains/{}/state'.format(dom_uuid): 'migrate', + '/domains/{}/hypervisor'.format(dom_uuid): self.name, + '/domains/{}/lasthypervisor'.format(dom_uuid): '' + }) self.inflush = False @@ -196,7 +194,7 @@ class NodeInstance(): past_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(self.name)) if past_state != 'run': self.daemon_state = 'run' - zkhandler.writedata(self.zk_conn, '/nodes/{}/daemonstate'.format(self.name), [ 'run' ]) + zkhandler.writedata(self.zk_conn, { '/nodes/{}/daemonstate'.format(self.name): 'run' }) else: self.daemon_state = 'run' @@ -210,7 +208,7 @@ class NodeInstance(): raise except Exception as e: # Toggle a state "change" - zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(domain), [ instance.getstate() ]) + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(domain): instance.getstate() }) # Set our information in zookeeper self.name = lv_conn.getHostname() @@ -220,14 +218,14 @@ class NodeInstance(): self.domains_count = len(lv_conn.listDomainsID()) keepalive_time = int(time.time()) try: - transaction = self.zk_conn.transaction() - transaction.set_data('/nodes/{}/memused'.format(self.name), str(self.memused).encode('ascii')) - transaction.set_data('/nodes/{}/memfree'.format(self.name), str(self.memfree).encode('ascii')) - transaction.set_data('/nodes/{}/cpuload'.format(self.name), str(self.cpuload).encode('ascii')) - transaction.set_data('/nodes/{}/runningdomains'.format(self.name), ' '.join(self.domain_list).encode('ascii')) - transaction.set_data('/nodes/{}/domainscount'.format(self.name), str(self.domains_count).encode('ascii')) - transaction.set_data('/nodes/{}/keepalive'.format(self.name), str(keepalive_time).encode('ascii')) - transaction.commit() + zkhandler.writedata(self.zk_conn, { + '/nodes/{}/memused'.format(self.name): str(self.memused), + '/nodes/{}/memfree'.format(self.name): str(self.memfree), + '/nodes/{}/cpuload'.format(self.name): str(self.cpuload), + '/nodes/{}/runningdomains'.format(self.name): ' '.join(self.domain_list), + '/nodes/{}/domainscount'.format(self.name): str(self.domains_count), + '/nodes/{}/keepalive'.format(self.name): str(keepalive_time) + }) except: return @@ -256,7 +254,7 @@ class NodeInstance(): if node_keepalive < node_deadtime and node_daemon_state == 'run': # CHECK VERSIONING HERE ansiiprint.echo('Node {} seems dead - starting monitor for fencing'.format(node_name), '', 'w') - zkhandler.writedata(self.zk_conn, '/nodes/{}/daemonstate'.format(node_name), [ 'dead' ]) + zkhandler.writedata(self.zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' }) fence_thread = threading.Thread(target=fenceNode, args=(node_name, self.zk_conn), kwargs={}) fence_thread.start() @@ -344,14 +342,14 @@ def fenceNode(node_name, zk_conn): target_hypervisor = hypervisor ansiiprint.echo('Moving VM "{}" to hypervisor "{}"'.format(dom_uuid, target_hypervisor), '', 'i') - transaction = zk_conn.transaction() - transaction.set_data('/domains/{}/state'.format(dom_uuid), 'start'.encode('ascii')) - transaction.set_data('/domains/{}/hypervisor'.format(dom_uuid), target_hypervisor.encode('ascii')) - transaction.set_data('/domains/{}/lasthypervisor'.format(dom_uuid), current_hypervisor.encode('ascii')) - transaction.commit() + zkhandler.writedata(self.zk_conn, { + '/domains/{}/state'.format(dom_uuid): 'start', + '/domains/{}/hypervisor'.format(dom_uuid): target_hypervisor, + '/domains/{}/lasthypervisor'.format(dom_uuid): current_hypervisor + }) # Set node in flushed state for easy remigrating when it comes back - zkhandler.writedata(self.zk_conn, '/nodes/{}/domainstate'.format(node_name), [ 'flushed' ]) + zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(node_name): 'flushed' }) # # Perform an IPMI fence diff --git a/pvcd/VMInstance.py b/pvcd/VMInstance.py index 74989eb2..ae5dc9f6 100644 --- a/pvcd/VMInstance.py +++ b/pvcd/VMInstance.py @@ -72,7 +72,7 @@ class VMInstance: # Add the domain to the domain_list array self.thishypervisor.domain_list.append(self.domuuid) # Push the change up to Zookeeper - zkhandler.writedata(self.zk_conn, '/nodes/{}/runningdomains'.format(self.thishypervisor.name), [ ' '.join(self.thishypervisor.domain_list) ]) + zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.thishypervisor.name): ' '.join(self.thishypervisor.domain_list) }) except Exception as e: ansiiprint.echo('Error adding domain to list: {}'.format(e), '', 'c') @@ -82,7 +82,7 @@ class VMInstance: # Remove the domain from the domain_list array self.thishypervisor.domain_list.remove(self.domuuid) # Push the change up to Zookeeper - zkhandler.writedata(self.zk_conn, '/nodes/{}/runningdomains'.format(self.thishypervisor.name), [ ' '.join(self.thishypervisor.domain_list) ]) + zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.thishypervisor.name): ' '.join(self.thishypervisor.domain_list) }) except Exception as e: ansiiprint.echo('Error removing domain from list: {}'.format(e), '', 'c') @@ -108,7 +108,7 @@ class VMInstance: self.dom = dom except libvirt.libvirtError as e: ansiiprint.echo('Failed to create VM', '{}:'.format(self.domuuid), 'e') - zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'failed' ]) + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'failed' }) self.dom = None lv_conn.close() @@ -135,7 +135,7 @@ class VMInstance: except libvirt.libvirtError as e: ansiiprint.echo('Failed to restart VM', '{}:'.format(self.domuuid), 'e') - zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'start' ]) + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) lv_conn.close() self.inrestart = False @@ -163,7 +163,7 @@ class VMInstance: self.removeDomainFromList() if self.inrestart == False: - zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'stop' ]) + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' }) ansiiprint.echo('Successfully stopped VM', '{}:'.format(self.domuuid), 'o') self.dom = None @@ -191,7 +191,7 @@ class VMInstance: self.removeDomainFromList() if self.inrestart == False: - zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'stop' ]) + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' }) ansiiprint.echo('Successfully shutdown VM', '{}:'.format(self.domuuid), 'o') self.dom = None @@ -237,7 +237,7 @@ class VMInstance: self.removeDomainFromList() time.sleep(1) - zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'start' ]) + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) self.inmigrate = False # Receive the migration from another host (wait until VM is running) @@ -322,7 +322,7 @@ class VMInstance: self.addDomainToList() # VM is already running and should be but stuck in migrate state elif self.state == "migrate": - zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'start' ]) + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) self.addDomainToList() # VM should be restarted elif self.state == "restart": @@ -342,7 +342,7 @@ class VMInstance: self.receive_migrate() # VM should be restarted (i.e. started since it isn't running) if self.state == "restart": - zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'start' ]) + zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) # VM should be shut down; ensure it's gone from this node's domain_list elif self.state == "shutdown": self.removeDomainFromList() diff --git a/pvcd/zkhandler.py b/pvcd/zkhandler.py index 44e76136..2b5a2688 100644 --- a/pvcd/zkhandler.py +++ b/pvcd/zkhandler.py @@ -35,22 +35,24 @@ def readdata(zk_conn, key): return data # Data write function -def writedata(zk_conn, key, data): - # Get the current version - orig_data_raw = zk_conn.get(key) +def writedata(zk_conn, kv): + # Get the current version; we base this off the first key (ordering in multi-key calls is irrelevant) + first_key = list(kv.keys())[0] + orig_data_raw = zk_conn.get(first_key) meta = orig_data_raw[1] if meta == None: - ansiiprint.echo('Zookeeper key "{}" does not exist'.format(key), '', 'e') + ansiiprint.echo('Zookeeper key "{}" does not exist'.format(first_key), '', 'e') return 1 version = meta.version new_version = version + 1 zk_transaction = zk_conn.transaction() - zk_transaction.set_data(key, data.encode('ascii')) + for key, data in kv: + zk_transaction.set_data(key, data.encode('ascii')) try: - zk_transaction.check(key, new_version) + zk_transaction.check(first_key, new_version) except TypeError: - ansiiprint.echo('Zookeeper key "{}" does not match expected version'.format(key), '', 'e') + ansiiprint.echo('Zookeeper key "{}" does not match expected version'.format(first_key), '', 'e') return 1 zk_transaction.commit() return 0