Add node network statistics and utilization values

Adds a new physical network interface stats parser to the node
keepalives, and leverages this information to provide a network
utilization overview in the Prometheus metrics.
This commit is contained in:
Joshua Boniface 2023-12-21 15:12:20 -05:00
parent d2d2a9c617
commit 3e4cc53fdd
6 changed files with 386 additions and 13 deletions

View File

@ -1168,6 +1168,9 @@ class API_Node_Root(Resource):
provisioned: provisioned:
type: integer type: integer
description: The total amount of RAM provisioned to all domains (regardless of state) on this node in MB description: The total amount of RAM provisioned to all domains (regardless of state) on this node in MB
interfaces:
type: object
description: Details on speed, bytes, and packets per second of each node physical network interface
parameters: parameters:
- in: query - in: query
name: limit name: limit

View File

@ -619,6 +619,54 @@ def get_resource_metrics(zkhandler):
output_lines = list() output_lines = list()
#
# Network Utilization stats
#
# This is a bit of a doozie. First, for each node, we have to determine the % utilization
# of all the (active) network interface on that node, averaged together. Then we average
# the values of all the nodes together.
# This is very rough, but should give some idea as to the total network bandwidth used
# and available.
all_total_speed = 0
all_total_util = 0
all_total_count = 0
for node in node_data:
if node["daemon_state"] != "run":
continue
total_speed = 0
total_util = 0
total_count = 0
for iface in node["interfaces"].keys():
link_state = node["interfaces"][iface]["state"]
if link_state != "up":
continue
link_speed = node["interfaces"][iface]["link_speed"] * 2 # full-duplex
total_speed += link_speed
total_bps = node["interfaces"][iface]["total_bps"]
total_util += total_bps
total_count += 1
if total_count > 0:
# Average the speed and util by the count
avg_speed = int(total_speed / total_count)
all_total_speed += avg_speed
avg_util = int(total_util / total_count)
all_total_util += avg_util
all_total_count += 1
if all_total_count > 0:
all_avg_speed = all_total_speed / all_total_count
all_avg_util = all_total_util / all_total_count
used_network_percentage = all_avg_util / all_avg_speed * 100
else:
used_network_percentage = 0
# #
# Cluster stats # Cluster stats
# #
@ -633,7 +681,15 @@ def get_resource_metrics(zkhandler):
total_cpu = sum(node_sorted_cpu[:-2]) total_cpu = sum(node_sorted_cpu[:-2])
used_cpu = sum([n["load"] for n in node_data]) used_cpu = sum([n["load"] for n in node_data])
used_cpu_percentage = used_cpu / total_cpu * 100 used_cpu_percentage = used_cpu / total_cpu * 100
output_lines.append(f"pvc_cluster_cpu_utilization {used_cpu_percentage:.2f}") output_lines.append(f"pvc_cluster_cpu_utilization {used_cpu_percentage:2.2f}")
output_lines.append(
"# HELP pvc_cluster_network_utilization PVC cluster network utilization percentage"
)
output_lines.append("# TYPE pvc_cluster_network_utilization gauge")
output_lines.append(
f"pvc_cluster_network_utilization {used_network_percentage:2.2f}"
)
node_sorted_memory = [ node_sorted_memory = [
n["memory"]["total"] n["memory"]["total"]
@ -648,7 +704,7 @@ def get_resource_metrics(zkhandler):
) )
output_lines.append("# TYPE pvc_cluster_memory_real_utilization gauge") output_lines.append("# TYPE pvc_cluster_memory_real_utilization gauge")
output_lines.append( output_lines.append(
f"pvc_cluster_memory_real_utilization {used_memory_percentage:.2f}" f"pvc_cluster_memory_real_utilization {used_memory_percentage:2.2f}"
) )
allocated_memory = sum([n["memory"]["allocated"] for n in node_data]) allocated_memory = sum([n["memory"]["allocated"] for n in node_data])
@ -658,7 +714,7 @@ def get_resource_metrics(zkhandler):
) )
output_lines.append("# TYPE pvc_cluster_memory_allocated_utilization gauge") output_lines.append("# TYPE pvc_cluster_memory_allocated_utilization gauge")
output_lines.append( output_lines.append(
f"pvc_cluster_memory_allocated_utilization {allocated_memory_percentage:.2f}" f"pvc_cluster_memory_allocated_utilization {allocated_memory_percentage:2.2f}"
) )
provisioned_memory = sum([n["memory"]["provisioned"] for n in node_data]) provisioned_memory = sum([n["memory"]["provisioned"] for n in node_data])
@ -668,7 +724,7 @@ def get_resource_metrics(zkhandler):
) )
output_lines.append("# TYPE pvc_cluster_memory_provisioned_utilization gauge") output_lines.append("# TYPE pvc_cluster_memory_provisioned_utilization gauge")
output_lines.append( output_lines.append(
f"pvc_cluster_memory_provisioned_utilization {provisioned_memory_percentage:.2f}" f"pvc_cluster_memory_provisioned_utilization {provisioned_memory_percentage:2.2f}"
) )
output_lines.append( output_lines.append(
@ -685,7 +741,7 @@ def get_resource_metrics(zkhandler):
except Exception: except Exception:
continue continue
used_disk_percentage = used_disk / total_disk * 100 used_disk_percentage = used_disk / total_disk * 100
output_lines.append(f"pvc_cluster_disk_utilization {used_disk_percentage:.2f}") output_lines.append(f"pvc_cluster_disk_utilization {used_disk_percentage:2.2f}")
# #
# Node stats # Node stats

View File

@ -103,6 +103,7 @@ def getNodeInformation(zkhandler, node_name):
_node_running_domains, _node_running_domains,
_node_health, _node_health,
_node_health_plugins, _node_health_plugins,
_node_network_stats,
) = zkhandler.read_many( ) = zkhandler.read_many(
[ [
("node.state.daemon", node_name), ("node.state.daemon", node_name),
@ -121,6 +122,7 @@ def getNodeInformation(zkhandler, node_name):
("node.running_domains", node_name), ("node.running_domains", node_name),
("node.monitoring.health", node_name), ("node.monitoring.health", node_name),
("node.monitoring.plugins", node_name), ("node.monitoring.plugins", node_name),
("node.network.stats", node_name),
] ]
) )
@ -154,6 +156,8 @@ def getNodeInformation(zkhandler, node_name):
zkhandler, node_name, node_health_plugins zkhandler, node_name, node_health_plugins
) )
node_network_stats = json.loads(_node_network_stats)
# Construct a data structure to represent the data # Construct a data structure to represent the data
node_information = { node_information = {
"name": node_name, "name": node_name,
@ -182,6 +186,7 @@ def getNodeInformation(zkhandler, node_name):
"used": node_mem_used, "used": node_mem_used,
"free": node_mem_free, "free": node_mem_free,
}, },
"interfaces": node_network_stats,
} }
return node_information return node_information

View File

@ -31,6 +31,7 @@ import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance
import pvcnoded.objects.VMInstance as VMInstance import pvcnoded.objects.VMInstance as VMInstance
import pvcnoded.objects.NodeInstance as NodeInstance import pvcnoded.objects.NodeInstance as NodeInstance
import pvcnoded.objects.VXNetworkInstance as VXNetworkInstance import pvcnoded.objects.VXNetworkInstance as VXNetworkInstance
import pvcnoded.objects.NetstatsInstance as NetstatsInstance
import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance
import pvcnoded.objects.CephInstance as CephInstance import pvcnoded.objects.CephInstance as CephInstance
@ -200,9 +201,9 @@ def entrypoint():
# Define a cleanup function # Define a cleanup function
def cleanup(failure=False): def cleanup(failure=False):
nonlocal logger, zkhandler, keepalive_timer, d_domain nonlocal logger, zkhandler, keepalive_timer, d_domain, netstats
logger.out("Terminating pvcnoded and cleaning up", state="s") logger.out("Terminating pvcnoded", state="s")
# Set shutdown state in Zookeeper # Set shutdown state in Zookeeper
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "shutdown")]) zkhandler.write([(("node.state.daemon", config["node_hostname"]), "shutdown")])
@ -249,12 +250,20 @@ def entrypoint():
except Exception: except Exception:
pass pass
# Set stop state in Zookeeper logger.out("Cleaning up", state="s")
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
# Stop netstats instance
try:
netstats.shutdown()
except Exception:
pass
# Forcibly terminate dnsmasq because it gets stuck sometimes # Forcibly terminate dnsmasq because it gets stuck sometimes
common.run_os_command("killall dnsmasq") common.run_os_command("killall dnsmasq")
# Set stop state in Zookeeper
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
# Close the Zookeeper connection # Close the Zookeeper connection
try: try:
zkhandler.disconnect(persistent=True) zkhandler.disconnect(persistent=True)
@ -1000,9 +1009,12 @@ def entrypoint():
state="s", state="s",
) )
# Set up netstats
netstats = NetstatsInstance.NetstatsInstance(logger, config, zkhandler, this_node)
# Start keepalived thread # Start keepalived thread
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer( keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(
logger, config, zkhandler, this_node logger, config, zkhandler, this_node, netstats
) )
# Tick loop; does nothing since everything is async # Tick loop; does nothing since everything is async

View File

@ -0,0 +1,293 @@
#!/usr/bin/env python3
# NetstatsInstance.py - Class implementing a PVC network stats gatherer and run by pvcnoded
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2023 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
from apscheduler.schedulers.background import BackgroundScheduler
from collections import deque
from json import dumps
from os import walk
from os.path import exists
class NetstatsIfaceInstance(object):
"""
NetstatsIfaceInstance
This class implements a rolling statistics poller for a network interface,
collecting stats on the bits and packets per second in both directions every
second.
Via the get_stats() function, it returns the rolling average of all 4 values,
as well as totals, over the last 5 seconds (self.avg_samples) as a tuple of:
(rx_bps, rx_pps, tx_bps, tx_pps, total_bps, total_pps, link_speed, state)
"""
def __init__(self, logger, iface, avg_samples):
"""
Initialize the class instance, creating our BackgroundScheduler, setting
the average sample rate, and creating the deques and average values.
"""
self.logger = logger
self.iface = iface
self.data_valid = False
self.data_polls = 0
self.timer = BackgroundScheduler()
self.timer.add_job(self.gather_stats, trigger="interval", seconds=1)
self.avg_samples = avg_samples
self.link_speed = 0
self.state = "down"
self.rx_bits_rolling = deque(list(), self.avg_samples + 1)
self.rx_bps = 0
self.rx_packets_rolling = deque(list(), self.avg_samples + 1)
self.rx_pps = 0
self.tx_bits_rolling = deque(list(), self.avg_samples + 1)
self.tx_bps = 0
self.tx_packets_rolling = deque(list(), self.avg_samples + 1)
self.tx_pps = 0
self.total_bps = 0
self.total_pps = 0
def get_iface_stats(self):
"""
Reads the interface statistics from the sysfs for the interface.
"""
iface_state_path = f"/sys/class/net/{self.iface}/operstate"
with open(iface_state_path) as stfh:
self.state = stfh.read().strip()
iface_speed_path = f"/sys/class/net/{self.iface}/speed"
try:
with open(iface_speed_path) as spfh:
# The speed key is always in Mbps so multiply by 1000*1000 to get bps
self.link_speed = int(spfh.read()) * 1000 * 1000
except OSError:
self.link_speed = 0
iface_stats_path = f"/sys/class/net/{self.iface}/statistics"
with open(f"{iface_stats_path}/rx_bytes") as rxbfh:
self.rx_bits_rolling.append(int(rxbfh.read()) * 8)
with open(f"{iface_stats_path}/tx_bytes") as txbfh:
self.tx_bits_rolling.append(int(txbfh.read()) * 8)
with open(f"{iface_stats_path}/rx_packets") as rxpfh:
self.rx_packets_rolling.append(int(rxpfh.read()) * 8)
with open(f"{iface_stats_path}/tx_packets") as txpfh:
self.tx_packets_rolling.append(int(txpfh.read()) * 8)
def calculate_averages(self):
"""
Calculates the bps/pps values from the rolling values.
"""
rx_bits_diffs = list()
for sample_idx in range(self.avg_samples, 0, -1):
rx_bits_diffs.append(
self.rx_bits_rolling[sample_idx] - self.rx_bits_rolling[sample_idx - 1]
)
self.rx_bps = int(sum(rx_bits_diffs) / self.avg_samples)
rx_packets_diffs = list()
for sample_idx in range(self.avg_samples, 0, -1):
rx_packets_diffs.append(
self.rx_packets_rolling[sample_idx]
- self.rx_packets_rolling[sample_idx - 1]
)
self.rx_pps = int(sum(rx_packets_diffs) / self.avg_samples)
tx_bits_diffs = list()
for sample_idx in range(self.avg_samples, 0, -1):
tx_bits_diffs.append(
self.tx_bits_rolling[sample_idx] - self.tx_bits_rolling[sample_idx - 1]
)
self.tx_bps = int(sum(tx_bits_diffs) / self.avg_samples)
tx_packets_diffs = list()
for sample_idx in range(self.avg_samples, 0, -1):
tx_packets_diffs.append(
self.tx_packets_rolling[sample_idx]
- self.tx_packets_rolling[sample_idx - 1]
)
self.tx_pps = int(sum(tx_packets_diffs) / self.avg_samples)
self.total_bps = self.rx_bps + self.tx_bps
self.total_pps = self.rx_pps + self.tx_pps
def gather_stats(self):
"""
Gathers the current stats and then calculates the averages.
Runs via the BackgroundScheduler timer every 1 second.
"""
self.get_iface_stats()
if self.data_valid:
self.calculate_averages()
# Handle data validity: our data is invalid until we hit enough polls
# to make a valid average (avg_samples plus 1).
if not self.data_valid:
self.data_polls += 1
if self.data_polls > self.avg_samples:
self.data_valid = True
def start(self):
"""
Starts the timer.
"""
self.timer.start()
def stop(self):
"""
Stops the timer.
"""
self.timer.shutdown()
def get_stats(self):
"""
Returns a tuple of the current statistics.
"""
if not self.data_valid:
return None
return (
self.rx_bps,
self.rx_pps,
self.tx_bps,
self.tx_pps,
self.total_bps,
self.total_pps,
self.link_speed,
self.state,
)
class NetstatsInstance(object):
"""
NetstatsInstance
This class implements a rolling statistics poller for all PHYSICAL network interfaces,
on the system, initializing a NetstatsIfaceInstance for each, as well as handling
value updates into Zookeeper.
"""
def __init__(self, logger, config, zkhandler, this_node):
"""
Initialize the class instance.
"""
self.logger = logger
self.config = config
self.zkhandler = zkhandler
self.node_name = this_node.name
self.interfaces = dict()
self.logger.out(
f"Starting netstats collector ({self.config['keepalive_interval']} second interval)",
state="s",
)
self.set_interfaces()
def shutdown(self):
"""
Stop all pollers and delete the NetstatsIfaceInstance objects
"""
# Empty the network stats object
self.zkhandler.write([(("node.network.stats", self.node_name), dumps({}))])
for iface in self.interfaces.keys():
self.interfaces[iface].stop()
def set_interfaces(self):
"""
Sets the list of interfaces on the system, and then ensures that each
interface has a NetstatsIfaceInstance assigned to it and polling.
"""
# Get a list of all active interfaces
net_root_path = "/sys/class/net"
all_ifaces = list()
for (_, dirnames, _) in walk(net_root_path):
all_ifaces.extend(dirnames)
all_ifaces.sort()
self.logger.out(
f"Parsing network list: {all_ifaces}", state="d", prefix="netstats-thread"
)
# Add any missing interfaces
for iface in all_ifaces:
if not exists(f"{net_root_path}/{iface}/device"):
# This is not a physical interface; skip it
continue
if iface not in self.interfaces.keys():
# Set the number of samples to be equal to the keepalive interval, so that each
# keepalive has a fresh set of data from the last keepalive_interval seconds.
self.interfaces[iface] = NetstatsIfaceInstance(
self.logger, iface, self.config["keepalive_interval"]
)
self.interfaces[iface].start()
# Remove any superfluous interfaces
for iface in self.interfaces.keys():
if iface not in all_ifaces:
self.interfaces[iface].stop()
del self.interfaces[iface]
def set_data(self):
data = dict()
for iface in self.interfaces.keys():
self.logger.out(
f"Getting data for interface {iface}",
state="d",
prefix="netstats-thread",
)
iface_stats = self.interfaces[iface].get_stats()
if iface_stats is None:
continue
(
iface_rx_bps,
iface_rx_pps,
iface_tx_bps,
iface_tx_pps,
iface_total_bps,
iface_total_pps,
iface_link_speed,
iface_state,
) = iface_stats
data[iface] = {
"rx_bps": iface_rx_bps,
"rx_pps": iface_rx_pps,
"tx_bps": iface_tx_bps,
"tx_pps": iface_tx_pps,
"total_bps": iface_total_bps,
"total_pps": iface_total_pps,
"link_speed": iface_link_speed,
"state": iface_state,
}
self.zkhandler.write([(("node.network.stats", self.node_name), dumps(data))])

View File

@ -51,7 +51,7 @@ libvirt_vm_states = {
} }
def start_keepalive_timer(logger, config, zkhandler, this_node): def start_keepalive_timer(logger, config, zkhandler, this_node, netstats):
keepalive_interval = config["keepalive_interval"] keepalive_interval = config["keepalive_interval"]
logger.out( logger.out(
f"Starting keepalive timer ({keepalive_interval} second interval)", state="s" f"Starting keepalive timer ({keepalive_interval} second interval)", state="s"
@ -59,7 +59,7 @@ def start_keepalive_timer(logger, config, zkhandler, this_node):
keepalive_timer = BackgroundScheduler() keepalive_timer = BackgroundScheduler()
keepalive_timer.add_job( keepalive_timer.add_job(
node_keepalive, node_keepalive,
args=(logger, config, zkhandler, this_node), args=(logger, config, zkhandler, this_node, netstats),
trigger="interval", trigger="interval",
seconds=keepalive_interval, seconds=keepalive_interval,
) )
@ -684,7 +684,7 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
# Keepalive update function # Keepalive update function
def node_keepalive(logger, config, zkhandler, this_node): def node_keepalive(logger, config, zkhandler, this_node, netstats):
debug = config["debug"] debug = config["debug"]
# Display node information to the terminal # Display node information to the terminal
@ -793,6 +793,10 @@ def node_keepalive(logger, config, zkhandler, this_node):
this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024) this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024)
this_node.cpuload = round(os.getloadavg()[0], 2) this_node.cpuload = round(os.getloadavg()[0], 2)
# Get node network statistics via netstats instance
netstats.set_interfaces()
netstats.set_data()
# Join against running threads # Join against running threads
if config["enable_hypervisor"]: if config["enable_hypervisor"]:
vm_stats_thread.join(timeout=config["keepalive_interval"]) vm_stats_thread.join(timeout=config["keepalive_interval"])