Move Ceph cluster health reporting to plugin
Also removes several outputs from the normal keepalive that were superfluous/static so that the main output fits on one line.
This commit is contained in:
		
							
								
								
									
										126
									
								
								node-daemon/plugins/ceph
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										126
									
								
								node-daemon/plugins/ceph
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,126 @@
 | 
				
			|||||||
 | 
					#!/usr/bin/env python3
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# ceph.py - PVC Monitoring example plugin for ceph status
 | 
				
			||||||
 | 
					# Part of the Parallel Virtual Cluster (PVC) system
 | 
				
			||||||
 | 
					#
 | 
				
			||||||
 | 
					#    Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
 | 
				
			||||||
 | 
					#
 | 
				
			||||||
 | 
					#    This program is free software: you can redistribute it and/or modify
 | 
				
			||||||
 | 
					#    it under the terms of the GNU General Public License as published by
 | 
				
			||||||
 | 
					#    the Free Software Foundation, version 3.
 | 
				
			||||||
 | 
					#
 | 
				
			||||||
 | 
					#    This program is distributed in the hope that it will be useful,
 | 
				
			||||||
 | 
					#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
				
			||||||
 | 
					#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
				
			||||||
 | 
					#    GNU General Public License for more details.
 | 
				
			||||||
 | 
					#
 | 
				
			||||||
 | 
					#    You should have received a copy of the GNU General Public License
 | 
				
			||||||
 | 
					#    along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
				
			||||||
 | 
					#
 | 
				
			||||||
 | 
					###############################################################################
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# This script provides an example of a PVC monitoring plugin script. It will create
 | 
				
			||||||
 | 
					# a simple plugin to check the Ceph cluster health for anomalies, and return a health
 | 
				
			||||||
 | 
					# delta reflective of the overall Ceph status (HEALTH_WARN = 10, HEALTH_ERR = 50).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# This script can thus be used as an example or reference implementation of a
 | 
				
			||||||
 | 
					# PVC monitoring pluginscript and expanded upon as required.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# A monitoring plugin script must implement the class "MonitoringPluginScript" which
 | 
				
			||||||
 | 
					# extends "MonitoringPlugin", providing the 3 functions indicated. Detailed explanation
 | 
				
			||||||
 | 
					# of the role of each function is provided in context of the example; see the other
 | 
				
			||||||
 | 
					# examples for more potential uses.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# WARNING:
 | 
				
			||||||
 | 
					#
 | 
				
			||||||
 | 
					# This script will run in the context of the node daemon keepalives as root.
 | 
				
			||||||
 | 
					# DO NOT install untrusted, unvetted plugins under any circumstances.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# This import is always required here, as MonitoringPlugin is used by the
 | 
				
			||||||
 | 
					# MonitoringPluginScript class
 | 
				
			||||||
 | 
					from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# A monitoring plugin script must always expose its nice name, which must be identical to
 | 
				
			||||||
 | 
					# the file name
 | 
				
			||||||
 | 
					PLUGIN_NAME = "ceph"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# The MonitoringPluginScript class must be named as such, and extend MonitoringPlugin.
 | 
				
			||||||
 | 
					class MonitoringPluginScript(MonitoringPlugin):
 | 
				
			||||||
 | 
					    def setup(self):
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        setup(): Perform special setup steps during node daemon startup
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        This step is optional and should be used sparingly.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def run(self):
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        run(): Perform the check actions and return a PluginResult object
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Run any imports first
 | 
				
			||||||
 | 
					        from rados import Rados
 | 
				
			||||||
 | 
					        from json import loads, dumps
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Connect to the Ceph cluster
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            ceph_conn = Rados(
 | 
				
			||||||
 | 
					                conffile=self.config["ceph_config_file"],
 | 
				
			||||||
 | 
					                conf=dict(keyring=self.config["ceph_admin_keyring"]),
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            ceph_conn.connect(timeout=1)
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            self.log(f"Failed to connect to Ceph cluster: {e}", state="e")
 | 
				
			||||||
 | 
					            return self.plugin_result
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					        # Get the Ceph cluster health
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            health_status = loads(
 | 
				
			||||||
 | 
					                ceph_conn.mon_command(dumps({"prefix": "health", "format": "json"}), b"", timeout=1)[1]
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            ceph_health = health_status["status"]
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            self.log(f"Failed to get health data from Ceph cluster: {e}", state="e")
 | 
				
			||||||
 | 
					            return self.plugin_result
 | 
				
			||||||
 | 
					        finally:
 | 
				
			||||||
 | 
					            ceph_conn.shutdown()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Get a list of error entries in the health status output
 | 
				
			||||||
 | 
					        error_entries = health_status["checks"].keys()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Set the health delta based on the errors presented
 | 
				
			||||||
 | 
					        if ceph_health == "HEALTH_ERR":
 | 
				
			||||||
 | 
					            health_delta = 50
 | 
				
			||||||
 | 
					            message = f"Ceph cluster in ERROR state: {', '.join(error_entries)}"
 | 
				
			||||||
 | 
					        elif ceph_health == "HEALTH_WARN":
 | 
				
			||||||
 | 
					            health_delta = 10
 | 
				
			||||||
 | 
					            message = f"Ceph cluster in WARNING state: {', '.join(error_entries)}"
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            health_delta = 0
 | 
				
			||||||
 | 
					            message = "Ceph cluster in OK state"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Set the health delta in our local PluginResult object
 | 
				
			||||||
 | 
					        self.plugin_result.set_health_delta(health_delta)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Set the message in our local PluginResult object
 | 
				
			||||||
 | 
					        self.plugin_result.set_message(message)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Set the detailed data in our local PluginResult object
 | 
				
			||||||
 | 
					        self.plugin_result.set_data(dumps(health_status))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Return our local PluginResult object
 | 
				
			||||||
 | 
					        return self.plugin_result
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def cleanup(self):
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        cleanup(): Perform special cleanup steps during node daemon termination
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        This step is optional and should be used sparingly.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        pass
 | 
				
			||||||
@@ -97,29 +97,6 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
        logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e")
 | 
					        logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e")
 | 
				
			||||||
        return
 | 
					        return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if debug:
 | 
					 | 
				
			||||||
        logger.out("Getting health stats from monitor", state="d", prefix="ceph-thread")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # Get Ceph cluster health for local status output
 | 
					 | 
				
			||||||
    command = {"prefix": "health", "format": "json"}
 | 
					 | 
				
			||||||
    try:
 | 
					 | 
				
			||||||
        health_status = json.loads(
 | 
					 | 
				
			||||||
            ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1]
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        ceph_health = health_status["status"]
 | 
					 | 
				
			||||||
    except Exception as e:
 | 
					 | 
				
			||||||
        logger.out("Failed to obtain Ceph health data: {}".format(e), state="e")
 | 
					 | 
				
			||||||
        ceph_health = "HEALTH_UNKN"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    if ceph_health in ["HEALTH_OK"]:
 | 
					 | 
				
			||||||
        ceph_health_colour = logger.fmt_green
 | 
					 | 
				
			||||||
    elif ceph_health in ["HEALTH_UNKN"]:
 | 
					 | 
				
			||||||
        ceph_health_colour = logger.fmt_cyan
 | 
					 | 
				
			||||||
    elif ceph_health in ["HEALTH_WARN"]:
 | 
					 | 
				
			||||||
        ceph_health_colour = logger.fmt_yellow
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        ceph_health_colour = logger.fmt_red
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # Primary-only functions
 | 
					    # Primary-only functions
 | 
				
			||||||
    if this_node.router_state == "primary":
 | 
					    if this_node.router_state == "primary":
 | 
				
			||||||
        if debug:
 | 
					        if debug:
 | 
				
			||||||
@@ -408,8 +385,6 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    ceph_conn.shutdown()
 | 
					    ceph_conn.shutdown()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    queue.put(ceph_health_colour)
 | 
					 | 
				
			||||||
    queue.put(ceph_health)
 | 
					 | 
				
			||||||
    queue.put(osds_this_node)
 | 
					    queue.put(osds_this_node)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if debug:
 | 
					    if debug:
 | 
				
			||||||
@@ -777,16 +752,14 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    if config["enable_storage"]:
 | 
					    if config["enable_storage"]:
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            ceph_health_colour = ceph_thread_queue.get(
 | 
					            osds_this_node = ceph_thread_queue.get(
 | 
				
			||||||
                timeout=config["keepalive_interval"]
 | 
					                timeout=(config["keepalive_interval"] - 1)
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            ceph_health = ceph_thread_queue.get(timeout=config["keepalive_interval"])
 | 
					 | 
				
			||||||
            osds_this_node = ceph_thread_queue.get(timeout=config["keepalive_interval"])
 | 
					 | 
				
			||||||
        except Exception:
 | 
					        except Exception:
 | 
				
			||||||
            logger.out("Ceph stats queue get exceeded timeout, continuing", state="w")
 | 
					            logger.out("Ceph stats queue get exceeded timeout, continuing", state="w")
 | 
				
			||||||
            ceph_health_colour = logger.fmt_cyan
 | 
					 | 
				
			||||||
            ceph_health = "UNKNOWN"
 | 
					 | 
				
			||||||
            osds_this_node = "?"
 | 
					            osds_this_node = "?"
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        osds_this_node = "0"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Set our information in zookeeper
 | 
					    # Set our information in zookeeper
 | 
				
			||||||
    keepalive_time = int(time.time())
 | 
					    keepalive_time = int(time.time())
 | 
				
			||||||
@@ -839,8 +812,8 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
 | 
				
			|||||||
        if config["log_keepalive_cluster_details"]:
 | 
					        if config["log_keepalive_cluster_details"]:
 | 
				
			||||||
            logger.out(
 | 
					            logger.out(
 | 
				
			||||||
                "{bold}Maintenance:{nofmt} {maint}  "
 | 
					                "{bold}Maintenance:{nofmt} {maint}  "
 | 
				
			||||||
                "{bold}Active VMs:{nofmt} {domcount}  "
 | 
					                "{bold}Node VMs:{nofmt} {domcount}  "
 | 
				
			||||||
                "{bold}Networks:{nofmt} {netcount}  "
 | 
					                "{bold}Node OSDs:{nofmt} {osdcount}  "
 | 
				
			||||||
                "{bold}Load:{nofmt} {load}  "
 | 
					                "{bold}Load:{nofmt} {load}  "
 | 
				
			||||||
                "{bold}Memory [MiB]: VMs:{nofmt} {allocmem}  "
 | 
					                "{bold}Memory [MiB]: VMs:{nofmt} {allocmem}  "
 | 
				
			||||||
                "{bold}Used:{nofmt} {usedmem}  "
 | 
					                "{bold}Used:{nofmt} {usedmem}  "
 | 
				
			||||||
@@ -849,7 +822,7 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
 | 
				
			|||||||
                    nofmt=logger.fmt_end,
 | 
					                    nofmt=logger.fmt_end,
 | 
				
			||||||
                    maint=this_node.maintenance,
 | 
					                    maint=this_node.maintenance,
 | 
				
			||||||
                    domcount=this_node.domains_count,
 | 
					                    domcount=this_node.domains_count,
 | 
				
			||||||
                    netcount=len(zkhandler.children("base.network")),
 | 
					                    osdcount=osds_this_node,
 | 
				
			||||||
                    load=this_node.cpuload,
 | 
					                    load=this_node.cpuload,
 | 
				
			||||||
                    freemem=this_node.memfree,
 | 
					                    freemem=this_node.memfree,
 | 
				
			||||||
                    usedmem=this_node.memused,
 | 
					                    usedmem=this_node.memused,
 | 
				
			||||||
@@ -857,22 +830,6 @@ def node_keepalive(logger, config, zkhandler, this_node, monitoring_instance):
 | 
				
			|||||||
                ),
 | 
					                ),
 | 
				
			||||||
                state="t",
 | 
					                state="t",
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
        if config["enable_storage"] and config["log_keepalive_storage_details"]:
 | 
					 | 
				
			||||||
            logger.out(
 | 
					 | 
				
			||||||
                "{bold}Ceph cluster status:{nofmt} {health_colour}{health}{nofmt}  "
 | 
					 | 
				
			||||||
                "{bold}Total OSDs:{nofmt} {total_osds}  "
 | 
					 | 
				
			||||||
                "{bold}Node OSDs:{nofmt} {node_osds}  "
 | 
					 | 
				
			||||||
                "{bold}Pools:{nofmt} {total_pools}  ".format(
 | 
					 | 
				
			||||||
                    bold=logger.fmt_bold,
 | 
					 | 
				
			||||||
                    health_colour=ceph_health_colour,
 | 
					 | 
				
			||||||
                    nofmt=logger.fmt_end,
 | 
					 | 
				
			||||||
                    health=ceph_health,
 | 
					 | 
				
			||||||
                    total_osds=len(zkhandler.children("base.osd")),
 | 
					 | 
				
			||||||
                    node_osds=osds_this_node,
 | 
					 | 
				
			||||||
                    total_pools=len(zkhandler.children("base.pool")),
 | 
					 | 
				
			||||||
                ),
 | 
					 | 
				
			||||||
                state="t",
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Look for dead nodes and fence them
 | 
					    # Look for dead nodes and fence them
 | 
				
			||||||
    if not this_node.maintenance:
 | 
					    if not this_node.maintenance:
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user