[#5] Implement zkhandler for readdata commands
This commit is contained in:
		| @@ -130,21 +130,21 @@ class NodeInstance(): | |||||||
|         for dom_uuid in self.domain_list: |         for dom_uuid in self.domain_list: | ||||||
|             most_memfree = 0 |             most_memfree = 0 | ||||||
|             target_hypervisor = None |             target_hypervisor = None | ||||||
|             hypervisor_list = self.zk_conn.get_children('/nodes') |             hypervisor_list = zkhander.listchildren(self.zk_conn, '/nodes') | ||||||
|             current_hypervisor = self.zk_conn.get('/domains/{}/hypervisor'.format(dom_uuid))[0].decode('ascii') |             current_hypervisor = zkhandler.readdata(self.zk_conn, '/domains/{}/hypervisor'.format(dom_uuid)) | ||||||
|             if current_hypervisor != self.this_node: |             if current_hypervisor != self.this_node: | ||||||
|                 continue |                 continue | ||||||
|  |  | ||||||
|             for hypervisor in hypervisor_list: |             for hypervisor in hypervisor_list: | ||||||
|                 daemon_state = self.zk_conn.get('/nodes/{}/daemonstate'.format(hypervisor))[0].decode('ascii') |                 daemon_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(hypervisor)) | ||||||
|                 domain_state = self.zk_conn.get('/nodes/{}/domainstate'.format(hypervisor))[0].decode('ascii') |                 domain_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/domainstate'.format(hypervisor)) | ||||||
|                 if hypervisor == current_hypervisor: |                 if hypervisor == current_hypervisor: | ||||||
|                     continue |                     continue | ||||||
|  |  | ||||||
|                 if daemon_state != 'run' or domain_state != 'ready': |                 if daemon_state != 'run' or domain_state != 'ready': | ||||||
|                     continue |                     continue | ||||||
|      |      | ||||||
|                 memfree = int(self.zk_conn.get('/nodes/{}/memfree'.format(hypervisor))[0].decode('ascii')) |                 memfree = int(zkhandler.readdata(self.zk_conn, '/nodes/{}/memfree'.format(hypervisor))) | ||||||
|                 if memfree > most_memfree: |                 if memfree > most_memfree: | ||||||
|                     most_memfree = memfree |                     most_memfree = memfree | ||||||
|                     target_hypervisor = hypervisor |                     target_hypervisor = hypervisor | ||||||
| @@ -171,7 +171,7 @@ class NodeInstance(): | |||||||
|         ansiiprint.echo('Restoring node {} to active service.'.format(self.name), '', 'i') |         ansiiprint.echo('Restoring node {} to active service.'.format(self.name), '', 'i') | ||||||
|         self.zk_conn.set('/nodes/{}/domainstate'.format(self.name), 'ready'.encode('ascii')) |         self.zk_conn.set('/nodes/{}/domainstate'.format(self.name), 'ready'.encode('ascii')) | ||||||
|         for dom_uuid in self.s_domain: |         for dom_uuid in self.s_domain: | ||||||
|             last_hypervisor = self.zk_conn.get('/domains/{}/lasthypervisor'.format(dom_uuid))[0].decode('ascii') |             last_hypervisor = zkhandler.readdata(self.zk_conn, '/domains/{}/lasthypervisor'.format(dom_uuid)) | ||||||
|             if last_hypervisor != self.name: |             if last_hypervisor != self.name: | ||||||
|                 continue |                 continue | ||||||
|  |  | ||||||
| @@ -193,7 +193,7 @@ class NodeInstance(): | |||||||
|             return |             return | ||||||
|  |  | ||||||
|         # Get past state and update if needed |         # Get past state and update if needed | ||||||
|         past_state = self.zk_conn.get('/nodes/{}/daemonstate'.format(self.name))[0].decode('ascii') |         past_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(self.name)) | ||||||
|         if past_state != 'run': |         if past_state != 'run': | ||||||
|             self.daemon_state = 'run' |             self.daemon_state = 'run' | ||||||
|             self.zk_conn.set('/nodes/{}/daemonstate'.format(self.name), 'run'.encode('ascii')) |             self.zk_conn.set('/nodes/{}/daemonstate'.format(self.name), 'run'.encode('ascii')) | ||||||
| @@ -241,9 +241,9 @@ class NodeInstance(): | |||||||
|         # Update our local node lists |         # Update our local node lists | ||||||
|         for node_name in self.t_node: |         for node_name in self.t_node: | ||||||
|             try: |             try: | ||||||
|                 node_daemon_state = self.zk_conn.get('/nodes/{}/daemonstate'.format(node_name))[0].decode('ascii') |                 node_daemon_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/daemonstate'.format(node_name)) | ||||||
|                 node_domain_state = self.zk_conn.get('/nodes/{}/domainstate'.format(node_name))[0].decode('ascii') |                 node_domain_state = zkhandler.readdata(self.zk_conn, '/nodes/{}/domainstate'.format(node_name)) | ||||||
|                 node_keepalive = int(self.zk_conn.get('/nodes/{}/keepalive'.format(node_name))[0].decode('ascii')) |                 node_keepalive = int(zkhandler.readdata(self.zk_conn, '/nodes/{}/keepalive'.format(node_name))) | ||||||
|             except: |             except: | ||||||
|                 node_daemon_state = 'unknown' |                 node_daemon_state = 'unknown' | ||||||
|                 node_domain_state = 'unknown' |                 node_domain_state = 'unknown' | ||||||
| @@ -307,7 +307,7 @@ def fenceNode(node_name, zk_conn): | |||||||
|         # Wait 5 seconds |         # Wait 5 seconds | ||||||
|         time.sleep(5) |         time.sleep(5) | ||||||
|         # Get the state |         # Get the state | ||||||
|         node_daemon_state = zk_conn.get('/nodes/{}/daemonstate'.format(node_name))[0].decode('ascii') |         node_daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(node_name)) | ||||||
|         # Is it still 'dead' |         # Is it still 'dead' | ||||||
|         if node_daemon_state == 'dead': |         if node_daemon_state == 'dead': | ||||||
|             failcount += 1 |             failcount += 1 | ||||||
| @@ -319,26 +319,26 @@ def fenceNode(node_name, zk_conn): | |||||||
|  |  | ||||||
|     ansiiprint.echo('Fencing node "{}" via IPMI reboot signal'.format(node_name), '', 'e') |     ansiiprint.echo('Fencing node "{}" via IPMI reboot signal'.format(node_name), '', 'e') | ||||||
|  |  | ||||||
|     ipmi_hostname = zk_conn.get('/nodes/{}/ipmihostname'.format(node_name))[0].decode('ascii') |     ipmi_hostname = zkhandler.readdata(zk_conn, '/nodes/{}/ipmihostname'.format(node_name)) | ||||||
|     ipmi_username = zk_conn.get('/nodes/{}/ipmiusername'.format(node_name))[0].decode('ascii') |     ipmi_username = zkhandler.readdata(zk_conn, '/nodes/{}/ipmiusername'.format(node_name)) | ||||||
|     ipmi_password = zk_conn.get('/nodes/{}/ipmipassword'.format(node_name))[0].decode('ascii') |     ipmi_password = zkhandler.readdata(zk_conn, '/nodes/{}/ipmipassword'.format(node_name)) | ||||||
|     rebootViaIPMI(ipmi_hostname, ipmi_username, ipmi_password) |     rebootViaIPMI(ipmi_hostname, ipmi_username, ipmi_password) | ||||||
|     time.sleep(5) |     time.sleep(5) | ||||||
|  |  | ||||||
|     ansiiprint.echo('Moving VMs from dead hypervisor "{}" to new hosts'.format(node_name), '', 'i') |     ansiiprint.echo('Moving VMs from dead hypervisor "{}" to new hosts'.format(node_name), '', 'i') | ||||||
|     dead_node_running_domains = zk_conn.get('/nodes/{}/runningdomains'.format(node_name))[0].decode('ascii').split() |     dead_node_running_domains = zkhandler.readdata(zk_conn, '/nodes/{}/runningdomains'.format(node_name)).split() | ||||||
|     for dom_uuid in dead_node_running_domains: |     for dom_uuid in dead_node_running_domains: | ||||||
|         most_memfree = 0 |         most_memfree = 0 | ||||||
|         hypervisor_list = zk_conn.get_children('/nodes') |         hypervisor_list = zkhandler.listchildren(zk_conn, '/nodes') | ||||||
|         current_hypervisor = zk_conn.get('/domains/{}/hypervisor'.format(dom_uuid))[0].decode('ascii') |         current_hypervisor = zkhandler.readdata(zk_conn, '/domains/{}/hypervisor'.format(dom_uuid)) | ||||||
|         for hypervisor in hypervisor_list: |         for hypervisor in hypervisor_list: | ||||||
|             print(hypervisor) |             print(hypervisor) | ||||||
|             daemon_state = zk_conn.get('/nodes/{}/daemonstate'.format(hypervisor))[0].decode('ascii') |             daemon_state = zkhandler.readdata(zk_conn, '/nodes/{}/daemonstate'.format(hypervisor)) | ||||||
|             domain_state = zk_conn.get('/nodes/{}/domainstate'.format(hypervisor))[0].decode('ascii') |             domain_state = zkhandler.readdata(zk_conn, '/nodes/{}/domainstate'.format(hypervisor)) | ||||||
|             if daemon_state != 'run' or domain_state != 'ready': |             if daemon_state != 'run' or domain_state != 'ready': | ||||||
|                 continue |                 continue | ||||||
|  |  | ||||||
|             memfree = int(zk_conn.get('/nodes/{}/memfree'.format(hypervisor))[0].decode('ascii')) |             memfree = int(zkhandler.readdata(zk_conn, '/nodes/{}/memfree'.format(hypervisor))) | ||||||
|             if memfree > most_memfree: |             if memfree > most_memfree: | ||||||
|                 most_memfree = memfree |                 most_memfree = memfree | ||||||
|                 target_hypervisor = hypervisor |                 target_hypervisor = hypervisor | ||||||
|   | |||||||
| @@ -22,6 +22,7 @@ | |||||||
|  |  | ||||||
| import os, sys, uuid, socket, time, threading, libvirt, kazoo.client | import os, sys, uuid, socket, time, threading, libvirt, kazoo.client | ||||||
| import pvcd.ansiiprint as ansiiprint | import pvcd.ansiiprint as ansiiprint | ||||||
|  | import pvcd.zkhandler as zkhandler | ||||||
|  |  | ||||||
| class VMInstance: | class VMInstance: | ||||||
|     # Initialization function |     # Initialization function | ||||||
| @@ -100,7 +101,7 @@ class VMInstance: | |||||||
|      |      | ||||||
|         try: |         try: | ||||||
|             # Grab the domain information from Zookeeper |             # Grab the domain information from Zookeeper | ||||||
|             xmlconfig = self.zk_conn.get('/domains/{}/xml'.format(self.domuuid))[0].decode('ascii') |             xmlconfig = zkhandler.readdata(self.zk_conn, '/domains/{}/xml'.format(self.domuuid)) | ||||||
|             dom = lv_conn.createXML(xmlconfig, 0) |             dom = lv_conn.createXML(xmlconfig, 0) | ||||||
|             self.addDomainToList() |             self.addDomainToList() | ||||||
|             ansiiprint.echo('Successfully started VM', '{}:'.format(self.domuuid), 'o') |             ansiiprint.echo('Successfully started VM', '{}:'.format(self.domuuid), 'o') | ||||||
| @@ -245,7 +246,7 @@ class VMInstance: | |||||||
|         ansiiprint.echo('Receiving migration', '{}:'.format(self.domuuid), 'i') |         ansiiprint.echo('Receiving migration', '{}:'.format(self.domuuid), 'i') | ||||||
|         while True: |         while True: | ||||||
|             time.sleep(0.5) |             time.sleep(0.5) | ||||||
|             self.state = self.zk_conn.get('/domains/{}/state'.format(self.domuuid))[0].decode('ascii') |             self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) | ||||||
|             self.dom = self.lookupByUUID(self.domuuid) |             self.dom = self.lookupByUUID(self.domuuid) | ||||||
|  |  | ||||||
|             if self.dom == None and self.state == 'migrate': |             if self.dom == None and self.state == 'migrate': | ||||||
| @@ -281,8 +282,8 @@ class VMInstance: | |||||||
|         time.sleep(0.2) |         time.sleep(0.2) | ||||||
|  |  | ||||||
|         # Get the current values from zookeeper (don't rely on the watch) |         # Get the current values from zookeeper (don't rely on the watch) | ||||||
|         self.state = self.zk_conn.get('/domains/{}/state'.format(self.domuuid))[0].decode('ascii') |         self.state = zkhandler.readdata(self.zk_conn, '/domains/{}/state'.format(self.domuuid)) | ||||||
|         self.hypervisor = self.zk_conn.get('/domains/{}/hypervisor'.format(self.domuuid))[0].decode('ascii') |         self.hypervisor = zkhandler.readdata(self.zk_conn, '/domains/{}/hypervisor'.format(self.domuuid)) | ||||||
|  |  | ||||||
|         # Check the current state of the VM |         # Check the current state of the VM | ||||||
|         try: |         try: | ||||||
|   | |||||||
							
								
								
									
										65
									
								
								pvcd/zkhandler.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										65
									
								
								pvcd/zkhandler.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,65 @@ | |||||||
|  | #!/usr/bin/env python3 | ||||||
|  |  | ||||||
|  | # zkhandler.py - Secure versioned ZooKeeper updates | ||||||
|  | # Part of the Parallel Virtual Cluster (PVC) system | ||||||
|  | # | ||||||
|  | #    Copyright (C) 2018  Joshua M. Boniface <joshua@boniface.me> | ||||||
|  | # | ||||||
|  | #    This program is free software: you can redistribute it and/or modify | ||||||
|  | #    it under the terms of the GNU General Public License as published by | ||||||
|  | #    the Free Software Foundation, either version 3 of the License, or | ||||||
|  | #    (at your option) any later version. | ||||||
|  | # | ||||||
|  | #    This program is distributed in the hope that it will be useful, | ||||||
|  | #    but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | #    GNU General Public License for more details. | ||||||
|  | # | ||||||
|  | #    You should have received a copy of the GNU General Public License | ||||||
|  | #    along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||||
|  | # | ||||||
|  | ############################################################################### | ||||||
|  |  | ||||||
|  | import kazoo.client, ansiiprint | ||||||
|  |  | ||||||
|  | # Child list function | ||||||
|  | def listchildren(zk_conn, key): | ||||||
|  |     children = zk_conn.get_children(key) | ||||||
|  |     return children | ||||||
|  |  | ||||||
|  | # Data read function | ||||||
|  | def readdata(zk_conn, key): | ||||||
|  |     data_raw = zk_conn.get(key) | ||||||
|  |     data = data_raw[0].decode('ascii') | ||||||
|  |     meta = data_raw[1] | ||||||
|  |     return data | ||||||
|  |  | ||||||
|  | # Data write function | ||||||
|  | def writedata(zk_conn, key, data): | ||||||
|  |     # Get the current version | ||||||
|  |     orig_data_raw = zk_conn.get(key) | ||||||
|  |     meta = orig_data_raw[1] | ||||||
|  |     if meta == None: | ||||||
|  |         ansiiprint.echo('Zookeeper key "{}" does not exist'.format(key), '', 'e') | ||||||
|  |         return 1 | ||||||
|  |  | ||||||
|  |     version = meta.version | ||||||
|  |     new_version = version + 1 | ||||||
|  |     zk_transaction = zk_conn.transaction() | ||||||
|  |     for line in data: | ||||||
|  |         zk_transaction.set_data(key, line.encode('ascii')) | ||||||
|  |     try: | ||||||
|  |         zk_transaction.check(key, new_version) | ||||||
|  |     except TypeError: | ||||||
|  |         ansiiprint.echo('Zookeeper key "{}" does not match expected version'.format(key), '', 'e') | ||||||
|  |         return 1 | ||||||
|  |     zk_transaction.commit() | ||||||
|  |     return 0 | ||||||
|  |  | ||||||
|  | # Key create function | ||||||
|  | def createkey(zk_conn, key, data): | ||||||
|  |     zk_transaction = zk_conn.transaction() | ||||||
|  |     for line in data: | ||||||
|  |         zk_transaction.create(key, line.encode('ascii')) | ||||||
|  |     zk_transaction.commit() | ||||||
|  |  | ||||||
		Reference in New Issue
	
	Block a user