Compare commits

..

20 Commits

Author SHA1 Message Date
5e0b0080d9 Bump version to 0.9.87 2023-12-27 13:21:28 -05:00
572596c575 Fix missing f-string placeholder 2023-12-27 13:21:20 -05:00
e654fbba08 Move debug condition handling to Logger
Avoids many dozens of conditionals sprinkled throughout the code by
centralizing this check into the main Logger instance.
2023-12-27 13:01:45 -05:00
52bf5ad0ef Update store_path set location
Prevents a bug if no cluster is selected while doing connection list
commands.
2023-12-27 12:42:19 -05:00
576afc1e94 Update Grafana dashboard layouts 2023-12-27 12:24:46 -05:00
4375f66793 Use proper get() for invalid values 2023-12-27 12:03:48 -05:00
3df3ca5b44 Fix value for OSD utilization
Ceph provides in KB; convert to bytes.
2023-12-27 11:56:50 -05:00
cb3c2cd86d Adjust name of PVC cluster dashboard 2023-12-27 11:42:58 -05:00
d0de4f1825 Update Grafana dashboard to overview
Adds resource utilization in addition to health.
2023-12-27 11:38:39 -05:00
494c20263d Move monitoring folder to top level 2023-12-27 11:37:49 -05:00
431ee69620 Use proper percentage for pool util 2023-12-27 10:03:00 -05:00
88f4d79d5a Handle invalid values on older Libvirt versions 2023-12-27 09:51:24 -05:00
84d22751d8 Fix bad JSON data handler 2023-12-27 09:43:37 -05:00
40ff005a09 Fix handling of Ceph OSD bytes 2023-12-26 12:43:51 -05:00
ab4ec7a5fa Remove WebUI from README 2023-12-25 02:48:44 -05:00
9604f655d0 Improve node utilization metrics and fix bugs 2023-12-25 02:47:41 -05:00
3e4cc53fdd Add node network statistics and utilization values
Adds a new physical network interface stats parser to the node
keepalives, and leverages this information to provide a network
utilization overview in the Prometheus metrics.
2023-12-21 15:45:01 -05:00
d2d2a9c617 Include our newline atomically
Sometimes clashing log entries would print on the same line, likely due
to some sort of race condition in Python's print() built-in.

Instead, add a newline to our actual message and print without an end
character. This ensures atomic printing of our log messages.
2023-12-21 13:12:43 -05:00
6ed4efad33 Add new network.stats key to nodes 2023-12-21 12:48:48 -05:00
39f9f3640c Rename health metrics and add resource metrics 2023-12-21 09:40:49 -05:00
33 changed files with 8830 additions and 2921 deletions

View File

@ -8,7 +8,7 @@
ignore = W503, E501, F403, F405
extend-ignore = E203
# We exclude the Debian, migrations, and provisioner examples
exclude = debian,api-daemon/migrations/versions,api-daemon/provisioner/examples,node-daemon/monitoring
exclude = debian,monitoring,api-daemon/migrations/versions,api-daemon/provisioner/examples
# Set the max line length to 88 for Black
max-line-length = 88

View File

@ -1 +1 @@
0.9.86
0.9.87

View File

@ -1,5 +1,13 @@
## PVC Changelog
###### [v0.9.87](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.87)
* [API Daemon] Adds cluster Prometheus resource utilization metrics and an updated Grafana dashboard.
* [Node Daemon] Adds network traffic rate calculation subsystem.
* [All Daemons] Fixes a printing bug where newlines were not added atomically.
* [CLI Client] Fixes a bug listing connections if no default is specified.
* [All Daemons] Simplifies debug logging conditionals by moving into the Logger instance itself.
###### [v0.9.86](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.86)
* [API Daemon] Significantly improves the performance of several commands via async Zookeeper calls and removal of superfluous backend calls.

View File

@ -19,7 +19,7 @@ As a consequence of its features, PVC makes administrating very high-uptime VMs
PVC also features an optional, fully customizable VM provisioning framework, designed to automate and simplify VM deployments using custom provisioning profiles, scripts, and CloudInit userdata API support.
Installation of PVC is accomplished by two main components: a [Node installer ISO](https://github.com/parallelvirtualcluster/pvc-installer) which creates on-demand installer ISOs, and an [Ansible role framework](https://github.com/parallelvirtualcluster/pvc-ansible) to configure, bootstrap, and administrate the nodes. Installation can also be fully automated with a companion [cluster bootstrapping system](https://github.com/parallelvirtualcluster/pvc-bootstrap). Once up, the cluster is managed via an HTTP REST API, accessible via a Python Click CLI client or WebUI.
Installation of PVC is accomplished by two main components: a [Node installer ISO](https://github.com/parallelvirtualcluster/pvc-installer) which creates on-demand installer ISOs, and an [Ansible role framework](https://github.com/parallelvirtualcluster/pvc-ansible) to configure, bootstrap, and administrate the nodes. Installation can also be fully automated with a companion [cluster bootstrapping system](https://github.com/parallelvirtualcluster/pvc-bootstrap). Once up, the cluster is managed via an HTTP REST API, accessible via a Python Click CLI client ~~or WebUI~~ (eventually).
Just give it physical servers, and it will run your VMs without you having to think about it, all in just an hour or two of setup time.

View File

@ -27,7 +27,7 @@ from distutils.util import strtobool as dustrtobool
import daemon_lib.config as cfg
# Daemon version
version = "0.9.86"
version = "0.9.87"
# API version
API_VERSION = 1.0

View File

@ -640,14 +640,15 @@ class API_Metrics(Resource):
400:
description: Bad request
"""
cluster_output, cluster_retcode = api_helper.cluster_metrics()
health_output, health_retcode = api_helper.cluster_health_metrics()
resource_output, resource_retcode = api_helper.cluster_resource_metrics()
ceph_output, ceph_retcode = api_helper.ceph_metrics()
if cluster_retcode != 200 or ceph_retcode != 200:
if health_retcode != 200 or resource_retcode != 200 or ceph_retcode != 200:
output = "Error: Failed to obtain data"
retcode = 400
else:
output = cluster_output + ceph_output
output = health_output + resource_output + ceph_output
retcode = 200
response = flask.make_response(output, retcode)
@ -658,11 +659,11 @@ class API_Metrics(Resource):
api.add_resource(API_Metrics, "/metrics")
# /metrics/pvc
class API_Metrics_PVC(Resource):
# /metrics/health
class API_Metrics_Health(Resource):
def get(self):
"""
Return the current PVC cluster status in Prometheus-compatible metrics format
Return the current PVC cluster health status in Prometheus-compatible metrics format
Endpoint is unauthenticated to allow metrics exfiltration without having to deal
with the Prometheus compatibility later.
@ -675,13 +676,13 @@ class API_Metrics_PVC(Resource):
400:
description: Bad request
"""
cluster_output, cluster_retcode = api_helper.cluster_metrics()
health_output, health_retcode = api_helper.cluster_health_metrics()
if cluster_retcode != 200:
if health_retcode != 200:
output = "Error: Failed to obtain data"
retcode = 400
else:
output = cluster_output
output = health_output
retcode = 200
response = flask.make_response(output, retcode)
@ -689,7 +690,41 @@ class API_Metrics_PVC(Resource):
return response
api.add_resource(API_Metrics_PVC, "/metrics/pvc")
api.add_resource(API_Metrics_Health, "/metrics/health")
# /metrics/resource
class API_Metrics_Resource(Resource):
def get(self):
"""
Return the current PVC cluster resource utilizations in Prometheus-compatible metrics format
Endpoint is unauthenticated to allow metrics exfiltration without having to deal
with the Prometheus compatibility later.
---
tags:
- root
responses:
200:
description: OK
400:
description: Bad request
"""
resource_output, resource_retcode = api_helper.cluster_resource_metrics()
if resource_retcode != 200:
output = "Error: Failed to obtain data"
retcode = 400
else:
output = resource_output
retcode = 200
response = flask.make_response(output, retcode)
response.mimetype = "text/plain"
return response
api.add_resource(API_Metrics_Resource, "/metrics/resource")
# /metrics/ceph
@ -1133,6 +1168,9 @@ class API_Node_Root(Resource):
provisioned:
type: integer
description: The total amount of RAM provisioned to all domains (regardless of state) on this node in MB
interfaces:
type: object
description: Details on speed, bytes, and packets per second of each node physical network interface
parameters:
- in: query
name: limit

View File

@ -126,12 +126,27 @@ def cluster_maintenance(zkhandler, maint_state="false"):
#
@pvc_common.Profiler(config)
@ZKConnection(config)
def cluster_metrics(zkhandler):
def cluster_health_metrics(zkhandler):
"""
Format status data from cluster_status into Prometheus-compatible metrics
Get cluster-wide Prometheus metrics for health
"""
retflag, retdata = pvc_cluster.get_metrics(zkhandler)
retflag, retdata = pvc_cluster.get_health_metrics(zkhandler)
if retflag:
retcode = 200
else:
retcode = 400
return retdata, retcode
@pvc_common.Profiler(config)
@ZKConnection(config)
def cluster_resource_metrics(zkhandler):
"""
Get cluster-wide Prometheus metrics for resource utilization
"""
retflag, retdata = pvc_cluster.get_resource_metrics(zkhandler)
if retflag:
retcode = 200
else:

View File

@ -6116,13 +6116,14 @@ def cli(
else:
CLI_CONFIG = get_config(store_data, _connection)
CLI_CONFIG["store_path"] = store_path
if not CLI_CONFIG.get("badcfg", None):
CLI_CONFIG["debug"] = _debug
CLI_CONFIG["unsafe"] = _unsafe
CLI_CONFIG["colour"] = _colour
CLI_CONFIG["quiet"] = _quiet
CLI_CONFIG["silent"] = _silent
CLI_CONFIG["store_path"] = store_path
audit()

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name="pvc",
version="0.9.86",
version="0.9.87",
packages=["pvc.cli", "pvc.lib"],
install_requires=[
"Click",

View File

@ -123,13 +123,13 @@ def format_bytes_tohuman(databytes):
def format_bytes_fromhuman(datahuman):
if not re.search(r"[A-Za-z]+", datahuman):
dataunit = "B"
datasize = int(datahuman)
datasize = float(datahuman)
else:
dataunit = str(re.match(r"[0-9]+([A-Za-z])[iBb]*", datahuman).group(1))
datasize = int(re.match(r"([0-9]+)[A-Za-z]+", datahuman).group(1))
dataunit = str(re.match(r"[0-9\.]+([A-Za-z])[iBb]*", datahuman).group(1))
datasize = float(re.match(r"([0-9\.]+)[A-Za-z]+", datahuman).group(1))
if byte_unit_matrix.get(dataunit):
databytes = datasize * byte_unit_matrix[dataunit]
if byte_unit_matrix.get(dataunit.upper()):
databytes = int(datasize * byte_unit_matrix[dataunit.upper()])
return databytes
else:
return None
@ -155,7 +155,7 @@ def format_ops_fromhuman(datahuman):
# Trim off human-readable character
dataunit = datahuman[-1]
datasize = int(datahuman[:-1])
dataops = datasize * ops_unit_matrix[dataunit]
dataops = datasize * ops_unit_matrix[dataunit.upper()]
return "{}".format(dataops)

File diff suppressed because it is too large Load Diff

View File

@ -115,6 +115,10 @@ class Logger(object):
# Output function
def out(self, message, state=None, prefix=""):
# Only handle d-state (debug) messages if we're in debug mode
if state in ["d"] and not self.config["debug"]:
return
# Get the date
if self.config["log_dates"]:
date = "{} ".format(datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f"))
@ -146,7 +150,7 @@ class Logger(object):
if self.config["stdout_logging"]:
# Assemble output string
output = colour + prompt + endc + date + prefix + message
print(output)
print(output + "\n", end="")
# Log to file
if self.config["file_logging"]:

View File

@ -0,0 +1 @@
{"version": "12", "root": "", "base": {"root": "", "schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.primary_node.sync_lock": "/config/primary_node/sync_lock", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "logs": "/logs", "faults": "/faults", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.health": "/ceph/health", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "logs": {"node": "", "messages": "/messages"}, "faults": {"id": "", "last_time": "/last_time", "first_time": "/first_time", "ack_time": "/ack_time", "status": "/status", "delta": "/delta", "message": "/message"}, "node": {"name": "", "keepalive": "/keepalive", "mode": "/daemonmode", "data.active_schema": "/activeschema", "data.latest_schema": "/latestschema", "data.static": "/staticdata", "data.pvc_version": "/pvcversion", "running_domains": "/runningdomains", "count.provisioned_domains": "/domainscount", "count.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "cpu.load": "/cpuload", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword", "sriov": "/sriov", "sriov.pf": "/sriov/pf", "sriov.vf": "/sriov/vf", "monitoring.plugins": "/monitoring_plugins", "monitoring.data": "/monitoring_data", "monitoring.health": "/monitoring_health", "network.stats": "/network_stats"}, "monitoring_plugin": {"name": "", "last_run": "/last_run", "health_delta": "/health_delta", "message": "/message", "data": "/data", "runtime": "/runtime"}, "sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, "sriov_vf": {"phy": "", "pf": "/pf", "mtu": "/mtu", "mac": "/mac", "phy_mac": "/phy_mac", "config": "/config", "config.vlan_id": "/config/vlan_id", "config.vlan_qos": "/config/vlan_qos", "config.tx_rate_min": "/config/tx_rate_min", "config.tx_rate_max": "/config/tx_rate_max", "config.spoof_check": "/config/spoof_check", "config.link_state": "/config/link_state", "config.trust": "/config/trust", "config.query_rss": "/config/query_rss", "pci": "/pci", "pci.domain": "/pci/domain", "pci.bus": "/pci/bus", "pci.slot": "/pci/slot", "pci.function": "/pci/function", "used": "/used", "used_by": "/used_by"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "storage.volumes": "/rbdlist", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit", "meta.tags": "/tags", "migrate.sync_lock": "/migrate_sync_lock"}, "tag": {"name": "", "type": "/type", "protected": "/protected"}, "network": {"vni": "", "type": "/nettype", "mtu": "/mtu", "rule": "/firewall_rules", "rule.in": "/firewall_rules/in", "rule.out": "/firewall_rules/out", "nameservers": "/name_servers", "domain": "/domain", "reservation": "/dhcp4_reservations", "lease": "/dhcp4_leases", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "reservation": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname"}, "lease": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname", "expiry": "/expiry", "client_id": "/clientid"}, "rule": {"description": "", "rule": "/rule", "order": "/order"}, "osd": {"id": "", "node": "/node", "device": "/device", "db_device": "/db_device", "fsid": "/fsid", "ofsid": "/fsid/osd", "cfsid": "/fsid/cluster", "lvm": "/lvm", "vg": "/lvm/vg", "lv": "/lvm/lv", "is_split": "/is_split", "stats": "/stats"}, "pool": {"name": "", "pgs": "/pgs", "tier": "/tier", "stats": "/stats"}, "volume": {"name": "", "stats": "/stats"}, "snapshot": {"name": "", "stats": "/stats"}}

View File

@ -103,6 +103,7 @@ def getNodeInformation(zkhandler, node_name):
_node_running_domains,
_node_health,
_node_health_plugins,
_node_network_stats,
) = zkhandler.read_many(
[
("node.state.daemon", node_name),
@ -121,6 +122,7 @@ def getNodeInformation(zkhandler, node_name):
("node.running_domains", node_name),
("node.monitoring.health", node_name),
("node.monitoring.plugins", node_name),
("node.network.stats", node_name),
]
)
@ -154,6 +156,11 @@ def getNodeInformation(zkhandler, node_name):
zkhandler, node_name, node_health_plugins
)
if _node_network_stats is not None:
node_network_stats = json.loads(_node_network_stats)
else:
node_network_stats = dict()
# Construct a data structure to represent the data
node_information = {
"name": node_name,
@ -182,6 +189,7 @@ def getNodeInformation(zkhandler, node_name):
"used": node_mem_used,
"free": node_mem_free,
},
"interfaces": node_network_stats,
}
return node_information

View File

@ -572,7 +572,7 @@ class ZKHandler(object):
#
class ZKSchema(object):
# Current version
_version = 11
_version = 12
# Root for doing nested keys
_schema_root = ""
@ -651,6 +651,7 @@ class ZKSchema(object):
"monitoring.plugins": "/monitoring_plugins",
"monitoring.data": "/monitoring_data",
"monitoring.health": "/monitoring_health",
"network.stats": "/network_stats",
},
# The schema of an individual monitoring plugin data entry (/nodes/{node_name}/monitoring_data/{plugin})
"monitoring_plugin": {

10
debian/changelog vendored
View File

@ -1,3 +1,13 @@
pvc (0.9.87-0) unstable; urgency=high
* [API Daemon] Adds cluster Prometheus resource utilization metrics and an updated Grafana dashboard.
* [Node Daemon] Adds network traffic rate calculation subsystem.
* [All Daemons] Fixes a printing bug where newlines were not added atomically.
* [CLI Client] Fixes a bug listing connections if no default is specified.
* [All Daemons] Simplifies debug logging conditionals by moving into the Logger instance itself.
-- Joshua M. Boniface <joshua@boniface.me> Wed, 27 Dec 2023 13:21:28 -0500
pvc (0.9.86-0) unstable; urgency=high
* [API Daemon] Significantly improves the performance of several commands via async Zookeeper calls and removal of superfluous backend calls.

View File

@ -3,4 +3,4 @@ node-daemon/pvcnoded usr/share/pvc
node-daemon/pvcnoded.service lib/systemd/system
node-daemon/pvc.target lib/systemd/system
node-daemon/pvcautoready.service lib/systemd/system
node-daemon/monitoring usr/share/pvc
monitoring usr/share/pvc

View File

@ -33,7 +33,7 @@ import os
import signal
# Daemon version
version = "0.9.86"
version = "0.9.87"
##########################################################

View File

@ -157,9 +157,6 @@ class MonitoringPlugin(object):
"w": warning
"e": error
"""
if state == "d" and not self.config["debug"]:
return
self.logger.out(message, state=state, prefix=self.plugin_name)
#
@ -523,11 +520,10 @@ class MonitoringInstance(object):
entries = fault_data["entries"]()
if self.config["debug"]:
self.logger.out(
f"Entries for fault check {fault_type}: {dumps(entries)}",
state="d",
)
self.logger.out(
f"Entries for fault check {fault_type}: {dumps(entries)}",
state="d",
)
for _entry in entries:
entry = _entry["entry"]
@ -699,7 +695,7 @@ class MonitoringInstance(object):
health_text = f"{health_colour}{self.this_node.health}%{self.logger.fmt_end} node health"
result_text.append(health_text)
else:
health_text = "{self.logger.fmt_blue}N/A{self.logger.fmt_end} node health"
health_text = f"{self.logger.fmt_blue}N/A{self.logger.fmt_end} node health"
result_text.append(health_text)
self.logger.out(

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -31,6 +31,7 @@ import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance
import pvcnoded.objects.VMInstance as VMInstance
import pvcnoded.objects.NodeInstance as NodeInstance
import pvcnoded.objects.VXNetworkInstance as VXNetworkInstance
import pvcnoded.objects.NetstatsInstance as NetstatsInstance
import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance
import pvcnoded.objects.CephInstance as CephInstance
@ -48,7 +49,7 @@ import re
import json
# Daemon version
version = "0.9.86"
version = "0.9.87"
##########################################################
@ -200,9 +201,9 @@ def entrypoint():
# Define a cleanup function
def cleanup(failure=False):
nonlocal logger, zkhandler, keepalive_timer, d_domain
nonlocal logger, zkhandler, keepalive_timer, d_domain, netstats
logger.out("Terminating pvcnoded and cleaning up", state="s")
logger.out("Terminating pvcnoded", state="s")
# Set shutdown state in Zookeeper
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "shutdown")])
@ -249,12 +250,20 @@ def entrypoint():
except Exception:
pass
# Set stop state in Zookeeper
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
logger.out("Cleaning up", state="s")
# Stop netstats instance
try:
netstats.shutdown()
except Exception:
pass
# Forcibly terminate dnsmasq because it gets stuck sometimes
common.run_os_command("killall dnsmasq")
# Set stop state in Zookeeper
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
# Close the Zookeeper connection
try:
zkhandler.disconnect(persistent=True)
@ -1000,9 +1009,12 @@ def entrypoint():
state="s",
)
# Set up netstats
netstats = NetstatsInstance.NetstatsInstance(logger, config, zkhandler, this_node)
# Start keepalived thread
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(
logger, config, zkhandler, this_node
logger, config, zkhandler, this_node, netstats
)
# Tick loop; does nothing since everything is async

View File

@ -333,12 +333,11 @@ class AXFRDaemonInstance(object):
z = dns.zone.from_xfr(axfr)
records_raw = [z[n].to_text(n) for n in z.nodes.keys()]
except Exception as e:
if self.config["debug"]:
self.logger.out(
"{} {} ({})".format(e, dnsmasq_ip, domain),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"{} {} ({})".format(e, dnsmasq_ip, domain),
state="d",
prefix="dns-aggregator",
)
continue
# Fix the formatting because it's useless
@ -370,12 +369,11 @@ class AXFRDaemonInstance(object):
"SELECT * FROM records WHERE domain_id=%s", (domain_id,)
)
results = list(sql_curs.fetchall())
if self.config["debug"]:
self.logger.out(
"SQL query results: {}".format(results),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"SQL query results: {}".format(results),
state="d",
prefix="dns-aggregator",
)
except Exception as e:
self.logger.out(
"ERROR: Failed to obtain DNS records from database: {}".format(
@ -388,12 +386,11 @@ class AXFRDaemonInstance(object):
records_old = list()
records_old_ids = list()
if not results:
if self.config["debug"]:
self.logger.out(
"No results found, skipping.",
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"No results found, skipping.",
state="d",
prefix="dns-aggregator",
)
continue
for record in results:
# Skip the non-A
@ -404,23 +401,21 @@ class AXFRDaemonInstance(object):
r_data = record[4]
# Assemble a list element in the same format as the AXFR data
entry = "{} {} IN {} {}".format(r_name, r_ttl, r_type, r_data)
if self.config["debug"]:
self.logger.out(
"Found record: {}".format(entry),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Found record: {}".format(entry),
state="d",
prefix="dns-aggregator",
)
# Skip non-A or AAAA records
if r_type != "A" and r_type != "AAAA":
if self.config["debug"]:
self.logger.out(
'Skipping record {}, not A or AAAA: "{}"'.format(
entry, r_type
),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
'Skipping record {}, not A or AAAA: "{}"'.format(
entry, r_type
),
state="d",
prefix="dns-aggregator",
)
continue
records_old.append(entry)
@ -429,17 +424,16 @@ class AXFRDaemonInstance(object):
records_new.sort()
records_old.sort()
if self.config["debug"]:
self.logger.out(
"New: {}".format(records_new),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Old: {}".format(records_old),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"New: {}".format(records_new),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Old: {}".format(records_old),
state="d",
prefix="dns-aggregator",
)
# Find the differences between the lists
# Basic check one: are they completely equal
@ -450,17 +444,16 @@ class AXFRDaemonInstance(object):
in_new_not_in_old = in_new - in_old
in_old_not_in_new = in_old - in_new
if self.config["debug"]:
self.logger.out(
"New but not old: {}".format(in_new_not_in_old),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Old but not new: {}".format(in_old_not_in_new),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"New but not old: {}".format(in_new_not_in_old),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Old but not new: {}".format(in_old_not_in_new),
state="d",
prefix="dns-aggregator",
)
# Go through the old list
remove_records = list() # list of database IDs
@ -487,12 +480,11 @@ class AXFRDaemonInstance(object):
if len(remove_records) > 0:
# Remove the invalid old records
for record_id in remove_records:
if self.config["debug"]:
self.logger.out(
"Removing record: {}".format(record_id),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Removing record: {}".format(record_id),
state="d",
prefix="dns-aggregator",
)
sql_curs.execute(
"DELETE FROM records WHERE id=%s", (record_id,)
)
@ -507,12 +499,11 @@ class AXFRDaemonInstance(object):
r_ttl = record[1]
r_type = record[3]
r_data = record[4]
if self.config["debug"]:
self.logger.out(
"Add record: {}".format(name),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Add record: {}".format(name),
state="d",
prefix="dns-aggregator",
)
try:
sql_curs.execute(
"INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)",
@ -520,23 +511,21 @@ class AXFRDaemonInstance(object):
)
changed = True
except psycopg2.IntegrityError as e:
if self.config["debug"]:
self.logger.out(
"Failed to add record due to {}: {}".format(
e, name
),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Failed to add record due to {}: {}".format(
e, name
),
state="d",
prefix="dns-aggregator",
)
except psycopg2.errors.InFailedSqlTransaction as e:
if self.config["debug"]:
self.logger.out(
"Failed to add record due to {}: {}".format(
e, name
),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Failed to add record due to {}: {}".format(
e, name
),
state="d",
prefix="dns-aggregator",
)
if changed:
# Increase SOA serial
@ -548,24 +537,22 @@ class AXFRDaemonInstance(object):
current_serial = int(soa_record[2])
new_serial = current_serial + 1
soa_record[2] = str(new_serial)
if self.config["debug"]:
self.logger.out(
"Records changed; bumping SOA: {}".format(new_serial),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Records changed; bumping SOA: {}".format(new_serial),
state="d",
prefix="dns-aggregator",
)
sql_curs.execute(
"UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'",
(" ".join(soa_record), domain_id),
)
# Commit all the previous changes
if self.config["debug"]:
self.logger.out(
"Committing database changes and reloading PDNS",
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Committing database changes and reloading PDNS",
state="d",
prefix="dns-aggregator",
)
try:
self.sql_conn.commit()
except Exception as e:

View File

@ -0,0 +1,293 @@
#!/usr/bin/env python3
# NetstatsInstance.py - Class implementing a PVC network stats gatherer and run by pvcnoded
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2023 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
from apscheduler.schedulers.background import BackgroundScheduler
from collections import deque
from json import dumps
from os import walk
from os.path import exists
class NetstatsIfaceInstance(object):
"""
NetstatsIfaceInstance
This class implements a rolling statistics poller for a network interface,
collecting stats on the bits and packets per second in both directions every
second.
Via the get_stats() function, it returns the rolling average of all 4 values,
as well as totals, over the last 5 seconds (self.avg_samples) as a tuple of:
(rx_bps, rx_pps, tx_bps, tx_pps, total_bps, total_pps, link_speed, state)
"""
def __init__(self, logger, iface, avg_samples):
"""
Initialize the class instance, creating our BackgroundScheduler, setting
the average sample rate, and creating the deques and average values.
"""
self.logger = logger
self.iface = iface
self.data_valid = False
self.data_polls = 0
self.timer = BackgroundScheduler()
self.timer.add_job(self.gather_stats, trigger="interval", seconds=1)
self.avg_samples = avg_samples
self.link_speed = 0
self.state = "down"
self.rx_bits_rolling = deque(list(), self.avg_samples + 1)
self.rx_bps = 0
self.rx_packets_rolling = deque(list(), self.avg_samples + 1)
self.rx_pps = 0
self.tx_bits_rolling = deque(list(), self.avg_samples + 1)
self.tx_bps = 0
self.tx_packets_rolling = deque(list(), self.avg_samples + 1)
self.tx_pps = 0
self.total_bps = 0
self.total_pps = 0
def get_iface_stats(self):
"""
Reads the interface statistics from the sysfs for the interface.
"""
iface_state_path = f"/sys/class/net/{self.iface}/operstate"
with open(iface_state_path) as stfh:
self.state = stfh.read().strip()
iface_speed_path = f"/sys/class/net/{self.iface}/speed"
try:
with open(iface_speed_path) as spfh:
# The speed key is always in Mbps so multiply by 1000*1000 to get bps
self.link_speed = int(spfh.read()) * 1000 * 1000
except OSError:
self.link_speed = 0
iface_stats_path = f"/sys/class/net/{self.iface}/statistics"
with open(f"{iface_stats_path}/rx_bytes") as rxbfh:
self.rx_bits_rolling.append(int(rxbfh.read()) * 8)
with open(f"{iface_stats_path}/tx_bytes") as txbfh:
self.tx_bits_rolling.append(int(txbfh.read()) * 8)
with open(f"{iface_stats_path}/rx_packets") as rxpfh:
self.rx_packets_rolling.append(int(rxpfh.read()) * 8)
with open(f"{iface_stats_path}/tx_packets") as txpfh:
self.tx_packets_rolling.append(int(txpfh.read()) * 8)
def calculate_averages(self):
"""
Calculates the bps/pps values from the rolling values.
"""
rx_bits_diffs = list()
for sample_idx in range(self.avg_samples, 0, -1):
rx_bits_diffs.append(
self.rx_bits_rolling[sample_idx] - self.rx_bits_rolling[sample_idx - 1]
)
self.rx_bps = int(sum(rx_bits_diffs) / self.avg_samples)
rx_packets_diffs = list()
for sample_idx in range(self.avg_samples, 0, -1):
rx_packets_diffs.append(
self.rx_packets_rolling[sample_idx]
- self.rx_packets_rolling[sample_idx - 1]
)
self.rx_pps = int(sum(rx_packets_diffs) / self.avg_samples)
tx_bits_diffs = list()
for sample_idx in range(self.avg_samples, 0, -1):
tx_bits_diffs.append(
self.tx_bits_rolling[sample_idx] - self.tx_bits_rolling[sample_idx - 1]
)
self.tx_bps = int(sum(tx_bits_diffs) / self.avg_samples)
tx_packets_diffs = list()
for sample_idx in range(self.avg_samples, 0, -1):
tx_packets_diffs.append(
self.tx_packets_rolling[sample_idx]
- self.tx_packets_rolling[sample_idx - 1]
)
self.tx_pps = int(sum(tx_packets_diffs) / self.avg_samples)
self.total_bps = self.rx_bps + self.tx_bps
self.total_pps = self.rx_pps + self.tx_pps
def gather_stats(self):
"""
Gathers the current stats and then calculates the averages.
Runs via the BackgroundScheduler timer every 1 second.
"""
self.get_iface_stats()
if self.data_valid:
self.calculate_averages()
# Handle data validity: our data is invalid until we hit enough polls
# to make a valid average (avg_samples plus 1).
if not self.data_valid:
self.data_polls += 1
if self.data_polls > self.avg_samples:
self.data_valid = True
def start(self):
"""
Starts the timer.
"""
self.timer.start()
def stop(self):
"""
Stops the timer.
"""
self.timer.shutdown()
def get_stats(self):
"""
Returns a tuple of the current statistics.
"""
if not self.data_valid:
return None
return (
self.rx_bps,
self.rx_pps,
self.tx_bps,
self.tx_pps,
self.total_bps,
self.total_pps,
self.link_speed,
self.state,
)
class NetstatsInstance(object):
"""
NetstatsInstance
This class implements a rolling statistics poller for all PHYSICAL network interfaces,
on the system, initializing a NetstatsIfaceInstance for each, as well as handling
value updates into Zookeeper.
"""
def __init__(self, logger, config, zkhandler, this_node):
"""
Initialize the class instance.
"""
self.logger = logger
self.config = config
self.zkhandler = zkhandler
self.node_name = this_node.name
self.interfaces = dict()
self.logger.out(
f"Starting netstats collector ({self.config['keepalive_interval']} second interval)",
state="s",
)
self.set_interfaces()
def shutdown(self):
"""
Stop all pollers and delete the NetstatsIfaceInstance objects
"""
# Empty the network stats object
self.zkhandler.write([(("node.network.stats", self.node_name), dumps({}))])
for iface in self.interfaces.keys():
self.interfaces[iface].stop()
def set_interfaces(self):
"""
Sets the list of interfaces on the system, and then ensures that each
interface has a NetstatsIfaceInstance assigned to it and polling.
"""
# Get a list of all active interfaces
net_root_path = "/sys/class/net"
all_ifaces = list()
for (_, dirnames, _) in walk(net_root_path):
all_ifaces.extend(dirnames)
all_ifaces.sort()
self.logger.out(
f"Parsing network list: {all_ifaces}", state="d", prefix="netstats-thread"
)
# Add any missing interfaces
for iface in all_ifaces:
if not exists(f"{net_root_path}/{iface}/device"):
# This is not a physical interface; skip it
continue
if iface not in self.interfaces.keys():
# Set the number of samples to be equal to the keepalive interval, so that each
# keepalive has a fresh set of data from the last keepalive_interval seconds.
self.interfaces[iface] = NetstatsIfaceInstance(
self.logger, iface, self.config["keepalive_interval"]
)
self.interfaces[iface].start()
# Remove any superfluous interfaces
for iface in self.interfaces.keys():
if iface not in all_ifaces:
self.interfaces[iface].stop()
del self.interfaces[iface]
def set_data(self):
data = dict()
for iface in self.interfaces.keys():
self.logger.out(
f"Getting data for interface {iface}",
state="d",
prefix="netstats-thread",
)
iface_stats = self.interfaces[iface].get_stats()
if iface_stats is None:
continue
(
iface_rx_bps,
iface_rx_pps,
iface_tx_bps,
iface_tx_pps,
iface_total_bps,
iface_total_pps,
iface_link_speed,
iface_state,
) = iface_stats
data[iface] = {
"rx_bps": iface_rx_bps,
"rx_pps": iface_rx_pps,
"tx_bps": iface_tx_bps,
"tx_pps": iface_tx_pps,
"total_bps": iface_total_bps,
"total_pps": iface_total_pps,
"link_speed": iface_link_speed,
"state": iface_state,
}
self.zkhandler.write([(("node.network.stats", self.node_name), dumps(data))])

View File

@ -51,7 +51,7 @@ libvirt_vm_states = {
}
def start_keepalive_timer(logger, config, zkhandler, this_node):
def start_keepalive_timer(logger, config, zkhandler, this_node, netstats):
keepalive_interval = config["keepalive_interval"]
logger.out(
f"Starting keepalive timer ({keepalive_interval} second interval)", state="s"
@ -59,7 +59,7 @@ def start_keepalive_timer(logger, config, zkhandler, this_node):
keepalive_timer = BackgroundScheduler()
keepalive_timer.add_job(
node_keepalive,
args=(logger, config, zkhandler, this_node),
args=(logger, config, zkhandler, this_node, netstats),
trigger="interval",
seconds=keepalive_interval,
)
@ -80,9 +80,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
pool_list = zkhandler.children("base.pool")
osd_list = zkhandler.children("base.osd")
debug = config["debug"]
if debug:
logger.out("Thread starting", state="d", prefix="ceph-thread")
logger.out("Thread starting", state="d", prefix="ceph-thread")
# Connect to the Ceph cluster
try:
@ -90,8 +88,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
conffile=config["ceph_config_file"],
conf=dict(keyring=config["ceph_admin_keyring"]),
)
if debug:
logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
ceph_conn.connect(timeout=1)
except Exception as e:
logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e")
@ -100,12 +97,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
# Primary-only functions
if this_node.coordinator_state == "primary":
# Get Ceph status information (pretty)
if debug:
logger.out(
"Set Ceph status information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
logger.out(
"Set Ceph status information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
command = {"prefix": "status", "format": "pretty"}
ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
@ -117,12 +113,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to set Ceph status data: {}".format(e), state="e")
# Get Ceph health information (JSON)
if debug:
logger.out(
"Set Ceph health information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
logger.out(
"Set Ceph health information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
command = {"prefix": "health", "format": "json"}
ceph_health = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
@ -134,12 +129,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to set Ceph health data: {}".format(e), state="e")
# Get Ceph df information (pretty)
if debug:
logger.out(
"Set Ceph rados df information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
logger.out(
"Set Ceph rados df information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
# Get rados df info
command = {"prefix": "df", "format": "pretty"}
@ -151,12 +145,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
except Exception as e:
logger.out("Failed to set Ceph utilization data: {}".format(e), state="e")
if debug:
logger.out(
"Set pool information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
logger.out(
"Set pool information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
# Get pool info
command = {"prefix": "df", "format": "json"}
@ -179,12 +172,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
rados_pool_df_raw = []
pool_count = len(ceph_pool_df_raw)
if debug:
logger.out(
"Getting info for {} pools".format(pool_count),
state="d",
prefix="ceph-thread",
)
logger.out(
"Getting info for {} pools".format(pool_count),
state="d",
prefix="ceph-thread",
)
for pool_idx in range(0, pool_count):
try:
# Combine all the data for this pool
@ -195,22 +187,18 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
# Ignore any pools that aren't in our pool list
if pool["name"] not in pool_list:
if debug:
logger.out(
"Pool {} not in pool list {}".format(
pool["name"], pool_list
),
state="d",
prefix="ceph-thread",
)
logger.out(
"Pool {} not in pool list {}".format(pool["name"], pool_list),
state="d",
prefix="ceph-thread",
)
continue
else:
if debug:
logger.out(
"Parsing data for pool {}".format(pool["name"]),
state="d",
prefix="ceph-thread",
)
logger.out(
"Parsing data for pool {}".format(pool["name"]),
state="d",
prefix="ceph-thread",
)
# Assemble a useful data structure
pool_df = {
@ -248,8 +236,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
osds_this_node = 0
if len(osd_list) > 0:
# Get data from Ceph OSDs
if debug:
logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
# Parse the dump data
osd_dump = dict()
@ -264,8 +251,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
osd_dump_raw = []
if debug:
logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
for osd in osd_dump_raw:
osd_dump.update(
{
@ -279,8 +265,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
)
# Parse the df data
if debug:
logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
osd_df = dict()
@ -293,8 +278,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
osd_df_raw = []
if debug:
logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
for osd in osd_df_raw:
osd_df.update(
{
@ -316,8 +300,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
)
# Parse the status data
if debug:
logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
osd_status = dict()
@ -330,8 +313,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to obtain OSD status data: {}".format(e), state="w")
osd_status_raw = []
if debug:
logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
for line in osd_status_raw.split("\n"):
# Strip off colour
@ -400,8 +382,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
)
# Merge them together into a single meaningful dict
if debug:
logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
osd_stats = dict()
@ -421,10 +402,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
# Upload OSD data for the cluster (primary-only)
if this_node.coordinator_state == "primary":
if debug:
logger.out(
"Trigger updates for each OSD", state="d", prefix="ceph-thread"
)
logger.out("Trigger updates for each OSD", state="d", prefix="ceph-thread")
for osd in osd_list:
try:
@ -441,20 +419,16 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
queue.put(osds_this_node)
if debug:
logger.out("Thread finished", state="d", prefix="ceph-thread")
logger.out("Thread finished", state="d", prefix="ceph-thread")
# VM stats update function
def collect_vm_stats(logger, config, zkhandler, this_node, queue):
debug = config["debug"]
if debug:
logger.out("Thread starting", state="d", prefix="vm-thread")
logger.out("Thread starting", state="d", prefix="vm-thread")
# Connect to libvirt
libvirt_name = "qemu:///system"
if debug:
logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
try:
lv_conn = libvirt.open(libvirt_name)
if lv_conn is None:
@ -467,12 +441,11 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
memprov = 0
vcpualloc = 0
# Toggle state management of dead VMs to restart them
if debug:
logger.out(
"Toggle state management of dead VMs to restart them",
state="d",
prefix="vm-thread",
)
logger.out(
"Toggle state management of dead VMs to restart them",
state="d",
prefix="vm-thread",
)
# Make a copy of the d_domain; if not, and it changes in flight, this can fail
fixed_d_domain = this_node.d_domain.copy()
for domain, instance in fixed_d_domain.items():
@ -518,12 +491,11 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
domain_name = domain.name()
# Get all the raw information about the VM
if debug:
logger.out(
"Getting general statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
logger.out(
"Getting general statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
(
domain_state,
domain_maxmem,
@ -537,29 +509,25 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
domain_memory_stats = domain.memoryStats()
domain_cpu_stats = domain.getCPUStats(True)[0]
except Exception as e:
if debug:
try:
logger.out(
"Failed getting VM information for {}: {}".format(
domain.name(), e
),
state="d",
prefix="vm-thread",
)
except Exception:
pass
try:
logger.out(
"Failed getting VM information for {}: {}".format(domain.name(), e),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
# Ensure VM is present in the domain_list
if domain_uuid not in this_node.domain_list:
this_node.domain_list.append(domain_uuid)
if debug:
logger.out(
"Getting disk statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
logger.out(
"Getting disk statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
domain_disk_stats = []
try:
for disk in tree.findall("devices/disk"):
@ -578,23 +546,21 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
}
)
except Exception as e:
if debug:
try:
logger.out(
"Failed getting disk stats for {}: {}".format(domain.name(), e),
state="d",
prefix="vm-thread",
)
except Exception:
pass
try:
logger.out(
"Failed getting disk stats for {}: {}".format(domain.name(), e),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
if debug:
logger.out(
"Getting network statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
logger.out(
"Getting network statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
domain_network_stats = []
try:
for interface in tree.findall("devices/interface"):
@ -619,17 +585,14 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
}
)
except Exception as e:
if debug:
try:
logger.out(
"Failed getting network stats for {}: {}".format(
domain.name(), e
),
state="d",
prefix="vm-thread",
)
except Exception:
pass
try:
logger.out(
"Failed getting network stats for {}: {}".format(domain.name(), e),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
# Create the final dictionary
@ -645,48 +608,42 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
"net_stats": domain_network_stats,
}
if debug:
logger.out(
"Writing statistics for VM {} to Zookeeper".format(domain_name),
state="d",
prefix="vm-thread",
)
logger.out(
"Writing statistics for VM {} to Zookeeper".format(domain_name),
state="d",
prefix="vm-thread",
)
try:
zkhandler.write(
[(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))]
)
except Exception as e:
if debug:
logger.out(
"Failed to write domain statistics: {}".format(e),
state="d",
prefix="vm-thread",
)
logger.out(
"Failed to write domain statistics: {}".format(e),
state="d",
prefix="vm-thread",
)
# Close the Libvirt connection
lv_conn.close()
if debug:
logger.out(
f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}",
state="d",
prefix="vm-thread",
)
logger.out(
f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}",
state="d",
prefix="vm-thread",
)
queue.put(len(running_domains))
queue.put(memalloc)
queue.put(memprov)
queue.put(vcpualloc)
if debug:
logger.out("Thread finished", state="d", prefix="vm-thread")
logger.out("Thread finished", state="d", prefix="vm-thread")
# Keepalive update function
def node_keepalive(logger, config, zkhandler, this_node):
debug = config["debug"]
def node_keepalive(logger, config, zkhandler, this_node, netstats):
# Display node information to the terminal
if config["log_keepalives"]:
if this_node.coordinator_state == "primary":
@ -746,10 +703,7 @@ def node_keepalive(logger, config, zkhandler, this_node):
)
# Get past state and update if needed
if debug:
logger.out(
"Get past state and update if needed", state="d", prefix="main-thread"
)
logger.out("Get past state and update if needed", state="d", prefix="main-thread")
past_state = zkhandler.read(("node.state.daemon", this_node.name))
if past_state != "run" and past_state != "shutdown":
@ -759,10 +713,9 @@ def node_keepalive(logger, config, zkhandler, this_node):
this_node.daemon_state = "run"
# Ensure the primary key is properly set
if debug:
logger.out(
"Ensure the primary key is properly set", state="d", prefix="main-thread"
)
logger.out(
"Ensure the primary key is properly set", state="d", prefix="main-thread"
)
if this_node.coordinator_state == "primary":
if zkhandler.read("base.config.primary_node") != this_node.name:
zkhandler.write([("base.config.primary_node", this_node.name)])
@ -793,6 +746,10 @@ def node_keepalive(logger, config, zkhandler, this_node):
this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024)
this_node.cpuload = round(os.getloadavg()[0], 2)
# Get node network statistics via netstats instance
netstats.set_interfaces()
netstats.set_data()
# Join against running threads
if config["enable_hypervisor"]:
vm_stats_thread.join(timeout=config["keepalive_interval"])
@ -839,8 +796,7 @@ def node_keepalive(logger, config, zkhandler, this_node):
# Set our information in zookeeper
keepalive_time = int(time.time())
if debug:
logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
try:
zkhandler.write(
[
@ -928,10 +884,9 @@ def node_keepalive(logger, config, zkhandler, this_node):
# Look for dead nodes and fence them
if not this_node.maintenance:
if debug:
logger.out(
"Look for dead nodes and fence them", state="d", prefix="main-thread"
)
logger.out(
"Look for dead nodes and fence them", state="d", prefix="main-thread"
)
if config["daemon_mode"] == "coordinator":
for node_name in zkhandler.children("base.node"):
try:

View File

@ -44,7 +44,7 @@ from daemon_lib.vmbuilder import (
)
# Daemon version
version = "0.9.86"
version = "0.9.87"
config = cfg.get_configuration()