[#5] Use better dict-based format for write updates
This commit is contained in:
		
							
								
								
									
										6
									
								
								pvcd.py
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								pvcd.py
									
									
									
									
									
								
							| @@ -146,7 +146,7 @@ zk_conn.add_listener(zk_listener) | ||||
| def cleanup(signum, frame): | ||||
|     ansiiprint.echo('Terminating daemon', '', 'e') | ||||
|     # Set stop state in Zookeeper | ||||
|     zkhandler.writedata(zk_conn, '/nodes/{}/daemonstate'.format(myhostname), [ 'stop' ]) | ||||
|     zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'stop' }) | ||||
|     # Close the Zookeeper connection | ||||
|     zk_conn.close() | ||||
|     # Stop keepalive thread | ||||
| @@ -180,7 +180,7 @@ print('  {0}Kernel:{1} {2}'.format(ansiiprint.bold(), ansiiprint.end(), staticda | ||||
| if zk_conn.exists('/nodes/{}'.format(myhostname)): | ||||
|     print("Node is " + ansiiprint.green() + "present" + ansiiprint.end() + " in Zookeeper") | ||||
|     # Update static data just in case it's changed | ||||
|     zkhandler.writedata(zk_conn, '/nodes/{}/staticdata'.format(myhostname), [ ' '.join(staticdata) ]) | ||||
|     zkhandler.writedata(zk_conn, { '/nodes/{}/staticdata'.format(myhostname): ' '.join(staticdata) }) | ||||
| else: | ||||
|     print("Node is " + ansiiprint.red() + "absent" + ansiiprint.end() + " in Zookeeper; adding new node") | ||||
|     keepalive_time = int(time.time()) | ||||
| @@ -202,7 +202,7 @@ else: | ||||
|     transaction.create('/nodes/{}/ipmipassword'.format(myhostname), config['ipmi_password'].encode('ascii')) | ||||
|     transaction.commit() | ||||
|  | ||||
| zkhandler.writedata(zk_conn, '/nodes/{}/daemonstate'.format(myhostname), [ 'init' ]) | ||||
| zkhandler.writedata(zk_conn, { '/nodes/{}/daemonstate'.format(myhostname): 'init' }) | ||||
|  | ||||
| t_node = dict() | ||||
| s_domain = dict() | ||||
|   | ||||
| @@ -151,36 +151,34 @@ class NodeInstance(): | ||||
|  | ||||
|             if target_hypervisor == None: | ||||
|                 ansiiprint.echo('Failed to find migration target for VM "{}"; shutting down'.format(dom_uuid), '', 'e') | ||||
|                 transaction = self.zk_conn.transaction() | ||||
|                 transaction.set_data('/domains/{}/state'.format(dom_uuid), 'shutdown'.encode('ascii')) | ||||
|                 transaction.commit() | ||||
|                 zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(dom_uuid): 'shutdown' }) | ||||
|             else: | ||||
|                 ansiiprint.echo('Migrating VM "{}" to hypervisor "{}"'.format(dom_uuid, target_hypervisor), '', 'i') | ||||
|                 transaction = self.zk_conn.transaction() | ||||
|                 transaction.set_data('/domains/{}/state'.format(dom_uuid), 'migrate'.encode('ascii')) | ||||
|                 transaction.set_data('/domains/{}/hypervisor'.format(dom_uuid), target_hypervisor.encode('ascii')) | ||||
|                 transaction.set_data('/domains/{}/lasthypervisor'.format(dom_uuid), current_hypervisor.encode('ascii')) | ||||
|                 transaction.commit() | ||||
|                 zkhandler.writedata(self.zk_conn, { | ||||
|                     '/domains/{}/state'.format(dom_uuid): 'migrate', | ||||
|                     '/domains/{}/hypervisor'.format(dom_uuid): target_hypervisor, | ||||
|                     '/domains/{}/lasthypervisor'.format(dom_uuid): current_hypervisor | ||||
|                 }) | ||||
|  | ||||
|         zkhandler.writedata(self.zk_conn, '/nodes/{}/runningdomains'.format(self.name), [ '' ]) | ||||
|         zkhandler.writedata(self.zk_conn, '/nodes/{}/domainstate'.format(self.name), [ 'flushed' ]) | ||||
|         zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.name): '' }) | ||||
|         zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'flushed' }) | ||||
|         self.inflush = False | ||||
|  | ||||
|     def unflush(self): | ||||
|         self.inflush = True | ||||
|         ansiiprint.echo('Restoring node {} to active service.'.format(self.name), '', 'i') | ||||
|         zkhandler.writedata(self.zk_conn, '/nodes/{}/domainstate'.format(self.name), [ 'ready' ]) | ||||
|         zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(self.name): 'ready' }) | ||||
|         for dom_uuid in self.s_domain: | ||||
|             last_hypervisor = zkhandler.readdata(self.zk_conn, '/domains/{}/lasthypervisor'.format(dom_uuid)) | ||||
|             if last_hypervisor != self.name: | ||||
|                 continue | ||||
|  | ||||
|             ansiiprint.echo('Setting unmigration for VM "{}"'.format(dom_uuid), '', 'i') | ||||
|             transaction = self.zk_conn.transaction() | ||||
|             transaction.set_data('/domains/{}/state'.format(dom_uuid), 'migrate'.encode('ascii')) | ||||
|             transaction.set_data('/domains/{}/hypervisor'.format(dom_uuid), self.name.encode('ascii')) | ||||
|             transaction.set_data('/domains/{}/lasthypervisor'.format(dom_uuid), ''.encode('ascii')) | ||||
|             transaction.commit() | ||||
|             zkhandler.writedata(self.zk_conn, { | ||||
|                 '/domains/{}/state'.format(dom_uuid): 'migrate', | ||||
|                 '/domains/{}/hypervisor'.format(dom_uuid): self.name, | ||||
|                 '/domains/{}/lasthypervisor'.format(dom_uuid): '' | ||||
|             }) | ||||
|  | ||||
|         self.inflush = False | ||||
|  | ||||
| @@ -196,7 +194,7 @@ class NodeInstance(): | ||||
|         past_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(self.name)) | ||||
|         if past_state != 'run': | ||||
|             self.daemon_state = 'run' | ||||
|             zkhandler.writedata(self.zk_conn, '/nodes/{}/daemonstate'.format(self.name), [ 'run' ]) | ||||
|             zkhandler.writedata(self.zk_conn, { '/nodes/{}/daemonstate'.format(self.name): 'run' }) | ||||
|         else: | ||||
|             self.daemon_state = 'run' | ||||
|  | ||||
| @@ -210,7 +208,7 @@ class NodeInstance(): | ||||
|                                 raise | ||||
|                         except Exception as e: | ||||
|                             # Toggle a state "change" | ||||
|                             zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(domain), [ instance.getstate() ]) | ||||
|                             zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(domain): instance.getstate() }) | ||||
|  | ||||
|         # Set our information in zookeeper | ||||
|         self.name = lv_conn.getHostname() | ||||
| @@ -220,14 +218,14 @@ class NodeInstance(): | ||||
|         self.domains_count = len(lv_conn.listDomainsID()) | ||||
|         keepalive_time = int(time.time()) | ||||
|         try: | ||||
|             transaction = self.zk_conn.transaction() | ||||
|             transaction.set_data('/nodes/{}/memused'.format(self.name), str(self.memused).encode('ascii')) | ||||
|             transaction.set_data('/nodes/{}/memfree'.format(self.name), str(self.memfree).encode('ascii')) | ||||
|             transaction.set_data('/nodes/{}/cpuload'.format(self.name), str(self.cpuload).encode('ascii')) | ||||
|             transaction.set_data('/nodes/{}/runningdomains'.format(self.name), ' '.join(self.domain_list).encode('ascii')) | ||||
|             transaction.set_data('/nodes/{}/domainscount'.format(self.name), str(self.domains_count).encode('ascii')) | ||||
|             transaction.set_data('/nodes/{}/keepalive'.format(self.name), str(keepalive_time).encode('ascii')) | ||||
|             transaction.commit() | ||||
|             zkhandler.writedata(self.zk_conn, { | ||||
|                 '/nodes/{}/memused'.format(self.name): str(self.memused), | ||||
|                 '/nodes/{}/memfree'.format(self.name): str(self.memfree), | ||||
|                 '/nodes/{}/cpuload'.format(self.name): str(self.cpuload), | ||||
|                 '/nodes/{}/runningdomains'.format(self.name): ' '.join(self.domain_list), | ||||
|                 '/nodes/{}/domainscount'.format(self.name): str(self.domains_count), | ||||
|                 '/nodes/{}/keepalive'.format(self.name): str(keepalive_time) | ||||
|             }) | ||||
|         except: | ||||
|             return | ||||
|  | ||||
| @@ -256,7 +254,7 @@ class NodeInstance(): | ||||
|             if node_keepalive < node_deadtime and node_daemon_state == 'run': | ||||
|                 # CHECK VERSIONING HERE | ||||
|                 ansiiprint.echo('Node {} seems dead - starting monitor for fencing'.format(node_name), '', 'w') | ||||
|                 zkhandler.writedata(self.zk_conn, '/nodes/{}/daemonstate'.format(node_name), [ 'dead' ]) | ||||
|                 zkhandler.writedata(self.zk_conn, { '/nodes/{}/daemonstate'.format(node_name): 'dead' }) | ||||
|                 fence_thread = threading.Thread(target=fenceNode, args=(node_name, self.zk_conn), kwargs={}) | ||||
|                 fence_thread.start() | ||||
|  | ||||
| @@ -344,14 +342,14 @@ def fenceNode(node_name, zk_conn): | ||||
|                 target_hypervisor = hypervisor | ||||
|  | ||||
|         ansiiprint.echo('Moving VM "{}" to hypervisor "{}"'.format(dom_uuid, target_hypervisor), '', 'i') | ||||
|         transaction = zk_conn.transaction() | ||||
|         transaction.set_data('/domains/{}/state'.format(dom_uuid), 'start'.encode('ascii')) | ||||
|         transaction.set_data('/domains/{}/hypervisor'.format(dom_uuid), target_hypervisor.encode('ascii')) | ||||
|         transaction.set_data('/domains/{}/lasthypervisor'.format(dom_uuid), current_hypervisor.encode('ascii')) | ||||
|         transaction.commit() | ||||
|         zkhandler.writedata(self.zk_conn, { | ||||
|             '/domains/{}/state'.format(dom_uuid): 'start', | ||||
|             '/domains/{}/hypervisor'.format(dom_uuid): target_hypervisor, | ||||
|             '/domains/{}/lasthypervisor'.format(dom_uuid): current_hypervisor | ||||
|         }) | ||||
|  | ||||
|     # Set node in flushed state for easy remigrating when it comes back | ||||
|     zkhandler.writedata(self.zk_conn, '/nodes/{}/domainstate'.format(node_name), [ 'flushed' ]) | ||||
|     zkhandler.writedata(self.zk_conn, { '/nodes/{}/domainstate'.format(node_name): 'flushed' }) | ||||
|  | ||||
| # | ||||
| # Perform an IPMI fence | ||||
|   | ||||
| @@ -72,7 +72,7 @@ class VMInstance: | ||||
|                 # Add the domain to the domain_list array | ||||
|                 self.thishypervisor.domain_list.append(self.domuuid) | ||||
|                 # Push the change up to Zookeeper | ||||
|                 zkhandler.writedata(self.zk_conn, '/nodes/{}/runningdomains'.format(self.thishypervisor.name), [ ' '.join(self.thishypervisor.domain_list) ]) | ||||
|                 zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.thishypervisor.name): ' '.join(self.thishypervisor.domain_list) }) | ||||
|             except Exception as e: | ||||
|                 ansiiprint.echo('Error adding domain to list: {}'.format(e), '', 'c') | ||||
|  | ||||
| @@ -82,7 +82,7 @@ class VMInstance: | ||||
|                 # Remove the domain from the domain_list array | ||||
|                 self.thishypervisor.domain_list.remove(self.domuuid) | ||||
|                 # Push the change up to Zookeeper | ||||
|                 zkhandler.writedata(self.zk_conn, '/nodes/{}/runningdomains'.format(self.thishypervisor.name), [ ' '.join(self.thishypervisor.domain_list) ]) | ||||
|                 zkhandler.writedata(self.zk_conn, { '/nodes/{}/runningdomains'.format(self.thishypervisor.name): ' '.join(self.thishypervisor.domain_list) }) | ||||
|             except Exception as e: | ||||
|                 ansiiprint.echo('Error removing domain from list: {}'.format(e), '', 'c') | ||||
|  | ||||
| @@ -108,7 +108,7 @@ class VMInstance: | ||||
|             self.dom = dom | ||||
|         except libvirt.libvirtError as e: | ||||
|             ansiiprint.echo('Failed to create VM', '{}:'.format(self.domuuid), 'e') | ||||
|             zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'failed' ]) | ||||
|             zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'failed' }) | ||||
|             self.dom = None | ||||
|  | ||||
|         lv_conn.close() | ||||
| @@ -135,7 +135,7 @@ class VMInstance: | ||||
|         except libvirt.libvirtError as e: | ||||
|             ansiiprint.echo('Failed to restart VM', '{}:'.format(self.domuuid), 'e') | ||||
|  | ||||
|         zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'start' ]) | ||||
|         zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) | ||||
|         lv_conn.close() | ||||
|         self.inrestart = False | ||||
|  | ||||
| @@ -163,7 +163,7 @@ class VMInstance: | ||||
|         self.removeDomainFromList() | ||||
|  | ||||
|         if self.inrestart == False: | ||||
|             zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'stop' ]) | ||||
|             zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' }) | ||||
|  | ||||
|         ansiiprint.echo('Successfully stopped VM', '{}:'.format(self.domuuid), 'o') | ||||
|         self.dom = None | ||||
| @@ -191,7 +191,7 @@ class VMInstance: | ||||
|         self.removeDomainFromList() | ||||
|  | ||||
|         if self.inrestart == False: | ||||
|             zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'stop' ]) | ||||
|             zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'stop' }) | ||||
|  | ||||
|         ansiiprint.echo('Successfully shutdown VM', '{}:'.format(self.domuuid), 'o') | ||||
|         self.dom = None | ||||
| @@ -237,7 +237,7 @@ class VMInstance: | ||||
|             self.removeDomainFromList() | ||||
|             time.sleep(1) | ||||
|  | ||||
|         zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'start' ]) | ||||
|         zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) | ||||
|         self.inmigrate = False | ||||
|  | ||||
|     # Receive the migration from another host (wait until VM is running) | ||||
| @@ -322,7 +322,7 @@ class VMInstance: | ||||
|                         self.addDomainToList() | ||||
|                     # VM is already running and should be but stuck in migrate state | ||||
|                     elif self.state == "migrate": | ||||
|                         zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'start' ]) | ||||
|                         zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) | ||||
|                         self.addDomainToList() | ||||
|                     # VM should be restarted | ||||
|                     elif self.state == "restart": | ||||
| @@ -342,7 +342,7 @@ class VMInstance: | ||||
|                         self.receive_migrate() | ||||
|                     # VM should be restarted (i.e. started since it isn't running) | ||||
|                     if self.state == "restart": | ||||
|                         zkhandler.writedata(self.zk_conn, '/domains/{}/state'.format(self.domuuid), [ 'start' ]) | ||||
|                         zkhandler.writedata(self.zk_conn, { '/domains/{}/state'.format(self.domuuid): 'start' }) | ||||
|                     # VM should be shut down; ensure it's gone from this node's domain_list | ||||
|                     elif self.state == "shutdown": | ||||
|                         self.removeDomainFromList() | ||||
|   | ||||
| @@ -35,22 +35,24 @@ def readdata(zk_conn, key): | ||||
|     return data | ||||
|  | ||||
| # Data write function | ||||
| def writedata(zk_conn, key, data): | ||||
|     # Get the current version | ||||
|     orig_data_raw = zk_conn.get(key) | ||||
| def writedata(zk_conn, kv): | ||||
|     # Get the current version; we base this off the first key (ordering in multi-key calls is irrelevant) | ||||
|     first_key = list(kv.keys())[0] | ||||
|     orig_data_raw = zk_conn.get(first_key) | ||||
|     meta = orig_data_raw[1] | ||||
|     if meta == None: | ||||
|         ansiiprint.echo('Zookeeper key "{}" does not exist'.format(key), '', 'e') | ||||
|         ansiiprint.echo('Zookeeper key "{}" does not exist'.format(first_key), '', 'e') | ||||
|         return 1 | ||||
|  | ||||
|     version = meta.version | ||||
|     new_version = version + 1 | ||||
|     zk_transaction = zk_conn.transaction() | ||||
|     zk_transaction.set_data(key, data.encode('ascii')) | ||||
|     for key, data in kv: | ||||
|         zk_transaction.set_data(key, data.encode('ascii')) | ||||
|     try: | ||||
|         zk_transaction.check(key, new_version) | ||||
|         zk_transaction.check(first_key, new_version) | ||||
|     except TypeError: | ||||
|         ansiiprint.echo('Zookeeper key "{}" does not match expected version'.format(key), '', 'e') | ||||
|         ansiiprint.echo('Zookeeper key "{}" does not match expected version'.format(first_key), '', 'e') | ||||
|         return 1 | ||||
|     zk_transaction.commit() | ||||
|     return 0 | ||||
|   | ||||
		Reference in New Issue
	
	Block a user