diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index 03fd7869..bb1369cc 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -46,6 +46,8 @@ from distutils.util import strtobool from queue import Queue +from xml.etree import ElementTree + import pvcnoded.log as log import pvcnoded.zkhandler as zkhandler import pvcnoded.fencing as fencing @@ -1243,6 +1245,107 @@ def collect_ceph_stats(queue): queue.put(ceph_health) queue.put(osds_this_node) +# State table for pretty stats +libvirt_vm_states = { + 0: "NOSTATE", + 1: "RUNNING", + 2: "BLOCKED", + 3: "PAUSED", + 4: "SHUTDOWN", + 5: "SHUTOFF", + 6: "CRASHED", + 7: "PMSUSPENDED" +} + +# VM stats update function +def collect_vm_stats(queue): + if debug: + print("Get VM statistics") + + # Connect to libvirt + if debug: + print("Connect to libvirt") + libvirt_name = "qemu:///system" + lv_conn = libvirt.open(libvirt_name) + if lv_conn == None: + logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e') + return + + # Get list of running domains from Libvirt + running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE) + # Loop through running domains + for domain in running_domains: + # Get all the raw information about the VM + tree = ElementTree.fromstring(domain.XMLDesc()) + domain_uuid = domain.UUIDString() + domain_name = domain.name() + if debug: + print("Getting statistics for VM {}".format(domain_name)) + domain_state, domain_maxmem, domain_mem, domain_vcpus, domain_cputime = domain.info() + domain_memory_stats = domain.memoryStats() + domain_cpu_stats = domain.getCPUStats(True)[0] + domain_disk_stats = [] + for disk in tree.findall('devices/disk'): + disk_name = disk.find('source').get('name') + if not disk_name: + disk_name = disk.find('source').get('file') + disk_stats = domain.blockStats(disk.find('target').get('dev')) + domain_disk_stats.append({ + "name": disk_name, + "rd_req": disk_stats[0], + "rd_bytes": disk_stats[1], + "wr_req": disk_stats[2], + "wr_bytes": disk_stats[3], + "err": disk_stats[4] + }) + + domain_network_stats = [] + for interface in tree.findall('devices/interface'): + interface_name = interface.find('target').get('dev') + interface_bridge = interface.find('source').get('bridge') + interface_stats = domain.interfaceStats(interface_name) + domain_network_stats.append({ + "name": interface_name, + "bridge": interface_bridge, + "rd_bytes": interface_stats[0], + "rd_packets": interface_stats[1], + "rd_errors": interface_stats[2], + "rd_drops": interface_stats[3], + "wr_bytes": interface_stats[4], + "wr_packets": interface_stats[5], + "wr_errors": interface_stats[6], + "wr_drops": interface_stats[7] + }) + + # Create the final dictionary + domain_stats = { + "state": libvirt_vm_states[domain_state], + "maxmem": domain_maxmem, + "livemem": domain_mem, + "cpus": domain_vcpus, + "cputime": domain_cputime, + "mem_stats": domain_memory_stats, + "cpu_stats": domain_cpu_stats, + "disk_stats": domain_disk_stats, + "net_stats": domain_network_stats + } + + if debug: + print("Writing statistics for VM {} to Zookeeper".format(domain_name)) + + try: + zkhandler.writedata(zk_conn, { + "/domains/{}/stats".format(domain_uuid): str(json.dumps(domain_stats)) + }) + except Exception as e: + if debug: + print(e) + + # Close the Libvirt connection + lv_conn.close() + + queue.put('') + # Keepalive update function def node_keepalive(): # Set the upstream IP in Zookeeper for clients to read @@ -1277,6 +1380,12 @@ def node_keepalive(): ceph_stats_thread = threading.Thread(target=collect_ceph_stats, args=(ceph_thread_queue,), kwargs={}) ceph_stats_thread.start() + # Run VM statistics collection in separate thread for parallelization + if enable_hypervisor: + vm_thread_queue = Queue() + vm_stats_thread = threading.Thread(target=collect_vm_stats, args=(vm_thread_queue,), kwargs={}) + vm_stats_thread.start() + # Normalize running VM status memalloc = 0 vcpualloc = 0 @@ -1339,6 +1448,12 @@ def node_keepalive(): ceph_health = ceph_thread_queue.get() osds_this_node = ceph_thread_queue.get() + # Wait for VM thread completion + if enable_hypervisor: + vm_stats_thread.join() + + vm_stats = vm_thread_queue.get() + # Set our information in zookeeper keepalive_time = int(time.time()) if debug: