Convert NodeInstance from having an internal "daemon" to using the APScheduler library

This commit is contained in:
Joshua Boniface 2018-06-06 14:16:52 -04:00
parent 8f31f49252
commit 66fe258655
2 changed files with 89 additions and 103 deletions

View File

@ -20,17 +20,15 @@
# #
############################################################################### ###############################################################################
import os, sys, socket, time, threading, libvirt, kazoo.client import os, sys, socket, time, libvirt, kazoo.client
class NodeInstance(threading.Thread): class NodeInstance():
def __init__(self, name, t_node, s_domain, zk): def __init__(self, name, t_node, s_domain, zk):
super(NodeInstance, self).__init__()
# Passed-in variables on creation # Passed-in variables on creation
self.zkey = '/nodes/%s' % name self.zkey = '/nodes/%s' % name
self.zk = zk self.zk = zk
self.name = name self.name = name
self.state = 'stop' self.state = 'stop'
self.stop_thread = threading.Event()
self.t_node = t_node self.t_node = t_node
self.active_node_list = [] self.active_node_list = []
self.flushed_node_list = [] self.flushed_node_list = []
@ -88,10 +86,6 @@ class NodeInstance(threading.Thread):
def updatedomainlist(self, s_domain): def updatedomainlist(self, s_domain):
self.s_domain = s_domain self.s_domain = s_domain
# Shutdown the thread
def stop(self):
self.stop_thread.set()
# Flush all VMs on the host # Flush all VMs on the host
def flush(self): def flush(self):
for domain in self.domain_list: for domain in self.domain_list:
@ -143,11 +137,7 @@ class NodeInstance(threading.Thread):
self.zk.set("/nodes/" + self.name + "/state", 'start'.encode('ascii')) self.zk.set("/nodes/" + self.name + "/state", 'start'.encode('ascii'))
def run(self): def update_zookeeper(self):
if self.name == socket.gethostname():
self.setup_local_node()
def setup_local_node(self):
# Connect to libvirt # Connect to libvirt
libvirt_name = "qemu:///system" libvirt_name = "qemu:///system"
conn = libvirt.open(libvirt_name) conn = libvirt.open(libvirt_name)
@ -170,87 +160,79 @@ class NodeInstance(threading.Thread):
else: else:
self.state = 'flush' self.state = 'flush'
while True: # Toggle state management of all VMs and remove any non-running VMs
# Toggle state management of all VMs and remove any non-running VMs for domain, instance in self.s_domain.items():
for domain, instance in self.s_domain.items(): if instance.inshutdown == False and domain in self.domain_list:
if instance.inshutdown == False and domain in self.domain_list: instance.manage_vm_state()
instance.manage_vm_state() if instance.dom == None:
if instance.dom == None: try:
self.domain_list.remove(domain)
except:
pass
else:
try:
state = instance.dom.state()[0]
except:
state = libvirt.VIR_DOMAIN_NOSTATE
if state != libvirt.VIR_DOMAIN_RUNNING:
try: try:
self.domain_list.remove(domain) self.domain_list.remove(domain)
except: except:
pass pass
else:
try:
state = instance.dom.state()[0]
except:
state = libvirt.VIR_DOMAIN_NOSTATE
if state != libvirt.VIR_DOMAIN_RUNNING: # Set our information in zookeeper
try: self.memfree = conn.getFreeMemory()
self.domain_list.remove(domain) self.cpuload = os.getloadavg()[0]
except: try:
pass self.zk.set(self.zkey + '/memfree', str(self.memfree).encode('ascii'))
self.zk.set(self.zkey + '/cpuload', str(self.cpuload).encode('ascii'))
self.zk.set(self.zkey + '/runningdomains', ' '.join(self.domain_list).encode('ascii'))
except:
return
# Set our information in zookeeper print(">>> %s - Free memory: %s | Load: %s" % ( time.strftime("%d/%m/%Y %H:%M:%S"), self.memfree, self.cpuload ))
self.memfree = conn.getFreeMemory() print("Active domains: %s" % self.domain_list)
self.cpuload = os.getloadavg()[0]
# Update our local node lists
for node_name in self.t_node:
try: try:
self.zk.set(self.zkey + '/memfree', str(self.memfree).encode('ascii')) state, stat = self.zk.get('/nodes/%s/state' % node_name)
self.zk.set(self.zkey + '/cpuload', str(self.cpuload).encode('ascii')) node_state = state.decode('ascii')
self.zk.set(self.zkey + '/runningdomains', ' '.join(self.domain_list).encode('ascii'))
except: except:
if self.stop_thread.is_set(): node_state = 'stop'
return
print(">>> %s - Free memory: %s | Load: %s" % ( time.strftime("%d/%m/%Y %H:%M:%S"), self.memfree, self.cpuload )) if node_state == 'start' and node_name not in self.active_node_list:
print("Active domains: %s" % self.domain_list) self.active_node_list.append(node_name)
# Update our local node lists
for node_name in self.t_node:
try: try:
state, stat = self.zk.get('/nodes/%s/state' % node_name) self.flushed_node_list.remove(node_name)
node_state = state.decode('ascii') except ValueError:
except: pass
node_state = 'stop' try:
self.inactive_node_list.remove(node_name)
except ValueError:
pass
if node_state == 'flush' and node_name not in self.flushed_node_list:
self.flushed_node_list.append(node_name)
try:
self.active_node_list.remove(node_name)
except ValueError:
pass
try:
self.inactive_node_list.remove(node_name)
except ValueError:
pass
if node_state != 'start' and node_state != 'flush' and node_name not in self.inactive_node_list:
self.inactive_node_list.append(node_name)
try:
self.active_node_list.remove(node_name)
except ValueError:
pass
try:
self.flushed_node_list.remove(node_name)
except ValueError:
pass
if node_state == 'start' and node_name not in self.active_node_list: print('Active nodes: %s' % self.active_node_list)
self.active_node_list.append(node_name) print('Flushed nodes: %s' % self.flushed_node_list)
try: print('Inactive nodes: %s' % self.inactive_node_list)
self.flushed_node_list.remove(node_name)
except ValueError:
pass
try:
self.inactive_node_list.remove(node_name)
except ValueError:
pass
if node_state == 'flush' and node_name not in self.flushed_node_list:
self.flushed_node_list.append(node_name)
try:
self.active_node_list.remove(node_name)
except ValueError:
pass
try:
self.inactive_node_list.remove(node_name)
except ValueError:
pass
if node_state != 'start' and node_state != 'flush' and node_name not in self.inactive_node_list:
self.inactive_node_list.append(node_name)
try:
self.active_node_list.remove(node_name)
except ValueError:
pass
try:
self.flushed_node_list.remove(node_name)
except ValueError:
pass
print('Active nodes: %s' % self.active_node_list)
print('Flushed nodes: %s' % self.flushed_node_list)
print('Inactive nodes: %s' % self.inactive_node_list)
# Sleep for 9s but with quick interruptability
for x in range(0,90):
time.sleep(0.1)
if self.stop_thread.is_set():
sys.exit(0)

32
pvcd.py
View File

@ -20,8 +20,7 @@
# #
############################################################################### ###############################################################################
from kazoo.client import KazooClient import kazoo.client
from kazoo.client import KazooState
import libvirt import libvirt
import sys import sys
import socket import socket
@ -29,8 +28,8 @@ import uuid
import VMInstance import VMInstance
import NodeInstance import NodeInstance
import time import time
import threading
import atexit import atexit
import apscheduler.schedulers.background
def help(): def help():
print("pvcd - Parallel Virtual Cluster management daemon") print("pvcd - Parallel Virtual Cluster management daemon")
@ -38,8 +37,8 @@ def help():
help() help()
# Connect to zookeeper # Connect to local zookeeper
zk = KazooClient(hosts='127.0.0.1:2181') zk = kazoo.client.KazooClient(hosts='127.0.0.1:2181')
try: try:
zk.start() zk.start()
except: except:
@ -47,10 +46,10 @@ except:
exit(1) exit(1)
def zk_listener(state): def zk_listener(state):
if state == KazooState.LOST: if state == kazoo.client.KazooState.LOST:
cleanup() cleanup()
exit(2) exit(2)
elif state == KazooState.SUSPENDED: elif state == kazoo.client.KazooState.SUSPENDED:
cleanup() cleanup()
exit(2) exit(2)
else: else:
@ -63,9 +62,8 @@ myhostname = socket.gethostname()
mynodestring = '/nodes/%s' % myhostname mynodestring = '/nodes/%s' % myhostname
def cleanup(): def cleanup():
t_node[myhostname].stop()
time.sleep(0.2)
try: try:
update_timer.shutdown()
if t_node[myhostname].getstate() != 'flush': if t_node[myhostname].getstate() != 'flush':
zk.set('/nodes/' + myhostname + '/state', 'stop'.encode('ascii')) zk.set('/nodes/' + myhostname + '/state', 'stop'.encode('ascii'))
zk.stop() zk.stop()
@ -75,6 +73,7 @@ def cleanup():
atexit.register(cleanup) atexit.register(cleanup)
# Check if our node exists in Zookeeper, and create it if not # Check if our node exists in Zookeeper, and create it if not
if zk.exists('%s' % mynodestring): if zk.exists('%s' % mynodestring):
print("Node is present in Zookeeper") print("Node is present in Zookeeper")
@ -114,13 +113,18 @@ def updatedomains(new_domain_list):
if node in t_node: if node in t_node:
t_node[node].updatedomainlist(s_domain) t_node[node].updatedomainlist(s_domain)
t_node[myhostname].start() # Set up our update function
time.sleep(0.2) this_node = t_node[myhostname]
update_zookeeper = this_node.update_zookeeper
# Create timer to update this node in Zookeeper
update_timer = apscheduler.schedulers.background.BackgroundScheduler()
update_timer.add_job(update_zookeeper, 'interval', seconds=2)
update_timer.start()
# Tick loop
while True: while True:
# Tick loop
try: try:
time.sleep(0.1) time.sleep(0.1)
except: except:
cleanup() break
exit(0)