Compare commits
9 Commits
d0de4f1825
...
v0.9.87
Author | SHA1 | Date | |
---|---|---|---|
8083b7a3e6 | |||
3346ce9bb0 | |||
572596c575 | |||
e654fbba08 | |||
52bf5ad0ef | |||
576afc1e94 | |||
4375f66793 | |||
3df3ca5b44 | |||
cb3c2cd86d |
@ -1,5 +1,13 @@
|
||||
## PVC Changelog
|
||||
|
||||
###### [v0.9.87](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.87)
|
||||
|
||||
* [API Daemon] Adds cluster Prometheus resource utilization metrics and an updated Grafana dashboard.
|
||||
* [Node Daemon] Adds network traffic rate calculation subsystem.
|
||||
* [All Daemons] Fixes a printing bug where newlines were not added atomically.
|
||||
* [CLI Client] Fixes a bug listing connections if no default is specified.
|
||||
* [All Daemons] Simplifies debug logging conditionals by moving into the Logger instance itself.
|
||||
|
||||
###### [v0.9.86](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.86)
|
||||
|
||||
* [API Daemon] Significantly improves the performance of several commands via async Zookeeper calls and removal of superfluous backend calls.
|
||||
|
@ -27,7 +27,7 @@ from distutils.util import strtobool as dustrtobool
|
||||
import daemon_lib.config as cfg
|
||||
|
||||
# Daemon version
|
||||
version = "0.9.86"
|
||||
version = "0.9.87"
|
||||
|
||||
# API version
|
||||
API_VERSION = 1.0
|
||||
|
@ -6116,13 +6116,14 @@ def cli(
|
||||
else:
|
||||
CLI_CONFIG = get_config(store_data, _connection)
|
||||
|
||||
CLI_CONFIG["store_path"] = store_path
|
||||
|
||||
if not CLI_CONFIG.get("badcfg", None):
|
||||
CLI_CONFIG["debug"] = _debug
|
||||
CLI_CONFIG["unsafe"] = _unsafe
|
||||
CLI_CONFIG["colour"] = _colour
|
||||
CLI_CONFIG["quiet"] = _quiet
|
||||
CLI_CONFIG["silent"] = _silent
|
||||
CLI_CONFIG["store_path"] = store_path
|
||||
|
||||
audit()
|
||||
|
||||
|
@ -2,7 +2,7 @@ from setuptools import setup
|
||||
|
||||
setup(
|
||||
name="pvc",
|
||||
version="0.9.86",
|
||||
version="0.9.87",
|
||||
packages=["pvc.cli", "pvc.lib"],
|
||||
install_requires=[
|
||||
"Click",
|
||||
|
@ -878,7 +878,7 @@ def get_resource_metrics(zkhandler):
|
||||
)
|
||||
output_lines.append("# TYPE pvc_node_network_utilization gauge")
|
||||
for node in node_data:
|
||||
used_network_percentage = per_node_network_utilization[node["name"]]
|
||||
used_network_percentage = per_node_network_utilization.get(node["name"], 0)
|
||||
output_lines.append(
|
||||
f"pvc_node_network_utilization{{node=\"{node['name']}\"}} {used_network_percentage:2.2f}"
|
||||
)
|
||||
@ -1117,11 +1117,7 @@ def get_resource_metrics(zkhandler):
|
||||
output_lines.append("# HELP pvc_vm_memory_stats_unused PVC VM unused memory")
|
||||
output_lines.append("# TYPE pvc_vm_memory_stats_unused gauge")
|
||||
for vm in vm_data:
|
||||
unused_memory = (
|
||||
vm["memory_stats"]["unused"]
|
||||
if vm["memory_stats"].get("unused") is not None
|
||||
else 0
|
||||
)
|
||||
unused_memory = vm["memory_stats"].get("unused", 0)
|
||||
output_lines.append(
|
||||
f"pvc_vm_memory_stats_unused{{vm=\"{vm['name']}\"}} {unused_memory}"
|
||||
)
|
||||
@ -1129,11 +1125,7 @@ def get_resource_metrics(zkhandler):
|
||||
output_lines.append("# HELP pvc_vm_memory_stats_available PVC VM available memory")
|
||||
output_lines.append("# TYPE pvc_vm_memory_stats_available gauge")
|
||||
for vm in vm_data:
|
||||
available_memory = (
|
||||
vm["memory_stats"]["available"]
|
||||
if vm["memory_stats"].get("available") is not None
|
||||
else 0
|
||||
)
|
||||
available_memory = vm["memory_stats"].get("available", 0)
|
||||
output_lines.append(
|
||||
f"pvc_vm_memory_stats_available{{vm=\"{vm['name']}\"}} {available_memory}"
|
||||
)
|
||||
@ -1141,11 +1133,7 @@ def get_resource_metrics(zkhandler):
|
||||
output_lines.append("# HELP pvc_vm_memory_stats_usable PVC VM usable memory")
|
||||
output_lines.append("# TYPE pvc_vm_memory_stats_usable gauge")
|
||||
for vm in vm_data:
|
||||
usable_memory = (
|
||||
vm["memory_stats"]["usable"]
|
||||
if vm["memory_stats"].get("usable") is not None
|
||||
else 0
|
||||
)
|
||||
usable_memory = vm["memory_stats"].get("usable", 0)
|
||||
output_lines.append(
|
||||
f"pvc_vm_memory_stats_usable{{vm=\"{vm['name']}\"}} {usable_memory}"
|
||||
)
|
||||
@ -1155,11 +1143,7 @@ def get_resource_metrics(zkhandler):
|
||||
)
|
||||
output_lines.append("# TYPE pvc_vm_memory_stats_disk_caches gauge")
|
||||
for vm in vm_data:
|
||||
disk_caches_memory = (
|
||||
vm["memory_stats"]["disk_caches"]
|
||||
if vm["memory_stats"].get("disk_caches") is not None
|
||||
else 0
|
||||
)
|
||||
disk_caches_memory = vm["memory_stats"].get("disk_caches", 0)
|
||||
output_lines.append(
|
||||
f"pvc_vm_memory_stats_disk_caches{{vm=\"{vm['name']}\"}} {disk_caches_memory}"
|
||||
)
|
||||
@ -1167,11 +1151,7 @@ def get_resource_metrics(zkhandler):
|
||||
output_lines.append("# HELP pvc_vm_memory_swap_in PVC VM memory swap in")
|
||||
output_lines.append("# TYPE pvc_vm_memory_swap_in gauge")
|
||||
for vm in vm_data:
|
||||
swap_in_memory = (
|
||||
vm["memory_stats"]["swap_in"]
|
||||
if vm["memory_stats"].get("swap_in") is not None
|
||||
else 0
|
||||
)
|
||||
swap_in_memory = vm["memory_stats"].get("swap_in", 0)
|
||||
output_lines.append(
|
||||
f"pvc_vm_memory_stats_swap_in{{vm=\"{vm['name']}\"}} {swap_in_memory}"
|
||||
)
|
||||
@ -1179,11 +1159,7 @@ def get_resource_metrics(zkhandler):
|
||||
output_lines.append("# HELP pvc_vm_memory_swap_out PVC VM memory swap out")
|
||||
output_lines.append("# TYPE pvc_vm_memory_swap_out gauge")
|
||||
for vm in vm_data:
|
||||
swap_out_memory = (
|
||||
vm["memory_stats"]["swap_out"]
|
||||
if vm["memory_stats"].get("swap_out") is not None
|
||||
else 0
|
||||
)
|
||||
swap_out_memory = vm["memory_stats"].get("swap_out", 0)
|
||||
output_lines.append(
|
||||
f"pvc_vm_memory_stats_swap_out{{vm=\"{vm['name']}\"}} {swap_out_memory}"
|
||||
)
|
||||
@ -1191,11 +1167,7 @@ def get_resource_metrics(zkhandler):
|
||||
output_lines.append("# HELP pvc_vm_memory_major_fault PVC VM memory major faults")
|
||||
output_lines.append("# TYPE pvc_vm_memory_major_fault gauge")
|
||||
for vm in vm_data:
|
||||
major_fault_memory = (
|
||||
vm["memory_stats"]["major_fault"]
|
||||
if vm["memory_stats"].get("major_fault") is not None
|
||||
else 0
|
||||
)
|
||||
major_fault_memory = vm["memory_stats"].get("major_fault", 0)
|
||||
output_lines.append(
|
||||
f"pvc_vm_memory_stats_major_fault{{vm=\"{vm['name']}\"}} {major_fault_memory}"
|
||||
)
|
||||
@ -1203,11 +1175,7 @@ def get_resource_metrics(zkhandler):
|
||||
output_lines.append("# HELP pvc_vm_memory_minor_fault PVC VM memory minor faults")
|
||||
output_lines.append("# TYPE pvc_vm_memory_minor_fault gauge")
|
||||
for vm in vm_data:
|
||||
minor_fault_memory = (
|
||||
vm["memory_stats"]["minor_fault"]
|
||||
if vm["memory_stats"].get("minor_fault") is not None
|
||||
else 0
|
||||
)
|
||||
minor_fault_memory = vm["memory_stats"].get("minor_fault", 0)
|
||||
output_lines.append(
|
||||
f"pvc_vm_memory_stats_minor_fault{{vm=\"{vm['name']}\"}} {minor_fault_memory}"
|
||||
)
|
||||
@ -1217,11 +1185,7 @@ def get_resource_metrics(zkhandler):
|
||||
)
|
||||
output_lines.append("# TYPE pvc_vm_memory_hugetlb_pgalloc gauge")
|
||||
for vm in vm_data:
|
||||
hugetlb_pgalloc_memory = (
|
||||
vm["memory_stats"]["hugetlb_pgalloc"]
|
||||
if vm["memory_stats"].get("hugetlb_pgalloc") is not None
|
||||
else 0
|
||||
)
|
||||
hugetlb_pgalloc_memory = vm["memory_stats"].get("hugetlb_pgalloc", 0)
|
||||
output_lines.append(
|
||||
f"pvc_vm_memory_stats_hugetlb_pgalloc{{vm=\"{vm['name']}\"}} {hugetlb_pgalloc_memory}"
|
||||
)
|
||||
@ -1231,11 +1195,7 @@ def get_resource_metrics(zkhandler):
|
||||
)
|
||||
output_lines.append("# TYPE pvc_vm_memory_hugetlb_pgfail gauge")
|
||||
for vm in vm_data:
|
||||
hugetlb_pgfail_memory = (
|
||||
vm["memory_stats"]["hugetlb_pgfail"]
|
||||
if vm["memory_stats"].get("hugetlb_pgfail") is not None
|
||||
else 0
|
||||
)
|
||||
hugetlb_pgfail_memory = vm["memory_stats"].get("hugetlb_pgfail", 0)
|
||||
output_lines.append(
|
||||
f"pvc_vm_memory_stats_hugetlb_pgfail{{vm=\"{vm['name']}\"}} {hugetlb_pgfail_memory}"
|
||||
)
|
||||
@ -1454,58 +1414,58 @@ def get_resource_metrics(zkhandler):
|
||||
continue
|
||||
output_lines.append(f"pvc_ceph_osd_size{{osd=\"{osd['id']}\"}} {osd_size}")
|
||||
|
||||
output_lines.append("# HELP pvc_ceph_osd_used PVC OSD used KB")
|
||||
output_lines.append("# HELP pvc_ceph_osd_used PVC OSD used bytes")
|
||||
output_lines.append("# TYPE pvc_ceph_osd_used gauge")
|
||||
for osd in osd_data:
|
||||
try:
|
||||
osd_used = osd["stats"]["kb_used"]
|
||||
osd_used = osd["stats"]["kb_used"] * 1024
|
||||
except Exception:
|
||||
continue
|
||||
output_lines.append(f"pvc_ceph_osd_used{{osd=\"{osd['id']}\"}} {osd_used}")
|
||||
|
||||
output_lines.append("# HELP pvc_ceph_osd_used_data PVC OSD used (data) KB")
|
||||
output_lines.append("# HELP pvc_ceph_osd_used_data PVC OSD used (data) bytes")
|
||||
output_lines.append("# TYPE pvc_ceph_osd_used_data gauge")
|
||||
for osd in osd_data:
|
||||
try:
|
||||
osd_used_data = osd["stats"]["kb_used_data"]
|
||||
osd_used_data = osd["stats"]["kb_used_data"] * 1024
|
||||
except Exception:
|
||||
continue
|
||||
output_lines.append(
|
||||
f"pvc_ceph_osd_used_data{{osd=\"{osd['id']}\"}} {osd_used_data}"
|
||||
)
|
||||
|
||||
output_lines.append("# HELP pvc_ceph_osd_used_omap PVC OSD used (omap) KB")
|
||||
output_lines.append("# HELP pvc_ceph_osd_used_omap PVC OSD used (omap) bytes")
|
||||
output_lines.append("# TYPE pvc_ceph_osd_used_omap gauge")
|
||||
for osd in osd_data:
|
||||
try:
|
||||
osd_used_omap = osd["stats"]["kb_used_omap"]
|
||||
osd_used_omap = osd["stats"]["kb_used_omap"] * 1024
|
||||
except Exception:
|
||||
continue
|
||||
output_lines.append(
|
||||
f"pvc_ceph_osd_used_omap{{osd=\"{osd['id']}\"}} {osd_used_omap}"
|
||||
)
|
||||
|
||||
output_lines.append("# HELP pvc_ceph_osd_used_meta PVC OSD used (meta) KB")
|
||||
output_lines.append("# HELP pvc_ceph_osd_used_meta PVC OSD used (meta) bytes")
|
||||
output_lines.append("# TYPE pvc_ceph_osd_used_meta gauge")
|
||||
for osd in osd_data:
|
||||
try:
|
||||
osd_used_meta = osd["stats"]["kb_used_meta"]
|
||||
osd_used_meta = osd["stats"]["kb_used_meta"] * 1024
|
||||
except Exception:
|
||||
continue
|
||||
output_lines.append(
|
||||
f"pvc_ceph_osd_used_meta{{osd=\"{osd['id']}\"}} {osd_used_meta}"
|
||||
)
|
||||
|
||||
output_lines.append("# HELP pvc_ceph_osd_avail PVC OSD available KB")
|
||||
output_lines.append("# HELP pvc_ceph_osd_avail PVC OSD available bytes")
|
||||
output_lines.append("# TYPE pvc_ceph_osd_avail gauge")
|
||||
for osd in osd_data:
|
||||
try:
|
||||
osd_avail = osd["stats"]["kb_avail"]
|
||||
osd_avail = osd["stats"]["kb_avail"] * 1024
|
||||
except Exception:
|
||||
continue
|
||||
output_lines.append(f"pvc_ceph_osd_avail{{osd=\"{osd['id']}\"}} {osd_avail}")
|
||||
|
||||
output_lines.append("# HELP pvc_ceph_osd_weight PVC OSD weight KB")
|
||||
output_lines.append("# HELP pvc_ceph_osd_weight PVC OSD weight")
|
||||
output_lines.append("# TYPE pvc_ceph_osd_weight gauge")
|
||||
for osd in osd_data:
|
||||
try:
|
||||
@ -1514,7 +1474,7 @@ def get_resource_metrics(zkhandler):
|
||||
continue
|
||||
output_lines.append(f"pvc_ceph_osd_weight{{osd=\"{osd['id']}\"}} {osd_weight}")
|
||||
|
||||
output_lines.append("# HELP pvc_ceph_osd_reweight PVC OSD reweight KB")
|
||||
output_lines.append("# HELP pvc_ceph_osd_reweight PVC OSD reweight")
|
||||
output_lines.append("# TYPE pvc_ceph_osd_reweight gauge")
|
||||
for osd in osd_data:
|
||||
try:
|
||||
|
@ -53,6 +53,10 @@ node_state_combinations = [
|
||||
"init,flush",
|
||||
"init,flushed",
|
||||
"init,unflush",
|
||||
"shutdown,ready",
|
||||
"shutdown,flush",
|
||||
"shutdown,flushed",
|
||||
"shutdown,unflush",
|
||||
"stop,ready",
|
||||
"stop,flush",
|
||||
"stop,flushed",
|
||||
|
@ -115,6 +115,10 @@ class Logger(object):
|
||||
|
||||
# Output function
|
||||
def out(self, message, state=None, prefix=""):
|
||||
# Only handle d-state (debug) messages if we're in debug mode
|
||||
if state in ["d"] and not self.config["debug"]:
|
||||
return
|
||||
|
||||
# Get the date
|
||||
if self.config["log_dates"]:
|
||||
date = "{} ".format(datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f"))
|
||||
|
10
debian/changelog
vendored
10
debian/changelog
vendored
@ -1,3 +1,13 @@
|
||||
pvc (0.9.87-0) unstable; urgency=high
|
||||
|
||||
* [API Daemon] Adds cluster Prometheus resource utilization metrics and an updated Grafana dashboard.
|
||||
* [Node Daemon] Adds network traffic rate calculation subsystem.
|
||||
* [All Daemons] Fixes a printing bug where newlines were not added atomically.
|
||||
* [CLI Client] Fixes a bug listing connections if no default is specified.
|
||||
* [All Daemons] Simplifies debug logging conditionals by moving into the Logger instance itself.
|
||||
|
||||
-- Joshua M. Boniface <joshua@boniface.me> Wed, 27 Dec 2023 13:40:51 -0500
|
||||
|
||||
pvc (0.9.86-0) unstable; urgency=high
|
||||
|
||||
* [API Daemon] Significantly improves the performance of several commands via async Zookeeper calls and removal of superfluous backend calls.
|
||||
|
@ -33,7 +33,7 @@ import os
|
||||
import signal
|
||||
|
||||
# Daemon version
|
||||
version = "0.9.86"
|
||||
version = "0.9.87"
|
||||
|
||||
|
||||
##########################################################
|
||||
|
@ -157,9 +157,6 @@ class MonitoringPlugin(object):
|
||||
"w": warning
|
||||
"e": error
|
||||
"""
|
||||
if state == "d" and not self.config["debug"]:
|
||||
return
|
||||
|
||||
self.logger.out(message, state=state, prefix=self.plugin_name)
|
||||
|
||||
#
|
||||
@ -523,11 +520,10 @@ class MonitoringInstance(object):
|
||||
|
||||
entries = fault_data["entries"]()
|
||||
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
f"Entries for fault check {fault_type}: {dumps(entries)}",
|
||||
state="d",
|
||||
)
|
||||
self.logger.out(
|
||||
f"Entries for fault check {fault_type}: {dumps(entries)}",
|
||||
state="d",
|
||||
)
|
||||
|
||||
for _entry in entries:
|
||||
entry = _entry["entry"]
|
||||
@ -699,7 +695,7 @@ class MonitoringInstance(object):
|
||||
health_text = f"{health_colour}{self.this_node.health}%{self.logger.fmt_end} node health"
|
||||
result_text.append(health_text)
|
||||
else:
|
||||
health_text = "{self.logger.fmt_blue}N/A{self.logger.fmt_end} node health"
|
||||
health_text = f"{self.logger.fmt_blue}N/A{self.logger.fmt_end} node health"
|
||||
result_text.append(health_text)
|
||||
|
||||
self.logger.out(
|
||||
|
7022
monitoring/prometheus/grafana-pvc-cluster-dashboard.json
Normal file
7022
monitoring/prometheus/grafana-pvc-cluster-dashboard.json
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -49,7 +49,7 @@ import re
|
||||
import json
|
||||
|
||||
# Daemon version
|
||||
version = "0.9.86"
|
||||
version = "0.9.87"
|
||||
|
||||
|
||||
##########################################################
|
||||
|
@ -333,12 +333,11 @@ class AXFRDaemonInstance(object):
|
||||
z = dns.zone.from_xfr(axfr)
|
||||
records_raw = [z[n].to_text(n) for n in z.nodes.keys()]
|
||||
except Exception as e:
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"{} {} ({})".format(e, dnsmasq_ip, domain),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"{} {} ({})".format(e, dnsmasq_ip, domain),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
continue
|
||||
|
||||
# Fix the formatting because it's useless
|
||||
@ -370,12 +369,11 @@ class AXFRDaemonInstance(object):
|
||||
"SELECT * FROM records WHERE domain_id=%s", (domain_id,)
|
||||
)
|
||||
results = list(sql_curs.fetchall())
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"SQL query results: {}".format(results),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"SQL query results: {}".format(results),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.out(
|
||||
"ERROR: Failed to obtain DNS records from database: {}".format(
|
||||
@ -388,12 +386,11 @@ class AXFRDaemonInstance(object):
|
||||
records_old = list()
|
||||
records_old_ids = list()
|
||||
if not results:
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"No results found, skipping.",
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"No results found, skipping.",
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
continue
|
||||
for record in results:
|
||||
# Skip the non-A
|
||||
@ -404,23 +401,21 @@ class AXFRDaemonInstance(object):
|
||||
r_data = record[4]
|
||||
# Assemble a list element in the same format as the AXFR data
|
||||
entry = "{} {} IN {} {}".format(r_name, r_ttl, r_type, r_data)
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Found record: {}".format(entry),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Found record: {}".format(entry),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
|
||||
# Skip non-A or AAAA records
|
||||
if r_type != "A" and r_type != "AAAA":
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
'Skipping record {}, not A or AAAA: "{}"'.format(
|
||||
entry, r_type
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
'Skipping record {}, not A or AAAA: "{}"'.format(
|
||||
entry, r_type
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
continue
|
||||
|
||||
records_old.append(entry)
|
||||
@ -429,17 +424,16 @@ class AXFRDaemonInstance(object):
|
||||
records_new.sort()
|
||||
records_old.sort()
|
||||
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"New: {}".format(records_new),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Old: {}".format(records_old),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"New: {}".format(records_new),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Old: {}".format(records_old),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
|
||||
# Find the differences between the lists
|
||||
# Basic check one: are they completely equal
|
||||
@ -450,17 +444,16 @@ class AXFRDaemonInstance(object):
|
||||
in_new_not_in_old = in_new - in_old
|
||||
in_old_not_in_new = in_old - in_new
|
||||
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"New but not old: {}".format(in_new_not_in_old),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Old but not new: {}".format(in_old_not_in_new),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"New but not old: {}".format(in_new_not_in_old),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Old but not new: {}".format(in_old_not_in_new),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
|
||||
# Go through the old list
|
||||
remove_records = list() # list of database IDs
|
||||
@ -487,12 +480,11 @@ class AXFRDaemonInstance(object):
|
||||
if len(remove_records) > 0:
|
||||
# Remove the invalid old records
|
||||
for record_id in remove_records:
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Removing record: {}".format(record_id),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Removing record: {}".format(record_id),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
sql_curs.execute(
|
||||
"DELETE FROM records WHERE id=%s", (record_id,)
|
||||
)
|
||||
@ -507,12 +499,11 @@ class AXFRDaemonInstance(object):
|
||||
r_ttl = record[1]
|
||||
r_type = record[3]
|
||||
r_data = record[4]
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Add record: {}".format(name),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Add record: {}".format(name),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
try:
|
||||
sql_curs.execute(
|
||||
"INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)",
|
||||
@ -520,23 +511,21 @@ class AXFRDaemonInstance(object):
|
||||
)
|
||||
changed = True
|
||||
except psycopg2.IntegrityError as e:
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Failed to add record due to {}: {}".format(
|
||||
e, name
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Failed to add record due to {}: {}".format(
|
||||
e, name
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
except psycopg2.errors.InFailedSqlTransaction as e:
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Failed to add record due to {}: {}".format(
|
||||
e, name
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Failed to add record due to {}: {}".format(
|
||||
e, name
|
||||
),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
|
||||
if changed:
|
||||
# Increase SOA serial
|
||||
@ -548,24 +537,22 @@ class AXFRDaemonInstance(object):
|
||||
current_serial = int(soa_record[2])
|
||||
new_serial = current_serial + 1
|
||||
soa_record[2] = str(new_serial)
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Records changed; bumping SOA: {}".format(new_serial),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Records changed; bumping SOA: {}".format(new_serial),
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
sql_curs.execute(
|
||||
"UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'",
|
||||
(" ".join(soa_record), domain_id),
|
||||
)
|
||||
|
||||
# Commit all the previous changes
|
||||
if self.config["debug"]:
|
||||
self.logger.out(
|
||||
"Committing database changes and reloading PDNS",
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
self.logger.out(
|
||||
"Committing database changes and reloading PDNS",
|
||||
state="d",
|
||||
prefix="dns-aggregator",
|
||||
)
|
||||
try:
|
||||
self.sql_conn.commit()
|
||||
except Exception as e:
|
||||
|
@ -80,9 +80,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
pool_list = zkhandler.children("base.pool")
|
||||
osd_list = zkhandler.children("base.osd")
|
||||
|
||||
debug = config["debug"]
|
||||
if debug:
|
||||
logger.out("Thread starting", state="d", prefix="ceph-thread")
|
||||
logger.out("Thread starting", state="d", prefix="ceph-thread")
|
||||
|
||||
# Connect to the Ceph cluster
|
||||
try:
|
||||
@ -90,8 +88,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
conffile=config["ceph_config_file"],
|
||||
conf=dict(keyring=config["ceph_admin_keyring"]),
|
||||
)
|
||||
if debug:
|
||||
logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
|
||||
logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
|
||||
ceph_conn.connect(timeout=1)
|
||||
except Exception as e:
|
||||
logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e")
|
||||
@ -100,12 +97,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
# Primary-only functions
|
||||
if this_node.coordinator_state == "primary":
|
||||
# Get Ceph status information (pretty)
|
||||
if debug:
|
||||
logger.out(
|
||||
"Set Ceph status information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Set Ceph status information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
|
||||
command = {"prefix": "status", "format": "pretty"}
|
||||
ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
|
||||
@ -117,12 +113,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
logger.out("Failed to set Ceph status data: {}".format(e), state="e")
|
||||
|
||||
# Get Ceph health information (JSON)
|
||||
if debug:
|
||||
logger.out(
|
||||
"Set Ceph health information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Set Ceph health information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
|
||||
command = {"prefix": "health", "format": "json"}
|
||||
ceph_health = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
|
||||
@ -134,12 +129,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
logger.out("Failed to set Ceph health data: {}".format(e), state="e")
|
||||
|
||||
# Get Ceph df information (pretty)
|
||||
if debug:
|
||||
logger.out(
|
||||
"Set Ceph rados df information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Set Ceph rados df information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
|
||||
# Get rados df info
|
||||
command = {"prefix": "df", "format": "pretty"}
|
||||
@ -151,12 +145,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
except Exception as e:
|
||||
logger.out("Failed to set Ceph utilization data: {}".format(e), state="e")
|
||||
|
||||
if debug:
|
||||
logger.out(
|
||||
"Set pool information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Set pool information in zookeeper (primary only)",
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
|
||||
# Get pool info
|
||||
command = {"prefix": "df", "format": "json"}
|
||||
@ -179,12 +172,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
rados_pool_df_raw = []
|
||||
|
||||
pool_count = len(ceph_pool_df_raw)
|
||||
if debug:
|
||||
logger.out(
|
||||
"Getting info for {} pools".format(pool_count),
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Getting info for {} pools".format(pool_count),
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
for pool_idx in range(0, pool_count):
|
||||
try:
|
||||
# Combine all the data for this pool
|
||||
@ -195,22 +187,18 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
|
||||
# Ignore any pools that aren't in our pool list
|
||||
if pool["name"] not in pool_list:
|
||||
if debug:
|
||||
logger.out(
|
||||
"Pool {} not in pool list {}".format(
|
||||
pool["name"], pool_list
|
||||
),
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Pool {} not in pool list {}".format(pool["name"], pool_list),
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
continue
|
||||
else:
|
||||
if debug:
|
||||
logger.out(
|
||||
"Parsing data for pool {}".format(pool["name"]),
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Parsing data for pool {}".format(pool["name"]),
|
||||
state="d",
|
||||
prefix="ceph-thread",
|
||||
)
|
||||
|
||||
# Assemble a useful data structure
|
||||
pool_df = {
|
||||
@ -248,8 +236,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
osds_this_node = 0
|
||||
if len(osd_list) > 0:
|
||||
# Get data from Ceph OSDs
|
||||
if debug:
|
||||
logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
|
||||
logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
|
||||
|
||||
# Parse the dump data
|
||||
osd_dump = dict()
|
||||
@ -264,8 +251,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
|
||||
osd_dump_raw = []
|
||||
|
||||
if debug:
|
||||
logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
|
||||
logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
|
||||
for osd in osd_dump_raw:
|
||||
osd_dump.update(
|
||||
{
|
||||
@ -279,8 +265,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
)
|
||||
|
||||
# Parse the df data
|
||||
if debug:
|
||||
logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
|
||||
logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
|
||||
|
||||
osd_df = dict()
|
||||
|
||||
@ -293,8 +278,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
|
||||
osd_df_raw = []
|
||||
|
||||
if debug:
|
||||
logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
|
||||
logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
|
||||
for osd in osd_df_raw:
|
||||
osd_df.update(
|
||||
{
|
||||
@ -316,8 +300,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
)
|
||||
|
||||
# Parse the status data
|
||||
if debug:
|
||||
logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
|
||||
logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
|
||||
|
||||
osd_status = dict()
|
||||
|
||||
@ -330,8 +313,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
logger.out("Failed to obtain OSD status data: {}".format(e), state="w")
|
||||
osd_status_raw = []
|
||||
|
||||
if debug:
|
||||
logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
|
||||
logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
|
||||
|
||||
for line in osd_status_raw.split("\n"):
|
||||
# Strip off colour
|
||||
@ -400,8 +382,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
)
|
||||
|
||||
# Merge them together into a single meaningful dict
|
||||
if debug:
|
||||
logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
|
||||
logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
|
||||
|
||||
osd_stats = dict()
|
||||
|
||||
@ -421,10 +402,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
|
||||
# Upload OSD data for the cluster (primary-only)
|
||||
if this_node.coordinator_state == "primary":
|
||||
if debug:
|
||||
logger.out(
|
||||
"Trigger updates for each OSD", state="d", prefix="ceph-thread"
|
||||
)
|
||||
logger.out("Trigger updates for each OSD", state="d", prefix="ceph-thread")
|
||||
|
||||
for osd in osd_list:
|
||||
try:
|
||||
@ -441,20 +419,16 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
|
||||
|
||||
queue.put(osds_this_node)
|
||||
|
||||
if debug:
|
||||
logger.out("Thread finished", state="d", prefix="ceph-thread")
|
||||
logger.out("Thread finished", state="d", prefix="ceph-thread")
|
||||
|
||||
|
||||
# VM stats update function
|
||||
def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
debug = config["debug"]
|
||||
if debug:
|
||||
logger.out("Thread starting", state="d", prefix="vm-thread")
|
||||
logger.out("Thread starting", state="d", prefix="vm-thread")
|
||||
|
||||
# Connect to libvirt
|
||||
libvirt_name = "qemu:///system"
|
||||
if debug:
|
||||
logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
|
||||
logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
|
||||
try:
|
||||
lv_conn = libvirt.open(libvirt_name)
|
||||
if lv_conn is None:
|
||||
@ -467,12 +441,11 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
memprov = 0
|
||||
vcpualloc = 0
|
||||
# Toggle state management of dead VMs to restart them
|
||||
if debug:
|
||||
logger.out(
|
||||
"Toggle state management of dead VMs to restart them",
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Toggle state management of dead VMs to restart them",
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
# Make a copy of the d_domain; if not, and it changes in flight, this can fail
|
||||
fixed_d_domain = this_node.d_domain.copy()
|
||||
for domain, instance in fixed_d_domain.items():
|
||||
@ -518,12 +491,11 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
domain_name = domain.name()
|
||||
|
||||
# Get all the raw information about the VM
|
||||
if debug:
|
||||
logger.out(
|
||||
"Getting general statistics for VM {}".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Getting general statistics for VM {}".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
(
|
||||
domain_state,
|
||||
domain_maxmem,
|
||||
@ -537,29 +509,25 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
domain_memory_stats = domain.memoryStats()
|
||||
domain_cpu_stats = domain.getCPUStats(True)[0]
|
||||
except Exception as e:
|
||||
if debug:
|
||||
try:
|
||||
logger.out(
|
||||
"Failed getting VM information for {}: {}".format(
|
||||
domain.name(), e
|
||||
),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
logger.out(
|
||||
"Failed getting VM information for {}: {}".format(domain.name(), e),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
# Ensure VM is present in the domain_list
|
||||
if domain_uuid not in this_node.domain_list:
|
||||
this_node.domain_list.append(domain_uuid)
|
||||
|
||||
if debug:
|
||||
logger.out(
|
||||
"Getting disk statistics for VM {}".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Getting disk statistics for VM {}".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
domain_disk_stats = []
|
||||
try:
|
||||
for disk in tree.findall("devices/disk"):
|
||||
@ -578,23 +546,21 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
if debug:
|
||||
try:
|
||||
logger.out(
|
||||
"Failed getting disk stats for {}: {}".format(domain.name(), e),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
logger.out(
|
||||
"Failed getting disk stats for {}: {}".format(domain.name(), e),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
if debug:
|
||||
logger.out(
|
||||
"Getting network statistics for VM {}".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Getting network statistics for VM {}".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
domain_network_stats = []
|
||||
try:
|
||||
for interface in tree.findall("devices/interface"):
|
||||
@ -619,17 +585,14 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
if debug:
|
||||
try:
|
||||
logger.out(
|
||||
"Failed getting network stats for {}: {}".format(
|
||||
domain.name(), e
|
||||
),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
logger.out(
|
||||
"Failed getting network stats for {}: {}".format(domain.name(), e),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
# Create the final dictionary
|
||||
@ -645,48 +608,42 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
|
||||
"net_stats": domain_network_stats,
|
||||
}
|
||||
|
||||
if debug:
|
||||
logger.out(
|
||||
"Writing statistics for VM {} to Zookeeper".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Writing statistics for VM {} to Zookeeper".format(domain_name),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
|
||||
try:
|
||||
zkhandler.write(
|
||||
[(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))]
|
||||
)
|
||||
except Exception as e:
|
||||
if debug:
|
||||
logger.out(
|
||||
"Failed to write domain statistics: {}".format(e),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
"Failed to write domain statistics: {}".format(e),
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
|
||||
# Close the Libvirt connection
|
||||
lv_conn.close()
|
||||
|
||||
if debug:
|
||||
logger.out(
|
||||
f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}",
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
logger.out(
|
||||
f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}",
|
||||
state="d",
|
||||
prefix="vm-thread",
|
||||
)
|
||||
|
||||
queue.put(len(running_domains))
|
||||
queue.put(memalloc)
|
||||
queue.put(memprov)
|
||||
queue.put(vcpualloc)
|
||||
|
||||
if debug:
|
||||
logger.out("Thread finished", state="d", prefix="vm-thread")
|
||||
logger.out("Thread finished", state="d", prefix="vm-thread")
|
||||
|
||||
|
||||
# Keepalive update function
|
||||
def node_keepalive(logger, config, zkhandler, this_node, netstats):
|
||||
debug = config["debug"]
|
||||
|
||||
# Display node information to the terminal
|
||||
if config["log_keepalives"]:
|
||||
if this_node.coordinator_state == "primary":
|
||||
@ -746,10 +703,7 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
|
||||
)
|
||||
|
||||
# Get past state and update if needed
|
||||
if debug:
|
||||
logger.out(
|
||||
"Get past state and update if needed", state="d", prefix="main-thread"
|
||||
)
|
||||
logger.out("Get past state and update if needed", state="d", prefix="main-thread")
|
||||
|
||||
past_state = zkhandler.read(("node.state.daemon", this_node.name))
|
||||
if past_state != "run" and past_state != "shutdown":
|
||||
@ -759,10 +713,9 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
|
||||
this_node.daemon_state = "run"
|
||||
|
||||
# Ensure the primary key is properly set
|
||||
if debug:
|
||||
logger.out(
|
||||
"Ensure the primary key is properly set", state="d", prefix="main-thread"
|
||||
)
|
||||
logger.out(
|
||||
"Ensure the primary key is properly set", state="d", prefix="main-thread"
|
||||
)
|
||||
if this_node.coordinator_state == "primary":
|
||||
if zkhandler.read("base.config.primary_node") != this_node.name:
|
||||
zkhandler.write([("base.config.primary_node", this_node.name)])
|
||||
@ -843,8 +796,7 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
|
||||
|
||||
# Set our information in zookeeper
|
||||
keepalive_time = int(time.time())
|
||||
if debug:
|
||||
logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
|
||||
logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
|
||||
try:
|
||||
zkhandler.write(
|
||||
[
|
||||
@ -932,10 +884,9 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
|
||||
|
||||
# Look for dead nodes and fence them
|
||||
if not this_node.maintenance:
|
||||
if debug:
|
||||
logger.out(
|
||||
"Look for dead nodes and fence them", state="d", prefix="main-thread"
|
||||
)
|
||||
logger.out(
|
||||
"Look for dead nodes and fence them", state="d", prefix="main-thread"
|
||||
)
|
||||
if config["daemon_mode"] == "coordinator":
|
||||
for node_name in zkhandler.children("base.node"):
|
||||
try:
|
||||
|
@ -44,7 +44,7 @@ from daemon_lib.vmbuilder import (
|
||||
)
|
||||
|
||||
# Daemon version
|
||||
version = "0.9.86"
|
||||
version = "0.9.87"
|
||||
|
||||
|
||||
config = cfg.get_configuration()
|
||||
|
Reference in New Issue
Block a user