Add node instances

This commit is contained in:
Joshua Boniface 2018-05-31 21:49:23 -04:00
parent a81a6dd979
commit 92ddec311b
3 changed files with 95 additions and 39 deletions

70
NodeInstance.py Normal file
View File

@ -0,0 +1,70 @@
#!/usr/bin/env python3
import os, socket, time, uuid, threading, libvirt, kazoo.client
class NodeInstance(threading.Thread):
def __init__(self, name, zk):
super(NodeInstance, self).__init__()
# Passed-in variables on creation
self.zkey = '/nodes/%s' % name
self.zk = zk
self.name = name
self.stop_thread = threading.Event()
def stop(self):
self.stop_thread.set()
def run(self):
if self.name == socket.gethostname():
self.setup_local_node()
else:
self.setup_remote_node()
def setup_local_node(self):
# Connect to libvirt
libvirt_name = "qemu:///system"
conn = libvirt.open(libvirt_name)
if conn == None:
print('Failed to open connection to %s' % libvirt_name)
exit(1)
# Gather data about hypervisor
self.name = conn.getHostname()
self.cpucount = conn.getCPUMap()[0]
self.zk.set(self.zkey + '/state', self.name.encode('ascii'))
self.zk.set(self.zkey + '/cpucount', str(self.cpucount).encode('ascii'))
print("Node hostname: %s" % self.name)
print("CPUs: %s" % self.cpucount)
while True:
self.memfree = conn.getFreeMemory()
self.cpuload = os.getloadavg()[0]
self.zk.set(self.zkey + '/memfree', str(self.memfree).encode('ascii'))
self.zk.set(self.zkey + '/cpuload', str(self.cpuload).encode('ascii'))
print("Free memory: %s | Load: %s" % ( self.memfree, self.cpuload ))
time.sleep(1)
if self.stop_thread.is_set():
break
def setup_remote_node(self):
@zk.DataWatch(self.zkey + '/state')
def watch_state(data, stat):
self.state = data.decode('ascii')
print("Version: %s, data: %s" % (stat.version, self.state))
@zk.DataWatch(self.zkey + '/cpucount')
def watch_state(data, stat):
self.cpucount = data.decode('ascii')
print("Version: %s, data: %s" % (stat.version, self.cpucount))
@zk.DataWatch(self.zkey + '/cpuload')
def watch_state(data, stat):
self.cpuload = data.decode('ascii')
print("Version: %s, data: %s" % (stat.version, self.cpuload))
@zk.DataWatch(self.zkey + '/memfree')
def watch_state(data, stat):
self.memfree = data.decode('ascii')
print("Version: %s, data: %s" % (stat.version, self.memfree))

View File

@ -19,7 +19,7 @@ class VMInstance:
@zk.DataWatch(self.zkey + '/hypervisor') @zk.DataWatch(self.zkey + '/hypervisor')
def watch_hypervisor(data, stat): def watch_hypervisor(data, stat):
self.hypervisor = data.decode('ascii') self.hypervisor = data.decode('ascii')
print("Version: %s, data: %s" % (stat.version, data.decode('ascii'))) print("Version: %s, data: %s" % (stat.version, self.hypervisor))
self.manage_vm_state() self.manage_vm_state()
# Watch for changes to the state field in Zookeeper # Watch for changes to the state field in Zookeeper

62
pvcd.py
View File

@ -4,9 +4,13 @@ from kazoo.client import KazooClient
from kazoo.client import KazooState from kazoo.client import KazooState
import libvirt import libvirt
import sys import sys
import socket
import uuid import uuid
import VMInstance import VMInstance
import NodeInstance
import time import time
import threading
import atexit
def help(): def help():
print("pvcd - Parallel Virtual Cluster management daemon") print("pvcd - Parallel Virtual Cluster management daemon")
@ -35,50 +39,32 @@ def zk_listener(state):
zk.add_listener(zk_listener) zk.add_listener(zk_listener)
# Connect to libvirt def cleanup():
libvirt_name = "qemu:///system" t_node[socket.gethostname()].stop()
conn = libvirt.open(libvirt_name) zk.stop()
if conn == None:
print('Failed to open connection to %s' % libvirt_name)
exit(1)
# Gather data about hypervisor atexit.register(cleanup)
hostname = conn.getHostname()
nodeinfo = conn.getInfo()
numnodes = nodeinfo[4]
memlistNUMA = conn.getCellsFreeMemory(0, numnodes)
memlistTOTAL = conn.getFreeMemory()
print("Node hostname: %s" % hostname) node_list = zk.get_children('/nodes')
print("Free memory: %s" % memlistTOTAL) print(node_list)
cell = 0
for cellfreemem in memlistNUMA:
print('NUMA Node '+str(cell)+': '+str(cellfreemem)+' bytes free memory')
cell += 1
print('Virtualization type: '+conn.getType()) domain_list = zk.get_children('/domains')
uri = conn.getURI() print(domain_list)
print('Canonical URI: '+uri)
print() t_node = dict()
s_domain = dict()
map = conn.getCPUMap() for node in node_list:
t_node[node] = NodeInstance.NodeInstance(node, zk);
t_node[node].start()
print("CPUs: " + str(map[0])) for domain in domain_list:
print("Available: " + str(map[1])) s_domain[domain] = VMInstance.VMInstance(domain, zk, socket.gethostname());
print()
def start_vm(vmname):
print("Starting VM %s" % vmname)
vm = VMInstance.VMInstance('b1dc4e21-544f-47aa-9bb7-8af0bc443b78', zk, hostname);
while True: while True:
# Tick loop # Tick loop
time.sleep(1) try:
pass time.sleep(1)
except:
conn.close() cleanup()
zk.stop() exit(0)