923 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			923 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						|
 | 
						|
# keepalive.py - Utility functions for pvcnoded Keepalives
 | 
						|
# Part of the Parallel Virtual Cluster (PVC) system
 | 
						|
#
 | 
						|
#    Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
 | 
						|
#
 | 
						|
#    This program is free software: you can redistribute it and/or modify
 | 
						|
#    it under the terms of the GNU General Public License as published by
 | 
						|
#    the Free Software Foundation, version 3.
 | 
						|
#
 | 
						|
#    This program is distributed in the hope that it will be useful,
 | 
						|
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
#    GNU General Public License for more details.
 | 
						|
#
 | 
						|
#    You should have received a copy of the GNU General Public License
 | 
						|
#    along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
						|
#
 | 
						|
###############################################################################
 | 
						|
 | 
						|
import pvcnoded.util.fencing
 | 
						|
 | 
						|
import daemon_lib.common as common
 | 
						|
 | 
						|
from apscheduler.schedulers.background import BackgroundScheduler
 | 
						|
from rados import Rados
 | 
						|
from xml.etree import ElementTree
 | 
						|
from queue import Queue
 | 
						|
from threading import Thread
 | 
						|
from datetime import datetime
 | 
						|
 | 
						|
import json
 | 
						|
import re
 | 
						|
import libvirt
 | 
						|
import psutil
 | 
						|
import os
 | 
						|
import time
 | 
						|
 | 
						|
 | 
						|
# State table for pretty stats
 | 
						|
libvirt_vm_states = {
 | 
						|
    0: "NOSTATE",
 | 
						|
    1: "RUNNING",
 | 
						|
    2: "BLOCKED",
 | 
						|
    3: "PAUSED",
 | 
						|
    4: "SHUTDOWN",
 | 
						|
    5: "SHUTOFF",
 | 
						|
    6: "CRASHED",
 | 
						|
    7: "PMSUSPENDED",
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
def start_keepalive_timer(logger, config, zkhandler, this_node, monitoring_instance):
 | 
						|
    keepalive_interval = config["keepalive_interval"]
 | 
						|
    logger.out(
 | 
						|
        f"Starting keepalive timer ({keepalive_interval} second interval)", state="s"
 | 
						|
    )
 | 
						|
    keepalive_timer = BackgroundScheduler()
 | 
						|
    keepalive_timer.add_job(
 | 
						|
        node_keepalive,
 | 
						|
        args=(logger, config, zkhandler, this_node, monitoring_instance),
 | 
						|
        trigger="interval",
 | 
						|
        seconds=keepalive_interval,
 | 
						|
    )
 | 
						|
    keepalive_timer.start()
 | 
						|
    return keepalive_timer
 | 
						|
 | 
						|
 | 
						|
def stop_keepalive_timer(logger, keepalive_timer):
 | 
						|
    try:
 | 
						|
        keepalive_timer.shutdown()
 | 
						|
        logger.out("Stopping keepalive timer", state="s")
 | 
						|
    except Exception:
 | 
						|
        logger.out("Failed to stop keepalive timer", state="w")
 | 
						|
 | 
						|
 | 
						|
# Ceph stats update function
 | 
						|
def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
						|
    pool_list = zkhandler.children("base.pool")
 | 
						|
    osd_list = zkhandler.children("base.osd")
 | 
						|
 | 
						|
    debug = config["debug"]
 | 
						|
    if debug:
 | 
						|
        logger.out("Thread starting", state="d", prefix="ceph-thread")
 | 
						|
 | 
						|
    # Connect to the Ceph cluster
 | 
						|
    try:
 | 
						|
        ceph_conn = Rados(
 | 
						|
            conffile=config["ceph_config_file"],
 | 
						|
            conf=dict(keyring=config["ceph_admin_keyring"]),
 | 
						|
        )
 | 
						|
        if debug:
 | 
						|
            logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
 | 
						|
        ceph_conn.connect(timeout=1)
 | 
						|
    except Exception as e:
 | 
						|
        logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e")
 | 
						|
        return
 | 
						|
 | 
						|
    if debug:
 | 
						|
        logger.out("Getting health stats from monitor", state="d", prefix="ceph-thread")
 | 
						|
 | 
						|
    # Get Ceph cluster health for local status output
 | 
						|
    command = {"prefix": "health", "format": "json"}
 | 
						|
    try:
 | 
						|
        health_status = json.loads(
 | 
						|
            ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1]
 | 
						|
        )
 | 
						|
        ceph_health = health_status["status"]
 | 
						|
    except Exception as e:
 | 
						|
        logger.out("Failed to obtain Ceph health data: {}".format(e), state="e")
 | 
						|
        ceph_health = "HEALTH_UNKN"
 | 
						|
 | 
						|
    if ceph_health in ["HEALTH_OK"]:
 | 
						|
        ceph_health_colour = logger.fmt_green
 | 
						|
    elif ceph_health in ["HEALTH_UNKN"]:
 | 
						|
        ceph_health_colour = logger.fmt_cyan
 | 
						|
    elif ceph_health in ["HEALTH_WARN"]:
 | 
						|
        ceph_health_colour = logger.fmt_yellow
 | 
						|
    else:
 | 
						|
        ceph_health_colour = logger.fmt_red
 | 
						|
 | 
						|
    # Primary-only functions
 | 
						|
    if this_node.router_state == "primary":
 | 
						|
        if debug:
 | 
						|
            logger.out(
 | 
						|
                "Set ceph health information in zookeeper (primary only)",
 | 
						|
                state="d",
 | 
						|
                prefix="ceph-thread",
 | 
						|
            )
 | 
						|
 | 
						|
        command = {"prefix": "status", "format": "pretty"}
 | 
						|
        ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
 | 
						|
            1
 | 
						|
        ].decode("ascii")
 | 
						|
        try:
 | 
						|
            zkhandler.write([("base.storage", str(ceph_status))])
 | 
						|
        except Exception as e:
 | 
						|
            logger.out("Failed to set Ceph status data: {}".format(e), state="e")
 | 
						|
 | 
						|
        if debug:
 | 
						|
            logger.out(
 | 
						|
                "Set ceph rados df information in zookeeper (primary only)",
 | 
						|
                state="d",
 | 
						|
                prefix="ceph-thread",
 | 
						|
            )
 | 
						|
 | 
						|
        # Get rados df info
 | 
						|
        command = {"prefix": "df", "format": "pretty"}
 | 
						|
        ceph_df = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1].decode(
 | 
						|
            "ascii"
 | 
						|
        )
 | 
						|
        try:
 | 
						|
            zkhandler.write([("base.storage.util", str(ceph_df))])
 | 
						|
        except Exception as e:
 | 
						|
            logger.out("Failed to set Ceph utilization data: {}".format(e), state="e")
 | 
						|
 | 
						|
        if debug:
 | 
						|
            logger.out(
 | 
						|
                "Set pool information in zookeeper (primary only)",
 | 
						|
                state="d",
 | 
						|
                prefix="ceph-thread",
 | 
						|
            )
 | 
						|
 | 
						|
        # Get pool info
 | 
						|
        command = {"prefix": "df", "format": "json"}
 | 
						|
        ceph_df_output = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
 | 
						|
            1
 | 
						|
        ].decode("ascii")
 | 
						|
        try:
 | 
						|
            ceph_pool_df_raw = json.loads(ceph_df_output)["pools"]
 | 
						|
        except Exception as e:
 | 
						|
            logger.out("Failed to obtain Pool data (ceph df): {}".format(e), state="w")
 | 
						|
            ceph_pool_df_raw = []
 | 
						|
 | 
						|
        retcode, stdout, stderr = common.run_os_command(
 | 
						|
            "rados df --format json", timeout=1
 | 
						|
        )
 | 
						|
        try:
 | 
						|
            rados_pool_df_raw = json.loads(stdout)["pools"]
 | 
						|
        except Exception as e:
 | 
						|
            logger.out("Failed to obtain Pool data (rados df): {}".format(e), state="w")
 | 
						|
            rados_pool_df_raw = []
 | 
						|
 | 
						|
        pool_count = len(ceph_pool_df_raw)
 | 
						|
        if debug:
 | 
						|
            logger.out(
 | 
						|
                "Getting info for {} pools".format(pool_count),
 | 
						|
                state="d",
 | 
						|
                prefix="ceph-thread",
 | 
						|
            )
 | 
						|
        for pool_idx in range(0, pool_count):
 | 
						|
            try:
 | 
						|
                # Combine all the data for this pool
 | 
						|
                ceph_pool_df = ceph_pool_df_raw[pool_idx]
 | 
						|
                rados_pool_df = rados_pool_df_raw[pool_idx]
 | 
						|
                pool = ceph_pool_df
 | 
						|
                pool.update(rados_pool_df)
 | 
						|
 | 
						|
                # Ignore any pools that aren't in our pool list
 | 
						|
                if pool["name"] not in pool_list:
 | 
						|
                    if debug:
 | 
						|
                        logger.out(
 | 
						|
                            "Pool {} not in pool list {}".format(
 | 
						|
                                pool["name"], pool_list
 | 
						|
                            ),
 | 
						|
                            state="d",
 | 
						|
                            prefix="ceph-thread",
 | 
						|
                        )
 | 
						|
                    continue
 | 
						|
                else:
 | 
						|
                    if debug:
 | 
						|
                        logger.out(
 | 
						|
                            "Parsing data for pool {}".format(pool["name"]),
 | 
						|
                            state="d",
 | 
						|
                            prefix="ceph-thread",
 | 
						|
                        )
 | 
						|
 | 
						|
                # Assemble a useful data structure
 | 
						|
                pool_df = {
 | 
						|
                    "id": pool["id"],
 | 
						|
                    "stored_bytes": pool["stats"]["stored"],
 | 
						|
                    "free_bytes": pool["stats"]["max_avail"],
 | 
						|
                    "used_bytes": pool["stats"]["bytes_used"],
 | 
						|
                    "used_percent": pool["stats"]["percent_used"],
 | 
						|
                    "num_objects": pool["stats"]["objects"],
 | 
						|
                    "num_object_clones": pool["num_object_clones"],
 | 
						|
                    "num_object_copies": pool["num_object_copies"],
 | 
						|
                    "num_objects_missing_on_primary": pool[
 | 
						|
                        "num_objects_missing_on_primary"
 | 
						|
                    ],
 | 
						|
                    "num_objects_unfound": pool["num_objects_unfound"],
 | 
						|
                    "num_objects_degraded": pool["num_objects_degraded"],
 | 
						|
                    "read_ops": pool["read_ops"],
 | 
						|
                    "read_bytes": pool["read_bytes"],
 | 
						|
                    "write_ops": pool["write_ops"],
 | 
						|
                    "write_bytes": pool["write_bytes"],
 | 
						|
                }
 | 
						|
 | 
						|
                # Write the pool data to Zookeeper
 | 
						|
                zkhandler.write(
 | 
						|
                    [(("pool.stats", pool["name"]), str(json.dumps(pool_df)))]
 | 
						|
                )
 | 
						|
            except Exception as e:
 | 
						|
                # One or more of the status commands timed out, just continue
 | 
						|
                logger.out(
 | 
						|
                    "Failed to format and send pool data: {}".format(e), state="w"
 | 
						|
                )
 | 
						|
                pass
 | 
						|
 | 
						|
    # Only grab OSD stats if there are OSDs to grab (otherwise `ceph osd df` hangs)
 | 
						|
    osds_this_node = 0
 | 
						|
    if len(osd_list) > 0:
 | 
						|
        # Get data from Ceph OSDs
 | 
						|
        if debug:
 | 
						|
            logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
 | 
						|
 | 
						|
        # Parse the dump data
 | 
						|
        osd_dump = dict()
 | 
						|
 | 
						|
        command = {"prefix": "osd dump", "format": "json"}
 | 
						|
        osd_dump_output = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
 | 
						|
            1
 | 
						|
        ].decode("ascii")
 | 
						|
        try:
 | 
						|
            osd_dump_raw = json.loads(osd_dump_output)["osds"]
 | 
						|
        except Exception as e:
 | 
						|
            logger.out("Failed to obtain OSD data: {}".format(e), state="w")
 | 
						|
            osd_dump_raw = []
 | 
						|
 | 
						|
        if debug:
 | 
						|
            logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
 | 
						|
        for osd in osd_dump_raw:
 | 
						|
            osd_dump.update(
 | 
						|
                {
 | 
						|
                    str(osd["osd"]): {
 | 
						|
                        "uuid": osd["uuid"],
 | 
						|
                        "up": osd["up"],
 | 
						|
                        "in": osd["in"],
 | 
						|
                        "primary_affinity": osd["primary_affinity"],
 | 
						|
                    }
 | 
						|
                }
 | 
						|
            )
 | 
						|
 | 
						|
        # Parse the df data
 | 
						|
        if debug:
 | 
						|
            logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
 | 
						|
 | 
						|
        osd_df = dict()
 | 
						|
 | 
						|
        command = {"prefix": "osd df", "format": "json"}
 | 
						|
        try:
 | 
						|
            osd_df_raw = json.loads(
 | 
						|
                ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1]
 | 
						|
            )["nodes"]
 | 
						|
        except Exception as e:
 | 
						|
            logger.out("Failed to obtain OSD data: {}".format(e), state="w")
 | 
						|
            osd_df_raw = []
 | 
						|
 | 
						|
        if debug:
 | 
						|
            logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
 | 
						|
        for osd in osd_df_raw:
 | 
						|
            osd_df.update(
 | 
						|
                {
 | 
						|
                    str(osd["id"]): {
 | 
						|
                        "utilization": osd["utilization"],
 | 
						|
                        "var": osd["var"],
 | 
						|
                        "pgs": osd["pgs"],
 | 
						|
                        "kb": osd["kb"],
 | 
						|
                        "kb_used": osd["kb_used"],
 | 
						|
                        "kb_used_data": osd["kb_used_data"],
 | 
						|
                        "kb_used_omap": osd["kb_used_omap"],
 | 
						|
                        "kb_used_meta": osd["kb_used_meta"],
 | 
						|
                        "kb_avail": osd["kb_avail"],
 | 
						|
                        "weight": osd["crush_weight"],
 | 
						|
                        "reweight": osd["reweight"],
 | 
						|
                        "class": osd["device_class"],
 | 
						|
                    }
 | 
						|
                }
 | 
						|
            )
 | 
						|
 | 
						|
        # Parse the status data
 | 
						|
        if debug:
 | 
						|
            logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
 | 
						|
 | 
						|
        osd_status = dict()
 | 
						|
 | 
						|
        command = {"prefix": "osd status", "format": "pretty"}
 | 
						|
        try:
 | 
						|
            osd_status_raw = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
 | 
						|
                1
 | 
						|
            ].decode("ascii")
 | 
						|
        except Exception as e:
 | 
						|
            logger.out("Failed to obtain OSD status data: {}".format(e), state="w")
 | 
						|
            osd_status_raw = []
 | 
						|
 | 
						|
        if debug:
 | 
						|
            logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
 | 
						|
 | 
						|
        for line in osd_status_raw.split("\n"):
 | 
						|
            # Strip off colour
 | 
						|
            line = re.sub(r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))", "", line)
 | 
						|
            # Split it for parsing
 | 
						|
            line = line.split()
 | 
						|
            if len(line) > 1 and line[1].isdigit():
 | 
						|
                # This is an OSD line so parse it
 | 
						|
                osd_id = line[1]
 | 
						|
                node = line[3].split(".")[0]
 | 
						|
                used = line[5]
 | 
						|
                avail = line[7]
 | 
						|
                wr_ops = line[9]
 | 
						|
                wr_data = line[11]
 | 
						|
                rd_ops = line[13]
 | 
						|
                rd_data = line[15]
 | 
						|
                state = line[17]
 | 
						|
                osd_status.update(
 | 
						|
                    {
 | 
						|
                        str(osd_id): {
 | 
						|
                            "node": node,
 | 
						|
                            "used": used,
 | 
						|
                            "avail": avail,
 | 
						|
                            "wr_ops": wr_ops,
 | 
						|
                            "wr_data": wr_data,
 | 
						|
                            "rd_ops": rd_ops,
 | 
						|
                            "rd_data": rd_data,
 | 
						|
                            "state": state,
 | 
						|
                        }
 | 
						|
                    }
 | 
						|
                )
 | 
						|
 | 
						|
        # Merge them together into a single meaningful dict
 | 
						|
        if debug:
 | 
						|
            logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
 | 
						|
 | 
						|
        osd_stats = dict()
 | 
						|
 | 
						|
        for osd in osd_list:
 | 
						|
            if zkhandler.read(("osd.node", osd)) == config["node_hostname"]:
 | 
						|
                osds_this_node += 1
 | 
						|
            try:
 | 
						|
                this_dump = osd_dump[osd]
 | 
						|
                this_dump.update(osd_df[osd])
 | 
						|
                this_dump.update(osd_status[osd])
 | 
						|
                osd_stats[osd] = this_dump
 | 
						|
            except KeyError as e:
 | 
						|
                # One or more of the status commands timed out, just continue
 | 
						|
                logger.out(
 | 
						|
                    "Failed to parse OSD stats into dictionary: {}".format(e), state="w"
 | 
						|
                )
 | 
						|
 | 
						|
        # Upload OSD data for the cluster (primary-only)
 | 
						|
        if this_node.router_state == "primary":
 | 
						|
            if debug:
 | 
						|
                logger.out(
 | 
						|
                    "Trigger updates for each OSD", state="d", prefix="ceph-thread"
 | 
						|
                )
 | 
						|
 | 
						|
            for osd in osd_list:
 | 
						|
                try:
 | 
						|
                    stats = json.dumps(osd_stats[osd])
 | 
						|
                    zkhandler.write([(("osd.stats", osd), str(stats))])
 | 
						|
                except KeyError as e:
 | 
						|
                    # One or more of the status commands timed out, just continue
 | 
						|
                    logger.out(
 | 
						|
                        "Failed to upload OSD stats from dictionary: {}".format(e),
 | 
						|
                        state="w",
 | 
						|
                    )
 | 
						|
 | 
						|
    ceph_conn.shutdown()
 | 
						|
 | 
						|
    queue.put(ceph_health_colour)
 | 
						|
    queue.put(ceph_health)
 | 
						|
    queue.put(osds_this_node)
 | 
						|
 | 
						|
    if debug:
 | 
						|
        logger.out("Thread finished", state="d", prefix="ceph-thread")
 | 
						|
 | 
						|
 | 
						|
# VM stats update function
 | 
						|
def collect_vm_stats(logger, config, zkhandler, this_node, queue):
 | 
						|
    debug = config["debug"]
 | 
						|
    if debug:
 | 
						|
        logger.out("Thread starting", state="d", prefix="vm-thread")
 | 
						|
 | 
						|
    # Connect to libvirt
 | 
						|
    libvirt_name = "qemu:///system"
 | 
						|
    if debug:
 | 
						|
        logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
 | 
						|
    try:
 | 
						|
        lv_conn = libvirt.open(libvirt_name)
 | 
						|
        if lv_conn is None:
 | 
						|
            raise Exception
 | 
						|
    except Exception:
 | 
						|
        logger.out('Failed to open connection to "{}"'.format(libvirt_name), state="e")
 | 
						|
        return
 | 
						|
 | 
						|
    memalloc = 0
 | 
						|
    memprov = 0
 | 
						|
    vcpualloc = 0
 | 
						|
    # Toggle state management of dead VMs to restart them
 | 
						|
    if debug:
 | 
						|
        logger.out(
 | 
						|
            "Toggle state management of dead VMs to restart them",
 | 
						|
            state="d",
 | 
						|
            prefix="vm-thread",
 | 
						|
        )
 | 
						|
    # Make a copy of the d_domain; if not, and it changes in flight, this can fail
 | 
						|
    fixed_d_domain = this_node.d_domain.copy()
 | 
						|
    for domain, instance in fixed_d_domain.items():
 | 
						|
        if domain in this_node.domain_list:
 | 
						|
            # Add the allocated memory to our memalloc value
 | 
						|
            memalloc += instance.getmemory()
 | 
						|
            memprov += instance.getmemory()
 | 
						|
            vcpualloc += instance.getvcpus()
 | 
						|
            if instance.getstate() == "start" and instance.getnode() == this_node.name:
 | 
						|
                if instance.getdom() is not None:
 | 
						|
                    try:
 | 
						|
                        if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
 | 
						|
                            logger.out(
 | 
						|
                                "VM {} has failed".format(instance.domname),
 | 
						|
                                state="w",
 | 
						|
                                prefix="vm-thread",
 | 
						|
                            )
 | 
						|
                            raise
 | 
						|
                    except Exception:
 | 
						|
                        # Toggle a state "change"
 | 
						|
                        logger.out(
 | 
						|
                            "Resetting state to {} for VM {}".format(
 | 
						|
                                instance.getstate(), instance.domname
 | 
						|
                            ),
 | 
						|
                            state="i",
 | 
						|
                            prefix="vm-thread",
 | 
						|
                        )
 | 
						|
                        zkhandler.write(
 | 
						|
                            [(("domain.state", domain), instance.getstate())]
 | 
						|
                        )
 | 
						|
        elif instance.getnode() == this_node.name:
 | 
						|
            memprov += instance.getmemory()
 | 
						|
 | 
						|
    # Get list of running domains from Libvirt
 | 
						|
    running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE)
 | 
						|
 | 
						|
    # Get statistics from any running VMs
 | 
						|
    for domain in running_domains:
 | 
						|
        try:
 | 
						|
            # Get basic information about the VM
 | 
						|
            tree = ElementTree.fromstring(domain.XMLDesc())
 | 
						|
            domain_uuid = domain.UUIDString()
 | 
						|
            domain_name = domain.name()
 | 
						|
 | 
						|
            # Get all the raw information about the VM
 | 
						|
            if debug:
 | 
						|
                logger.out(
 | 
						|
                    "Getting general statistics for VM {}".format(domain_name),
 | 
						|
                    state="d",
 | 
						|
                    prefix="vm-thread",
 | 
						|
                )
 | 
						|
            (
 | 
						|
                domain_state,
 | 
						|
                domain_maxmem,
 | 
						|
                domain_mem,
 | 
						|
                domain_vcpus,
 | 
						|
                domain_cputime,
 | 
						|
            ) = domain.info()
 | 
						|
            # We can't properly gather stats from a non-running VMs so continue
 | 
						|
            if domain_state != libvirt.VIR_DOMAIN_RUNNING:
 | 
						|
                continue
 | 
						|
            domain_memory_stats = domain.memoryStats()
 | 
						|
            domain_cpu_stats = domain.getCPUStats(True)[0]
 | 
						|
        except Exception as e:
 | 
						|
            if debug:
 | 
						|
                try:
 | 
						|
                    logger.out(
 | 
						|
                        "Failed getting VM information for {}: {}".format(
 | 
						|
                            domain.name(), e
 | 
						|
                        ),
 | 
						|
                        state="d",
 | 
						|
                        prefix="vm-thread",
 | 
						|
                    )
 | 
						|
                except Exception:
 | 
						|
                    pass
 | 
						|
            continue
 | 
						|
 | 
						|
        # Ensure VM is present in the domain_list
 | 
						|
        if domain_uuid not in this_node.domain_list:
 | 
						|
            this_node.domain_list.append(domain_uuid)
 | 
						|
 | 
						|
        if debug:
 | 
						|
            logger.out(
 | 
						|
                "Getting disk statistics for VM {}".format(domain_name),
 | 
						|
                state="d",
 | 
						|
                prefix="vm-thread",
 | 
						|
            )
 | 
						|
        domain_disk_stats = []
 | 
						|
        try:
 | 
						|
            for disk in tree.findall("devices/disk"):
 | 
						|
                disk_name = disk.find("source").get("name")
 | 
						|
                if not disk_name:
 | 
						|
                    disk_name = disk.find("source").get("file")
 | 
						|
                disk_stats = domain.blockStats(disk.find("target").get("dev"))
 | 
						|
                domain_disk_stats.append(
 | 
						|
                    {
 | 
						|
                        "name": disk_name,
 | 
						|
                        "rd_req": disk_stats[0],
 | 
						|
                        "rd_bytes": disk_stats[1],
 | 
						|
                        "wr_req": disk_stats[2],
 | 
						|
                        "wr_bytes": disk_stats[3],
 | 
						|
                        "err": disk_stats[4],
 | 
						|
                    }
 | 
						|
                )
 | 
						|
        except Exception as e:
 | 
						|
            if debug:
 | 
						|
                try:
 | 
						|
                    logger.out(
 | 
						|
                        "Failed getting disk stats for {}: {}".format(domain.name(), e),
 | 
						|
                        state="d",
 | 
						|
                        prefix="vm-thread",
 | 
						|
                    )
 | 
						|
                except Exception:
 | 
						|
                    pass
 | 
						|
            continue
 | 
						|
 | 
						|
        if debug:
 | 
						|
            logger.out(
 | 
						|
                "Getting network statistics for VM {}".format(domain_name),
 | 
						|
                state="d",
 | 
						|
                prefix="vm-thread",
 | 
						|
            )
 | 
						|
        domain_network_stats = []
 | 
						|
        try:
 | 
						|
            for interface in tree.findall("devices/interface"):
 | 
						|
                interface_type = interface.get("type")
 | 
						|
                if interface_type not in ["bridge"]:
 | 
						|
                    continue
 | 
						|
                interface_name = interface.find("target").get("dev")
 | 
						|
                interface_bridge = interface.find("source").get("bridge")
 | 
						|
                interface_stats = domain.interfaceStats(interface_name)
 | 
						|
                domain_network_stats.append(
 | 
						|
                    {
 | 
						|
                        "name": interface_name,
 | 
						|
                        "bridge": interface_bridge,
 | 
						|
                        "rd_bytes": interface_stats[0],
 | 
						|
                        "rd_packets": interface_stats[1],
 | 
						|
                        "rd_errors": interface_stats[2],
 | 
						|
                        "rd_drops": interface_stats[3],
 | 
						|
                        "wr_bytes": interface_stats[4],
 | 
						|
                        "wr_packets": interface_stats[5],
 | 
						|
                        "wr_errors": interface_stats[6],
 | 
						|
                        "wr_drops": interface_stats[7],
 | 
						|
                    }
 | 
						|
                )
 | 
						|
        except Exception as e:
 | 
						|
            if debug:
 | 
						|
                try:
 | 
						|
                    logger.out(
 | 
						|
                        "Failed getting network stats for {}: {}".format(
 | 
						|
                            domain.name(), e
 | 
						|
                        ),
 | 
						|
                        state="d",
 | 
						|
                        prefix="vm-thread",
 | 
						|
                    )
 | 
						|
                except Exception:
 | 
						|
                    pass
 | 
						|
            continue
 | 
						|
 | 
						|
        # Create the final dictionary
 | 
						|
        domain_stats = {
 | 
						|
            "state": libvirt_vm_states[domain_state],
 | 
						|
            "maxmem": domain_maxmem,
 | 
						|
            "livemem": domain_mem,
 | 
						|
            "cpus": domain_vcpus,
 | 
						|
            "cputime": domain_cputime,
 | 
						|
            "mem_stats": domain_memory_stats,
 | 
						|
            "cpu_stats": domain_cpu_stats,
 | 
						|
            "disk_stats": domain_disk_stats,
 | 
						|
            "net_stats": domain_network_stats,
 | 
						|
        }
 | 
						|
 | 
						|
        if debug:
 | 
						|
            logger.out(
 | 
						|
                "Writing statistics for VM {} to Zookeeper".format(domain_name),
 | 
						|
                state="d",
 | 
						|
                prefix="vm-thread",
 | 
						|
            )
 | 
						|
 | 
						|
        try:
 | 
						|
            zkhandler.write(
 | 
						|
                [(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))]
 | 
						|
            )
 | 
						|
        except Exception as e:
 | 
						|
            if debug:
 | 
						|
                logger.out(
 | 
						|
                    "Failed to write domain statistics: {}".format(e),
 | 
						|
                    state="d",
 | 
						|
                    prefix="vm-thread",
 | 
						|
                )
 | 
						|
 | 
						|
    # Close the Libvirt connection
 | 
						|
    lv_conn.close()
 | 
						|
 | 
						|
    queue.put(len(running_domains))
 | 
						|
    queue.put(memalloc)
 | 
						|
    queue.put(memprov)
 | 
						|
    queue.put(vcpualloc)
 | 
						|
 | 
						|
    if debug:
 | 
						|
        logger.out("Thread finished", state="d", prefix="vm-thread")
 | 
						|
 | 
						|
 | 
						|
# Keepalive update function
 | 
						|
def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
 | 
						|
    debug = config["debug"]
 | 
						|
    if debug:
 | 
						|
        logger.out("Keepalive starting", state="d", prefix="main-thread")
 | 
						|
 | 
						|
    # Set the migration selector in Zookeeper for clients to read
 | 
						|
    if config["enable_hypervisor"]:
 | 
						|
        if this_node.router_state == "primary":
 | 
						|
            try:
 | 
						|
                if (
 | 
						|
                    zkhandler.read("base.config.migration_target_selector")
 | 
						|
                    != config["migration_target_selector"]
 | 
						|
                ):
 | 
						|
                    zkhandler.write(
 | 
						|
                        [
 | 
						|
                            (
 | 
						|
                                "base.config.migration_target_selector",
 | 
						|
                                config["migration_target_selector"],
 | 
						|
                            )
 | 
						|
                        ]
 | 
						|
                    )
 | 
						|
            except Exception:
 | 
						|
                logger.out(
 | 
						|
                    "Failed to set migration target selector in Zookeeper",
 | 
						|
                    state="e",
 | 
						|
                    prefix="main-thread",
 | 
						|
                )
 | 
						|
 | 
						|
    # Set the upstream IP in Zookeeper for clients to read
 | 
						|
    if config["enable_networking"]:
 | 
						|
        if this_node.router_state == "primary":
 | 
						|
            try:
 | 
						|
                if (
 | 
						|
                    zkhandler.read("base.config.upstream_ip")
 | 
						|
                    != config["upstream_floating_ip"]
 | 
						|
                ):
 | 
						|
                    zkhandler.write(
 | 
						|
                        [("base.config.upstream_ip", config["upstream_floating_ip"])]
 | 
						|
                    )
 | 
						|
            except Exception:
 | 
						|
                logger.out(
 | 
						|
                    "Failed to set upstream floating IP in Zookeeper",
 | 
						|
                    state="e",
 | 
						|
                    prefix="main-thread",
 | 
						|
                )
 | 
						|
 | 
						|
    # Get past state and update if needed
 | 
						|
    if debug:
 | 
						|
        logger.out(
 | 
						|
            "Get past state and update if needed", state="d", prefix="main-thread"
 | 
						|
        )
 | 
						|
 | 
						|
    past_state = zkhandler.read(("node.state.daemon", this_node.name))
 | 
						|
    if past_state != "run" and past_state != "shutdown":
 | 
						|
        this_node.daemon_state = "run"
 | 
						|
        zkhandler.write([(("node.state.daemon", this_node.name), "run")])
 | 
						|
    else:
 | 
						|
        this_node.daemon_state = "run"
 | 
						|
 | 
						|
    # Ensure the primary key is properly set
 | 
						|
    if debug:
 | 
						|
        logger.out(
 | 
						|
            "Ensure the primary key is properly set", state="d", prefix="main-thread"
 | 
						|
        )
 | 
						|
    if this_node.router_state == "primary":
 | 
						|
        if zkhandler.read("base.config.primary_node") != this_node.name:
 | 
						|
            zkhandler.write([("base.config.primary_node", this_node.name)])
 | 
						|
 | 
						|
    # Run VM statistics collection in separate thread for parallelization
 | 
						|
    if config["enable_hypervisor"]:
 | 
						|
        vm_thread_queue = Queue()
 | 
						|
        vm_stats_thread = Thread(
 | 
						|
            target=collect_vm_stats,
 | 
						|
            args=(logger, config, zkhandler, this_node, vm_thread_queue),
 | 
						|
            kwargs={},
 | 
						|
        )
 | 
						|
        vm_stats_thread.start()
 | 
						|
 | 
						|
    # Run Ceph status collection in separate thread for parallelization
 | 
						|
    if config["enable_storage"]:
 | 
						|
        ceph_thread_queue = Queue()
 | 
						|
        ceph_stats_thread = Thread(
 | 
						|
            target=collect_ceph_stats,
 | 
						|
            args=(logger, config, zkhandler, this_node, ceph_thread_queue),
 | 
						|
            kwargs={},
 | 
						|
        )
 | 
						|
        ceph_stats_thread.start()
 | 
						|
 | 
						|
    # Get node performance statistics
 | 
						|
    this_node.memtotal = int(psutil.virtual_memory().total / 1024 / 1024)
 | 
						|
    this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024)
 | 
						|
    this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024)
 | 
						|
    this_node.cpuload = os.getloadavg()[0]
 | 
						|
 | 
						|
    # Join against running threads
 | 
						|
    if config["enable_hypervisor"]:
 | 
						|
        vm_stats_thread.join(timeout=config["keepalive_interval"])
 | 
						|
        if vm_stats_thread.is_alive():
 | 
						|
            logger.out("VM stats gathering exceeded timeout, continuing", state="w")
 | 
						|
    if config["enable_storage"]:
 | 
						|
        ceph_stats_thread.join(timeout=config["keepalive_interval"])
 | 
						|
        if ceph_stats_thread.is_alive():
 | 
						|
            logger.out("Ceph stats gathering exceeded timeout, continuing", state="w")
 | 
						|
 | 
						|
    # Get information from thread queues
 | 
						|
    if config["enable_hypervisor"]:
 | 
						|
        try:
 | 
						|
            this_node.domains_count = vm_thread_queue.get(
 | 
						|
                timeout=config["keepalive_interval"]
 | 
						|
            )
 | 
						|
            this_node.memalloc = vm_thread_queue.get(
 | 
						|
                timeout=config["keepalive_interval"]
 | 
						|
            )
 | 
						|
            this_node.memprov = vm_thread_queue.get(
 | 
						|
                timeout=config["keepalive_interval"]
 | 
						|
            )
 | 
						|
            this_node.vcpualloc = vm_thread_queue.get(
 | 
						|
                timeout=config["keepalive_interval"]
 | 
						|
            )
 | 
						|
        except Exception:
 | 
						|
            logger.out("VM stats queue get exceeded timeout, continuing", state="w")
 | 
						|
    else:
 | 
						|
        this_node.domains_count = 0
 | 
						|
        this_node.memalloc = 0
 | 
						|
        this_node.memprov = 0
 | 
						|
        this_node.vcpualloc = 0
 | 
						|
 | 
						|
    if config["enable_storage"]:
 | 
						|
        try:
 | 
						|
            ceph_health_colour = ceph_thread_queue.get(
 | 
						|
                timeout=config["keepalive_interval"]
 | 
						|
            )
 | 
						|
            ceph_health = ceph_thread_queue.get(timeout=config["keepalive_interval"])
 | 
						|
            osds_this_node = ceph_thread_queue.get(timeout=config["keepalive_interval"])
 | 
						|
        except Exception:
 | 
						|
            logger.out("Ceph stats queue get exceeded timeout, continuing", state="w")
 | 
						|
            ceph_health_colour = logger.fmt_cyan
 | 
						|
            ceph_health = "UNKNOWN"
 | 
						|
            osds_this_node = "?"
 | 
						|
 | 
						|
    # Set our information in zookeeper
 | 
						|
    keepalive_time = int(time.time())
 | 
						|
    if debug:
 | 
						|
        logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
 | 
						|
    try:
 | 
						|
        zkhandler.write(
 | 
						|
            [
 | 
						|
                (("node.memory.total", this_node.name), str(this_node.memtotal)),
 | 
						|
                (("node.memory.used", this_node.name), str(this_node.memused)),
 | 
						|
                (("node.memory.free", this_node.name), str(this_node.memfree)),
 | 
						|
                (("node.memory.allocated", this_node.name), str(this_node.memalloc)),
 | 
						|
                (("node.memory.provisioned", this_node.name), str(this_node.memprov)),
 | 
						|
                (("node.vcpu.allocated", this_node.name), str(this_node.vcpualloc)),
 | 
						|
                (("node.cpu.load", this_node.name), str(this_node.cpuload)),
 | 
						|
                (
 | 
						|
                    ("node.count.provisioned_domains", this_node.name),
 | 
						|
                    str(this_node.domains_count),
 | 
						|
                ),
 | 
						|
                (
 | 
						|
                    ("node.running_domains", this_node.name),
 | 
						|
                    " ".join(this_node.domain_list),
 | 
						|
                ),
 | 
						|
                (("node.keepalive", this_node.name), str(keepalive_time)),
 | 
						|
            ]
 | 
						|
        )
 | 
						|
    except Exception:
 | 
						|
        logger.out("Failed to set keepalive data", state="e")
 | 
						|
 | 
						|
    # Display node information to the terminal
 | 
						|
    if config["log_keepalives"]:
 | 
						|
        if this_node.router_state == "primary":
 | 
						|
            cst_colour = logger.fmt_green
 | 
						|
        elif this_node.router_state == "secondary":
 | 
						|
            cst_colour = logger.fmt_blue
 | 
						|
        else:
 | 
						|
            cst_colour = logger.fmt_cyan
 | 
						|
        logger.out(
 | 
						|
            "{}{} keepalive @ {}{} [{}{}{}]".format(
 | 
						|
                logger.fmt_purple,
 | 
						|
                config["node_hostname"],
 | 
						|
                datetime.now(),
 | 
						|
                logger.fmt_end,
 | 
						|
                logger.fmt_bold + cst_colour,
 | 
						|
                this_node.router_state,
 | 
						|
                logger.fmt_end,
 | 
						|
            ),
 | 
						|
            state="t",
 | 
						|
        )
 | 
						|
        if config["log_keepalive_cluster_details"]:
 | 
						|
            logger.out(
 | 
						|
                "{bold}Maintenance:{nofmt} {maint}  "
 | 
						|
                "{bold}Active VMs:{nofmt} {domcount}  "
 | 
						|
                "{bold}Networks:{nofmt} {netcount}  "
 | 
						|
                "{bold}Load:{nofmt} {load}  "
 | 
						|
                "{bold}Memory [MiB]: VMs:{nofmt} {allocmem}  "
 | 
						|
                "{bold}Used:{nofmt} {usedmem}  "
 | 
						|
                "{bold}Free:{nofmt} {freemem}".format(
 | 
						|
                    bold=logger.fmt_bold,
 | 
						|
                    nofmt=logger.fmt_end,
 | 
						|
                    maint=this_node.maintenance,
 | 
						|
                    domcount=this_node.domains_count,
 | 
						|
                    netcount=len(zkhandler.children("base.network")),
 | 
						|
                    load=this_node.cpuload,
 | 
						|
                    freemem=this_node.memfree,
 | 
						|
                    usedmem=this_node.memused,
 | 
						|
                    allocmem=this_node.memalloc,
 | 
						|
                ),
 | 
						|
                state="t",
 | 
						|
            )
 | 
						|
        if config["enable_storage"] and config["log_keepalive_storage_details"]:
 | 
						|
            logger.out(
 | 
						|
                "{bold}Ceph cluster status:{nofmt} {health_colour}{health}{nofmt}  "
 | 
						|
                "{bold}Total OSDs:{nofmt} {total_osds}  "
 | 
						|
                "{bold}Node OSDs:{nofmt} {node_osds}  "
 | 
						|
                "{bold}Pools:{nofmt} {total_pools}  ".format(
 | 
						|
                    bold=logger.fmt_bold,
 | 
						|
                    health_colour=ceph_health_colour,
 | 
						|
                    nofmt=logger.fmt_end,
 | 
						|
                    health=ceph_health,
 | 
						|
                    total_osds=len(zkhandler.children("base.osd")),
 | 
						|
                    node_osds=osds_this_node,
 | 
						|
                    total_pools=len(zkhandler.children("base.pool")),
 | 
						|
                ),
 | 
						|
                state="t",
 | 
						|
            )
 | 
						|
 | 
						|
    # Look for dead nodes and fence them
 | 
						|
    if not this_node.maintenance:
 | 
						|
        if debug:
 | 
						|
            logger.out(
 | 
						|
                "Look for dead nodes and fence them", state="d", prefix="main-thread"
 | 
						|
            )
 | 
						|
        if config["daemon_mode"] == "coordinator":
 | 
						|
            for node_name in zkhandler.children("base.node"):
 | 
						|
                try:
 | 
						|
                    node_daemon_state = zkhandler.read(("node.state.daemon", node_name))
 | 
						|
                    node_keepalive = int(zkhandler.read(("node.keepalive", node_name)))
 | 
						|
                except Exception:
 | 
						|
                    node_daemon_state = "unknown"
 | 
						|
                    node_keepalive = 0
 | 
						|
 | 
						|
                # Handle deadtime and fencng if needed
 | 
						|
                # (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds
 | 
						|
                # out-of-date while in 'start' state)
 | 
						|
                node_deadtime = int(time.time()) - (
 | 
						|
                    int(config["keepalive_interval"]) * int(config["fence_intervals"])
 | 
						|
                )
 | 
						|
                if node_keepalive < node_deadtime and node_daemon_state == "run":
 | 
						|
                    logger.out(
 | 
						|
                        "Node {} seems dead - starting monitor for fencing".format(
 | 
						|
                            node_name
 | 
						|
                        ),
 | 
						|
                        state="w",
 | 
						|
                    )
 | 
						|
                    zk_lock = zkhandler.writelock(("node.state.daemon", node_name))
 | 
						|
                    with zk_lock:
 | 
						|
                        # Ensures that, if we lost the lock race and come out of waiting,
 | 
						|
                        # we won't try to trigger our own fence thread.
 | 
						|
                        if zkhandler.read(("node.state.daemon", node_name)) != "dead":
 | 
						|
                            fence_thread = Thread(
 | 
						|
                                target=pvcnoded.util.fencing.fence_node,
 | 
						|
                                args=(node_name, zkhandler, config, logger),
 | 
						|
                                kwargs={},
 | 
						|
                            )
 | 
						|
                            fence_thread.start()
 | 
						|
                            # Write the updated data after we start the fence thread
 | 
						|
                            zkhandler.write(
 | 
						|
                                [(("node.state.daemon", node_name), "dead")]
 | 
						|
                            )
 | 
						|
 | 
						|
    if debug:
 | 
						|
        logger.out("Keepalive finished", state="d", prefix="main-thread")
 |