Compare commits

..

4 Commits

Author SHA1 Message Date
8083b7a3e6 Bump version to 0.9.87 2023-12-27 13:40:51 -05:00
3346ce9bb0 Add missing shutdown state from combinations 2023-12-27 13:40:30 -05:00
572596c575 Fix missing f-string placeholder 2023-12-27 13:21:20 -05:00
e654fbba08 Move debug condition handling to Logger
Avoids many dozens of conditionals sprinkled throughout the code by
centralizing this check into the main Logger instance.
2023-12-27 13:01:45 -05:00
8 changed files with 266 additions and 340 deletions

View File

@@ -6,6 +6,7 @@
* [Node Daemon] Adds network traffic rate calculation subsystem.
* [All Daemons] Fixes a printing bug where newlines were not added atomically.
* [CLI Client] Fixes a bug listing connections if no default is specified.
* [All Daemons] Simplifies debug logging conditionals by moving into the Logger instance itself.
###### [v0.9.86](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.86)

View File

@@ -53,6 +53,10 @@ node_state_combinations = [
"init,flush",
"init,flushed",
"init,unflush",
"shutdown,ready",
"shutdown,flush",
"shutdown,flushed",
"shutdown,unflush",
"stop,ready",
"stop,flush",
"stop,flushed",

View File

@@ -115,6 +115,10 @@ class Logger(object):
# Output function
def out(self, message, state=None, prefix=""):
# Only handle d-state (debug) messages if we're in debug mode
if state in ["d"] and not self.config["debug"]:
return
# Get the date
if self.config["log_dates"]:
date = "{} ".format(datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f"))

3
debian/changelog vendored
View File

@@ -4,8 +4,9 @@ pvc (0.9.87-0) unstable; urgency=high
* [Node Daemon] Adds network traffic rate calculation subsystem.
* [All Daemons] Fixes a printing bug where newlines were not added atomically.
* [CLI Client] Fixes a bug listing connections if no default is specified.
* [All Daemons] Simplifies debug logging conditionals by moving into the Logger instance itself.
-- Joshua M. Boniface <joshua@boniface.me> Wed, 27 Dec 2023 12:42:52 -0500
-- Joshua M. Boniface <joshua@boniface.me> Wed, 27 Dec 2023 13:40:51 -0500
pvc (0.9.86-0) unstable; urgency=high

View File

@@ -157,9 +157,6 @@ class MonitoringPlugin(object):
"w": warning
"e": error
"""
if state == "d" and not self.config["debug"]:
return
self.logger.out(message, state=state, prefix=self.plugin_name)
#
@@ -523,11 +520,10 @@ class MonitoringInstance(object):
entries = fault_data["entries"]()
if self.config["debug"]:
self.logger.out(
f"Entries for fault check {fault_type}: {dumps(entries)}",
state="d",
)
self.logger.out(
f"Entries for fault check {fault_type}: {dumps(entries)}",
state="d",
)
for _entry in entries:
entry = _entry["entry"]
@@ -699,7 +695,7 @@ class MonitoringInstance(object):
health_text = f"{health_colour}{self.this_node.health}%{self.logger.fmt_end} node health"
result_text.append(health_text)
else:
health_text = "{self.logger.fmt_blue}N/A{self.logger.fmt_end} node health"
health_text = f"{self.logger.fmt_blue}N/A{self.logger.fmt_end} node health"
result_text.append(health_text)
self.logger.out(

View File

@@ -1633,6 +1633,21 @@
}
]
},
{
"matcher": {
"id": "byName",
"options": "Shutdown"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "dark-yellow",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
@@ -1773,12 +1788,13 @@
},
"indexByName": {
"Time": 0,
"dead": 5,
"fenced": 6,
"dead": 6,
"fenced": 7,
"init": 3,
"pvc_nodes": 1,
"run": 2,
"stop": 4
"shutdown": 4,
"stop": 5
},
"renameByName": {
"Time": "",
@@ -3152,8 +3168,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -3263,8 +3278,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -3340,8 +3354,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -3451,8 +3464,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -3528,8 +3540,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -3639,8 +3650,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -3716,8 +3726,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -3827,8 +3836,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -3917,8 +3925,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -4027,8 +4034,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -4158,8 +4164,7 @@
"mode": "absolute",
"steps": [
{
"color": "transparent",
"value": null
"color": "transparent"
}
]
},
@@ -4247,8 +4252,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -4357,8 +4361,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -4490,8 +4493,7 @@
"mode": "absolute",
"steps": [
{
"color": "transparent",
"value": null
"color": "transparent"
}
]
},
@@ -4592,8 +4594,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -4702,8 +4703,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -4833,8 +4833,7 @@
"mode": "absolute",
"steps": [
{
"color": "transparent",
"value": null
"color": "transparent"
}
]
},
@@ -4923,8 +4922,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -5034,8 +5032,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -5165,8 +5162,7 @@
"mode": "absolute",
"steps": [
{
"color": "transparent",
"value": null
"color": "transparent"
}
]
},
@@ -5255,8 +5251,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -5366,8 +5361,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -5498,8 +5492,7 @@
"mode": "absolute",
"steps": [
{
"color": "transparent",
"value": null
"color": "transparent"
}
]
},
@@ -5601,8 +5594,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -5713,8 +5705,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -5827,8 +5818,7 @@
"mode": "absolute",
"steps": [
{
"color": "transparent",
"value": null
"color": "transparent"
}
]
},
@@ -5931,8 +5921,7 @@
"mode": "absolute",
"steps": [
{
"color": "transparent",
"value": null
"color": "transparent"
}
]
},
@@ -6210,8 +6199,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
}
]
},
@@ -6337,8 +6325,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@@ -6454,8 +6441,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -6564,8 +6550,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "yellow",
@@ -6677,8 +6662,7 @@
"mode": "absolute",
"steps": [
{
"color": "transparent",
"value": null
"color": "transparent"
}
]
},
@@ -6781,8 +6765,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
}
]
},
@@ -6908,8 +6891,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",

View File

@@ -333,12 +333,11 @@ class AXFRDaemonInstance(object):
z = dns.zone.from_xfr(axfr)
records_raw = [z[n].to_text(n) for n in z.nodes.keys()]
except Exception as e:
if self.config["debug"]:
self.logger.out(
"{} {} ({})".format(e, dnsmasq_ip, domain),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"{} {} ({})".format(e, dnsmasq_ip, domain),
state="d",
prefix="dns-aggregator",
)
continue
# Fix the formatting because it's useless
@@ -370,12 +369,11 @@ class AXFRDaemonInstance(object):
"SELECT * FROM records WHERE domain_id=%s", (domain_id,)
)
results = list(sql_curs.fetchall())
if self.config["debug"]:
self.logger.out(
"SQL query results: {}".format(results),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"SQL query results: {}".format(results),
state="d",
prefix="dns-aggregator",
)
except Exception as e:
self.logger.out(
"ERROR: Failed to obtain DNS records from database: {}".format(
@@ -388,12 +386,11 @@ class AXFRDaemonInstance(object):
records_old = list()
records_old_ids = list()
if not results:
if self.config["debug"]:
self.logger.out(
"No results found, skipping.",
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"No results found, skipping.",
state="d",
prefix="dns-aggregator",
)
continue
for record in results:
# Skip the non-A
@@ -404,23 +401,21 @@ class AXFRDaemonInstance(object):
r_data = record[4]
# Assemble a list element in the same format as the AXFR data
entry = "{} {} IN {} {}".format(r_name, r_ttl, r_type, r_data)
if self.config["debug"]:
self.logger.out(
"Found record: {}".format(entry),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Found record: {}".format(entry),
state="d",
prefix="dns-aggregator",
)
# Skip non-A or AAAA records
if r_type != "A" and r_type != "AAAA":
if self.config["debug"]:
self.logger.out(
'Skipping record {}, not A or AAAA: "{}"'.format(
entry, r_type
),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
'Skipping record {}, not A or AAAA: "{}"'.format(
entry, r_type
),
state="d",
prefix="dns-aggregator",
)
continue
records_old.append(entry)
@@ -429,17 +424,16 @@ class AXFRDaemonInstance(object):
records_new.sort()
records_old.sort()
if self.config["debug"]:
self.logger.out(
"New: {}".format(records_new),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Old: {}".format(records_old),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"New: {}".format(records_new),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Old: {}".format(records_old),
state="d",
prefix="dns-aggregator",
)
# Find the differences between the lists
# Basic check one: are they completely equal
@@ -450,17 +444,16 @@ class AXFRDaemonInstance(object):
in_new_not_in_old = in_new - in_old
in_old_not_in_new = in_old - in_new
if self.config["debug"]:
self.logger.out(
"New but not old: {}".format(in_new_not_in_old),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Old but not new: {}".format(in_old_not_in_new),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"New but not old: {}".format(in_new_not_in_old),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Old but not new: {}".format(in_old_not_in_new),
state="d",
prefix="dns-aggregator",
)
# Go through the old list
remove_records = list() # list of database IDs
@@ -487,12 +480,11 @@ class AXFRDaemonInstance(object):
if len(remove_records) > 0:
# Remove the invalid old records
for record_id in remove_records:
if self.config["debug"]:
self.logger.out(
"Removing record: {}".format(record_id),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Removing record: {}".format(record_id),
state="d",
prefix="dns-aggregator",
)
sql_curs.execute(
"DELETE FROM records WHERE id=%s", (record_id,)
)
@@ -507,12 +499,11 @@ class AXFRDaemonInstance(object):
r_ttl = record[1]
r_type = record[3]
r_data = record[4]
if self.config["debug"]:
self.logger.out(
"Add record: {}".format(name),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Add record: {}".format(name),
state="d",
prefix="dns-aggregator",
)
try:
sql_curs.execute(
"INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)",
@@ -520,23 +511,21 @@ class AXFRDaemonInstance(object):
)
changed = True
except psycopg2.IntegrityError as e:
if self.config["debug"]:
self.logger.out(
"Failed to add record due to {}: {}".format(
e, name
),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Failed to add record due to {}: {}".format(
e, name
),
state="d",
prefix="dns-aggregator",
)
except psycopg2.errors.InFailedSqlTransaction as e:
if self.config["debug"]:
self.logger.out(
"Failed to add record due to {}: {}".format(
e, name
),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Failed to add record due to {}: {}".format(
e, name
),
state="d",
prefix="dns-aggregator",
)
if changed:
# Increase SOA serial
@@ -548,24 +537,22 @@ class AXFRDaemonInstance(object):
current_serial = int(soa_record[2])
new_serial = current_serial + 1
soa_record[2] = str(new_serial)
if self.config["debug"]:
self.logger.out(
"Records changed; bumping SOA: {}".format(new_serial),
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Records changed; bumping SOA: {}".format(new_serial),
state="d",
prefix="dns-aggregator",
)
sql_curs.execute(
"UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'",
(" ".join(soa_record), domain_id),
)
# Commit all the previous changes
if self.config["debug"]:
self.logger.out(
"Committing database changes and reloading PDNS",
state="d",
prefix="dns-aggregator",
)
self.logger.out(
"Committing database changes and reloading PDNS",
state="d",
prefix="dns-aggregator",
)
try:
self.sql_conn.commit()
except Exception as e:

View File

@@ -80,9 +80,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
pool_list = zkhandler.children("base.pool")
osd_list = zkhandler.children("base.osd")
debug = config["debug"]
if debug:
logger.out("Thread starting", state="d", prefix="ceph-thread")
logger.out("Thread starting", state="d", prefix="ceph-thread")
# Connect to the Ceph cluster
try:
@@ -90,8 +88,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
conffile=config["ceph_config_file"],
conf=dict(keyring=config["ceph_admin_keyring"]),
)
if debug:
logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
ceph_conn.connect(timeout=1)
except Exception as e:
logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e")
@@ -100,12 +97,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
# Primary-only functions
if this_node.coordinator_state == "primary":
# Get Ceph status information (pretty)
if debug:
logger.out(
"Set Ceph status information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
logger.out(
"Set Ceph status information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
command = {"prefix": "status", "format": "pretty"}
ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
@@ -117,12 +113,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to set Ceph status data: {}".format(e), state="e")
# Get Ceph health information (JSON)
if debug:
logger.out(
"Set Ceph health information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
logger.out(
"Set Ceph health information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
command = {"prefix": "health", "format": "json"}
ceph_health = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
@@ -134,12 +129,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to set Ceph health data: {}".format(e), state="e")
# Get Ceph df information (pretty)
if debug:
logger.out(
"Set Ceph rados df information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
logger.out(
"Set Ceph rados df information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
# Get rados df info
command = {"prefix": "df", "format": "pretty"}
@@ -151,12 +145,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
except Exception as e:
logger.out("Failed to set Ceph utilization data: {}".format(e), state="e")
if debug:
logger.out(
"Set pool information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
logger.out(
"Set pool information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
# Get pool info
command = {"prefix": "df", "format": "json"}
@@ -179,12 +172,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
rados_pool_df_raw = []
pool_count = len(ceph_pool_df_raw)
if debug:
logger.out(
"Getting info for {} pools".format(pool_count),
state="d",
prefix="ceph-thread",
)
logger.out(
"Getting info for {} pools".format(pool_count),
state="d",
prefix="ceph-thread",
)
for pool_idx in range(0, pool_count):
try:
# Combine all the data for this pool
@@ -195,22 +187,18 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
# Ignore any pools that aren't in our pool list
if pool["name"] not in pool_list:
if debug:
logger.out(
"Pool {} not in pool list {}".format(
pool["name"], pool_list
),
state="d",
prefix="ceph-thread",
)
logger.out(
"Pool {} not in pool list {}".format(pool["name"], pool_list),
state="d",
prefix="ceph-thread",
)
continue
else:
if debug:
logger.out(
"Parsing data for pool {}".format(pool["name"]),
state="d",
prefix="ceph-thread",
)
logger.out(
"Parsing data for pool {}".format(pool["name"]),
state="d",
prefix="ceph-thread",
)
# Assemble a useful data structure
pool_df = {
@@ -248,8 +236,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
osds_this_node = 0
if len(osd_list) > 0:
# Get data from Ceph OSDs
if debug:
logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
# Parse the dump data
osd_dump = dict()
@@ -264,8 +251,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
osd_dump_raw = []
if debug:
logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
for osd in osd_dump_raw:
osd_dump.update(
{
@@ -279,8 +265,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
)
# Parse the df data
if debug:
logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
osd_df = dict()
@@ -293,8 +278,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
osd_df_raw = []
if debug:
logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
for osd in osd_df_raw:
osd_df.update(
{
@@ -316,8 +300,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
)
# Parse the status data
if debug:
logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
osd_status = dict()
@@ -330,8 +313,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to obtain OSD status data: {}".format(e), state="w")
osd_status_raw = []
if debug:
logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
for line in osd_status_raw.split("\n"):
# Strip off colour
@@ -400,8 +382,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
)
# Merge them together into a single meaningful dict
if debug:
logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
osd_stats = dict()
@@ -421,10 +402,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
# Upload OSD data for the cluster (primary-only)
if this_node.coordinator_state == "primary":
if debug:
logger.out(
"Trigger updates for each OSD", state="d", prefix="ceph-thread"
)
logger.out("Trigger updates for each OSD", state="d", prefix="ceph-thread")
for osd in osd_list:
try:
@@ -441,20 +419,16 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
queue.put(osds_this_node)
if debug:
logger.out("Thread finished", state="d", prefix="ceph-thread")
logger.out("Thread finished", state="d", prefix="ceph-thread")
# VM stats update function
def collect_vm_stats(logger, config, zkhandler, this_node, queue):
debug = config["debug"]
if debug:
logger.out("Thread starting", state="d", prefix="vm-thread")
logger.out("Thread starting", state="d", prefix="vm-thread")
# Connect to libvirt
libvirt_name = "qemu:///system"
if debug:
logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
try:
lv_conn = libvirt.open(libvirt_name)
if lv_conn is None:
@@ -467,12 +441,11 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
memprov = 0
vcpualloc = 0
# Toggle state management of dead VMs to restart them
if debug:
logger.out(
"Toggle state management of dead VMs to restart them",
state="d",
prefix="vm-thread",
)
logger.out(
"Toggle state management of dead VMs to restart them",
state="d",
prefix="vm-thread",
)
# Make a copy of the d_domain; if not, and it changes in flight, this can fail
fixed_d_domain = this_node.d_domain.copy()
for domain, instance in fixed_d_domain.items():
@@ -518,12 +491,11 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
domain_name = domain.name()
# Get all the raw information about the VM
if debug:
logger.out(
"Getting general statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
logger.out(
"Getting general statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
(
domain_state,
domain_maxmem,
@@ -537,29 +509,25 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
domain_memory_stats = domain.memoryStats()
domain_cpu_stats = domain.getCPUStats(True)[0]
except Exception as e:
if debug:
try:
logger.out(
"Failed getting VM information for {}: {}".format(
domain.name(), e
),
state="d",
prefix="vm-thread",
)
except Exception:
pass
try:
logger.out(
"Failed getting VM information for {}: {}".format(domain.name(), e),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
# Ensure VM is present in the domain_list
if domain_uuid not in this_node.domain_list:
this_node.domain_list.append(domain_uuid)
if debug:
logger.out(
"Getting disk statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
logger.out(
"Getting disk statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
domain_disk_stats = []
try:
for disk in tree.findall("devices/disk"):
@@ -578,23 +546,21 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
}
)
except Exception as e:
if debug:
try:
logger.out(
"Failed getting disk stats for {}: {}".format(domain.name(), e),
state="d",
prefix="vm-thread",
)
except Exception:
pass
try:
logger.out(
"Failed getting disk stats for {}: {}".format(domain.name(), e),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
if debug:
logger.out(
"Getting network statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
logger.out(
"Getting network statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
domain_network_stats = []
try:
for interface in tree.findall("devices/interface"):
@@ -619,17 +585,14 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
}
)
except Exception as e:
if debug:
try:
logger.out(
"Failed getting network stats for {}: {}".format(
domain.name(), e
),
state="d",
prefix="vm-thread",
)
except Exception:
pass
try:
logger.out(
"Failed getting network stats for {}: {}".format(domain.name(), e),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
# Create the final dictionary
@@ -645,48 +608,42 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
"net_stats": domain_network_stats,
}
if debug:
logger.out(
"Writing statistics for VM {} to Zookeeper".format(domain_name),
state="d",
prefix="vm-thread",
)
logger.out(
"Writing statistics for VM {} to Zookeeper".format(domain_name),
state="d",
prefix="vm-thread",
)
try:
zkhandler.write(
[(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))]
)
except Exception as e:
if debug:
logger.out(
"Failed to write domain statistics: {}".format(e),
state="d",
prefix="vm-thread",
)
logger.out(
"Failed to write domain statistics: {}".format(e),
state="d",
prefix="vm-thread",
)
# Close the Libvirt connection
lv_conn.close()
if debug:
logger.out(
f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}",
state="d",
prefix="vm-thread",
)
logger.out(
f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}",
state="d",
prefix="vm-thread",
)
queue.put(len(running_domains))
queue.put(memalloc)
queue.put(memprov)
queue.put(vcpualloc)
if debug:
logger.out("Thread finished", state="d", prefix="vm-thread")
logger.out("Thread finished", state="d", prefix="vm-thread")
# Keepalive update function
def node_keepalive(logger, config, zkhandler, this_node, netstats):
debug = config["debug"]
# Display node information to the terminal
if config["log_keepalives"]:
if this_node.coordinator_state == "primary":
@@ -746,10 +703,7 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
)
# Get past state and update if needed
if debug:
logger.out(
"Get past state and update if needed", state="d", prefix="main-thread"
)
logger.out("Get past state and update if needed", state="d", prefix="main-thread")
past_state = zkhandler.read(("node.state.daemon", this_node.name))
if past_state != "run" and past_state != "shutdown":
@@ -759,10 +713,9 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
this_node.daemon_state = "run"
# Ensure the primary key is properly set
if debug:
logger.out(
"Ensure the primary key is properly set", state="d", prefix="main-thread"
)
logger.out(
"Ensure the primary key is properly set", state="d", prefix="main-thread"
)
if this_node.coordinator_state == "primary":
if zkhandler.read("base.config.primary_node") != this_node.name:
zkhandler.write([("base.config.primary_node", this_node.name)])
@@ -843,8 +796,7 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
# Set our information in zookeeper
keepalive_time = int(time.time())
if debug:
logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
try:
zkhandler.write(
[
@@ -932,10 +884,9 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
# Look for dead nodes and fence them
if not this_node.maintenance:
if debug:
logger.out(
"Look for dead nodes and fence them", state="d", prefix="main-thread"
)
logger.out(
"Look for dead nodes and fence them", state="d", prefix="main-thread"
)
if config["daemon_mode"] == "coordinator":
for node_name in zkhandler.children("base.node"):
try: