#!/usr/bin/env python3 # keepalive.py - Utility functions for pvcnoded Keepalives # Part of the Parallel Virtual Cluster (PVC) system # # Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # ############################################################################### import pvcnoded.util.fencing import daemon_lib.common as common from apscheduler.schedulers.background import BackgroundScheduler from rados import Rados from xml.etree import ElementTree from queue import Queue from threading import Thread from datetime import datetime import json import re import libvirt import psutil import os import time # State table for pretty stats libvirt_vm_states = { 0: "NOSTATE", 1: "RUNNING", 2: "BLOCKED", 3: "PAUSED", 4: "SHUTDOWN", 5: "SHUTOFF", 6: "CRASHED", 7: "PMSUSPENDED" } def start_keepalive_timer(logger, config, zkhandler, this_node): keepalive_interval = config['keepalive_interval'] logger.out(f'Starting keepalive timer ({keepalive_interval} second interval)', state='s') keepalive_timer = BackgroundScheduler() keepalive_timer.add_job( node_keepalive, args=(logger, config, zkhandler, this_node), trigger='interval', seconds=keepalive_interval) keepalive_timer.start() return keepalive_timer def stop_keepalive_timer(logger, keepalive_timer): try: keepalive_timer.shutdown() logger.out('Stopping keepalive timer', state='s') except Exception: logger.out('Failed to stop keepalive timer', state='w') # Ceph stats update function def collect_ceph_stats(logger, config, zkhandler, this_node, queue): pool_list = zkhandler.children('base.pool') osd_list = zkhandler.children('base.osd') debug = config['debug'] if debug: logger.out("Thread starting", state='d', prefix='ceph-thread') # Connect to the Ceph cluster try: ceph_conn = Rados(conffile=config['ceph_config_file'], conf=dict(keyring=config['ceph_admin_keyring'])) if debug: logger.out("Connecting to cluster", state='d', prefix='ceph-thread') ceph_conn.connect(timeout=1) except Exception as e: logger.out('Failed to open connection to Ceph cluster: {}'.format(e), state='e') return if debug: logger.out("Getting health stats from monitor", state='d', prefix='ceph-thread') # Get Ceph cluster health for local status output command = {"prefix": "health", "format": "json"} try: health_status = json.loads(ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1]) ceph_health = health_status['status'] except Exception as e: logger.out('Failed to obtain Ceph health data: {}'.format(e), state='e') ceph_health = 'HEALTH_UNKN' if ceph_health in ['HEALTH_OK']: ceph_health_colour = logger.fmt_green elif ceph_health in ['HEALTH_UNKN']: ceph_health_colour = logger.fmt_cyan elif ceph_health in ['HEALTH_WARN']: ceph_health_colour = logger.fmt_yellow else: ceph_health_colour = logger.fmt_red # Primary-only functions if this_node.router_state == 'primary': if debug: logger.out("Set ceph health information in zookeeper (primary only)", state='d', prefix='ceph-thread') command = {"prefix": "status", "format": "pretty"} ceph_status = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii') try: zkhandler.write([ ('base.storage', str(ceph_status)) ]) except Exception as e: logger.out('Failed to set Ceph status data: {}'.format(e), state='e') if debug: logger.out("Set ceph rados df information in zookeeper (primary only)", state='d', prefix='ceph-thread') # Get rados df info command = {"prefix": "df", "format": "pretty"} ceph_df = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii') try: zkhandler.write([ ('base.storage.util', str(ceph_df)) ]) except Exception as e: logger.out('Failed to set Ceph utilization data: {}'.format(e), state='e') if debug: logger.out("Set pool information in zookeeper (primary only)", state='d', prefix='ceph-thread') # Get pool info command = {"prefix": "df", "format": "json"} ceph_df_output = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii') try: ceph_pool_df_raw = json.loads(ceph_df_output)['pools'] except Exception as e: logger.out('Failed to obtain Pool data (ceph df): {}'.format(e), state='w') ceph_pool_df_raw = [] retcode, stdout, stderr = common.run_os_command('rados df --format json', timeout=1) try: rados_pool_df_raw = json.loads(stdout)['pools'] except Exception as e: logger.out('Failed to obtain Pool data (rados df): {}'.format(e), state='w') rados_pool_df_raw = [] pool_count = len(ceph_pool_df_raw) if debug: logger.out("Getting info for {} pools".format(pool_count), state='d', prefix='ceph-thread') for pool_idx in range(0, pool_count): try: # Combine all the data for this pool ceph_pool_df = ceph_pool_df_raw[pool_idx] rados_pool_df = rados_pool_df_raw[pool_idx] pool = ceph_pool_df pool.update(rados_pool_df) # Ignore any pools that aren't in our pool list if pool['name'] not in pool_list: if debug: logger.out("Pool {} not in pool list {}".format(pool['name'], pool_list), state='d', prefix='ceph-thread') continue else: if debug: logger.out("Parsing data for pool {}".format(pool['name']), state='d', prefix='ceph-thread') # Assemble a useful data structure pool_df = { 'id': pool['id'], 'stored_bytes': pool['stats']['stored'], 'free_bytes': pool['stats']['max_avail'], 'used_bytes': pool['stats']['bytes_used'], 'used_percent': pool['stats']['percent_used'], 'num_objects': pool['stats']['objects'], 'num_object_clones': pool['num_object_clones'], 'num_object_copies': pool['num_object_copies'], 'num_objects_missing_on_primary': pool['num_objects_missing_on_primary'], 'num_objects_unfound': pool['num_objects_unfound'], 'num_objects_degraded': pool['num_objects_degraded'], 'read_ops': pool['read_ops'], 'read_bytes': pool['read_bytes'], 'write_ops': pool['write_ops'], 'write_bytes': pool['write_bytes'] } # Write the pool data to Zookeeper zkhandler.write([ (('pool.stats', pool['name']), str(json.dumps(pool_df))) ]) except Exception as e: # One or more of the status commands timed out, just continue logger.out('Failed to format and send pool data: {}'.format(e), state='w') pass # Only grab OSD stats if there are OSDs to grab (otherwise `ceph osd df` hangs) osds_this_node = 0 if len(osd_list) > 0: # Get data from Ceph OSDs if debug: logger.out("Get data from Ceph OSDs", state='d', prefix='ceph-thread') # Parse the dump data osd_dump = dict() command = {"prefix": "osd dump", "format": "json"} osd_dump_output = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii') try: osd_dump_raw = json.loads(osd_dump_output)['osds'] except Exception as e: logger.out('Failed to obtain OSD data: {}'.format(e), state='w') osd_dump_raw = [] if debug: logger.out("Loop through OSD dump", state='d', prefix='ceph-thread') for osd in osd_dump_raw: osd_dump.update({ str(osd['osd']): { 'uuid': osd['uuid'], 'up': osd['up'], 'in': osd['in'], 'primary_affinity': osd['primary_affinity'] } }) # Parse the df data if debug: logger.out("Parse the OSD df data", state='d', prefix='ceph-thread') osd_df = dict() command = {"prefix": "osd df", "format": "json"} try: osd_df_raw = json.loads(ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1])['nodes'] except Exception as e: logger.out('Failed to obtain OSD data: {}'.format(e), state='w') osd_df_raw = [] if debug: logger.out("Loop through OSD df", state='d', prefix='ceph-thread') for osd in osd_df_raw: osd_df.update({ str(osd['id']): { 'utilization': osd['utilization'], 'var': osd['var'], 'pgs': osd['pgs'], 'kb': osd['kb'], 'weight': osd['crush_weight'], 'reweight': osd['reweight'], } }) # Parse the status data if debug: logger.out("Parse the OSD status data", state='d', prefix='ceph-thread') osd_status = dict() command = {"prefix": "osd status", "format": "pretty"} try: osd_status_raw = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii') except Exception as e: logger.out('Failed to obtain OSD status data: {}'.format(e), state='w') osd_status_raw = [] if debug: logger.out("Loop through OSD status data", state='d', prefix='ceph-thread') for line in osd_status_raw.split('\n'): # Strip off colour line = re.sub(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))', '', line) # Split it for parsing line = line.split() if len(line) > 1 and line[1].isdigit(): # This is an OSD line so parse it osd_id = line[1] node = line[3].split('.')[0] used = line[5] avail = line[7] wr_ops = line[9] wr_data = line[11] rd_ops = line[13] rd_data = line[15] state = line[17] osd_status.update({ str(osd_id): { 'node': node, 'used': used, 'avail': avail, 'wr_ops': wr_ops, 'wr_data': wr_data, 'rd_ops': rd_ops, 'rd_data': rd_data, 'state': state } }) # Merge them together into a single meaningful dict if debug: logger.out("Merge OSD data together", state='d', prefix='ceph-thread') osd_stats = dict() for osd in osd_list: if zkhandler.read(('osd.node', osd)) == config['node_hostname']: osds_this_node += 1 try: this_dump = osd_dump[osd] this_dump.update(osd_df[osd]) this_dump.update(osd_status[osd]) osd_stats[osd] = this_dump except KeyError as e: # One or more of the status commands timed out, just continue logger.out('Failed to parse OSD stats into dictionary: {}'.format(e), state='w') # Upload OSD data for the cluster (primary-only) if this_node.router_state == 'primary': if debug: logger.out("Trigger updates for each OSD", state='d', prefix='ceph-thread') for osd in osd_list: try: stats = json.dumps(osd_stats[osd]) zkhandler.write([ (('osd.stats', osd), str(stats)) ]) except KeyError as e: # One or more of the status commands timed out, just continue logger.out('Failed to upload OSD stats from dictionary: {}'.format(e), state='w') ceph_conn.shutdown() queue.put(ceph_health_colour) queue.put(ceph_health) queue.put(osds_this_node) if debug: logger.out("Thread finished", state='d', prefix='ceph-thread') # VM stats update function def collect_vm_stats(logger, config, zkhandler, this_node, queue): debug = config['debug'] if debug: logger.out("Thread starting", state='d', prefix='vm-thread') # Connect to libvirt libvirt_name = "qemu:///system" if debug: logger.out("Connecting to libvirt", state='d', prefix='vm-thread') try: lv_conn = libvirt.open(libvirt_name) if lv_conn is None: raise Exception except Exception: logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e') return memalloc = 0 memprov = 0 vcpualloc = 0 # Toggle state management of dead VMs to restart them if debug: logger.out("Toggle state management of dead VMs to restart them", state='d', prefix='vm-thread') # Make a copy of the d_domain; if not, and it changes in flight, this can fail fixed_d_domain = this_node.d_domain.copy() for domain, instance in fixed_d_domain.items(): if domain in this_node.domain_list: # Add the allocated memory to our memalloc value memalloc += instance.getmemory() memprov += instance.getmemory() vcpualloc += instance.getvcpus() if instance.getstate() == 'start' and instance.getnode() == this_node.name: if instance.getdom() is not None: try: if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING: logger.out("VM {} has failed".format(instance.domname), state='w', prefix='vm-thread') raise except Exception: # Toggle a state "change" logger.out("Resetting state to {} for VM {}".format(instance.getstate(), instance.domname), state='i', prefix='vm-thread') zkhandler.write([ (('domain.state', domain), instance.getstate()) ]) elif instance.getnode() == this_node.name: memprov += instance.getmemory() # Get list of running domains from Libvirt running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE) # Get statistics from any running VMs for domain in running_domains: try: # Get basic information about the VM tree = ElementTree.fromstring(domain.XMLDesc()) domain_uuid = domain.UUIDString() domain_name = domain.name() # Get all the raw information about the VM if debug: logger.out("Getting general statistics for VM {}".format(domain_name), state='d', prefix='vm-thread') domain_state, domain_maxmem, domain_mem, domain_vcpus, domain_cputime = domain.info() # We can't properly gather stats from a non-running VMs so continue if domain_state != libvirt.VIR_DOMAIN_RUNNING: continue domain_memory_stats = domain.memoryStats() domain_cpu_stats = domain.getCPUStats(True)[0] except Exception as e: if debug: try: logger.out("Failed getting VM information for {}: {}".format(domain.name(), e), state='d', prefix='vm-thread') except Exception: pass continue # Ensure VM is present in the domain_list if domain_uuid not in this_node.domain_list: this_node.domain_list.append(domain_uuid) if debug: logger.out("Getting disk statistics for VM {}".format(domain_name), state='d', prefix='vm-thread') domain_disk_stats = [] try: for disk in tree.findall('devices/disk'): disk_name = disk.find('source').get('name') if not disk_name: disk_name = disk.find('source').get('file') disk_stats = domain.blockStats(disk.find('target').get('dev')) domain_disk_stats.append({ "name": disk_name, "rd_req": disk_stats[0], "rd_bytes": disk_stats[1], "wr_req": disk_stats[2], "wr_bytes": disk_stats[3], "err": disk_stats[4] }) except Exception as e: if debug: try: logger.out("Failed getting disk stats for {}: {}".format(domain.name(), e), state='d', prefix='vm-thread') except Exception: pass continue if debug: logger.out("Getting network statistics for VM {}".format(domain_name), state='d', prefix='vm-thread') domain_network_stats = [] try: for interface in tree.findall('devices/interface'): interface_type = interface.get('type') if interface_type not in ['bridge']: continue interface_name = interface.find('target').get('dev') interface_bridge = interface.find('source').get('bridge') interface_stats = domain.interfaceStats(interface_name) domain_network_stats.append({ "name": interface_name, "bridge": interface_bridge, "rd_bytes": interface_stats[0], "rd_packets": interface_stats[1], "rd_errors": interface_stats[2], "rd_drops": interface_stats[3], "wr_bytes": interface_stats[4], "wr_packets": interface_stats[5], "wr_errors": interface_stats[6], "wr_drops": interface_stats[7] }) except Exception as e: if debug: try: logger.out("Failed getting network stats for {}: {}".format(domain.name(), e), state='d', prefix='vm-thread') except Exception: pass continue # Create the final dictionary domain_stats = { "state": libvirt_vm_states[domain_state], "maxmem": domain_maxmem, "livemem": domain_mem, "cpus": domain_vcpus, "cputime": domain_cputime, "mem_stats": domain_memory_stats, "cpu_stats": domain_cpu_stats, "disk_stats": domain_disk_stats, "net_stats": domain_network_stats } if debug: logger.out("Writing statistics for VM {} to Zookeeper".format(domain_name), state='d', prefix='vm-thread') try: zkhandler.write([ (('domain.stats', domain_uuid), str(json.dumps(domain_stats))) ]) except Exception as e: if debug: logger.out("Failed to write domain statistics: {}".format(e), state='d', prefix='vm-thread') # Close the Libvirt connection lv_conn.close() queue.put(len(running_domains)) queue.put(memalloc) queue.put(memprov) queue.put(vcpualloc) if debug: logger.out("Thread finished", state='d', prefix='vm-thread') # Keepalive update function def node_keepalive(logger, config, zkhandler, this_node): debug = config['debug'] if debug: logger.out("Keepalive starting", state='d', prefix='main-thread') # Set the migration selector in Zookeeper for clients to read if config['enable_hypervisor']: if this_node.router_state == 'primary': try: if zkhandler.read('base.config.migration_target_selector') != config['migration_target_selector']: raise except Exception: zkhandler.write([ ('base.config.migration_target_selector', config['migration_target_selector']) ]) # Set the upstream IP in Zookeeper for clients to read if config['enable_networking']: if this_node.router_state == 'primary': try: if zkhandler.read('base.config.upstream_ip') != config['upstream_floating_ip']: raise except Exception: zkhandler.write([ ('base.config.upstream_ip', config['upstream_floating_ip']) ]) # Get past state and update if needed if debug: logger.out("Get past state and update if needed", state='d', prefix='main-thread') past_state = zkhandler.read(('node.state.daemon', this_node.name)) if past_state != 'run' and past_state != 'shutdown': this_node.daemon_state = 'run' zkhandler.write([ (('node.state.daemon', this_node.name), 'run') ]) else: this_node.daemon_state = 'run' # Ensure the primary key is properly set if debug: logger.out("Ensure the primary key is properly set", state='d', prefix='main-thread') if this_node.router_state == 'primary': if zkhandler.read('base.config.primary_node') != this_node.name: zkhandler.write([ ('base.config.primary_node', this_node.name) ]) # Run VM statistics collection in separate thread for parallelization if config['enable_hypervisor']: vm_thread_queue = Queue() vm_stats_thread = Thread(target=collect_vm_stats, args=(logger, config, zkhandler, this_node, vm_thread_queue), kwargs={}) vm_stats_thread.start() # Run Ceph status collection in separate thread for parallelization if config['enable_storage']: ceph_thread_queue = Queue() ceph_stats_thread = Thread(target=collect_ceph_stats, args=(logger, config, zkhandler, this_node, ceph_thread_queue), kwargs={}) ceph_stats_thread.start() # Get node performance statistics this_node.memtotal = int(psutil.virtual_memory().total / 1024 / 1024) this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024) this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024) this_node.cpuload = os.getloadavg()[0] # Join against running threads if config['enable_hypervisor']: vm_stats_thread.join(timeout=config['keepalive_interval']) if vm_stats_thread.is_alive(): logger.out('VM stats gathering exceeded timeout, continuing', state='w') if config['enable_storage']: ceph_stats_thread.join(timeout=config['keepalive_interval']) if ceph_stats_thread.is_alive(): logger.out('Ceph stats gathering exceeded timeout, continuing', state='w') # Get information from thread queues if config['enable_hypervisor']: try: this_node.domains_count = vm_thread_queue.get(timeout=config['keepalive_interval']) this_node.memalloc = vm_thread_queue.get(timeout=config['keepalive_interval']) this_node.memprov = vm_thread_queue.get(timeout=config['keepalive_interval']) this_node.vcpualloc = vm_thread_queue.get(timeout=config['keepalive_interval']) except Exception: logger.out('VM stats queue get exceeded timeout, continuing', state='w') else: this_node.domains_count = 0 this_node.memalloc = 0 this_node.memprov = 0 this_node.vcpualloc = 0 if config['enable_storage']: try: ceph_health_colour = ceph_thread_queue.get(timeout=config['keepalive_interval']) ceph_health = ceph_thread_queue.get(timeout=config['keepalive_interval']) osds_this_node = ceph_thread_queue.get(timeout=config['keepalive_interval']) except Exception: logger.out('Ceph stats queue get exceeded timeout, continuing', state='w') ceph_health_colour = logger.fmt_cyan ceph_health = 'UNKNOWN' osds_this_node = '?' # Set our information in zookeeper keepalive_time = int(time.time()) if debug: logger.out("Set our information in zookeeper", state='d', prefix='main-thread') try: zkhandler.write([ (('node.memory.total', this_node.name), str(this_node.memtotal)), (('node.memory.used', this_node.name), str(this_node.memused)), (('node.memory.free', this_node.name), str(this_node.memfree)), (('node.memory.allocated', this_node.name), str(this_node.memalloc)), (('node.memory.provisioned', this_node.name), str(this_node.memprov)), (('node.vcpu.allocated', this_node.name), str(this_node.vcpualloc)), (('node.cpu.load', this_node.name), str(this_node.cpuload)), (('node.count.provisioned_domains', this_node.name), str(this_node.domains_count)), (('node.running_domains', this_node.name), ' '.join(this_node.domain_list)), (('node.keepalive', this_node.name), str(keepalive_time)), ]) except Exception: logger.out('Failed to set keepalive data', state='e') # Display node information to the terminal if config['log_keepalives']: if this_node.router_state == 'primary': cst_colour = logger.fmt_green elif this_node.router_state == 'secondary': cst_colour = logger.fmt_blue else: cst_colour = logger.fmt_cyan logger.out( '{}{} keepalive @ {}{} [{}{}{}]'.format( logger.fmt_purple, config['node_hostname'], datetime.now(), logger.fmt_end, logger.fmt_bold + cst_colour, this_node.router_state, logger.fmt_end ), state='t' ) if config['log_keepalive_cluster_details']: logger.out( '{bold}Maintenance:{nofmt} {maint} ' '{bold}Active VMs:{nofmt} {domcount} ' '{bold}Networks:{nofmt} {netcount} ' '{bold}Load:{nofmt} {load} ' '{bold}Memory [MiB]: VMs:{nofmt} {allocmem} ' '{bold}Used:{nofmt} {usedmem} ' '{bold}Free:{nofmt} {freemem}'.format( bold=logger.fmt_bold, nofmt=logger.fmt_end, maint=this_node.maintenance, domcount=this_node.domains_count, netcount=len(zkhandler.children('base.network')), load=this_node.cpuload, freemem=this_node.memfree, usedmem=this_node.memused, allocmem=this_node.memalloc, ), state='t' ) if config['enable_storage'] and config['log_keepalive_storage_details']: logger.out( '{bold}Ceph cluster status:{nofmt} {health_colour}{health}{nofmt} ' '{bold}Total OSDs:{nofmt} {total_osds} ' '{bold}Node OSDs:{nofmt} {node_osds} ' '{bold}Pools:{nofmt} {total_pools} '.format( bold=logger.fmt_bold, health_colour=ceph_health_colour, nofmt=logger.fmt_end, health=ceph_health, total_osds=len(zkhandler.children('base.osd')), node_osds=osds_this_node, total_pools=len(zkhandler.children('base.pool')) ), state='t' ) # Look for dead nodes and fence them if not this_node.maintenance: if debug: logger.out("Look for dead nodes and fence them", state='d', prefix='main-thread') if config['daemon_mode'] == 'coordinator': for node_name in zkhandler.children('base.node'): try: node_daemon_state = zkhandler.read(('node.state.daemon', node_name)) node_keepalive = int(zkhandler.read(('node.keepalive', node_name))) except Exception: node_daemon_state = 'unknown' node_keepalive = 0 # Handle deadtime and fencng if needed # (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds # out-of-date while in 'start' state) node_deadtime = int(time.time()) - (int(config['keepalive_interval']) * int(config['fence_intervals'])) if node_keepalive < node_deadtime and node_daemon_state == 'run': logger.out('Node {} seems dead - starting monitor for fencing'.format(node_name), state='w') zk_lock = zkhandler.writelock(('node.state.daemon', node_name)) with zk_lock: # Ensures that, if we lost the lock race and come out of waiting, # we won't try to trigger our own fence thread. if zkhandler.read(('node.state.daemon', node_name)) != 'dead': fence_thread = Thread(target=pvcnoded.util.fencing.fence_node, args=(node_name, zkhandler, config, logger), kwargs={}) fence_thread.start() # Write the updated data after we start the fence thread zkhandler.write([ (('node.state.daemon', node_name), 'dead') ]) if debug: logger.out("Keepalive finished", state='d', prefix='main-thread')