Implement recording of VM stats during keepalive

This commit is contained in:
Joshua Boniface 2020-06-06 15:31:26 -04:00
parent 2ad6860dfe
commit e1310a05f2
1 changed files with 115 additions and 0 deletions

View File

@ -46,6 +46,8 @@ from distutils.util import strtobool
from queue import Queue from queue import Queue
from xml.etree import ElementTree
import pvcnoded.log as log import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler import pvcnoded.zkhandler as zkhandler
import pvcnoded.fencing as fencing import pvcnoded.fencing as fencing
@ -1243,6 +1245,107 @@ def collect_ceph_stats(queue):
queue.put(ceph_health) queue.put(ceph_health)
queue.put(osds_this_node) queue.put(osds_this_node)
# State table for pretty stats
libvirt_vm_states = {
0: "NOSTATE",
1: "RUNNING",
2: "BLOCKED",
3: "PAUSED",
4: "SHUTDOWN",
5: "SHUTOFF",
6: "CRASHED",
7: "PMSUSPENDED"
}
# VM stats update function
def collect_vm_stats(queue):
if debug:
print("Get VM statistics")
# Connect to libvirt
if debug:
print("Connect to libvirt")
libvirt_name = "qemu:///system"
lv_conn = libvirt.open(libvirt_name)
if lv_conn == None:
logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e')
return
# Get list of running domains from Libvirt
running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE)
# Loop through running domains
for domain in running_domains:
# Get all the raw information about the VM
tree = ElementTree.fromstring(domain.XMLDesc())
domain_uuid = domain.UUIDString()
domain_name = domain.name()
if debug:
print("Getting statistics for VM {}".format(domain_name))
domain_state, domain_maxmem, domain_mem, domain_vcpus, domain_cputime = domain.info()
domain_memory_stats = domain.memoryStats()
domain_cpu_stats = domain.getCPUStats(True)[0]
domain_disk_stats = []
for disk in tree.findall('devices/disk'):
disk_name = disk.find('source').get('name')
if not disk_name:
disk_name = disk.find('source').get('file')
disk_stats = domain.blockStats(disk.find('target').get('dev'))
domain_disk_stats.append({
"name": disk_name,
"rd_req": disk_stats[0],
"rd_bytes": disk_stats[1],
"wr_req": disk_stats[2],
"wr_bytes": disk_stats[3],
"err": disk_stats[4]
})
domain_network_stats = []
for interface in tree.findall('devices/interface'):
interface_name = interface.find('target').get('dev')
interface_bridge = interface.find('source').get('bridge')
interface_stats = domain.interfaceStats(interface_name)
domain_network_stats.append({
"name": interface_name,
"bridge": interface_bridge,
"rd_bytes": interface_stats[0],
"rd_packets": interface_stats[1],
"rd_errors": interface_stats[2],
"rd_drops": interface_stats[3],
"wr_bytes": interface_stats[4],
"wr_packets": interface_stats[5],
"wr_errors": interface_stats[6],
"wr_drops": interface_stats[7]
})
# Create the final dictionary
domain_stats = {
"state": libvirt_vm_states[domain_state],
"maxmem": domain_maxmem,
"livemem": domain_mem,
"cpus": domain_vcpus,
"cputime": domain_cputime,
"mem_stats": domain_memory_stats,
"cpu_stats": domain_cpu_stats,
"disk_stats": domain_disk_stats,
"net_stats": domain_network_stats
}
if debug:
print("Writing statistics for VM {} to Zookeeper".format(domain_name))
try:
zkhandler.writedata(zk_conn, {
"/domains/{}/stats".format(domain_uuid): str(json.dumps(domain_stats))
})
except Exception as e:
if debug:
print(e)
# Close the Libvirt connection
lv_conn.close()
queue.put('')
# Keepalive update function # Keepalive update function
def node_keepalive(): def node_keepalive():
# Set the upstream IP in Zookeeper for clients to read # Set the upstream IP in Zookeeper for clients to read
@ -1277,6 +1380,12 @@ def node_keepalive():
ceph_stats_thread = threading.Thread(target=collect_ceph_stats, args=(ceph_thread_queue,), kwargs={}) ceph_stats_thread = threading.Thread(target=collect_ceph_stats, args=(ceph_thread_queue,), kwargs={})
ceph_stats_thread.start() ceph_stats_thread.start()
# Run VM statistics collection in separate thread for parallelization
if enable_hypervisor:
vm_thread_queue = Queue()
vm_stats_thread = threading.Thread(target=collect_vm_stats, args=(vm_thread_queue,), kwargs={})
vm_stats_thread.start()
# Normalize running VM status # Normalize running VM status
memalloc = 0 memalloc = 0
vcpualloc = 0 vcpualloc = 0
@ -1339,6 +1448,12 @@ def node_keepalive():
ceph_health = ceph_thread_queue.get() ceph_health = ceph_thread_queue.get()
osds_this_node = ceph_thread_queue.get() osds_this_node = ceph_thread_queue.get()
# Wait for VM thread completion
if enable_hypervisor:
vm_stats_thread.join()
vm_stats = vm_thread_queue.get()
# Set our information in zookeeper # Set our information in zookeeper
keepalive_time = int(time.time()) keepalive_time = int(time.time())
if debug: if debug: