Compare commits

..

1 Commits

Author SHA1 Message Date
a1c5b8aad8 Bump version to 0.9.87 2023-12-27 12:42:52 -05:00
6 changed files with 267 additions and 207 deletions

View File

@@ -6,7 +6,6 @@
* [Node Daemon] Adds network traffic rate calculation subsystem. * [Node Daemon] Adds network traffic rate calculation subsystem.
* [All Daemons] Fixes a printing bug where newlines were not added atomically. * [All Daemons] Fixes a printing bug where newlines were not added atomically.
* [CLI Client] Fixes a bug listing connections if no default is specified. * [CLI Client] Fixes a bug listing connections if no default is specified.
* [All Daemons] Simplifies debug logging conditionals by moving into the Logger instance itself.
###### [v0.9.86](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.86) ###### [v0.9.86](https://github.com/parallelvirtualcluster/pvc/releases/tag/v0.9.86)

View File

@@ -115,10 +115,6 @@ class Logger(object):
# Output function # Output function
def out(self, message, state=None, prefix=""): def out(self, message, state=None, prefix=""):
# Only handle d-state (debug) messages if we're in debug mode
if state in ["d"] and not self.config["debug"]:
return
# Get the date # Get the date
if self.config["log_dates"]: if self.config["log_dates"]:
date = "{} ".format(datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f")) date = "{} ".format(datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f"))

3
debian/changelog vendored
View File

@@ -4,9 +4,8 @@ pvc (0.9.87-0) unstable; urgency=high
* [Node Daemon] Adds network traffic rate calculation subsystem. * [Node Daemon] Adds network traffic rate calculation subsystem.
* [All Daemons] Fixes a printing bug where newlines were not added atomically. * [All Daemons] Fixes a printing bug where newlines were not added atomically.
* [CLI Client] Fixes a bug listing connections if no default is specified. * [CLI Client] Fixes a bug listing connections if no default is specified.
* [All Daemons] Simplifies debug logging conditionals by moving into the Logger instance itself.
-- Joshua M. Boniface <joshua@boniface.me> Wed, 27 Dec 2023 13:03:14 -0500 -- Joshua M. Boniface <joshua@boniface.me> Wed, 27 Dec 2023 12:42:52 -0500
pvc (0.9.86-0) unstable; urgency=high pvc (0.9.86-0) unstable; urgency=high

View File

@@ -157,6 +157,9 @@ class MonitoringPlugin(object):
"w": warning "w": warning
"e": error "e": error
""" """
if state == "d" and not self.config["debug"]:
return
self.logger.out(message, state=state, prefix=self.plugin_name) self.logger.out(message, state=state, prefix=self.plugin_name)
# #
@@ -520,10 +523,11 @@ class MonitoringInstance(object):
entries = fault_data["entries"]() entries = fault_data["entries"]()
self.logger.out( if self.config["debug"]:
f"Entries for fault check {fault_type}: {dumps(entries)}", self.logger.out(
state="d", f"Entries for fault check {fault_type}: {dumps(entries)}",
) state="d",
)
for _entry in entries: for _entry in entries:
entry = _entry["entry"] entry = _entry["entry"]

View File

@@ -333,11 +333,12 @@ class AXFRDaemonInstance(object):
z = dns.zone.from_xfr(axfr) z = dns.zone.from_xfr(axfr)
records_raw = [z[n].to_text(n) for n in z.nodes.keys()] records_raw = [z[n].to_text(n) for n in z.nodes.keys()]
except Exception as e: except Exception as e:
self.logger.out( if self.config["debug"]:
"{} {} ({})".format(e, dnsmasq_ip, domain), self.logger.out(
state="d", "{} {} ({})".format(e, dnsmasq_ip, domain),
prefix="dns-aggregator", state="d",
) prefix="dns-aggregator",
)
continue continue
# Fix the formatting because it's useless # Fix the formatting because it's useless
@@ -369,11 +370,12 @@ class AXFRDaemonInstance(object):
"SELECT * FROM records WHERE domain_id=%s", (domain_id,) "SELECT * FROM records WHERE domain_id=%s", (domain_id,)
) )
results = list(sql_curs.fetchall()) results = list(sql_curs.fetchall())
self.logger.out( if self.config["debug"]:
"SQL query results: {}".format(results), self.logger.out(
state="d", "SQL query results: {}".format(results),
prefix="dns-aggregator", state="d",
) prefix="dns-aggregator",
)
except Exception as e: except Exception as e:
self.logger.out( self.logger.out(
"ERROR: Failed to obtain DNS records from database: {}".format( "ERROR: Failed to obtain DNS records from database: {}".format(
@@ -386,11 +388,12 @@ class AXFRDaemonInstance(object):
records_old = list() records_old = list()
records_old_ids = list() records_old_ids = list()
if not results: if not results:
self.logger.out( if self.config["debug"]:
"No results found, skipping.", self.logger.out(
state="d", "No results found, skipping.",
prefix="dns-aggregator", state="d",
) prefix="dns-aggregator",
)
continue continue
for record in results: for record in results:
# Skip the non-A # Skip the non-A
@@ -401,21 +404,23 @@ class AXFRDaemonInstance(object):
r_data = record[4] r_data = record[4]
# Assemble a list element in the same format as the AXFR data # Assemble a list element in the same format as the AXFR data
entry = "{} {} IN {} {}".format(r_name, r_ttl, r_type, r_data) entry = "{} {} IN {} {}".format(r_name, r_ttl, r_type, r_data)
self.logger.out( if self.config["debug"]:
"Found record: {}".format(entry),
state="d",
prefix="dns-aggregator",
)
# Skip non-A or AAAA records
if r_type != "A" and r_type != "AAAA":
self.logger.out( self.logger.out(
'Skipping record {}, not A or AAAA: "{}"'.format( "Found record: {}".format(entry),
entry, r_type
),
state="d", state="d",
prefix="dns-aggregator", prefix="dns-aggregator",
) )
# Skip non-A or AAAA records
if r_type != "A" and r_type != "AAAA":
if self.config["debug"]:
self.logger.out(
'Skipping record {}, not A or AAAA: "{}"'.format(
entry, r_type
),
state="d",
prefix="dns-aggregator",
)
continue continue
records_old.append(entry) records_old.append(entry)
@@ -424,16 +429,17 @@ class AXFRDaemonInstance(object):
records_new.sort() records_new.sort()
records_old.sort() records_old.sort()
self.logger.out( if self.config["debug"]:
"New: {}".format(records_new), self.logger.out(
state="d", "New: {}".format(records_new),
prefix="dns-aggregator", state="d",
) prefix="dns-aggregator",
self.logger.out( )
"Old: {}".format(records_old), self.logger.out(
state="d", "Old: {}".format(records_old),
prefix="dns-aggregator", state="d",
) prefix="dns-aggregator",
)
# Find the differences between the lists # Find the differences between the lists
# Basic check one: are they completely equal # Basic check one: are they completely equal
@@ -444,16 +450,17 @@ class AXFRDaemonInstance(object):
in_new_not_in_old = in_new - in_old in_new_not_in_old = in_new - in_old
in_old_not_in_new = in_old - in_new in_old_not_in_new = in_old - in_new
self.logger.out( if self.config["debug"]:
"New but not old: {}".format(in_new_not_in_old), self.logger.out(
state="d", "New but not old: {}".format(in_new_not_in_old),
prefix="dns-aggregator", state="d",
) prefix="dns-aggregator",
self.logger.out( )
"Old but not new: {}".format(in_old_not_in_new), self.logger.out(
state="d", "Old but not new: {}".format(in_old_not_in_new),
prefix="dns-aggregator", state="d",
) prefix="dns-aggregator",
)
# Go through the old list # Go through the old list
remove_records = list() # list of database IDs remove_records = list() # list of database IDs
@@ -480,11 +487,12 @@ class AXFRDaemonInstance(object):
if len(remove_records) > 0: if len(remove_records) > 0:
# Remove the invalid old records # Remove the invalid old records
for record_id in remove_records: for record_id in remove_records:
self.logger.out( if self.config["debug"]:
"Removing record: {}".format(record_id), self.logger.out(
state="d", "Removing record: {}".format(record_id),
prefix="dns-aggregator", state="d",
) prefix="dns-aggregator",
)
sql_curs.execute( sql_curs.execute(
"DELETE FROM records WHERE id=%s", (record_id,) "DELETE FROM records WHERE id=%s", (record_id,)
) )
@@ -499,11 +507,12 @@ class AXFRDaemonInstance(object):
r_ttl = record[1] r_ttl = record[1]
r_type = record[3] r_type = record[3]
r_data = record[4] r_data = record[4]
self.logger.out( if self.config["debug"]:
"Add record: {}".format(name), self.logger.out(
state="d", "Add record: {}".format(name),
prefix="dns-aggregator", state="d",
) prefix="dns-aggregator",
)
try: try:
sql_curs.execute( sql_curs.execute(
"INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)", "INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)",
@@ -511,21 +520,23 @@ class AXFRDaemonInstance(object):
) )
changed = True changed = True
except psycopg2.IntegrityError as e: except psycopg2.IntegrityError as e:
self.logger.out( if self.config["debug"]:
"Failed to add record due to {}: {}".format( self.logger.out(
e, name "Failed to add record due to {}: {}".format(
), e, name
state="d", ),
prefix="dns-aggregator", state="d",
) prefix="dns-aggregator",
)
except psycopg2.errors.InFailedSqlTransaction as e: except psycopg2.errors.InFailedSqlTransaction as e:
self.logger.out( if self.config["debug"]:
"Failed to add record due to {}: {}".format( self.logger.out(
e, name "Failed to add record due to {}: {}".format(
), e, name
state="d", ),
prefix="dns-aggregator", state="d",
) prefix="dns-aggregator",
)
if changed: if changed:
# Increase SOA serial # Increase SOA serial
@@ -537,22 +548,24 @@ class AXFRDaemonInstance(object):
current_serial = int(soa_record[2]) current_serial = int(soa_record[2])
new_serial = current_serial + 1 new_serial = current_serial + 1
soa_record[2] = str(new_serial) soa_record[2] = str(new_serial)
self.logger.out( if self.config["debug"]:
"Records changed; bumping SOA: {}".format(new_serial), self.logger.out(
state="d", "Records changed; bumping SOA: {}".format(new_serial),
prefix="dns-aggregator", state="d",
) prefix="dns-aggregator",
)
sql_curs.execute( sql_curs.execute(
"UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'", "UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'",
(" ".join(soa_record), domain_id), (" ".join(soa_record), domain_id),
) )
# Commit all the previous changes # Commit all the previous changes
self.logger.out( if self.config["debug"]:
"Committing database changes and reloading PDNS", self.logger.out(
state="d", "Committing database changes and reloading PDNS",
prefix="dns-aggregator", state="d",
) prefix="dns-aggregator",
)
try: try:
self.sql_conn.commit() self.sql_conn.commit()
except Exception as e: except Exception as e:

View File

@@ -80,7 +80,9 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
pool_list = zkhandler.children("base.pool") pool_list = zkhandler.children("base.pool")
osd_list = zkhandler.children("base.osd") osd_list = zkhandler.children("base.osd")
logger.out("Thread starting", state="d", prefix="ceph-thread") debug = config["debug"]
if debug:
logger.out("Thread starting", state="d", prefix="ceph-thread")
# Connect to the Ceph cluster # Connect to the Ceph cluster
try: try:
@@ -88,7 +90,8 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
conffile=config["ceph_config_file"], conffile=config["ceph_config_file"],
conf=dict(keyring=config["ceph_admin_keyring"]), conf=dict(keyring=config["ceph_admin_keyring"]),
) )
logger.out("Connecting to cluster", state="d", prefix="ceph-thread") if debug:
logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
ceph_conn.connect(timeout=1) ceph_conn.connect(timeout=1)
except Exception as e: except Exception as e:
logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e") logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e")
@@ -97,11 +100,12 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
# Primary-only functions # Primary-only functions
if this_node.coordinator_state == "primary": if this_node.coordinator_state == "primary":
# Get Ceph status information (pretty) # Get Ceph status information (pretty)
logger.out( if debug:
"Set Ceph status information in zookeeper (primary only)", logger.out(
state="d", "Set Ceph status information in zookeeper (primary only)",
prefix="ceph-thread", state="d",
) prefix="ceph-thread",
)
command = {"prefix": "status", "format": "pretty"} command = {"prefix": "status", "format": "pretty"}
ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[ ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
@@ -113,11 +117,12 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to set Ceph status data: {}".format(e), state="e") logger.out("Failed to set Ceph status data: {}".format(e), state="e")
# Get Ceph health information (JSON) # Get Ceph health information (JSON)
logger.out( if debug:
"Set Ceph health information in zookeeper (primary only)", logger.out(
state="d", "Set Ceph health information in zookeeper (primary only)",
prefix="ceph-thread", state="d",
) prefix="ceph-thread",
)
command = {"prefix": "health", "format": "json"} command = {"prefix": "health", "format": "json"}
ceph_health = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[ ceph_health = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
@@ -129,11 +134,12 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to set Ceph health data: {}".format(e), state="e") logger.out("Failed to set Ceph health data: {}".format(e), state="e")
# Get Ceph df information (pretty) # Get Ceph df information (pretty)
logger.out( if debug:
"Set Ceph rados df information in zookeeper (primary only)", logger.out(
state="d", "Set Ceph rados df information in zookeeper (primary only)",
prefix="ceph-thread", state="d",
) prefix="ceph-thread",
)
# Get rados df info # Get rados df info
command = {"prefix": "df", "format": "pretty"} command = {"prefix": "df", "format": "pretty"}
@@ -145,11 +151,12 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
except Exception as e: except Exception as e:
logger.out("Failed to set Ceph utilization data: {}".format(e), state="e") logger.out("Failed to set Ceph utilization data: {}".format(e), state="e")
logger.out( if debug:
"Set pool information in zookeeper (primary only)", logger.out(
state="d", "Set pool information in zookeeper (primary only)",
prefix="ceph-thread", state="d",
) prefix="ceph-thread",
)
# Get pool info # Get pool info
command = {"prefix": "df", "format": "json"} command = {"prefix": "df", "format": "json"}
@@ -172,11 +179,12 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
rados_pool_df_raw = [] rados_pool_df_raw = []
pool_count = len(ceph_pool_df_raw) pool_count = len(ceph_pool_df_raw)
logger.out( if debug:
"Getting info for {} pools".format(pool_count), logger.out(
state="d", "Getting info for {} pools".format(pool_count),
prefix="ceph-thread", state="d",
) prefix="ceph-thread",
)
for pool_idx in range(0, pool_count): for pool_idx in range(0, pool_count):
try: try:
# Combine all the data for this pool # Combine all the data for this pool
@@ -187,18 +195,22 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
# Ignore any pools that aren't in our pool list # Ignore any pools that aren't in our pool list
if pool["name"] not in pool_list: if pool["name"] not in pool_list:
logger.out( if debug:
"Pool {} not in pool list {}".format(pool["name"], pool_list), logger.out(
state="d", "Pool {} not in pool list {}".format(
prefix="ceph-thread", pool["name"], pool_list
) ),
state="d",
prefix="ceph-thread",
)
continue continue
else: else:
logger.out( if debug:
"Parsing data for pool {}".format(pool["name"]), logger.out(
state="d", "Parsing data for pool {}".format(pool["name"]),
prefix="ceph-thread", state="d",
) prefix="ceph-thread",
)
# Assemble a useful data structure # Assemble a useful data structure
pool_df = { pool_df = {
@@ -236,7 +248,8 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
osds_this_node = 0 osds_this_node = 0
if len(osd_list) > 0: if len(osd_list) > 0:
# Get data from Ceph OSDs # Get data from Ceph OSDs
logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread") if debug:
logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
# Parse the dump data # Parse the dump data
osd_dump = dict() osd_dump = dict()
@@ -251,7 +264,8 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to obtain OSD data: {}".format(e), state="w") logger.out("Failed to obtain OSD data: {}".format(e), state="w")
osd_dump_raw = [] osd_dump_raw = []
logger.out("Loop through OSD dump", state="d", prefix="ceph-thread") if debug:
logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
for osd in osd_dump_raw: for osd in osd_dump_raw:
osd_dump.update( osd_dump.update(
{ {
@@ -265,7 +279,8 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
) )
# Parse the df data # Parse the df data
logger.out("Parse the OSD df data", state="d", prefix="ceph-thread") if debug:
logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
osd_df = dict() osd_df = dict()
@@ -278,7 +293,8 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to obtain OSD data: {}".format(e), state="w") logger.out("Failed to obtain OSD data: {}".format(e), state="w")
osd_df_raw = [] osd_df_raw = []
logger.out("Loop through OSD df", state="d", prefix="ceph-thread") if debug:
logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
for osd in osd_df_raw: for osd in osd_df_raw:
osd_df.update( osd_df.update(
{ {
@@ -300,7 +316,8 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
) )
# Parse the status data # Parse the status data
logger.out("Parse the OSD status data", state="d", prefix="ceph-thread") if debug:
logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
osd_status = dict() osd_status = dict()
@@ -313,7 +330,8 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
logger.out("Failed to obtain OSD status data: {}".format(e), state="w") logger.out("Failed to obtain OSD status data: {}".format(e), state="w")
osd_status_raw = [] osd_status_raw = []
logger.out("Loop through OSD status data", state="d", prefix="ceph-thread") if debug:
logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
for line in osd_status_raw.split("\n"): for line in osd_status_raw.split("\n"):
# Strip off colour # Strip off colour
@@ -382,7 +400,8 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
) )
# Merge them together into a single meaningful dict # Merge them together into a single meaningful dict
logger.out("Merge OSD data together", state="d", prefix="ceph-thread") if debug:
logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
osd_stats = dict() osd_stats = dict()
@@ -402,7 +421,10 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
# Upload OSD data for the cluster (primary-only) # Upload OSD data for the cluster (primary-only)
if this_node.coordinator_state == "primary": if this_node.coordinator_state == "primary":
logger.out("Trigger updates for each OSD", state="d", prefix="ceph-thread") if debug:
logger.out(
"Trigger updates for each OSD", state="d", prefix="ceph-thread"
)
for osd in osd_list: for osd in osd_list:
try: try:
@@ -419,16 +441,20 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
queue.put(osds_this_node) queue.put(osds_this_node)
logger.out("Thread finished", state="d", prefix="ceph-thread") if debug:
logger.out("Thread finished", state="d", prefix="ceph-thread")
# VM stats update function # VM stats update function
def collect_vm_stats(logger, config, zkhandler, this_node, queue): def collect_vm_stats(logger, config, zkhandler, this_node, queue):
logger.out("Thread starting", state="d", prefix="vm-thread") debug = config["debug"]
if debug:
logger.out("Thread starting", state="d", prefix="vm-thread")
# Connect to libvirt # Connect to libvirt
libvirt_name = "qemu:///system" libvirt_name = "qemu:///system"
logger.out("Connecting to libvirt", state="d", prefix="vm-thread") if debug:
logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
try: try:
lv_conn = libvirt.open(libvirt_name) lv_conn = libvirt.open(libvirt_name)
if lv_conn is None: if lv_conn is None:
@@ -441,11 +467,12 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
memprov = 0 memprov = 0
vcpualloc = 0 vcpualloc = 0
# Toggle state management of dead VMs to restart them # Toggle state management of dead VMs to restart them
logger.out( if debug:
"Toggle state management of dead VMs to restart them", logger.out(
state="d", "Toggle state management of dead VMs to restart them",
prefix="vm-thread", state="d",
) prefix="vm-thread",
)
# Make a copy of the d_domain; if not, and it changes in flight, this can fail # Make a copy of the d_domain; if not, and it changes in flight, this can fail
fixed_d_domain = this_node.d_domain.copy() fixed_d_domain = this_node.d_domain.copy()
for domain, instance in fixed_d_domain.items(): for domain, instance in fixed_d_domain.items():
@@ -491,11 +518,12 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
domain_name = domain.name() domain_name = domain.name()
# Get all the raw information about the VM # Get all the raw information about the VM
logger.out( if debug:
"Getting general statistics for VM {}".format(domain_name), logger.out(
state="d", "Getting general statistics for VM {}".format(domain_name),
prefix="vm-thread", state="d",
) prefix="vm-thread",
)
( (
domain_state, domain_state,
domain_maxmem, domain_maxmem,
@@ -509,25 +537,29 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
domain_memory_stats = domain.memoryStats() domain_memory_stats = domain.memoryStats()
domain_cpu_stats = domain.getCPUStats(True)[0] domain_cpu_stats = domain.getCPUStats(True)[0]
except Exception as e: except Exception as e:
try: if debug:
logger.out( try:
"Failed getting VM information for {}: {}".format(domain.name(), e), logger.out(
state="d", "Failed getting VM information for {}: {}".format(
prefix="vm-thread", domain.name(), e
) ),
except Exception: state="d",
pass prefix="vm-thread",
)
except Exception:
pass
continue continue
# Ensure VM is present in the domain_list # Ensure VM is present in the domain_list
if domain_uuid not in this_node.domain_list: if domain_uuid not in this_node.domain_list:
this_node.domain_list.append(domain_uuid) this_node.domain_list.append(domain_uuid)
logger.out( if debug:
"Getting disk statistics for VM {}".format(domain_name), logger.out(
state="d", "Getting disk statistics for VM {}".format(domain_name),
prefix="vm-thread", state="d",
) prefix="vm-thread",
)
domain_disk_stats = [] domain_disk_stats = []
try: try:
for disk in tree.findall("devices/disk"): for disk in tree.findall("devices/disk"):
@@ -546,21 +578,23 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
} }
) )
except Exception as e: except Exception as e:
try: if debug:
logger.out( try:
"Failed getting disk stats for {}: {}".format(domain.name(), e), logger.out(
state="d", "Failed getting disk stats for {}: {}".format(domain.name(), e),
prefix="vm-thread", state="d",
) prefix="vm-thread",
except Exception: )
pass except Exception:
pass
continue continue
logger.out( if debug:
"Getting network statistics for VM {}".format(domain_name), logger.out(
state="d", "Getting network statistics for VM {}".format(domain_name),
prefix="vm-thread", state="d",
) prefix="vm-thread",
)
domain_network_stats = [] domain_network_stats = []
try: try:
for interface in tree.findall("devices/interface"): for interface in tree.findall("devices/interface"):
@@ -585,14 +619,17 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
} }
) )
except Exception as e: except Exception as e:
try: if debug:
logger.out( try:
"Failed getting network stats for {}: {}".format(domain.name(), e), logger.out(
state="d", "Failed getting network stats for {}: {}".format(
prefix="vm-thread", domain.name(), e
) ),
except Exception: state="d",
pass prefix="vm-thread",
)
except Exception:
pass
continue continue
# Create the final dictionary # Create the final dictionary
@@ -608,42 +645,48 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
"net_stats": domain_network_stats, "net_stats": domain_network_stats,
} }
logger.out( if debug:
"Writing statistics for VM {} to Zookeeper".format(domain_name), logger.out(
state="d", "Writing statistics for VM {} to Zookeeper".format(domain_name),
prefix="vm-thread", state="d",
) prefix="vm-thread",
)
try: try:
zkhandler.write( zkhandler.write(
[(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))] [(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))]
) )
except Exception as e: except Exception as e:
logger.out( if debug:
"Failed to write domain statistics: {}".format(e), logger.out(
state="d", "Failed to write domain statistics: {}".format(e),
prefix="vm-thread", state="d",
) prefix="vm-thread",
)
# Close the Libvirt connection # Close the Libvirt connection
lv_conn.close() lv_conn.close()
logger.out( if debug:
f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}", logger.out(
state="d", f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}",
prefix="vm-thread", state="d",
) prefix="vm-thread",
)
queue.put(len(running_domains)) queue.put(len(running_domains))
queue.put(memalloc) queue.put(memalloc)
queue.put(memprov) queue.put(memprov)
queue.put(vcpualloc) queue.put(vcpualloc)
logger.out("Thread finished", state="d", prefix="vm-thread") if debug:
logger.out("Thread finished", state="d", prefix="vm-thread")
# Keepalive update function # Keepalive update function
def node_keepalive(logger, config, zkhandler, this_node, netstats): def node_keepalive(logger, config, zkhandler, this_node, netstats):
debug = config["debug"]
# Display node information to the terminal # Display node information to the terminal
if config["log_keepalives"]: if config["log_keepalives"]:
if this_node.coordinator_state == "primary": if this_node.coordinator_state == "primary":
@@ -703,7 +746,10 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
) )
# Get past state and update if needed # Get past state and update if needed
logger.out("Get past state and update if needed", state="d", prefix="main-thread") if debug:
logger.out(
"Get past state and update if needed", state="d", prefix="main-thread"
)
past_state = zkhandler.read(("node.state.daemon", this_node.name)) past_state = zkhandler.read(("node.state.daemon", this_node.name))
if past_state != "run" and past_state != "shutdown": if past_state != "run" and past_state != "shutdown":
@@ -713,9 +759,10 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
this_node.daemon_state = "run" this_node.daemon_state = "run"
# Ensure the primary key is properly set # Ensure the primary key is properly set
logger.out( if debug:
"Ensure the primary key is properly set", state="d", prefix="main-thread" logger.out(
) "Ensure the primary key is properly set", state="d", prefix="main-thread"
)
if this_node.coordinator_state == "primary": if this_node.coordinator_state == "primary":
if zkhandler.read("base.config.primary_node") != this_node.name: if zkhandler.read("base.config.primary_node") != this_node.name:
zkhandler.write([("base.config.primary_node", this_node.name)]) zkhandler.write([("base.config.primary_node", this_node.name)])
@@ -796,7 +843,8 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
# Set our information in zookeeper # Set our information in zookeeper
keepalive_time = int(time.time()) keepalive_time = int(time.time())
logger.out("Set our information in zookeeper", state="d", prefix="main-thread") if debug:
logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
try: try:
zkhandler.write( zkhandler.write(
[ [
@@ -884,9 +932,10 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
# Look for dead nodes and fence them # Look for dead nodes and fence them
if not this_node.maintenance: if not this_node.maintenance:
logger.out( if debug:
"Look for dead nodes and fence them", state="d", prefix="main-thread" logger.out(
) "Look for dead nodes and fence them", state="d", prefix="main-thread"
)
if config["daemon_mode"] == "coordinator": if config["daemon_mode"] == "coordinator":
for node_name in zkhandler.children("base.node"): for node_name in zkhandler.children("base.node"):
try: try: