diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index 6a1cd3d4..534fb1ce 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -1168,6 +1168,9 @@ class API_Node_Root(Resource): provisioned: type: integer description: The total amount of RAM provisioned to all domains (regardless of state) on this node in MB + interfaces: + type: object + description: Details on speed, bytes, and packets per second of each node physical network interface parameters: - in: query name: limit diff --git a/daemon-common/cluster.py b/daemon-common/cluster.py index adf3cb96..ba5a1ada 100644 --- a/daemon-common/cluster.py +++ b/daemon-common/cluster.py @@ -619,6 +619,54 @@ def get_resource_metrics(zkhandler): output_lines = list() + # + # Network Utilization stats + # + # This is a bit of a doozie. First, for each node, we have to determine the % utilization + # of all the (active) network interface on that node, averaged together. Then we average + # the values of all the nodes together. + # This is very rough, but should give some idea as to the total network bandwidth used + # and available. + all_total_speed = 0 + all_total_util = 0 + all_total_count = 0 + for node in node_data: + if node["daemon_state"] != "run": + continue + + total_speed = 0 + total_util = 0 + total_count = 0 + for iface in node["interfaces"].keys(): + link_state = node["interfaces"][iface]["state"] + if link_state != "up": + continue + + link_speed = node["interfaces"][iface]["link_speed"] * 2 # full-duplex + total_speed += link_speed + + total_bps = node["interfaces"][iface]["total_bps"] + total_util += total_bps + + total_count += 1 + + if total_count > 0: + # Average the speed and util by the count + avg_speed = int(total_speed / total_count) + all_total_speed += avg_speed + avg_util = int(total_util / total_count) + all_total_util += avg_util + + all_total_count += 1 + + if all_total_count > 0: + all_avg_speed = all_total_speed / all_total_count + all_avg_util = all_total_util / all_total_count + + used_network_percentage = all_avg_util / all_avg_speed * 100 + else: + used_network_percentage = 0 + # # Cluster stats # @@ -633,7 +681,15 @@ def get_resource_metrics(zkhandler): total_cpu = sum(node_sorted_cpu[:-2]) used_cpu = sum([n["load"] for n in node_data]) used_cpu_percentage = used_cpu / total_cpu * 100 - output_lines.append(f"pvc_cluster_cpu_utilization {used_cpu_percentage:.2f}") + output_lines.append(f"pvc_cluster_cpu_utilization {used_cpu_percentage:2.2f}") + + output_lines.append( + "# HELP pvc_cluster_network_utilization PVC cluster network utilization percentage" + ) + output_lines.append("# TYPE pvc_cluster_network_utilization gauge") + output_lines.append( + f"pvc_cluster_network_utilization {used_network_percentage:2.2f}" + ) node_sorted_memory = [ n["memory"]["total"] @@ -648,7 +704,7 @@ def get_resource_metrics(zkhandler): ) output_lines.append("# TYPE pvc_cluster_memory_real_utilization gauge") output_lines.append( - f"pvc_cluster_memory_real_utilization {used_memory_percentage:.2f}" + f"pvc_cluster_memory_real_utilization {used_memory_percentage:2.2f}" ) allocated_memory = sum([n["memory"]["allocated"] for n in node_data]) @@ -658,7 +714,7 @@ def get_resource_metrics(zkhandler): ) output_lines.append("# TYPE pvc_cluster_memory_allocated_utilization gauge") output_lines.append( - f"pvc_cluster_memory_allocated_utilization {allocated_memory_percentage:.2f}" + f"pvc_cluster_memory_allocated_utilization {allocated_memory_percentage:2.2f}" ) provisioned_memory = sum([n["memory"]["provisioned"] for n in node_data]) @@ -668,7 +724,7 @@ def get_resource_metrics(zkhandler): ) output_lines.append("# TYPE pvc_cluster_memory_provisioned_utilization gauge") output_lines.append( - f"pvc_cluster_memory_provisioned_utilization {provisioned_memory_percentage:.2f}" + f"pvc_cluster_memory_provisioned_utilization {provisioned_memory_percentage:2.2f}" ) output_lines.append( @@ -685,7 +741,7 @@ def get_resource_metrics(zkhandler): except Exception: continue used_disk_percentage = used_disk / total_disk * 100 - output_lines.append(f"pvc_cluster_disk_utilization {used_disk_percentage:.2f}") + output_lines.append(f"pvc_cluster_disk_utilization {used_disk_percentage:2.2f}") # # Node stats diff --git a/daemon-common/node.py b/daemon-common/node.py index 265b1db7..d0cb9164 100644 --- a/daemon-common/node.py +++ b/daemon-common/node.py @@ -103,6 +103,7 @@ def getNodeInformation(zkhandler, node_name): _node_running_domains, _node_health, _node_health_plugins, + _node_network_stats, ) = zkhandler.read_many( [ ("node.state.daemon", node_name), @@ -121,6 +122,7 @@ def getNodeInformation(zkhandler, node_name): ("node.running_domains", node_name), ("node.monitoring.health", node_name), ("node.monitoring.plugins", node_name), + ("node.network.stats", node_name), ] ) @@ -154,6 +156,8 @@ def getNodeInformation(zkhandler, node_name): zkhandler, node_name, node_health_plugins ) + node_network_stats = json.loads(_node_network_stats) + # Construct a data structure to represent the data node_information = { "name": node_name, @@ -182,6 +186,7 @@ def getNodeInformation(zkhandler, node_name): "used": node_mem_used, "free": node_mem_free, }, + "interfaces": node_network_stats, } return node_information diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index a12fe927..c638d235 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -31,6 +31,7 @@ import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance import pvcnoded.objects.VMInstance as VMInstance import pvcnoded.objects.NodeInstance as NodeInstance import pvcnoded.objects.VXNetworkInstance as VXNetworkInstance +import pvcnoded.objects.NetstatsInstance as NetstatsInstance import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance import pvcnoded.objects.CephInstance as CephInstance @@ -200,9 +201,9 @@ def entrypoint(): # Define a cleanup function def cleanup(failure=False): - nonlocal logger, zkhandler, keepalive_timer, d_domain + nonlocal logger, zkhandler, keepalive_timer, d_domain, netstats - logger.out("Terminating pvcnoded and cleaning up", state="s") + logger.out("Terminating pvcnoded", state="s") # Set shutdown state in Zookeeper zkhandler.write([(("node.state.daemon", config["node_hostname"]), "shutdown")]) @@ -249,12 +250,20 @@ def entrypoint(): except Exception: pass - # Set stop state in Zookeeper - zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")]) + logger.out("Cleaning up", state="s") + + # Stop netstats instance + try: + netstats.shutdown() + except Exception: + pass # Forcibly terminate dnsmasq because it gets stuck sometimes common.run_os_command("killall dnsmasq") + # Set stop state in Zookeeper + zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")]) + # Close the Zookeeper connection try: zkhandler.disconnect(persistent=True) @@ -1000,9 +1009,12 @@ def entrypoint(): state="s", ) + # Set up netstats + netstats = NetstatsInstance.NetstatsInstance(logger, config, zkhandler, this_node) + # Start keepalived thread keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer( - logger, config, zkhandler, this_node + logger, config, zkhandler, this_node, netstats ) # Tick loop; does nothing since everything is async diff --git a/node-daemon/pvcnoded/objects/NetstatsInstance.py b/node-daemon/pvcnoded/objects/NetstatsInstance.py new file mode 100644 index 00000000..cec33a12 --- /dev/null +++ b/node-daemon/pvcnoded/objects/NetstatsInstance.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python3 + +# NetstatsInstance.py - Class implementing a PVC network stats gatherer and run by pvcnoded +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018-2023 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + + +from apscheduler.schedulers.background import BackgroundScheduler +from collections import deque +from json import dumps +from os import walk +from os.path import exists + + +class NetstatsIfaceInstance(object): + """ + NetstatsIfaceInstance + + This class implements a rolling statistics poller for a network interface, + collecting stats on the bits and packets per second in both directions every + second. + + Via the get_stats() function, it returns the rolling average of all 4 values, + as well as totals, over the last 5 seconds (self.avg_samples) as a tuple of: + (rx_bps, rx_pps, tx_bps, tx_pps, total_bps, total_pps, link_speed, state) + """ + + def __init__(self, logger, iface, avg_samples): + """ + Initialize the class instance, creating our BackgroundScheduler, setting + the average sample rate, and creating the deques and average values. + """ + self.logger = logger + self.iface = iface + + self.data_valid = False + self.data_polls = 0 + + self.timer = BackgroundScheduler() + self.timer.add_job(self.gather_stats, trigger="interval", seconds=1) + + self.avg_samples = avg_samples + + self.link_speed = 0 + self.state = "down" + + self.rx_bits_rolling = deque(list(), self.avg_samples + 1) + self.rx_bps = 0 + + self.rx_packets_rolling = deque(list(), self.avg_samples + 1) + self.rx_pps = 0 + + self.tx_bits_rolling = deque(list(), self.avg_samples + 1) + self.tx_bps = 0 + + self.tx_packets_rolling = deque(list(), self.avg_samples + 1) + self.tx_pps = 0 + + self.total_bps = 0 + self.total_pps = 0 + + def get_iface_stats(self): + """ + Reads the interface statistics from the sysfs for the interface. + """ + iface_state_path = f"/sys/class/net/{self.iface}/operstate" + with open(iface_state_path) as stfh: + self.state = stfh.read().strip() + + iface_speed_path = f"/sys/class/net/{self.iface}/speed" + try: + with open(iface_speed_path) as spfh: + # The speed key is always in Mbps so multiply by 1000*1000 to get bps + self.link_speed = int(spfh.read()) * 1000 * 1000 + except OSError: + self.link_speed = 0 + + iface_stats_path = f"/sys/class/net/{self.iface}/statistics" + with open(f"{iface_stats_path}/rx_bytes") as rxbfh: + self.rx_bits_rolling.append(int(rxbfh.read()) * 8) + with open(f"{iface_stats_path}/tx_bytes") as txbfh: + self.tx_bits_rolling.append(int(txbfh.read()) * 8) + with open(f"{iface_stats_path}/rx_packets") as rxpfh: + self.rx_packets_rolling.append(int(rxpfh.read()) * 8) + with open(f"{iface_stats_path}/tx_packets") as txpfh: + self.tx_packets_rolling.append(int(txpfh.read()) * 8) + + def calculate_averages(self): + """ + Calculates the bps/pps values from the rolling values. + """ + + rx_bits_diffs = list() + for sample_idx in range(self.avg_samples, 0, -1): + rx_bits_diffs.append( + self.rx_bits_rolling[sample_idx] - self.rx_bits_rolling[sample_idx - 1] + ) + self.rx_bps = int(sum(rx_bits_diffs) / self.avg_samples) + + rx_packets_diffs = list() + for sample_idx in range(self.avg_samples, 0, -1): + rx_packets_diffs.append( + self.rx_packets_rolling[sample_idx] + - self.rx_packets_rolling[sample_idx - 1] + ) + self.rx_pps = int(sum(rx_packets_diffs) / self.avg_samples) + + tx_bits_diffs = list() + for sample_idx in range(self.avg_samples, 0, -1): + tx_bits_diffs.append( + self.tx_bits_rolling[sample_idx] - self.tx_bits_rolling[sample_idx - 1] + ) + self.tx_bps = int(sum(tx_bits_diffs) / self.avg_samples) + + tx_packets_diffs = list() + for sample_idx in range(self.avg_samples, 0, -1): + tx_packets_diffs.append( + self.tx_packets_rolling[sample_idx] + - self.tx_packets_rolling[sample_idx - 1] + ) + self.tx_pps = int(sum(tx_packets_diffs) / self.avg_samples) + + self.total_bps = self.rx_bps + self.tx_bps + self.total_pps = self.rx_pps + self.tx_pps + + def gather_stats(self): + """ + Gathers the current stats and then calculates the averages. + + Runs via the BackgroundScheduler timer every 1 second. + """ + self.get_iface_stats() + if self.data_valid: + self.calculate_averages() + + # Handle data validity: our data is invalid until we hit enough polls + # to make a valid average (avg_samples plus 1). + if not self.data_valid: + self.data_polls += 1 + if self.data_polls > self.avg_samples: + self.data_valid = True + + def start(self): + """ + Starts the timer. + """ + self.timer.start() + + def stop(self): + """ + Stops the timer. + """ + self.timer.shutdown() + + def get_stats(self): + """ + Returns a tuple of the current statistics. + """ + if not self.data_valid: + return None + + return ( + self.rx_bps, + self.rx_pps, + self.tx_bps, + self.tx_pps, + self.total_bps, + self.total_pps, + self.link_speed, + self.state, + ) + + +class NetstatsInstance(object): + """ + NetstatsInstance + + This class implements a rolling statistics poller for all PHYSICAL network interfaces, + on the system, initializing a NetstatsIfaceInstance for each, as well as handling + value updates into Zookeeper. + """ + + def __init__(self, logger, config, zkhandler, this_node): + """ + Initialize the class instance. + """ + self.logger = logger + self.config = config + self.zkhandler = zkhandler + self.node_name = this_node.name + + self.interfaces = dict() + + self.logger.out( + f"Starting netstats collector ({self.config['keepalive_interval']} second interval)", + state="s", + ) + + self.set_interfaces() + + def shutdown(self): + """ + Stop all pollers and delete the NetstatsIfaceInstance objects + """ + # Empty the network stats object + self.zkhandler.write([(("node.network.stats", self.node_name), dumps({}))]) + + for iface in self.interfaces.keys(): + self.interfaces[iface].stop() + + def set_interfaces(self): + """ + Sets the list of interfaces on the system, and then ensures that each + interface has a NetstatsIfaceInstance assigned to it and polling. + """ + # Get a list of all active interfaces + net_root_path = "/sys/class/net" + all_ifaces = list() + for (_, dirnames, _) in walk(net_root_path): + all_ifaces.extend(dirnames) + all_ifaces.sort() + + self.logger.out( + f"Parsing network list: {all_ifaces}", state="d", prefix="netstats-thread" + ) + + # Add any missing interfaces + for iface in all_ifaces: + if not exists(f"{net_root_path}/{iface}/device"): + # This is not a physical interface; skip it + continue + + if iface not in self.interfaces.keys(): + # Set the number of samples to be equal to the keepalive interval, so that each + # keepalive has a fresh set of data from the last keepalive_interval seconds. + self.interfaces[iface] = NetstatsIfaceInstance( + self.logger, iface, self.config["keepalive_interval"] + ) + self.interfaces[iface].start() + # Remove any superfluous interfaces + for iface in self.interfaces.keys(): + if iface not in all_ifaces: + self.interfaces[iface].stop() + del self.interfaces[iface] + + def set_data(self): + data = dict() + for iface in self.interfaces.keys(): + self.logger.out( + f"Getting data for interface {iface}", + state="d", + prefix="netstats-thread", + ) + iface_stats = self.interfaces[iface].get_stats() + if iface_stats is None: + continue + ( + iface_rx_bps, + iface_rx_pps, + iface_tx_bps, + iface_tx_pps, + iface_total_bps, + iface_total_pps, + iface_link_speed, + iface_state, + ) = iface_stats + data[iface] = { + "rx_bps": iface_rx_bps, + "rx_pps": iface_rx_pps, + "tx_bps": iface_tx_bps, + "tx_pps": iface_tx_pps, + "total_bps": iface_total_bps, + "total_pps": iface_total_pps, + "link_speed": iface_link_speed, + "state": iface_state, + } + + self.zkhandler.write([(("node.network.stats", self.node_name), dumps(data))]) diff --git a/node-daemon/pvcnoded/util/keepalive.py b/node-daemon/pvcnoded/util/keepalive.py index 5cf7a1e5..9b56280d 100644 --- a/node-daemon/pvcnoded/util/keepalive.py +++ b/node-daemon/pvcnoded/util/keepalive.py @@ -51,7 +51,7 @@ libvirt_vm_states = { } -def start_keepalive_timer(logger, config, zkhandler, this_node): +def start_keepalive_timer(logger, config, zkhandler, this_node, netstats): keepalive_interval = config["keepalive_interval"] logger.out( f"Starting keepalive timer ({keepalive_interval} second interval)", state="s" @@ -59,7 +59,7 @@ def start_keepalive_timer(logger, config, zkhandler, this_node): keepalive_timer = BackgroundScheduler() keepalive_timer.add_job( node_keepalive, - args=(logger, config, zkhandler, this_node), + args=(logger, config, zkhandler, this_node, netstats), trigger="interval", seconds=keepalive_interval, ) @@ -684,7 +684,7 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue): # Keepalive update function -def node_keepalive(logger, config, zkhandler, this_node): +def node_keepalive(logger, config, zkhandler, this_node, netstats): debug = config["debug"] # Display node information to the terminal @@ -793,6 +793,10 @@ def node_keepalive(logger, config, zkhandler, this_node): this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024) this_node.cpuload = round(os.getloadavg()[0], 2) + # Get node network statistics via netstats instance + netstats.set_interfaces() + netstats.set_data() + # Join against running threads if config["enable_hypervisor"]: vm_stats_thread.join(timeout=config["keepalive_interval"])