From 26a460c43889b827ae677beb8f9e61d932a6e675 Mon Sep 17 00:00:00 2001 From: Joshua Boniface Date: Thu, 31 May 2018 22:31:20 -0400 Subject: [PATCH] More significant work --- NodeInstance.py | 51 ++++++++++++++++++++++++++++++++++++++++++------- VMInstance.py | 20 +++++++++++++------ pvcd.py | 22 ++++++++++++--------- 3 files changed, 71 insertions(+), 22 deletions(-) diff --git a/NodeInstance.py b/NodeInstance.py index 416e2515..55c58ef8 100644 --- a/NodeInstance.py +++ b/NodeInstance.py @@ -3,17 +3,46 @@ import os, socket, time, uuid, threading, libvirt, kazoo.client class NodeInstance(threading.Thread): - def __init__(self, name, zk): + def __init__(self, name, node_list, zk): super(NodeInstance, self).__init__() # Passed-in variables on creation self.zkey = '/nodes/%s' % name self.zk = zk self.name = name self.stop_thread = threading.Event() + self.node_list = node_list + self.domainlist = [] + # Get value functions + def getfreemem(): + return self.memfree + + def getcpuload(): + return self.cpuload + + def getname(): + return self.name + + # Shutdown the thread def stop(self): self.stop_thread.set() + # Flush all VMs on the host + def flush(self): + for domain in self.domainlist: + # Determine the best target hypervisor + least_mem = (2^64)/8 + least_load = 999.0 + least_host = "" + for node in node_list: + node_freemem = node.getfreemem() + if node_freemem < least_mem: + least_mem = node_freemem + least_host = node.getname() + + self.zk.set('/domains/' + domain + '/state', b'migrate') + self.zk.set('/domains/' + domain + '/hypervisor', least_host.encode('ascii')) + def run(self): if self.name == socket.gethostname(): self.setup_local_node() @@ -42,10 +71,18 @@ class NodeInstance(threading.Thread): self.zk.set(self.zkey + '/memfree', str(self.memfree).encode('ascii')) self.zk.set(self.zkey + '/cpuload', str(self.cpuload).encode('ascii')) print("Free memory: %s | Load: %s" % ( self.memfree, self.cpuload )) - time.sleep(1) - if self.stop_thread.is_set(): - break + print("Active domains: %s" % self.domainlist) + for x in range(0,50): + time.sleep(0.1) + if self.stop_thread.is_set(): + return + @zk.DataWatch(self.zkey + '/state') + def watch_state(data, stat): + self.state = data.decode('ascii') + print("Version: %s, data: %s" % (stat.version, self.state)) + if self.state == 'flush': + self.flush() def setup_remote_node(self): @zk.DataWatch(self.zkey + '/state') @@ -54,17 +91,17 @@ class NodeInstance(threading.Thread): print("Version: %s, data: %s" % (stat.version, self.state)) @zk.DataWatch(self.zkey + '/cpucount') - def watch_state(data, stat): + def watch_cpucount(data, stat): self.cpucount = data.decode('ascii') print("Version: %s, data: %s" % (stat.version, self.cpucount)) @zk.DataWatch(self.zkey + '/cpuload') - def watch_state(data, stat): + def watch_cpuload(data, stat): self.cpuload = data.decode('ascii') print("Version: %s, data: %s" % (stat.version, self.cpuload)) @zk.DataWatch(self.zkey + '/memfree') - def watch_state(data, stat): + def watch_memfree(data, stat): self.memfree = data.decode('ascii') print("Version: %s, data: %s" % (stat.version, self.memfree)) diff --git a/VMInstance.py b/VMInstance.py index 1f349318..d13c1017 100644 --- a/VMInstance.py +++ b/VMInstance.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -import os, time, uuid, libvirt, kazoo.client +import os, time, uuid, threading, libvirt, kazoo.client class VMInstance: def __init__(self, domuuid, zk, thishypervisor): @@ -36,17 +36,20 @@ class VMInstance: if dom == None: print('Failed to create a domain from an XML definition.') exit(1) + self.thishypervisor.domainlist.append(self.domuuid) return dom # Stop the VM forcibly def stop_vm(self): print("Forcibly stopping VM %s" % self.domuuid) self.dom.destroy() + self.thishypervisor.domainlist.remove(self.domuuid) # Shutdown the VM gracefully def shutdown_vm(self): print("Stopping VM %s" % self.domuuid) self.dom.shutdown() + self.thishypervisor.domainlist.remove(self.domuuid) # Migrate the VM to a target host def migrate_vm(self): @@ -61,6 +64,7 @@ class VMInstance: print('Could not migrate to the new domain') exit(1) + self.thishypervisor.domainlist.remove(self.domuuid) print('Migrated successfully') dest_conn.close() @@ -71,6 +75,7 @@ class VMInstance: continue else: self.zk.set(self.zkey + '/status', b'start') + self.thishypervisor.domainlist.append(self.domuuid) break # # Main function to manage a VM (taking only self) @@ -90,23 +95,26 @@ class VMInstance: except: running = False - if running != False and self.state == "stop" and self.hypervisor == self.thishypervisor: + if running != False and self.state == "stop" and self.hypervisor == self.thishypervisor.name: self.stop_vm() - if running != False and self.state == "shutdown" and self.hypervisor == self.thishypervisor: + if running != False and self.state == "shutdown" and self.hypervisor == self.thishypervisor.name: self.shutdown_vm() - elif running == False and self.state == "migrate" and self.hypervisr == self.thishypervisor: + elif running == False and self.state == "migrate" and self.hypervisr == self.thishypervisor.name: self.receive_migrate() - elif running != False and self.state == "migrate" and self.hypervisr != self.thishypervisor: + elif running != False and self.state == "migrate" and self.hypervisr != self.thishypervisor.name: self.migrate_vm() - elif running == False and self.state == "start" and self.hypervisor == self.thishypervisor: + elif running == False and self.state == "start" and self.hypervisor == self.thishypervisor.name: # Grab the domain information from Zookeeper domxml, domxmlstat = self.zk.get(self.zkey + '/xml') domxml = str(domxml.decode('ascii')) self.dom = self.start_vm(conn, domxml) + + elif running != False and self.state == "start" and self.hypervisor == self.thishypervisor.name: + self.thishypervisor.domainlist.append(self.domuuid) # The VM should now be running so return the domain and active connection conn.close diff --git a/pvcd.py b/pvcd.py index bcb71835..f2c62d13 100755 --- a/pvcd.py +++ b/pvcd.py @@ -28,19 +28,22 @@ except: def zk_listener(state): if state == KazooState.LOST: - # Register somewhere that the session was lost - pass + cleanup() + exit(2) elif state == KazooState.SUSPENDED: - # Handle being disconnected from Zookeeper - pass + cleanup() + exit(2) else: # Handle being connected/reconnected to Zookeeper pass zk.add_listener(zk_listener) +myhostname = socket.gethostname() + def cleanup(): - t_node[socket.gethostname()].stop() + for node in node_list: + t_node[node].stop() zk.stop() atexit.register(cleanup) @@ -55,16 +58,17 @@ t_node = dict() s_domain = dict() for node in node_list: - t_node[node] = NodeInstance.NodeInstance(node, zk); - t_node[node].start() + t_node[node] = NodeInstance.NodeInstance(node, node_list, zk); + if t_node[node].name == myhostname: + t_node[node].start() for domain in domain_list: - s_domain[domain] = VMInstance.VMInstance(domain, zk, socket.gethostname()); + s_domain[domain] = VMInstance.VMInstance(domain, zk, t_node[myhostname]); while True: # Tick loop try: - time.sleep(1) + time.sleep(0.1) except: cleanup() exit(0)