Compare commits
19 Commits
v0.9.86
...
cf59e85eeb
Author | SHA1 | Date | |
---|---|---|---|
cf59e85eeb | |||
e654fbba08 | |||
52bf5ad0ef | |||
576afc1e94 | |||
4375f66793 | |||
3df3ca5b44 | |||
cb3c2cd86d | |||
d0de4f1825 | |||
494c20263d | |||
431ee69620 | |||
88f4d79d5a | |||
84d22751d8 | |||
40ff005a09 | |||
ab4ec7a5fa | |||
9604f655d0 | |||
3e4cc53fdd | |||
d2d2a9c617 | |||
6ed4efad33 | |||
39f9f3640c |
2
.flake8
2
.flake8
@ -8,7 +8,7 @@
|
||||
ignore = W503, E501, F403, F405
|
||||
extend-ignore = E203
|
||||
# We exclude the Debian, migrations, and provisioner examples
|
||||
exclude = debian,api-daemon/migrations/versions,api-daemon/provisioner/examples,node-daemon/monitoring
|
||||
exclude = debian,monitoring,api-daemon/migrations/versions,api-daemon/provisioner/examples
|
||||
# Set the max line length to 88 for Black
|
||||
max-line-length = 88
|
||||
|
||||
|
@ -1,5 +1,13 @@
|
||||
## PVC Changelog
|
||||
|
||||
###### [v0.9.87](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.87)
|
||||
|
||||
* [API Daemon] Adds cluster Prometheus resource utilization metrics and an updated Grafana dashboard.
|
||||
* [Node Daemon] Adds network traffic rate calculation subsystem.
|
||||
* [All Daemons] Fixes a printing bug where newlines were not added atomically.
|
||||
* [CLI Client] Fixes a bug listing connections if no default is specified.
|
||||
* [All Daemons] Simplifies debug logging conditionals by moving into the Logger instance itself.
|
||||
|
||||
###### [v0.9.86](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.86)
|
||||
|
||||
* [API Daemon] Significantly improves the performance of several commands via async Zookeeper calls and removal of superfluous backend calls.
|
||||
|
@ -19,7 +19,7 @@ As a consequence of its features, PVC makes administrating very high-uptime VMs
|
||||
|
||||
PVC also features an optional, fully customizable VM provisioning framework, designed to automate and simplify VM deployments using custom provisioning profiles, scripts, and CloudInit userdata API support.
|
||||
|
||||
Installation of PVC is accomplished by two main components: a [Node installer ISO](https://github.com/parallelvirtualcluster/pvc-installer) which creates on-demand installer ISOs, and an [Ansible role framework](https://github.com/parallelvirtualcluster/pvc-ansible) to configure, bootstrap, and administrate the nodes. Installation can also be fully automated with a companion [cluster bootstrapping system](https://github.com/parallelvirtualcluster/pvc-bootstrap). Once up, the cluster is managed via an HTTP REST API, accessible via a Python Click CLI client or WebUI.
|
||||
Installation of PVC is accomplished by two main components: a [Node installer ISO](https://github.com/parallelvirtualcluster/pvc-installer) which creates on-demand installer ISOs, and an [Ansible role framework](https://github.com/parallelvirtualcluster/pvc-ansible) to configure, bootstrap, and administrate the nodes. Installation can also be fully automated with a companion [cluster bootstrapping system](https://github.com/parallelvirtualcluster/pvc-bootstrap). Once up, the cluster is managed via an HTTP REST API, accessible via a Python Click CLI client ~~or WebUI~~ (eventually).
|
||||
|
||||
Just give it physical servers, and it will run your VMs without you having to think about it, all in just an hour or two of setup time.
|
||||
|
||||
|
@ -27,7 +27,7 @@ from distutils.util import strtobool as dustrtobool
|
||||
import daemon_lib.config as cfg
|
||||
|
||||
# Daemon version
|
||||
version = "0.9.86"
|
||||
version = "0.9.87"
|
||||
|
||||
# API version
|
||||
API_VERSION = 1.0
|
||||
|
@ -640,14 +640,15 @@ class API_Metrics(Resource):
|
||||
400:
|
||||
description: Bad request
|
||||
"""
|
||||
cluster_output, cluster_retcode = api_helper.cluster_metrics()
|
||||
health_output, health_retcode = api_helper.cluster_health_metrics()
|
||||
resource_output, resource_retcode = api_helper.cluster_resource_metrics()
|
||||
ceph_output, ceph_retcode = api_helper.ceph_metrics()
|
||||
|
||||
if cluster_retcode != 200 or ceph_retcode != 200:
|
||||
if health_retcode != 200 or resource_retcode != 200 or ceph_retcode != 200:
|
||||
output = "Error: Failed to obtain data"
|
||||
retcode = 400
|
||||
else:
|
||||
output = cluster_output + ceph_output
|
||||
output = health_output + resource_output + ceph_output
|
||||
retcode = 200
|
||||
|
||||
response = flask.make_response(output, retcode)
|
||||
@ -658,11 +659,11 @@ class API_Metrics(Resource):
|
||||
api.add_resource(API_Metrics, "/metrics")
|
||||
|
||||
|
||||
# /metrics/pvc
|
||||
class API_Metrics_PVC(Resource):
|
||||
# /metrics/health
|
||||
class API_Metrics_Health(Resource):
|
||||
def get(self):
|
||||
"""
|
||||
Return the current PVC cluster status in Prometheus-compatible metrics format
|
||||
Return the current PVC cluster health status in Prometheus-compatible metrics format
|
||||
|
||||
Endpoint is unauthenticated to allow metrics exfiltration without having to deal
|
||||
with the Prometheus compatibility later.
|
||||
@ -675,13 +676,13 @@ class API_Metrics_PVC(Resource):
|
||||
400:
|
||||
description: Bad request
|
||||
"""
|
||||
cluster_output, cluster_retcode = api_helper.cluster_metrics()
|
||||
health_output, health_retcode = api_helper.cluster_health_metrics()
|
||||
|
||||
if cluster_retcode != 200:
|
||||
if health_retcode != 200:
|
||||
output = "Error: Failed to obtain data"
|
||||
retcode = 400
|
||||
else:
|
||||
output = cluster_output
|
||||
output = health_output
|
||||
retcode = 200
|
||||
|
||||
response = flask.make_response(output, retcode)
|
||||
@ -689,7 +690,41 @@ class API_Metrics_PVC(Resource):
|
||||
return response
|
||||
|
||||
|
||||
api.add_resource(API_Metrics_PVC, "/metrics/pvc")
|
||||
api.add_resource(API_Metrics_Health, "/metrics/health")
|
||||
|
||||
|
||||
# /metrics/resource
|
||||
class API_Metrics_Resource(Resource):
|
||||
def get(self):
|
||||
"""
|
||||
Return the current PVC cluster resource utilizations in Prometheus-compatible metrics format
|
||||
|
||||
Endpoint is unauthenticated to allow metrics exfiltration without having to deal
|
||||
with the Prometheus compatibility later.
|
||||
---
|
||||
tags:
|
||||
- root
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
400:
|
||||
description: Bad request
|
||||
"""
|
||||
resource_output, resource_retcode = api_helper.cluster_resource_metrics()
|
||||
|
||||
if resource_retcode != 200:
|
||||
output = "Error: Failed to obtain data"
|
||||
retcode = 400
|
||||
else:
|
||||
output = resource_output
|
||||
retcode = 200
|
||||
|
||||
response = flask.make_response(output, retcode)
|
||||
response.mimetype = "text/plain"
|
||||
return response
|
||||
|
||||
|
||||
api.add_resource(API_Metrics_Resource, "/metrics/resource")
|
||||
|
||||
|
||||
# /metrics/ceph
|
||||
@ -1133,6 +1168,9 @@ class API_Node_Root(Resource):
|
||||
provisioned:
|
||||
type: integer
|
||||
description: The total amount of RAM provisioned to all domains (regardless of state) on this node in MB
|
||||
interfaces:
|
||||
type: object
|
||||
description: Details on speed, bytes, and packets per second of each node physical network interface
|
||||
parameters:
|
||||
- in: query
|
||||
name: limit
|
||||
|
@ -126,12 +126,27 @@ def cluster_maintenance(zkhandler, maint_state="false"):
|
||||
#
|
||||
@pvc_common.Profiler(config)
|
||||
@ZKConnection(config)
|
||||
def cluster_metrics(zkhandler):
|
||||
def cluster_health_metrics(zkhandler):
|
||||
"""
|
||||
Format status data from cluster_status into Prometheus-compatible metrics
|
||||
Get cluster-wide Prometheus metrics for health
|
||||
"""
|
||||
|
||||
retflag, retdata = pvc_cluster.get_metrics(zkhandler)
|
||||
retflag, retdata = pvc_cluster.get_health_metrics(zkhandler)
|
||||
if retflag:
|
||||
retcode = 200
|
||||
else:
|
||||
retcode = 400
|
||||
return retdata, retcode
|
||||
|
||||
|
||||
@pvc_common.Profiler(config)
|
||||
@ZKConnection(config)
|
||||
def cluster_resource_metrics(zkhandler):
|
||||
"""
|
||||
Get cluster-wide Prometheus metrics for resource utilization
|
||||
"""
|
||||
|
||||
retflag, retdata = pvc_cluster.get_resource_metrics(zkhandler)
|
||||
if retflag:
|
||||
retcode = 200
|
||||
else:
|
||||
|
@ -6116,13 +6116,14 @@ def cli(
|
||||
else:
|
||||
CLI_CONFIG = get_config(store_data, _connection)
|
||||
|
||||
CLI_CONFIG["store_path"] = store_path
|
||||
|
||||
if not CLI_CONFIG.get("badcfg", None):
|
||||
CLI_CONFIG["debug"] = _debug
|
||||
CLI_CONFIG["unsafe"] = _unsafe
|
||||
CLI_CONFIG["colour"] = _colour
|
||||
CLI_CONFIG["quiet"] = _quiet
|
||||
CLI_CONFIG["silent"] = _silent
|
||||
CLI_CONFIG["store_path"] = store_path
|
||||
|
||||
audit()
|
||||
|
||||
|
@ -2,7 +2,7 @@ from setuptools import setup
|
||||
|
||||
setup(
|
||||
name="pvc",
|
||||
version="0.9.86",
|
||||
version="0.9.87",
|
||||
packages=["pvc.cli", "pvc.lib"],
|
||||
install_requires=[
|
||||
"Click",
|
||||
|
@ -123,13 +123,13 @@ def format_bytes_tohuman(databytes):
|
||||
def format_bytes_fromhuman(datahuman):
|
||||
if not re.search(r"[A-Za-z]+", datahuman):
|
||||
dataunit = "B"
|
||||
datasize = int(datahuman)
|
||||
datasize = float(datahuman)
|
||||
else:
|
||||
dataunit = str(re.match(r"[0-9]+([A-Za-z])[iBb]*", datahuman).group(1))
|
||||
datasize = int(re.match(r"([0-9]+)[A-Za-z]+", datahuman).group(1))
|
||||
dataunit = str(re.match(r"[0-9\.]+([A-Za-z])[iBb]*", datahuman).group(1))
|
||||
datasize = float(re.match(r"([0-9\.]+)[A-Za-z]+", datahuman).group(1))
|
||||
|
||||
if byte_unit_matrix.get(dataunit):
|
||||
databytes = datasize * byte_unit_matrix[dataunit]
|
||||
if byte_unit_matrix.get(dataunit.upper()):
|
||||
databytes = int(datasize * byte_unit_matrix[dataunit.upper()])
|
||||
return databytes
|
||||
else:
|
||||
return None
|
||||
@ -155,7 +155,7 @@ def format_ops_fromhuman(datahuman):
|
||||
# Trim off human-readable character
|
||||
dataunit = datahuman[-1]
|
||||
datasize = int(datahuman[:-1])
|
||||
dataops = datasize * ops_unit_matrix[dataunit]
|
||||
dataops = datasize * ops_unit_matrix[dataunit.upper()]
|
||||
return "{}".format(dataops)
|
||||
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -115,6 +115,10 @@ class Logger(object):
|
||||
|
||||
# Output function
|
||||
def out(self, message, state=None, prefix=""):
|
||||
# Only handle d-state (debug) messages if we're in debug mode
|
||||
if state in ["d"] and not self.config["debug"]:
|
||||
return
|
||||
|
||||
# Get the date
|
||||
if self.config["log_dates"]:
|
||||
date = "{} ".format(datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f"))
|
||||
@ -146,7 +150,7 @@ class Logger(object):
|
||||
if self.config["stdout_logging"]:
|
||||
# Assemble output string
|
||||
output = colour + prompt + endc + date + prefix + message
|
||||
print(output)
|
||||
print(output + "\n", end="")
|
||||
|
||||
# Log to file
|
||||
if self.config["file_logging"]:
|
||||
|
1
daemon-common/migrations/versions/12.json
Normal file
1
daemon-common/migrations/versions/12.json
Normal file
@ -0,0 +1 @@
|
||||
{"version": "12", "root": "", "base": {"root": "", "schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.primary_node.sync_lock": "/config/primary_node/sync_lock", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "logs": "/logs", "faults": "/faults", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.health": "/ceph/health", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "logs": {"node": "", "messages": "/messages"}, "faults": {"id": "", "last_time": "/last_time", "first_time": "/first_time", "ack_time": "/ack_time", "status": "/status", "delta": "/delta", "message": "/message"}, "node": {"name": "", "keepalive": "/keepalive", "mode": "/daemonmode", "data.active_schema": "/activeschema", "data.latest_schema": "/latestschema", "data.static": "/staticdata", "data.pvc_version": "/pvcversion", "running_domains": "/runningdomains", "count.provisioned_domains": "/domainscount", "count.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "cpu.load": "/cpuload", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword", "sriov": "/sriov", "sriov.pf": "/sriov/pf", "sriov.vf": "/sriov/vf", "monitoring.plugins": "/monitoring_plugins", "monitoring.data": "/monitoring_data", "monitoring.health": "/monitoring_health", "network.stats": "/network_stats"}, "monitoring_plugin": {"name": "", "last_run": "/last_run", "health_delta": "/health_delta", "message": "/message", "data": "/data", "runtime": "/runtime"}, "sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, "sriov_vf": {"phy": "", "pf": "/pf", "mtu": "/mtu", "mac": "/mac", "phy_mac": "/phy_mac", "config": "/config", "config.vlan_id": "/config/vlan_id", "config.vlan_qos": "/config/vlan_qos", "config.tx_rate_min": "/config/tx_rate_min", "config.tx_rate_max": "/config/tx_rate_max", "config.spoof_check": "/config/spoof_check", "config.link_state": "/config/link_state", "config.trust": "/config/trust", "config.query_rss": "/config/query_rss", "pci": "/pci", "pci.domain": "/pci/domain", "pci.bus": "/pci/bus", "pci.slot": "/pci/slot", "pci.function": "/pci/function", "used": "/used", "used_by": "/used_by"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "storage.volumes": "/rbdlist", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit", "meta.tags": "/tags", "migrate.sync_lock": "/migrate_sync_lock"}, "tag": {"name": "", "type": "/type", "protected": "/protected"}, "network": {"vni": "", "type": "/nettype", "mtu": "/mtu", "rule": "/firewall_rules", "rule.in": "/firewall_rules/in", "rule.out": "/firewall_rules/out", "nameservers": "/name_servers", "domain": "/domain", "reservation": "/dhcp4_reservations", "lease": "/dhcp4_leases", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "reservation": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname"}, "lease": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname", "expiry": "/expiry", "client_id": "/clientid"}, "rule": {"description": "", "rule": "/rule", "order": "/order"}, "osd": {"id": "", "node": "/node", "device": "/device", "db_device": "/db_device", "fsid": "/fsid", "ofsid": "/fsid/osd", "cfsid": "/fsid/cluster", "lvm": "/lvm", "vg": "/lvm/vg", "lv": "/lvm/lv", "is_split": "/is_split", "stats": "/stats"}, "pool": {"name": "", "pgs": "/pgs", "tier": "/tier", "stats": "/stats"}, "volume": {"name": "", "stats": "/stats"}, "snapshot": {"name": "", "stats": "/stats"}}
|
@ -103,6 +103,7 @@ def getNodeInformation(zkhandler, node_name):
|
||||
_node_running_domains,
|
||||
_node_health,
|
||||
_node_health_plugins,
|
||||
_node_network_stats,
|
||||
) = zkhandler.read_many(
|
||||
[
|
||||
("node.state.daemon", node_name),
|
||||
@ -121,6 +122,7 @@ def getNodeInformation(zkhandler, node_name):
|
||||
("node.running_domains", node_name),
|
||||
("node.monitoring.health", node_name),
|
||||
("node.monitoring.plugins", node_name),
|
||||
("node.network.stats", node_name),
|
||||
]
|
||||
)
|
||||
|
||||
@ -154,6 +156,11 @@ def getNodeInformation(zkhandler, node_name):
|
||||
zkhandler, node_name, node_health_plugins
|
||||
)
|
||||
|
||||
if _node_network_stats is not None:
|
||||
node_network_stats = json.loads(_node_network_stats)
|
||||
else:
|
||||
node_network_stats = dict()
|
||||
|
||||
# Construct a data structure to represent the data
|
||||
node_information = {
|
||||
"name": node_name,
|
||||
@ -182,6 +189,7 @@ def getNodeInformation(zkhandler, node_name):
|
||||
"used": node_mem_used,
|
||||
"free": node_mem_free,
|
||||
},
|
||||
"interfaces": node_network_stats,
|
||||
}
|
||||
return node_information
|
||||
|
||||
|
@ -572,7 +572,7 @@ class ZKHandler(object):
|
||||
#
|
||||
class ZKSchema(object):
|
||||
# Current version
|
||||
_version = 11
|
||||
_version = 12
|
||||
|
||||
# Root for doing nested keys
|
||||
_schema_root = ""
|
||||
@ -651,6 +651,7 @@ class ZKSchema(object):
|
||||
"monitoring.plugins": "/monitoring_plugins",
|
||||
"monitoring.data": "/monitoring_data",
|
||||
"monitoring.health": "/monitoring_health",
|
||||
"network.stats": "/network_stats",
|
||||
},
|
||||
# The schema of an individual monitoring plugin data entry (/nodes/{node_name}/monitoring_data/{plugin})
|
||||
"monitoring_plugin": {
|
||||
|
10
debian/changelog
vendored
10
debian/changelog
vendored
@ -1,3 +1,13 @@
|
||||
pvc (0.9.87-0) unstable; urgency=high
|
||||
|
||||
* [API Daemon] Adds cluster Prometheus resource utilization metrics and an updated Grafana dashboard.
|
||||
* [Node Daemon] Adds network traffic rate calculation subsystem.
|
||||
* [All Daemons] Fixes a printing bug where newlines were not added atomically.
|
||||
* [CLI Client] Fixes a bug listing connections if no default is specified.
|
||||
* [All Daemons] Simplifies debug logging conditionals by moving into the Logger instance itself.
|
||||
|
||||
-- Joshua M. Boniface <joshua@boniface.me> Wed, 27 Dec 2023 13:03:14 -0500
|
||||
|
||||
pvc (0.9.86-0) unstable; urgency=high
|
||||
|
||||
* [API Daemon] Significantly improves the performance of several commands via async Zookeeper calls and removal of superfluous backend calls.
|
||||
|
2
debian/pvc-daemon-node.install
vendored
2
debian/pvc-daemon-node.install
vendored
@ -3,4 +3,4 @@ node-daemon/pvcnoded usr/share/pvc
|
||||
node-daemon/pvcnoded.service lib/systemd/system
|
||||
node-daemon/pvc.target lib/systemd/system
|
||||
node-daemon/pvcautoready.service lib/systemd/system
|
||||
node-daemon/monitoring usr/share/pvc
|
||||
monitoring usr/share/pvc
|
||||
|
@ -33,7 +33,7 @@ import os
|
||||
import signal
|
||||
|
||||
# Daemon version
|
||||
version = "0.9.86"
|
||||
version = "0.9.87"
|
||||
|
||||
|
||||
##########################################################
|
||||
|
@ -157,9 +157,6 @@ class MonitoringPlugin(object):
|
||||
"w": warning
|
||||
"e": error
|
||||
"""
|
||||
if state == "d" and not self.config["debug"]:
|
||||
return
|
||||
|
||||
self.logger.out(message, state=state, prefix=self.plugin_name)
|
||||
|
||||
#
|
||||
@ -523,11 +520,10 @@ class MonitoringInstance(object):
|
||||
|
||||
entries = fault_data["entries"]()
|
||||
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
f"Entries for fault check {fault_type}: {dumps(entries)}",
|
||||
state="d",
|
||||
)
|
||||
self.logger.out(
|
||||
f"Entries for fault check {fault_type}: {dumps(entries)}",
|
||||
state="d",
|
||||
)
|
||||
|
||||
for _entry in entries:
|
||||
entry = _entry["entry"]
|
||||
|
7040
monitoring/prometheus/grafana-pvc-cluster-dashboard.json
Normal file
7040
monitoring/prometheus/grafana-pvc-cluster-dashboard.json
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -31,6 +31,7 @@ import pvcnoded.objects.MetadataAPIInstance as MetadataAPIInstance
|
||||
import pvcnoded.objects.VMInstance as VMInstance
|
||||
import pvcnoded.objects.NodeInstance as NodeInstance
|
||||
import pvcnoded.objects.VXNetworkInstance as VXNetworkInstance
|
||||
import pvcnoded.objects.NetstatsInstance as NetstatsInstance
|
||||
import pvcnoded.objects.SRIOVVFInstance as SRIOVVFInstance
|
||||
import pvcnoded.objects.CephInstance as CephInstance
|
||||
|
||||
@ -48,7 +49,7 @@ import re
|
||||
import json
|
||||
|
||||
# Daemon version
|
||||
version = "0.9.86"
|
||||
version = "0.9.87"
|
||||
|
||||
|
||||
##########################################################
|
||||
@ -200,9 +201,9 @@ def entrypoint():
|
||||
|
||||
# Define a cleanup function
|
||||
def cleanup(failure=False):
|
||||
nonlocal logger, zkhandler, keepalive_timer, d_domain
|
||||
nonlocal logger, zkhandler, keepalive_timer, d_domain, netstats
|
||||
|
||||
logger.out("Terminating pvcnoded and cleaning up", state="s")
|
||||
logger.out("Terminating pvcnoded", state="s")
|
||||
|
||||
# Set shutdown state in Zookeeper
|
||||
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "shutdown")])
|
||||
@ -249,12 +250,20 @@ def entrypoint():
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Set stop state in Zookeeper
|
||||
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
|
||||
logger.out("Cleaning up", state="s")
|
||||
|
||||
# Stop netstats instance
|
||||
try:
|
||||
netstats.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Forcibly terminate dnsmasq because it gets stuck sometimes
|
||||
common.run_os_command("killall dnsmasq")
|
||||
|
||||
# Set stop state in Zookeeper
|
||||
zkhandler.write([(("node.state.daemon", config["node_hostname"]), "stop")])
|
||||
|
||||
# Close the Zookeeper connection
|
||||
try:
|
||||
zkhandler.disconnect(persistent=True)
|
||||
@ -1000,9 +1009,12 @@ def entrypoint():
|
||||
state="s",
|
||||
)
|
||||
|
||||
# Set up netstats
|
||||
netstats = NetstatsInstance.NetstatsInstance(logger, config, zkhandler, this_node)
|
||||
|
||||
# Start keepalived thread
|
||||
keepalive_timer = pvcnoded.util.keepalive.start_keepalive_timer(
|
||||
logger, config, zkhandler, this_node
|
||||
logger, config, zkhandler, this_node, netstats
|
||||
)
|
||||
|
||||
# Tick loop; does nothing since everything is async
|
||||
|
@ -333,12 +333,11 @@ class AXFRDaemonInstance(object):
|
||||
z = dns.zone.from_xfr(axfr)
|
||||
records_raw = [z[n].to_text(n) for n in z.nodes.keys()]
|
||||
except Exception as e:
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"{} {} ({})".format(e, dnsmasq_ip, domain),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"{} {} ({})".format(e, dnsmasq_ip, domain),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
continue
|
||||
|
||||
# Fix the formatting because it's useless
|
||||
@ -370,12 +369,11 @@ class AXFRDaemonInstance(object):
|
||||
"SELECT * FROM records WHERE domain_id=%s", (domain_id,)
|
||||
)
|
||||
results = list(sql_curs.fetchall())
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"SQL query results: {}".format(results),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"SQL query results: {}".format(results),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.out(
|
||||
"ERROR: Failed to obtain DNS records from database: {}".format(
|
||||
@ -388,12 +386,11 @@ class AXFRDaemonInstance(object):
|
||||
records_old = list()
|
||||
records_old_ids = list()
|
||||
if not results:
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"No results found, skipping.",
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"No results found, skipping.",
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
continue
|
||||
for record in results:
|
||||
# Skip the non-A
|
||||
@ -404,23 +401,21 @@ class AXFRDaemonInstance(object):
|
||||
r_data = record[4]
|
||||
# Assemble a list element in the same format as the AXFR data
|
||||
entry = "{} {} IN {} {}".format(r_name, r_ttl, r_type, r_data)
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Found record: {}".format(entry),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Found record: {}".format(entry),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
|
||||
# Skip non-A or AAAA records
|
||||
if r_type != "A" and r_type != "AAAA":
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
'Skipping record {}, not A or AAAA: "{}"'.format(
|
||||
entry, r_type
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
'Skipping record {}, not A or AAAA: "{}"'.format(
|
||||
entry, r_type
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
continue
|
||||
|
||||
records_old.append(entry)
|
||||
@ -429,17 +424,16 @@ class AXFRDaemonInstance(object):
|
||||
records_new.sort()
|
||||
records_old.sort()
|
||||
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"New: {}".format(records_new),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Old: {}".format(records_old),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"New: {}".format(records_new),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Old: {}".format(records_old),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
|
||||
# Find the differences between the lists
|
||||
# Basic check one: are they completely equal
|
||||
@ -450,17 +444,16 @@ class AXFRDaemonInstance(object):
|
||||
in_new_not_in_old = in_new - in_old
|
||||
in_old_not_in_new = in_old - in_new
|
||||
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"New but not old: {}".format(in_new_not_in_old),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Old but not new: {}".format(in_old_not_in_new),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"New but not old: {}".format(in_new_not_in_old),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Old but not new: {}".format(in_old_not_in_new),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
|
||||
# Go through the old list
|
||||
remove_records = list() # list of database IDs
|
||||
@ -487,12 +480,11 @@ class AXFRDaemonInstance(object):
|
||||
if len(remove_records) > 0:
|
||||
# Remove the invalid old records
|
||||
for record_id in remove_records:
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Removing record: {}".format(record_id),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Removing record: {}".format(record_id),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
sql_curs.execute(
|
||||
"DELETE FROM records WHERE id=%s", (record_id,)
|
||||
)
|
||||
@ -507,12 +499,11 @@ class AXFRDaemonInstance(object):
|
||||
r_ttl = record[1]
|
||||
r_type = record[3]
|
||||
r_data = record[4]
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Add record: {}".format(name),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Add record: {}".format(name),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
try:
|
||||
sql_curs.execute(
|
||||
"INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)",
|
||||
@ -520,23 +511,21 @@ class AXFRDaemonInstance(object):
|
||||
)
|
||||
changed = True
|
||||
except psycopg2.IntegrityError as e:
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Failed to add record due to {}: {}".format(
|
||||
e, name
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Failed to add record due to {}: {}".format(
|
||||
e, name
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
except psycopg2.errors.InFailedSqlTransaction as e:
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Failed to add record due to {}: {}".format(
|
||||
e, name
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Failed to add record due to {}: {}".format(
|
||||
e, name
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
|
||||
if changed:
|
||||
# Increase SOA serial
|
||||
@ -548,24 +537,22 @@ class AXFRDaemonInstance(object):
|
||||
current_serial = int(soa_record[2])
|
||||
new_serial = current_serial + 1
|
||||
soa_record[2] = str(new_serial)
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Records changed; bumping SOA: {}".format(new_serial),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Records changed; bumping SOA: {}".format(new_serial),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
sql_curs.execute(
|
||||
"UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'",
|
||||
(" ".join(soa_record), domain_id),
|
||||
)
|
||||
|
||||
# Commit all the previous changes
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Committing database changes and reloading PDNS",
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Committing database changes and reloading PDNS",
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
try:
|
||||
self.sql_conn.commit()
|
||||
except Exception as e:
|
||||
|
293
node-daemon/pvcnoded/objects/NetstatsInstance.py
Normal file
293
node-daemon/pvcnoded/objects/NetstatsInstance.py
Normal file
@ -0,0 +1,293 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# NetstatsInstance.py - Class implementing a PVC network stats gatherer and run by pvcnoded
|
||||
# Part of the Parallel Virtual Cluster (PVC) system
|
||||
#
|
||||
# Copyright (C) 2018-2023 Joshua M. Boniface <joshua@boniface.me>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, version 3.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
###############################################################################
|
||||
|
||||
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from collections import deque
|
||||
from json import dumps
|
||||
from os import walk
|
||||
from os.path import exists
|
||||
|
||||
|
||||
class NetstatsIfaceInstance(object):
|
||||
"""
|
||||
NetstatsIfaceInstance
|
||||
|
||||
This class implements a rolling statistics poller for a network interface,
|
||||
collecting stats on the bits and packets per second in both directions every
|
||||
second.
|
||||
|
||||
Via the get_stats() function, it returns the rolling average of all 4 values,
|
||||
as well as totals, over the last 5 seconds (self.avg_samples) as a tuple of:
|
||||
(rx_bps, rx_pps, tx_bps, tx_pps, total_bps, total_pps, link_speed, state)
|
||||
"""
|
||||
|
||||
def __init__(self, logger, iface, avg_samples):
|
||||
"""
|
||||
Initialize the class instance, creating our BackgroundScheduler, setting
|
||||
the average sample rate, and creating the deques and average values.
|
||||
"""
|
||||
self.logger = logger
|
||||
self.iface = iface
|
||||
|
||||
self.data_valid = False
|
||||
self.data_polls = 0
|
||||
|
||||
self.timer = BackgroundScheduler()
|
||||
self.timer.add_job(self.gather_stats, trigger="interval", seconds=1)
|
||||
|
||||
self.avg_samples = avg_samples
|
||||
|
||||
self.link_speed = 0
|
||||
self.state = "down"
|
||||
|
||||
self.rx_bits_rolling = deque(list(), self.avg_samples + 1)
|
||||
self.rx_bps = 0
|
||||
|
||||
self.rx_packets_rolling = deque(list(), self.avg_samples + 1)
|
||||
self.rx_pps = 0
|
||||
|
||||
self.tx_bits_rolling = deque(list(), self.avg_samples + 1)
|
||||
self.tx_bps = 0
|
||||
|
||||
self.tx_packets_rolling = deque(list(), self.avg_samples + 1)
|
||||
self.tx_pps = 0
|
||||
|
||||
self.total_bps = 0
|
||||
self.total_pps = 0
|
||||
|
||||
def get_iface_stats(self):
|
||||
"""
|
||||
Reads the interface statistics from the sysfs for the interface.
|
||||
"""
|
||||
iface_state_path = f"/sys/class/net/{self.iface}/operstate"
|
||||
with open(iface_state_path) as stfh:
|
||||
self.state = stfh.read().strip()
|
||||
|
||||
iface_speed_path = f"/sys/class/net/{self.iface}/speed"
|
||||
try:
|
||||
with open(iface_speed_path) as spfh:
|
||||
# The speed key is always in Mbps so multiply by 1000*1000 to get bps
|
||||
self.link_speed = int(spfh.read()) * 1000 * 1000
|
||||
except OSError:
|
||||
self.link_speed = 0
|
||||
|
||||
iface_stats_path = f"/sys/class/net/{self.iface}/statistics"
|
||||
with open(f"{iface_stats_path}/rx_bytes") as rxbfh:
|
||||
self.rx_bits_rolling.append(int(rxbfh.read()) * 8)
|
||||
with open(f"{iface_stats_path}/tx_bytes") as txbfh:
|
||||
self.tx_bits_rolling.append(int(txbfh.read()) * 8)
|
||||
with open(f"{iface_stats_path}/rx_packets") as rxpfh:
|
||||
self.rx_packets_rolling.append(int(rxpfh.read()) * 8)
|
||||
with open(f"{iface_stats_path}/tx_packets") as txpfh:
|
||||
self.tx_packets_rolling.append(int(txpfh.read()) * 8)
|
||||
|
||||
def calculate_averages(self):
|
||||
"""
|
||||
Calculates the bps/pps values from the rolling values.
|
||||
"""
|
||||
|
||||
rx_bits_diffs = list()
|
||||
for sample_idx in range(self.avg_samples, 0, -1):
|
||||
rx_bits_diffs.append(
|
||||
self.rx_bits_rolling[sample_idx] - self.rx_bits_rolling[sample_idx - 1]
|
||||
)
|
||||
self.rx_bps = int(sum(rx_bits_diffs) / self.avg_samples)
|
||||
|
||||
rx_packets_diffs = list()
|
||||
for sample_idx in range(self.avg_samples, 0, -1):
|
||||
rx_packets_diffs.append(
|
||||
self.rx_packets_rolling[sample_idx]
|
||||
- self.rx_packets_rolling[sample_idx - 1]
|
||||
)
|
||||
self.rx_pps = int(sum(rx_packets_diffs) / self.avg_samples)
|
||||
|
||||
tx_bits_diffs = list()
|
||||
for sample_idx in range(self.avg_samples, 0, -1):
|
||||
tx_bits_diffs.append(
|
||||
self.tx_bits_rolling[sample_idx] - self.tx_bits_rolling[sample_idx - 1]
|
||||
)
|
||||
self.tx_bps = int(sum(tx_bits_diffs) / self.avg_samples)
|
||||
|
||||
tx_packets_diffs = list()
|
||||
for sample_idx in range(self.avg_samples, 0, -1):
|
||||
tx_packets_diffs.append(
|
||||
self.tx_packets_rolling[sample_idx]
|
||||
- self.tx_packets_rolling[sample_idx - 1]
|
||||
)
|
||||
self.tx_pps = int(sum(tx_packets_diffs) / self.avg_samples)
|
||||
|
||||
self.total_bps = self.rx_bps + self.tx_bps
|
||||
self.total_pps = self.rx_pps + self.tx_pps
|
||||
|
||||
def gather_stats(self):
|
||||
"""
|
||||
Gathers the current stats and then calculates the averages.
|
||||
|
||||
Runs via the BackgroundScheduler timer every 1 second.
|
||||
"""
|
||||
self.get_iface_stats()
|
||||
if self.data_valid:
|
||||
self.calculate_averages()
|
||||
|
||||
# Handle data validity: our data is invalid until we hit enough polls
|
||||
# to make a valid average (avg_samples plus 1).
|
||||
if not self.data_valid:
|
||||
self.data_polls += 1
|
||||
if self.data_polls > self.avg_samples:
|
||||
self.data_valid = True
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Starts the timer.
|
||||
"""
|
||||
self.timer.start()
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stops the timer.
|
||||
"""
|
||||
self.timer.shutdown()
|
||||
|
||||
def get_stats(self):
|
||||
"""
|
||||
Returns a tuple of the current statistics.
|
||||
"""
|
||||
if not self.data_valid:
|
||||
return None
|
||||
|
||||
return (
|
||||
self.rx_bps,
|
||||
self.rx_pps,
|
||||
self.tx_bps,
|
||||
self.tx_pps,
|
||||
self.total_bps,
|
||||
self.total_pps,
|
||||
self.link_speed,
|
||||
self.state,
|
||||
)
|
||||
|
||||
|
||||
class NetstatsInstance(object):
|
||||
"""
|
||||
NetstatsInstance
|
||||
|
||||
This class implements a rolling statistics poller for all PHYSICAL network interfaces,
|
||||
on the system, initializing a NetstatsIfaceInstance for each, as well as handling
|
||||
value updates into Zookeeper.
|
||||
"""
|
||||
|
||||
def __init__(self, logger, config, zkhandler, this_node):
|
||||
"""
|
||||
Initialize the class instance.
|
||||
"""
|
||||
self.logger = logger
|
||||
self.config = config
|
||||
self.zkhandler = zkhandler
|
||||
self.node_name = this_node.name
|
||||
|
||||
self.interfaces = dict()
|
||||
|
||||
self.logger.out(
|
||||
f"Starting netstats collector ({self.config['keepalive_interval']} second interval)",
|
||||
state="s",
|
||||
)
|
||||
|
||||
self.set_interfaces()
|
||||
|
||||
def shutdown(self):
|
||||
"""
|
||||
Stop all pollers and delete the NetstatsIfaceInstance objects
|
||||
"""
|
||||
# Empty the network stats object
|
||||
self.zkhandler.write([(("node.network.stats", self.node_name), dumps({}))])
|
||||
|
||||
for iface in self.interfaces.keys():
|
||||
self.interfaces[iface].stop()
|
||||
|
||||
def set_interfaces(self):
|
||||
"""
|
||||
Sets the list of interfaces on the system, and then ensures that each
|
||||
interface has a NetstatsIfaceInstance assigned to it and polling.
|
||||
"""
|
||||
# Get a list of all active interfaces
|
||||
net_root_path = "/sys/class/net"
|
||||
all_ifaces = list()
|
||||
for (_, dirnames, _) in walk(net_root_path):
|
||||
all_ifaces.extend(dirnames)
|
||||
all_ifaces.sort()
|
||||
|
||||
self.logger.out(
|
||||
f"Parsing network list: {all_ifaces}", state="d", prefix="netstats-thread"
|
||||
)
|
||||
|
||||
# Add any missing interfaces
|
||||
for iface in all_ifaces:
|
||||
if not exists(f"{net_root_path}/{iface}/device"):
|
||||
# This is not a physical interface; skip it
|
||||
continue
|
||||
|
||||
if iface not in self.interfaces.keys():
|
||||
# Set the number of samples to be equal to the keepalive interval, so that each
|
||||
# keepalive has a fresh set of data from the last keepalive_interval seconds.
|
||||
self.interfaces[iface] = NetstatsIfaceInstance(
|
||||
self.logger, iface, self.config["keepalive_interval"]
|
||||
)
|
||||
self.interfaces[iface].start()
|
||||
# Remove any superfluous interfaces
|
||||
for iface in self.interfaces.keys():
|
||||
if iface not in all_ifaces:
|
||||
self.interfaces[iface].stop()
|
||||
del self.interfaces[iface]
|
||||
|
||||
def set_data(self):
|
||||
data = dict()
|
||||
for iface in self.interfaces.keys():
|
||||
self.logger.out(
|
||||
f"Getting data for interface {iface}",
|
||||
state="d",
|
||||
prefix="netstats-thread",
|
||||
)
|
||||
iface_stats = self.interfaces[iface].get_stats()
|
||||
if iface_stats is None:
|
||||
continue
|
||||
(
|
||||
iface_rx_bps,
|
||||
iface_rx_pps,
|
||||
iface_tx_bps,
|
||||
iface_tx_pps,
|
||||
iface_total_bps,
|
||||
iface_total_pps,
|
||||
iface_link_speed,
|
||||
iface_state,
|
||||
) = iface_stats
|
||||
data[iface] = {
|
||||
"rx_bps": iface_rx_bps,
|
||||
"rx_pps": iface_rx_pps,
|
||||
"tx_bps": iface_tx_bps,
|
||||
"tx_pps": iface_tx_pps,
|
||||
"total_bps": iface_total_bps,
|
||||
"total_pps": iface_total_pps,
|
||||
"link_speed": iface_link_speed,
|
||||
"state": iface_state,
|
||||
}
|
||||
|
||||
self.zkhandler.write([(("node.network.stats", self.node_name), dumps(data))])
|
@ -51,7 +51,7 @@ libvirt_vm_states = {
|
||||
}
|
||||
|
||||
|
||||
def start_keepalive_timer(logger, config, zkhandler, this_node):
|
||||
def start_keepalive_timer(logger, config, zkhandler, this_node, netstats):
|
||||
keepalive_interval = config["keepalive_interval"]
|
||||
logger.out(
|
||||
f"Starting keepalive timer ({keepalive_interval} second interval)", state="s"
|
||||
@ -59,7 +59,7 @@ def start_keepalive_timer(logger, config, zkhandler, this_node):
|
||||
keepalive_timer = BackgroundScheduler()
|
||||
keepalive_timer.add_job(
|
||||
node_keepalive,
|
||||
args=(logger, config, zkhandler, this_node),
|
||||
args=(logger, config, zkhandler, this_node, netstats),
|
||||
trigger="interval",
|
||||
seconds=keepalive_interval,
|
||||
)
|
||||
@ -80,9 +80,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
pool_list = zkhandler.children("base.pool")
|
||||
osd_list = zkhandler.children("base.osd")
|
||||
|
||||
debug = config["debug"]
|
||||
if debug:
|
||||
logger.out("Thread starting", state="d", prefix="ceph-thread")
|
||||
logger.out("Thread starting", state="d", prefix="ceph-thread")
|
||||
|
||||
# Connect to the Ceph cluster
|
||||
try:
|
||||
@ -90,8 +88,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
conffile=config["ceph_config_file"],
|
||||
conf=dict(keyring=config["ceph_admin_keyring"]),
|
||||
)
|
||||
if debug:
|
||||
logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
|
||||
logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
|
||||
ceph_conn.connect(timeout=1)
|
||||
except Exception as e:
|
||||
logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e")
|
||||
@ -100,12 +97,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
# Primary-only functions
|
||||
if this_node.coordinator_state == "primary":
|
||||
# Get Ceph status information (pretty)
|
||||
if debug:
|
||||
logger.out(
|
||||
"Set Ceph status information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Set Ceph status information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
|
||||
command = {"prefix": "status", "format": "pretty"}
|
||||
ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
|
||||
@ -117,12 +113,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
logger.out("Failed to set Ceph status data: {}".format(e), state="e")
|
||||
|
||||
# Get Ceph health information (JSON)
|
||||
if debug:
|
||||
logger.out(
|
||||
"Set Ceph health information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Set Ceph health information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
|
||||
command = {"prefix": "health", "format": "json"}
|
||||
ceph_health = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
|
||||
@ -134,12 +129,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
logger.out("Failed to set Ceph health data: {}".format(e), state="e")
|
||||
|
||||
# Get Ceph df information (pretty)
|
||||
if debug:
|
||||
logger.out(
|
||||
"Set Ceph rados df information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Set Ceph rados df information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
|
||||
# Get rados df info
|
||||
command = {"prefix": "df", "format": "pretty"}
|
||||
@ -151,12 +145,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
except Exception as e:
|
||||
logger.out("Failed to set Ceph utilization data: {}".format(e), state="e")
|
||||
|
||||
if debug:
|
||||
logger.out(
|
||||
"Set pool information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Set pool information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
|
||||
# Get pool info
|
||||
command = {"prefix": "df", "format": "json"}
|
||||
@ -179,12 +172,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
rados_pool_df_raw = []
|
||||
|
||||
pool_count = len(ceph_pool_df_raw)
|
||||
if debug:
|
||||
logger.out(
|
||||
"Getting info for {} pools".format(pool_count),
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Getting info for {} pools".format(pool_count),
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
for pool_idx in range(0, pool_count):
|
||||
try:
|
||||
# Combine all the data for this pool
|
||||
@ -195,22 +187,18 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
|
||||
# Ignore any pools that aren't in our pool list
|
||||
if pool["name"] not in pool_list:
|
||||
if debug:
|
||||
logger.out(
|
||||
"Pool {} not in pool list {}".format(
|
||||
pool["name"], pool_list
|
||||
),
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Pool {} not in pool list {}".format(pool["name"], pool_list),
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
continue
|
||||
else:
|
||||
if debug:
|
||||
logger.out(
|
||||
"Parsing data for pool {}".format(pool["name"]),
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Parsing data for pool {}".format(pool["name"]),
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
|
||||
# Assemble a useful data structure
|
||||
pool_df = {
|
||||
@ -248,8 +236,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
osds_this_node = 0
|
||||
if len(osd_list) > 0:
|
||||
# Get data from Ceph OSDs
|
||||
if debug:
|
||||
logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
|
||||
logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
|
||||
|
||||
# Parse the dump data
|
||||
osd_dump = dict()
|
||||
@ -264,8 +251,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
|
||||
osd_dump_raw = []
|
||||
|
||||
if debug:
|
||||
logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
|
||||
logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
|
||||
for osd in osd_dump_raw:
|
||||
osd_dump.update(
|
||||
{
|
||||
@ -279,8 +265,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
)
|
||||
|
||||
# Parse the df data
|
||||
if debug:
|
||||
logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
|
||||
logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
|
||||
|
||||
osd_df = dict()
|
||||
|
||||
@ -293,8 +278,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
|
||||
osd_df_raw = []
|
||||
|
||||
if debug:
|
||||
logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
|
||||
logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
|
||||
for osd in osd_df_raw:
|
||||
osd_df.update(
|
||||
{
|
||||
@ -316,8 +300,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
)
|
||||
|
||||
# Parse the status data
|
||||
if debug:
|
||||
logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
|
||||
logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
|
||||
|
||||
osd_status = dict()
|
||||
|
||||
@ -330,8 +313,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
logger.out("Failed to obtain OSD status data: {}".format(e), state="w")
|
||||
osd_status_raw = []
|
||||
|
||||
if debug:
|
||||
logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
|
||||
logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
|
||||
|
||||
for line in osd_status_raw.split("\n"):
|
||||
# Strip off colour
|
||||
@ -400,8 +382,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
)
|
||||
|
||||
# Merge them together into a single meaningful dict
|
||||
if debug:
|
||||
logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
|
||||
logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
|
||||
|
||||
osd_stats = dict()
|
||||
|
||||
@ -421,10 +402,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
|
||||
# Upload OSD data for the cluster (primary-only)
|
||||
if this_node.coordinator_state == "primary":
|
||||
if debug:
|
||||
logger.out(
|
||||
"Trigger updates for each OSD", state="d", prefix="ceph-thread"
|
||||
)
|
||||
logger.out("Trigger updates for each OSD", state="d", prefix="ceph-thread")
|
||||
|
||||
for osd in osd_list:
|
||||
try:
|
||||
@ -441,20 +419,16 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
|
||||
queue.put(osds_this_node)
|
||||
|
||||
if debug:
|
||||
logger.out("Thread finished", state="d", prefix="ceph-thread")
|
||||
logger.out("Thread finished", state="d", prefix="ceph-thread")
|
||||
|
||||
|
||||
# VM stats update function
|
||||
def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
debug = config["debug"]
|
||||
if debug:
|
||||
logger.out("Thread starting", state="d", prefix="vm-thread")
|
||||
logger.out("Thread starting", state="d", prefix="vm-thread")
|
||||
|
||||
# Connect to libvirt
|
||||
libvirt_name = "qemu:///system"
|
||||
if debug:
|
||||
logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
|
||||
logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
|
||||
try:
|
||||
lv_conn = libvirt.open(libvirt_name)
|
||||
if lv_conn is None:
|
||||
@ -467,12 +441,11 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
memprov = 0
|
||||
vcpualloc = 0
|
||||
# Toggle state management of dead VMs to restart them
|
||||
if debug:
|
||||
logger.out(
|
||||
"Toggle state management of dead VMs to restart them",
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Toggle state management of dead VMs to restart them",
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
# Make a copy of the d_domain; if not, and it changes in flight, this can fail
|
||||
fixed_d_domain = this_node.d_domain.copy()
|
||||
for domain, instance in fixed_d_domain.items():
|
||||
@ -518,12 +491,11 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
domain_name = domain.name()
|
||||
|
||||
# Get all the raw information about the VM
|
||||
if debug:
|
||||
logger.out(
|
||||
"Getting general statistics for VM {}".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Getting general statistics for VM {}".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
(
|
||||
domain_state,
|
||||
domain_maxmem,
|
||||
@ -537,29 +509,25 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
domain_memory_stats = domain.memoryStats()
|
||||
domain_cpu_stats = domain.getCPUStats(True)[0]
|
||||
except Exception as e:
|
||||
if debug:
|
||||
try:
|
||||
logger.out(
|
||||
"Failed getting VM information for {}: {}".format(
|
||||
domain.name(), e
|
||||
),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
logger.out(
|
||||
"Failed getting VM information for {}: {}".format(domain.name(), e),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
# Ensure VM is present in the domain_list
|
||||
if domain_uuid not in this_node.domain_list:
|
||||
this_node.domain_list.append(domain_uuid)
|
||||
|
||||
if debug:
|
||||
logger.out(
|
||||
"Getting disk statistics for VM {}".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Getting disk statistics for VM {}".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
domain_disk_stats = []
|
||||
try:
|
||||
for disk in tree.findall("devices/disk"):
|
||||
@ -578,23 +546,21 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
if debug:
|
||||
try:
|
||||
logger.out(
|
||||
"Failed getting disk stats for {}: {}".format(domain.name(), e),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
logger.out(
|
||||
"Failed getting disk stats for {}: {}".format(domain.name(), e),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
if debug:
|
||||
logger.out(
|
||||
"Getting network statistics for VM {}".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Getting network statistics for VM {}".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
domain_network_stats = []
|
||||
try:
|
||||
for interface in tree.findall("devices/interface"):
|
||||
@ -619,17 +585,14 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
if debug:
|
||||
try:
|
||||
logger.out(
|
||||
"Failed getting network stats for {}: {}".format(
|
||||
domain.name(), e
|
||||
),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
logger.out(
|
||||
"Failed getting network stats for {}: {}".format(domain.name(), e),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
# Create the final dictionary
|
||||
@ -645,48 +608,42 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
"net_stats": domain_network_stats,
|
||||
}
|
||||
|
||||
if debug:
|
||||
logger.out(
|
||||
"Writing statistics for VM {} to Zookeeper".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Writing statistics for VM {} to Zookeeper".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
|
||||
try:
|
||||
zkhandler.write(
|
||||
[(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))]
|
||||
)
|
||||
except Exception as e:
|
||||
if debug:
|
||||
logger.out(
|
||||
"Failed to write domain statistics: {}".format(e),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Failed to write domain statistics: {}".format(e),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
|
||||
# Close the Libvirt connection
|
||||
lv_conn.close()
|
||||
|
||||
if debug:
|
||||
logger.out(
|
||||
f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}",
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}",
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
|
||||
queue.put(len(running_domains))
|
||||
queue.put(memalloc)
|
||||
queue.put(memprov)
|
||||
queue.put(vcpualloc)
|
||||
|
||||
if debug:
|
||||
logger.out("Thread finished", state="d", prefix="vm-thread")
|
||||
logger.out("Thread finished", state="d", prefix="vm-thread")
|
||||
|
||||
|
||||
# Keepalive update function
|
||||
def node_keepalive(logger, config, zkhandler, this_node):
|
||||
debug = config["debug"]
|
||||
|
||||
def node_keepalive(logger, config, zkhandler, this_node, netstats):
|
||||
# Display node information to the terminal
|
||||
if config["log_keepalives"]:
|
||||
if this_node.coordinator_state == "primary":
|
||||
@ -746,10 +703,7 @@ def node_keepalive(logger, config, zkhandler, this_node):
|
||||
)
|
||||
|
||||
# Get past state and update if needed
|
||||
if debug:
|
||||
logger.out(
|
||||
"Get past state and update if needed", state="d", prefix="main-thread"
|
||||
)
|
||||
logger.out("Get past state and update if needed", state="d", prefix="main-thread")
|
||||
|
||||
past_state = zkhandler.read(("node.state.daemon", this_node.name))
|
||||
if past_state != "run" and past_state != "shutdown":
|
||||
@ -759,10 +713,9 @@ def node_keepalive(logger, config, zkhandler, this_node):
|
||||
this_node.daemon_state = "run"
|
||||
|
||||
# Ensure the primary key is properly set
|
||||
if debug:
|
||||
logger.out(
|
||||
"Ensure the primary key is properly set", state="d", prefix="main-thread"
|
||||
)
|
||||
logger.out(
|
||||
"Ensure the primary key is properly set", state="d", prefix="main-thread"
|
||||
)
|
||||
if this_node.coordinator_state == "primary":
|
||||
if zkhandler.read("base.config.primary_node") != this_node.name:
|
||||
zkhandler.write([("base.config.primary_node", this_node.name)])
|
||||
@ -793,6 +746,10 @@ def node_keepalive(logger, config, zkhandler, this_node):
|
||||
this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024)
|
||||
this_node.cpuload = round(os.getloadavg()[0], 2)
|
||||
|
||||
# Get node network statistics via netstats instance
|
||||
netstats.set_interfaces()
|
||||
netstats.set_data()
|
||||
|
||||
# Join against running threads
|
||||
if config["enable_hypervisor"]:
|
||||
vm_stats_thread.join(timeout=config["keepalive_interval"])
|
||||
@ -839,8 +796,7 @@ def node_keepalive(logger, config, zkhandler, this_node):
|
||||
|
||||
# Set our information in zookeeper
|
||||
keepalive_time = int(time.time())
|
||||
if debug:
|
||||
logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
|
||||
logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
|
||||
try:
|
||||
zkhandler.write(
|
||||
[
|
||||
@ -928,10 +884,9 @@ def node_keepalive(logger, config, zkhandler, this_node):
|
||||
|
||||
# Look for dead nodes and fence them
|
||||
if not this_node.maintenance:
|
||||
if debug:
|
||||
logger.out(
|
||||
"Look for dead nodes and fence them", state="d", prefix="main-thread"
|
||||
)
|
||||
logger.out(
|
||||
"Look for dead nodes and fence them", state="d", prefix="main-thread"
|
||||
)
|
||||
if config["daemon_mode"] == "coordinator":
|
||||
for node_name in zkhandler.children("base.node"):
|
||||
try:
|
||||
|
@ -44,7 +44,7 @@ from daemon_lib.vmbuilder import (
|
||||
)
|
||||
|
||||
# Daemon version
|
||||
version = "0.9.86"
|
||||
version = "0.9.87"
|
||||
|
||||
|
||||
config = cfg.get_configuration()
|
||||
|
Reference in New Issue
Block a user