From 92ddec311b703f869a268d570ce042fe0f40b656 Mon Sep 17 00:00:00 2001 From: Joshua Boniface Date: Thu, 31 May 2018 21:49:23 -0400 Subject: [PATCH] Add node instances --- NodeInstance.py | 70 +++++++++++++++++++++++++++++++++++++++++++++++++ VMInstance.py | 2 +- pvcd.py | 62 +++++++++++++++++-------------------------- 3 files changed, 95 insertions(+), 39 deletions(-) create mode 100644 NodeInstance.py diff --git a/NodeInstance.py b/NodeInstance.py new file mode 100644 index 00000000..416e2515 --- /dev/null +++ b/NodeInstance.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 + +import os, socket, time, uuid, threading, libvirt, kazoo.client + +class NodeInstance(threading.Thread): + def __init__(self, name, zk): + super(NodeInstance, self).__init__() + # Passed-in variables on creation + self.zkey = '/nodes/%s' % name + self.zk = zk + self.name = name + self.stop_thread = threading.Event() + + def stop(self): + self.stop_thread.set() + + def run(self): + if self.name == socket.gethostname(): + self.setup_local_node() + else: + self.setup_remote_node() + + def setup_local_node(self): + # Connect to libvirt + libvirt_name = "qemu:///system" + conn = libvirt.open(libvirt_name) + if conn == None: + print('Failed to open connection to %s' % libvirt_name) + exit(1) + + # Gather data about hypervisor + self.name = conn.getHostname() + self.cpucount = conn.getCPUMap()[0] + self.zk.set(self.zkey + '/state', self.name.encode('ascii')) + self.zk.set(self.zkey + '/cpucount', str(self.cpucount).encode('ascii')) + print("Node hostname: %s" % self.name) + print("CPUs: %s" % self.cpucount) + + while True: + self.memfree = conn.getFreeMemory() + self.cpuload = os.getloadavg()[0] + self.zk.set(self.zkey + '/memfree', str(self.memfree).encode('ascii')) + self.zk.set(self.zkey + '/cpuload', str(self.cpuload).encode('ascii')) + print("Free memory: %s | Load: %s" % ( self.memfree, self.cpuload )) + time.sleep(1) + if self.stop_thread.is_set(): + break + + + def setup_remote_node(self): + @zk.DataWatch(self.zkey + '/state') + def watch_state(data, stat): + self.state = data.decode('ascii') + print("Version: %s, data: %s" % (stat.version, self.state)) + + @zk.DataWatch(self.zkey + '/cpucount') + def watch_state(data, stat): + self.cpucount = data.decode('ascii') + print("Version: %s, data: %s" % (stat.version, self.cpucount)) + + @zk.DataWatch(self.zkey + '/cpuload') + def watch_state(data, stat): + self.cpuload = data.decode('ascii') + print("Version: %s, data: %s" % (stat.version, self.cpuload)) + + @zk.DataWatch(self.zkey + '/memfree') + def watch_state(data, stat): + self.memfree = data.decode('ascii') + print("Version: %s, data: %s" % (stat.version, self.memfree)) + diff --git a/VMInstance.py b/VMInstance.py index f0ac7d8a..1f349318 100644 --- a/VMInstance.py +++ b/VMInstance.py @@ -19,7 +19,7 @@ class VMInstance: @zk.DataWatch(self.zkey + '/hypervisor') def watch_hypervisor(data, stat): self.hypervisor = data.decode('ascii') - print("Version: %s, data: %s" % (stat.version, data.decode('ascii'))) + print("Version: %s, data: %s" % (stat.version, self.hypervisor)) self.manage_vm_state() # Watch for changes to the state field in Zookeeper diff --git a/pvcd.py b/pvcd.py index f10e4dec..bcb71835 100755 --- a/pvcd.py +++ b/pvcd.py @@ -4,9 +4,13 @@ from kazoo.client import KazooClient from kazoo.client import KazooState import libvirt import sys +import socket import uuid import VMInstance +import NodeInstance import time +import threading +import atexit def help(): print("pvcd - Parallel Virtual Cluster management daemon") @@ -35,50 +39,32 @@ def zk_listener(state): zk.add_listener(zk_listener) -# Connect to libvirt -libvirt_name = "qemu:///system" -conn = libvirt.open(libvirt_name) -if conn == None: - print('Failed to open connection to %s' % libvirt_name) - exit(1) +def cleanup(): + t_node[socket.gethostname()].stop() + zk.stop() -# Gather data about hypervisor -hostname = conn.getHostname() -nodeinfo = conn.getInfo() -numnodes = nodeinfo[4] -memlistNUMA = conn.getCellsFreeMemory(0, numnodes) -memlistTOTAL = conn.getFreeMemory() +atexit.register(cleanup) -print("Node hostname: %s" % hostname) -print("Free memory: %s" % memlistTOTAL) -cell = 0 -for cellfreemem in memlistNUMA: - print('NUMA Node '+str(cell)+': '+str(cellfreemem)+' bytes free memory') - cell += 1 +node_list = zk.get_children('/nodes') +print(node_list) -print('Virtualization type: '+conn.getType()) -uri = conn.getURI() -print('Canonical URI: '+uri) +domain_list = zk.get_children('/domains') +print(domain_list) -print() +t_node = dict() +s_domain = dict() -map = conn.getCPUMap() +for node in node_list: + t_node[node] = NodeInstance.NodeInstance(node, zk); + t_node[node].start() -print("CPUs: " + str(map[0])) -print("Available: " + str(map[1])) - - -print() - -def start_vm(vmname): - print("Starting VM %s" % vmname) - -vm = VMInstance.VMInstance('b1dc4e21-544f-47aa-9bb7-8af0bc443b78', zk, hostname); +for domain in domain_list: + s_domain[domain] = VMInstance.VMInstance(domain, zk, socket.gethostname()); while True: # Tick loop - time.sleep(1) - pass - -conn.close() -zk.stop() + try: + time.sleep(1) + except: + cleanup() + exit(0)