Move all VM functions into thread

This commit is contained in:
Joshua Boniface 2020-06-06 15:44:05 -04:00
parent e1310a05f2
commit 70b787d1fd
1 changed files with 51 additions and 64 deletions

View File

@ -1271,14 +1271,40 @@ def collect_vm_stats(queue):
logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e') logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e')
return return
memalloc = 0
vcpualloc = 0
# Toggle state management of dead VMs to restart them
if debug:
print("Toggle state management of dead VMs to restart them")
for domain, instance in this_node.d_domain.items():
if domain in this_node.domain_list:
# Add the allocated memory to our memalloc value
memalloc += instance.getmemory()
vcpualloc += instance.getvcpus()
if instance.getstate() == 'start' and instance.getnode() == this_node.name:
if instance.getdom() != None:
try:
if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
raise
except Exception as e:
# Toggle a state "change"
zkhandler.writedata(zk_conn, { '/domains/{}/state'.format(domain): instance.getstate() })
# Get list of running domains from Libvirt # Get list of running domains from Libvirt
running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE) running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE)
# Loop through running domains
# Get statistics from any running VMs
for domain in running_domains: for domain in running_domains:
# Get all the raw information about the VM # Get basic information about the VM
tree = ElementTree.fromstring(domain.XMLDesc()) tree = ElementTree.fromstring(domain.XMLDesc())
domain_uuid = domain.UUIDString() domain_uuid = domain.UUIDString()
domain_name = domain.name() domain_name = domain.name()
# Ensure VM is present in the domain_list
if domain_uuid not in this_node.domain_list:
this_node.domain_list.append(domain_uuid)
# Get all the raw information about the VM
if debug: if debug:
print("Getting statistics for VM {}".format(domain_name)) print("Getting statistics for VM {}".format(domain_name))
domain_state, domain_maxmem, domain_mem, domain_vcpus, domain_cputime = domain.info() domain_state, domain_maxmem, domain_mem, domain_vcpus, domain_cputime = domain.info()
@ -1344,7 +1370,9 @@ def collect_vm_stats(queue):
# Close the Libvirt connection # Close the Libvirt connection
lv_conn.close() lv_conn.close()
queue.put('') queue.put(len(running_domains))
queue.put(memalloc)
queue.put(vcpualloc)
# Keepalive update function # Keepalive update function
def node_keepalive(): def node_keepalive():
@ -1374,86 +1402,45 @@ def node_keepalive():
if zkhandler.readdata(zk_conn, '/primary_node') != this_node.name: if zkhandler.readdata(zk_conn, '/primary_node') != this_node.name:
zkhandler.writedata(zk_conn, {'/primary_node': this_node.name}) zkhandler.writedata(zk_conn, {'/primary_node': this_node.name})
# Run Ceph status collection in separate thread for parallelization
if enable_storage:
ceph_thread_queue = Queue()
ceph_stats_thread = threading.Thread(target=collect_ceph_stats, args=(ceph_thread_queue,), kwargs={})
ceph_stats_thread.start()
# Run VM statistics collection in separate thread for parallelization # Run VM statistics collection in separate thread for parallelization
if enable_hypervisor: if enable_hypervisor:
vm_thread_queue = Queue() vm_thread_queue = Queue()
vm_stats_thread = threading.Thread(target=collect_vm_stats, args=(vm_thread_queue,), kwargs={}) vm_stats_thread = threading.Thread(target=collect_vm_stats, args=(vm_thread_queue,), kwargs={})
vm_stats_thread.start() vm_stats_thread.start()
# Normalize running VM status # Run Ceph status collection in separate thread for parallelization
memalloc = 0 if enable_storage:
vcpualloc = 0 ceph_thread_queue = Queue()
if enable_hypervisor: ceph_stats_thread = threading.Thread(target=collect_ceph_stats, args=(ceph_thread_queue,), kwargs={})
# Toggle state management of dead VMs to restart them ceph_stats_thread.start()
if debug:
print("Toggle state management of dead VMs to restart them")
for domain, instance in this_node.d_domain.items():
if domain in this_node.domain_list:
# Add the allocated memory to our memalloc value
memalloc += instance.getmemory()
vcpualloc += instance.getvcpus()
if instance.getstate() == 'start' and instance.getnode() == this_node.name:
if instance.getdom() != None:
try:
if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
raise
except Exception as e:
# Toggle a state "change"
zkhandler.writedata(zk_conn, { '/domains/{}/state'.format(domain): instance.getstate() })
# Connect to libvirt
if debug:
print("Connect to libvirt")
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e')
return
# Ensure that any running VMs are readded to the domain_list
if debug:
print("Ensure that any running VMs are readded to the domain_list")
running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE)
for domain in running_domains:
domain_uuid = domain.UUIDString()
if domain_uuid not in this_node.domain_list:
this_node.domain_list.append(domain_uuid)
# Close the Libvirt connection
lv_conn.close()
# Get node performance statistics # Get node performance statistics
this_node.memtotal = int(psutil.virtual_memory().total / 1024 / 1024) this_node.memtotal = int(psutil.virtual_memory().total / 1024 / 1024)
this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024) this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024)
this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024) this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024)
this_node.memalloc = memalloc
this_node.vcpualloc = vcpualloc
this_node.cpuload = os.getloadavg()[0] this_node.cpuload = os.getloadavg()[0]
if enable_hypervisor:
this_node.domains_count = len(running_domains)
else:
this_node.domains_count = 0
# Wait for Ceph thread completion # Join against running threads
if enable_hypervisor:
vm_stats_thread.join()
if enable_storage: if enable_storage:
ceph_stats_thread.join() ceph_stats_thread.join()
# Get information from thread queues
if enable_hypervisor:
this_node.domains_count = vm_thread_queue.get()
this_node.memalloc = vm_thread_queue.get()
this_node.vcpualloc = vm_thread_queue.get()
else:
this_node.domains_count = 0
this_node.memalloc = 0
this_node.vcpualloc = 0
if enable_storage:
ceph_health_colour = ceph_thread_queue.get() ceph_health_colour = ceph_thread_queue.get()
ceph_health = ceph_thread_queue.get() ceph_health = ceph_thread_queue.get()
osds_this_node = ceph_thread_queue.get() osds_this_node = ceph_thread_queue.get()
# Wait for VM thread completion
if enable_hypervisor:
vm_stats_thread.join()
vm_stats = vm_thread_queue.get()
# Set our information in zookeeper # Set our information in zookeeper
keepalive_time = int(time.time()) keepalive_time = int(time.time())
if debug: if debug: