From 70b787d1fd34a440e9487274685d04722594804f Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Sat, 6 Jun 2020 15:44:05 -0400 Subject: [PATCH] Move all VM functions into thread --- node-daemon/pvcnoded/Daemon.py | 115 +++++++++++++++------------------ 1 file changed, 51 insertions(+), 64 deletions(-) diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index bb1369cc..0114b4d4 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -1271,14 +1271,40 @@ def collect_vm_stats(queue): logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e') return + memalloc = 0 + vcpualloc = 0 + # Toggle state management of dead VMs to restart them + if debug: + print("Toggle state management of dead VMs to restart them") + for domain, instance in this_node.d_domain.items(): + if domain in this_node.domain_list: + # Add the allocated memory to our memalloc value + memalloc += instance.getmemory() + vcpualloc += instance.getvcpus() + if instance.getstate() == 'start' and instance.getnode() == this_node.name: + if instance.getdom() != None: + try: + if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING: + raise + except Exception as e: + # Toggle a state "change" + zkhandler.writedata(zk_conn, { '/domains/{}/state'.format(domain): instance.getstate() }) + # Get list of running domains from Libvirt running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE) - # Loop through running domains + + # Get statistics from any running VMs for domain in running_domains: - # Get all the raw information about the VM + # Get basic information about the VM tree = ElementTree.fromstring(domain.XMLDesc()) domain_uuid = domain.UUIDString() domain_name = domain.name() + + # Ensure VM is present in the domain_list + if domain_uuid not in this_node.domain_list: + this_node.domain_list.append(domain_uuid) + + # Get all the raw information about the VM if debug: print("Getting statistics for VM {}".format(domain_name)) domain_state, domain_maxmem, domain_mem, domain_vcpus, domain_cputime = domain.info() @@ -1344,7 +1370,9 @@ def collect_vm_stats(queue): # Close the Libvirt connection lv_conn.close() - queue.put('') + queue.put(len(running_domains)) + queue.put(memalloc) + queue.put(vcpualloc) # Keepalive update function def node_keepalive(): @@ -1374,86 +1402,45 @@ def node_keepalive(): if zkhandler.readdata(zk_conn, '/primary_node') != this_node.name: zkhandler.writedata(zk_conn, {'/primary_node': this_node.name}) - # Run Ceph status collection in separate thread for parallelization - if enable_storage: - ceph_thread_queue = Queue() - ceph_stats_thread = threading.Thread(target=collect_ceph_stats, args=(ceph_thread_queue,), kwargs={}) - ceph_stats_thread.start() - # Run VM statistics collection in separate thread for parallelization if enable_hypervisor: vm_thread_queue = Queue() vm_stats_thread = threading.Thread(target=collect_vm_stats, args=(vm_thread_queue,), kwargs={}) vm_stats_thread.start() - # Normalize running VM status - memalloc = 0 - vcpualloc = 0 - if enable_hypervisor: - # Toggle state management of dead VMs to restart them - if debug: - print("Toggle state management of dead VMs to restart them") - for domain, instance in this_node.d_domain.items(): - if domain in this_node.domain_list: - # Add the allocated memory to our memalloc value - memalloc += instance.getmemory() - vcpualloc += instance.getvcpus() - if instance.getstate() == 'start' and instance.getnode() == this_node.name: - if instance.getdom() != None: - try: - if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING: - raise - except Exception as e: - # Toggle a state "change" - zkhandler.writedata(zk_conn, { '/domains/{}/state'.format(domain): instance.getstate() }) - - # Connect to libvirt - if debug: - print("Connect to libvirt") - libvirt_name = "qemu:///system" - lv_conn = libvirt.open(libvirt_name) - if lv_conn == None: - logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e') - return - - # Ensure that any running VMs are readded to the domain_list - if debug: - print("Ensure that any running VMs are readded to the domain_list") - running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE) - for domain in running_domains: - domain_uuid = domain.UUIDString() - if domain_uuid not in this_node.domain_list: - this_node.domain_list.append(domain_uuid) - - # Close the Libvirt connection - lv_conn.close() - + # Run Ceph status collection in separate thread for parallelization + if enable_storage: + ceph_thread_queue = Queue() + ceph_stats_thread = threading.Thread(target=collect_ceph_stats, args=(ceph_thread_queue,), kwargs={}) + ceph_stats_thread.start() + # Get node performance statistics this_node.memtotal = int(psutil.virtual_memory().total / 1024 / 1024) this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024) this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024) - this_node.memalloc = memalloc - this_node.vcpualloc = vcpualloc this_node.cpuload = os.getloadavg()[0] - if enable_hypervisor: - this_node.domains_count = len(running_domains) - else: - this_node.domains_count = 0 - # Wait for Ceph thread completion + # Join against running threads + if enable_hypervisor: + vm_stats_thread.join() if enable_storage: ceph_stats_thread.join() + # Get information from thread queues + if enable_hypervisor: + this_node.domains_count = vm_thread_queue.get() + this_node.memalloc = vm_thread_queue.get() + this_node.vcpualloc = vm_thread_queue.get() + else: + this_node.domains_count = 0 + this_node.memalloc = 0 + this_node.vcpualloc = 0 + + if enable_storage: ceph_health_colour = ceph_thread_queue.get() ceph_health = ceph_thread_queue.get() osds_this_node = ceph_thread_queue.get() - # Wait for VM thread completion - if enable_hypervisor: - vm_stats_thread.join() - - vm_stats = vm_thread_queue.get() - # Set our information in zookeeper keepalive_time = int(time.time()) if debug: