Move debug condition handling to Logger
Avoids many dozens of conditionals sprinkled throughout the code by centralizing this check into the main Logger instance.
This commit is contained in:
		@@ -115,6 +115,10 @@ class Logger(object):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    # Output function
 | 
					    # Output function
 | 
				
			||||||
    def out(self, message, state=None, prefix=""):
 | 
					    def out(self, message, state=None, prefix=""):
 | 
				
			||||||
 | 
					        # Only handle d-state (debug) messages if we're in debug mode
 | 
				
			||||||
 | 
					        if state in ["d"] and not self.config["debug"]:
 | 
				
			||||||
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Get the date
 | 
					        # Get the date
 | 
				
			||||||
        if self.config["log_dates"]:
 | 
					        if self.config["log_dates"]:
 | 
				
			||||||
            date = "{} ".format(datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f"))
 | 
					            date = "{} ".format(datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f"))
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -157,9 +157,6 @@ class MonitoringPlugin(object):
 | 
				
			|||||||
            "w": warning
 | 
					            "w": warning
 | 
				
			||||||
            "e": error
 | 
					            "e": error
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        if state == "d" and not self.config["debug"]:
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        self.logger.out(message, state=state, prefix=self.plugin_name)
 | 
					        self.logger.out(message, state=state, prefix=self.plugin_name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    #
 | 
					    #
 | 
				
			||||||
@@ -523,11 +520,10 @@ class MonitoringInstance(object):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
            entries = fault_data["entries"]()
 | 
					            entries = fault_data["entries"]()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if self.config["debug"]:
 | 
					            self.logger.out(
 | 
				
			||||||
                self.logger.out(
 | 
					                f"Entries for fault check {fault_type}: {dumps(entries)}",
 | 
				
			||||||
                    f"Entries for fault check {fault_type}: {dumps(entries)}",
 | 
					                state="d",
 | 
				
			||||||
                    state="d",
 | 
					            )
 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            for _entry in entries:
 | 
					            for _entry in entries:
 | 
				
			||||||
                entry = _entry["entry"]
 | 
					                entry = _entry["entry"]
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -333,12 +333,11 @@ class AXFRDaemonInstance(object):
 | 
				
			|||||||
                    z = dns.zone.from_xfr(axfr)
 | 
					                    z = dns.zone.from_xfr(axfr)
 | 
				
			||||||
                    records_raw = [z[n].to_text(n) for n in z.nodes.keys()]
 | 
					                    records_raw = [z[n].to_text(n) for n in z.nodes.keys()]
 | 
				
			||||||
                except Exception as e:
 | 
					                except Exception as e:
 | 
				
			||||||
                    if self.config["debug"]:
 | 
					                    self.logger.out(
 | 
				
			||||||
                        self.logger.out(
 | 
					                        "{} {} ({})".format(e, dnsmasq_ip, domain),
 | 
				
			||||||
                            "{} {} ({})".format(e, dnsmasq_ip, domain),
 | 
					                        state="d",
 | 
				
			||||||
                            state="d",
 | 
					                        prefix="dns-aggregator",
 | 
				
			||||||
                            prefix="dns-aggregator",
 | 
					                    )
 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                    continue
 | 
					                    continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # Fix the formatting because it's useless
 | 
					                # Fix the formatting because it's useless
 | 
				
			||||||
@@ -370,12 +369,11 @@ class AXFRDaemonInstance(object):
 | 
				
			|||||||
                        "SELECT * FROM records WHERE domain_id=%s", (domain_id,)
 | 
					                        "SELECT * FROM records WHERE domain_id=%s", (domain_id,)
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
                    results = list(sql_curs.fetchall())
 | 
					                    results = list(sql_curs.fetchall())
 | 
				
			||||||
                    if self.config["debug"]:
 | 
					                    self.logger.out(
 | 
				
			||||||
                        self.logger.out(
 | 
					                        "SQL query results: {}".format(results),
 | 
				
			||||||
                            "SQL query results: {}".format(results),
 | 
					                        state="d",
 | 
				
			||||||
                            state="d",
 | 
					                        prefix="dns-aggregator",
 | 
				
			||||||
                            prefix="dns-aggregator",
 | 
					                    )
 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                except Exception as e:
 | 
					                except Exception as e:
 | 
				
			||||||
                    self.logger.out(
 | 
					                    self.logger.out(
 | 
				
			||||||
                        "ERROR: Failed to obtain DNS records from database: {}".format(
 | 
					                        "ERROR: Failed to obtain DNS records from database: {}".format(
 | 
				
			||||||
@@ -388,12 +386,11 @@ class AXFRDaemonInstance(object):
 | 
				
			|||||||
                records_old = list()
 | 
					                records_old = list()
 | 
				
			||||||
                records_old_ids = list()
 | 
					                records_old_ids = list()
 | 
				
			||||||
                if not results:
 | 
					                if not results:
 | 
				
			||||||
                    if self.config["debug"]:
 | 
					                    self.logger.out(
 | 
				
			||||||
                        self.logger.out(
 | 
					                        "No results found, skipping.",
 | 
				
			||||||
                            "No results found, skipping.",
 | 
					                        state="d",
 | 
				
			||||||
                            state="d",
 | 
					                        prefix="dns-aggregator",
 | 
				
			||||||
                            prefix="dns-aggregator",
 | 
					                    )
 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                    continue
 | 
					                    continue
 | 
				
			||||||
                for record in results:
 | 
					                for record in results:
 | 
				
			||||||
                    # Skip the non-A
 | 
					                    # Skip the non-A
 | 
				
			||||||
@@ -404,23 +401,21 @@ class AXFRDaemonInstance(object):
 | 
				
			|||||||
                    r_data = record[4]
 | 
					                    r_data = record[4]
 | 
				
			||||||
                    # Assemble a list element in the same format as the AXFR data
 | 
					                    # Assemble a list element in the same format as the AXFR data
 | 
				
			||||||
                    entry = "{} {} IN {} {}".format(r_name, r_ttl, r_type, r_data)
 | 
					                    entry = "{} {} IN {} {}".format(r_name, r_ttl, r_type, r_data)
 | 
				
			||||||
                    if self.config["debug"]:
 | 
					                    self.logger.out(
 | 
				
			||||||
                        self.logger.out(
 | 
					                        "Found record: {}".format(entry),
 | 
				
			||||||
                            "Found record: {}".format(entry),
 | 
					                        state="d",
 | 
				
			||||||
                            state="d",
 | 
					                        prefix="dns-aggregator",
 | 
				
			||||||
                            prefix="dns-aggregator",
 | 
					                    )
 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # Skip non-A or AAAA records
 | 
					                    # Skip non-A or AAAA records
 | 
				
			||||||
                    if r_type != "A" and r_type != "AAAA":
 | 
					                    if r_type != "A" and r_type != "AAAA":
 | 
				
			||||||
                        if self.config["debug"]:
 | 
					                        self.logger.out(
 | 
				
			||||||
                            self.logger.out(
 | 
					                            'Skipping record {}, not A or AAAA: "{}"'.format(
 | 
				
			||||||
                                'Skipping record {}, not A or AAAA: "{}"'.format(
 | 
					                                entry, r_type
 | 
				
			||||||
                                    entry, r_type
 | 
					                            ),
 | 
				
			||||||
                                ),
 | 
					                            state="d",
 | 
				
			||||||
                                state="d",
 | 
					                            prefix="dns-aggregator",
 | 
				
			||||||
                                prefix="dns-aggregator",
 | 
					                        )
 | 
				
			||||||
                            )
 | 
					 | 
				
			||||||
                        continue
 | 
					                        continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    records_old.append(entry)
 | 
					                    records_old.append(entry)
 | 
				
			||||||
@@ -429,17 +424,16 @@ class AXFRDaemonInstance(object):
 | 
				
			|||||||
                records_new.sort()
 | 
					                records_new.sort()
 | 
				
			||||||
                records_old.sort()
 | 
					                records_old.sort()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if self.config["debug"]:
 | 
					                self.logger.out(
 | 
				
			||||||
                    self.logger.out(
 | 
					                    "New: {}".format(records_new),
 | 
				
			||||||
                        "New: {}".format(records_new),
 | 
					                    state="d",
 | 
				
			||||||
                        state="d",
 | 
					                    prefix="dns-aggregator",
 | 
				
			||||||
                        prefix="dns-aggregator",
 | 
					                )
 | 
				
			||||||
                    )
 | 
					                self.logger.out(
 | 
				
			||||||
                    self.logger.out(
 | 
					                    "Old: {}".format(records_old),
 | 
				
			||||||
                        "Old: {}".format(records_old),
 | 
					                    state="d",
 | 
				
			||||||
                        state="d",
 | 
					                    prefix="dns-aggregator",
 | 
				
			||||||
                        prefix="dns-aggregator",
 | 
					                )
 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # Find the differences between the lists
 | 
					                # Find the differences between the lists
 | 
				
			||||||
                # Basic check one: are they completely equal
 | 
					                # Basic check one: are they completely equal
 | 
				
			||||||
@@ -450,17 +444,16 @@ class AXFRDaemonInstance(object):
 | 
				
			|||||||
                    in_new_not_in_old = in_new - in_old
 | 
					                    in_new_not_in_old = in_new - in_old
 | 
				
			||||||
                    in_old_not_in_new = in_old - in_new
 | 
					                    in_old_not_in_new = in_old - in_new
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    if self.config["debug"]:
 | 
					                    self.logger.out(
 | 
				
			||||||
                        self.logger.out(
 | 
					                        "New but not old: {}".format(in_new_not_in_old),
 | 
				
			||||||
                            "New but not old: {}".format(in_new_not_in_old),
 | 
					                        state="d",
 | 
				
			||||||
                            state="d",
 | 
					                        prefix="dns-aggregator",
 | 
				
			||||||
                            prefix="dns-aggregator",
 | 
					                    )
 | 
				
			||||||
                        )
 | 
					                    self.logger.out(
 | 
				
			||||||
                        self.logger.out(
 | 
					                        "Old but not new: {}".format(in_old_not_in_new),
 | 
				
			||||||
                            "Old but not new: {}".format(in_old_not_in_new),
 | 
					                        state="d",
 | 
				
			||||||
                            state="d",
 | 
					                        prefix="dns-aggregator",
 | 
				
			||||||
                            prefix="dns-aggregator",
 | 
					                    )
 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # Go through the old list
 | 
					                    # Go through the old list
 | 
				
			||||||
                    remove_records = list()  # list of database IDs
 | 
					                    remove_records = list()  # list of database IDs
 | 
				
			||||||
@@ -487,12 +480,11 @@ class AXFRDaemonInstance(object):
 | 
				
			|||||||
                    if len(remove_records) > 0:
 | 
					                    if len(remove_records) > 0:
 | 
				
			||||||
                        # Remove the invalid old records
 | 
					                        # Remove the invalid old records
 | 
				
			||||||
                        for record_id in remove_records:
 | 
					                        for record_id in remove_records:
 | 
				
			||||||
                            if self.config["debug"]:
 | 
					                            self.logger.out(
 | 
				
			||||||
                                self.logger.out(
 | 
					                                "Removing record: {}".format(record_id),
 | 
				
			||||||
                                    "Removing record: {}".format(record_id),
 | 
					                                state="d",
 | 
				
			||||||
                                    state="d",
 | 
					                                prefix="dns-aggregator",
 | 
				
			||||||
                                    prefix="dns-aggregator",
 | 
					                            )
 | 
				
			||||||
                                )
 | 
					 | 
				
			||||||
                            sql_curs.execute(
 | 
					                            sql_curs.execute(
 | 
				
			||||||
                                "DELETE FROM records WHERE id=%s", (record_id,)
 | 
					                                "DELETE FROM records WHERE id=%s", (record_id,)
 | 
				
			||||||
                            )
 | 
					                            )
 | 
				
			||||||
@@ -507,12 +499,11 @@ class AXFRDaemonInstance(object):
 | 
				
			|||||||
                            r_ttl = record[1]
 | 
					                            r_ttl = record[1]
 | 
				
			||||||
                            r_type = record[3]
 | 
					                            r_type = record[3]
 | 
				
			||||||
                            r_data = record[4]
 | 
					                            r_data = record[4]
 | 
				
			||||||
                            if self.config["debug"]:
 | 
					                            self.logger.out(
 | 
				
			||||||
                                self.logger.out(
 | 
					                                "Add record: {}".format(name),
 | 
				
			||||||
                                    "Add record: {}".format(name),
 | 
					                                state="d",
 | 
				
			||||||
                                    state="d",
 | 
					                                prefix="dns-aggregator",
 | 
				
			||||||
                                    prefix="dns-aggregator",
 | 
					                            )
 | 
				
			||||||
                                )
 | 
					 | 
				
			||||||
                            try:
 | 
					                            try:
 | 
				
			||||||
                                sql_curs.execute(
 | 
					                                sql_curs.execute(
 | 
				
			||||||
                                    "INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)",
 | 
					                                    "INSERT INTO records (domain_id, name, ttl, type, prio, content) VALUES (%s, %s, %s, %s, %s, %s)",
 | 
				
			||||||
@@ -520,23 +511,21 @@ class AXFRDaemonInstance(object):
 | 
				
			|||||||
                                )
 | 
					                                )
 | 
				
			||||||
                                changed = True
 | 
					                                changed = True
 | 
				
			||||||
                            except psycopg2.IntegrityError as e:
 | 
					                            except psycopg2.IntegrityError as e:
 | 
				
			||||||
                                if self.config["debug"]:
 | 
					                                self.logger.out(
 | 
				
			||||||
                                    self.logger.out(
 | 
					                                    "Failed to add record due to {}: {}".format(
 | 
				
			||||||
                                        "Failed to add record due to {}: {}".format(
 | 
					                                        e, name
 | 
				
			||||||
                                            e, name
 | 
					                                    ),
 | 
				
			||||||
                                        ),
 | 
					                                    state="d",
 | 
				
			||||||
                                        state="d",
 | 
					                                    prefix="dns-aggregator",
 | 
				
			||||||
                                        prefix="dns-aggregator",
 | 
					                                )
 | 
				
			||||||
                                    )
 | 
					 | 
				
			||||||
                            except psycopg2.errors.InFailedSqlTransaction as e:
 | 
					                            except psycopg2.errors.InFailedSqlTransaction as e:
 | 
				
			||||||
                                if self.config["debug"]:
 | 
					                                self.logger.out(
 | 
				
			||||||
                                    self.logger.out(
 | 
					                                    "Failed to add record due to {}: {}".format(
 | 
				
			||||||
                                        "Failed to add record due to {}: {}".format(
 | 
					                                        e, name
 | 
				
			||||||
                                            e, name
 | 
					                                    ),
 | 
				
			||||||
                                        ),
 | 
					                                    state="d",
 | 
				
			||||||
                                        state="d",
 | 
					                                    prefix="dns-aggregator",
 | 
				
			||||||
                                        prefix="dns-aggregator",
 | 
					                                )
 | 
				
			||||||
                                    )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    if changed:
 | 
					                    if changed:
 | 
				
			||||||
                        # Increase SOA serial
 | 
					                        # Increase SOA serial
 | 
				
			||||||
@@ -548,24 +537,22 @@ class AXFRDaemonInstance(object):
 | 
				
			|||||||
                        current_serial = int(soa_record[2])
 | 
					                        current_serial = int(soa_record[2])
 | 
				
			||||||
                        new_serial = current_serial + 1
 | 
					                        new_serial = current_serial + 1
 | 
				
			||||||
                        soa_record[2] = str(new_serial)
 | 
					                        soa_record[2] = str(new_serial)
 | 
				
			||||||
                        if self.config["debug"]:
 | 
					                        self.logger.out(
 | 
				
			||||||
                            self.logger.out(
 | 
					                            "Records changed; bumping SOA: {}".format(new_serial),
 | 
				
			||||||
                                "Records changed; bumping SOA: {}".format(new_serial),
 | 
					                            state="d",
 | 
				
			||||||
                                state="d",
 | 
					                            prefix="dns-aggregator",
 | 
				
			||||||
                                prefix="dns-aggregator",
 | 
					                        )
 | 
				
			||||||
                            )
 | 
					 | 
				
			||||||
                        sql_curs.execute(
 | 
					                        sql_curs.execute(
 | 
				
			||||||
                            "UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'",
 | 
					                            "UPDATE records SET content=%s WHERE domain_id=%s AND type='SOA'",
 | 
				
			||||||
                            (" ".join(soa_record), domain_id),
 | 
					                            (" ".join(soa_record), domain_id),
 | 
				
			||||||
                        )
 | 
					                        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        # Commit all the previous changes
 | 
					                        # Commit all the previous changes
 | 
				
			||||||
                        if self.config["debug"]:
 | 
					                        self.logger.out(
 | 
				
			||||||
                            self.logger.out(
 | 
					                            "Committing database changes and reloading PDNS",
 | 
				
			||||||
                                "Committing database changes and reloading PDNS",
 | 
					                            state="d",
 | 
				
			||||||
                                state="d",
 | 
					                            prefix="dns-aggregator",
 | 
				
			||||||
                                prefix="dns-aggregator",
 | 
					                        )
 | 
				
			||||||
                            )
 | 
					 | 
				
			||||||
                        try:
 | 
					                        try:
 | 
				
			||||||
                            self.sql_conn.commit()
 | 
					                            self.sql_conn.commit()
 | 
				
			||||||
                        except Exception as e:
 | 
					                        except Exception as e:
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -80,9 +80,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
    pool_list = zkhandler.children("base.pool")
 | 
					    pool_list = zkhandler.children("base.pool")
 | 
				
			||||||
    osd_list = zkhandler.children("base.osd")
 | 
					    osd_list = zkhandler.children("base.osd")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    debug = config["debug"]
 | 
					    logger.out("Thread starting", state="d", prefix="ceph-thread")
 | 
				
			||||||
    if debug:
 | 
					 | 
				
			||||||
        logger.out("Thread starting", state="d", prefix="ceph-thread")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Connect to the Ceph cluster
 | 
					    # Connect to the Ceph cluster
 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
@@ -90,8 +88,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
            conffile=config["ceph_config_file"],
 | 
					            conffile=config["ceph_config_file"],
 | 
				
			||||||
            conf=dict(keyring=config["ceph_admin_keyring"]),
 | 
					            conf=dict(keyring=config["ceph_admin_keyring"]),
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
        if debug:
 | 
					        logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
 | 
				
			||||||
            logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
 | 
					 | 
				
			||||||
        ceph_conn.connect(timeout=1)
 | 
					        ceph_conn.connect(timeout=1)
 | 
				
			||||||
    except Exception as e:
 | 
					    except Exception as e:
 | 
				
			||||||
        logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e")
 | 
					        logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e")
 | 
				
			||||||
@@ -100,12 +97,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
    # Primary-only functions
 | 
					    # Primary-only functions
 | 
				
			||||||
    if this_node.coordinator_state == "primary":
 | 
					    if this_node.coordinator_state == "primary":
 | 
				
			||||||
        # Get Ceph status information (pretty)
 | 
					        # Get Ceph status information (pretty)
 | 
				
			||||||
        if debug:
 | 
					        logger.out(
 | 
				
			||||||
            logger.out(
 | 
					            "Set Ceph status information in zookeeper (primary only)",
 | 
				
			||||||
                "Set Ceph status information in zookeeper (primary only)",
 | 
					            state="d",
 | 
				
			||||||
                state="d",
 | 
					            prefix="ceph-thread",
 | 
				
			||||||
                prefix="ceph-thread",
 | 
					        )
 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        command = {"prefix": "status", "format": "pretty"}
 | 
					        command = {"prefix": "status", "format": "pretty"}
 | 
				
			||||||
        ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
 | 
					        ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
 | 
				
			||||||
@@ -117,12 +113,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
            logger.out("Failed to set Ceph status data: {}".format(e), state="e")
 | 
					            logger.out("Failed to set Ceph status data: {}".format(e), state="e")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Get Ceph health information (JSON)
 | 
					        # Get Ceph health information (JSON)
 | 
				
			||||||
        if debug:
 | 
					        logger.out(
 | 
				
			||||||
            logger.out(
 | 
					            "Set Ceph health information in zookeeper (primary only)",
 | 
				
			||||||
                "Set Ceph health information in zookeeper (primary only)",
 | 
					            state="d",
 | 
				
			||||||
                state="d",
 | 
					            prefix="ceph-thread",
 | 
				
			||||||
                prefix="ceph-thread",
 | 
					        )
 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        command = {"prefix": "health", "format": "json"}
 | 
					        command = {"prefix": "health", "format": "json"}
 | 
				
			||||||
        ceph_health = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
 | 
					        ceph_health = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
 | 
				
			||||||
@@ -134,12 +129,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
            logger.out("Failed to set Ceph health data: {}".format(e), state="e")
 | 
					            logger.out("Failed to set Ceph health data: {}".format(e), state="e")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Get Ceph df information (pretty)
 | 
					        # Get Ceph df information (pretty)
 | 
				
			||||||
        if debug:
 | 
					        logger.out(
 | 
				
			||||||
            logger.out(
 | 
					            "Set Ceph rados df information in zookeeper (primary only)",
 | 
				
			||||||
                "Set Ceph rados df information in zookeeper (primary only)",
 | 
					            state="d",
 | 
				
			||||||
                state="d",
 | 
					            prefix="ceph-thread",
 | 
				
			||||||
                prefix="ceph-thread",
 | 
					        )
 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Get rados df info
 | 
					        # Get rados df info
 | 
				
			||||||
        command = {"prefix": "df", "format": "pretty"}
 | 
					        command = {"prefix": "df", "format": "pretty"}
 | 
				
			||||||
@@ -151,12 +145,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
        except Exception as e:
 | 
					        except Exception as e:
 | 
				
			||||||
            logger.out("Failed to set Ceph utilization data: {}".format(e), state="e")
 | 
					            logger.out("Failed to set Ceph utilization data: {}".format(e), state="e")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if debug:
 | 
					        logger.out(
 | 
				
			||||||
            logger.out(
 | 
					            "Set pool information in zookeeper (primary only)",
 | 
				
			||||||
                "Set pool information in zookeeper (primary only)",
 | 
					            state="d",
 | 
				
			||||||
                state="d",
 | 
					            prefix="ceph-thread",
 | 
				
			||||||
                prefix="ceph-thread",
 | 
					        )
 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Get pool info
 | 
					        # Get pool info
 | 
				
			||||||
        command = {"prefix": "df", "format": "json"}
 | 
					        command = {"prefix": "df", "format": "json"}
 | 
				
			||||||
@@ -179,12 +172,11 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
            rados_pool_df_raw = []
 | 
					            rados_pool_df_raw = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        pool_count = len(ceph_pool_df_raw)
 | 
					        pool_count = len(ceph_pool_df_raw)
 | 
				
			||||||
        if debug:
 | 
					        logger.out(
 | 
				
			||||||
            logger.out(
 | 
					            "Getting info for {} pools".format(pool_count),
 | 
				
			||||||
                "Getting info for {} pools".format(pool_count),
 | 
					            state="d",
 | 
				
			||||||
                state="d",
 | 
					            prefix="ceph-thread",
 | 
				
			||||||
                prefix="ceph-thread",
 | 
					        )
 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
        for pool_idx in range(0, pool_count):
 | 
					        for pool_idx in range(0, pool_count):
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                # Combine all the data for this pool
 | 
					                # Combine all the data for this pool
 | 
				
			||||||
@@ -195,22 +187,18 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
                # Ignore any pools that aren't in our pool list
 | 
					                # Ignore any pools that aren't in our pool list
 | 
				
			||||||
                if pool["name"] not in pool_list:
 | 
					                if pool["name"] not in pool_list:
 | 
				
			||||||
                    if debug:
 | 
					                    logger.out(
 | 
				
			||||||
                        logger.out(
 | 
					                        "Pool {} not in pool list {}".format(pool["name"], pool_list),
 | 
				
			||||||
                            "Pool {} not in pool list {}".format(
 | 
					                        state="d",
 | 
				
			||||||
                                pool["name"], pool_list
 | 
					                        prefix="ceph-thread",
 | 
				
			||||||
                            ),
 | 
					                    )
 | 
				
			||||||
                            state="d",
 | 
					 | 
				
			||||||
                            prefix="ceph-thread",
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                    continue
 | 
					                    continue
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
                    if debug:
 | 
					                    logger.out(
 | 
				
			||||||
                        logger.out(
 | 
					                        "Parsing data for pool {}".format(pool["name"]),
 | 
				
			||||||
                            "Parsing data for pool {}".format(pool["name"]),
 | 
					                        state="d",
 | 
				
			||||||
                            state="d",
 | 
					                        prefix="ceph-thread",
 | 
				
			||||||
                            prefix="ceph-thread",
 | 
					                    )
 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # Assemble a useful data structure
 | 
					                # Assemble a useful data structure
 | 
				
			||||||
                pool_df = {
 | 
					                pool_df = {
 | 
				
			||||||
@@ -248,8 +236,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
    osds_this_node = 0
 | 
					    osds_this_node = 0
 | 
				
			||||||
    if len(osd_list) > 0:
 | 
					    if len(osd_list) > 0:
 | 
				
			||||||
        # Get data from Ceph OSDs
 | 
					        # Get data from Ceph OSDs
 | 
				
			||||||
        if debug:
 | 
					        logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
 | 
				
			||||||
            logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Parse the dump data
 | 
					        # Parse the dump data
 | 
				
			||||||
        osd_dump = dict()
 | 
					        osd_dump = dict()
 | 
				
			||||||
@@ -264,8 +251,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
            logger.out("Failed to obtain OSD data: {}".format(e), state="w")
 | 
					            logger.out("Failed to obtain OSD data: {}".format(e), state="w")
 | 
				
			||||||
            osd_dump_raw = []
 | 
					            osd_dump_raw = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if debug:
 | 
					        logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
 | 
				
			||||||
            logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
 | 
					 | 
				
			||||||
        for osd in osd_dump_raw:
 | 
					        for osd in osd_dump_raw:
 | 
				
			||||||
            osd_dump.update(
 | 
					            osd_dump.update(
 | 
				
			||||||
                {
 | 
					                {
 | 
				
			||||||
@@ -279,8 +265,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Parse the df data
 | 
					        # Parse the df data
 | 
				
			||||||
        if debug:
 | 
					        logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
 | 
				
			||||||
            logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        osd_df = dict()
 | 
					        osd_df = dict()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -293,8 +278,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
            logger.out("Failed to obtain OSD data: {}".format(e), state="w")
 | 
					            logger.out("Failed to obtain OSD data: {}".format(e), state="w")
 | 
				
			||||||
            osd_df_raw = []
 | 
					            osd_df_raw = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if debug:
 | 
					        logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
 | 
				
			||||||
            logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
 | 
					 | 
				
			||||||
        for osd in osd_df_raw:
 | 
					        for osd in osd_df_raw:
 | 
				
			||||||
            osd_df.update(
 | 
					            osd_df.update(
 | 
				
			||||||
                {
 | 
					                {
 | 
				
			||||||
@@ -316,8 +300,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Parse the status data
 | 
					        # Parse the status data
 | 
				
			||||||
        if debug:
 | 
					        logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
 | 
				
			||||||
            logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        osd_status = dict()
 | 
					        osd_status = dict()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -330,8 +313,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
            logger.out("Failed to obtain OSD status data: {}".format(e), state="w")
 | 
					            logger.out("Failed to obtain OSD status data: {}".format(e), state="w")
 | 
				
			||||||
            osd_status_raw = []
 | 
					            osd_status_raw = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if debug:
 | 
					        logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
 | 
				
			||||||
            logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for line in osd_status_raw.split("\n"):
 | 
					        for line in osd_status_raw.split("\n"):
 | 
				
			||||||
            # Strip off colour
 | 
					            # Strip off colour
 | 
				
			||||||
@@ -400,8 +382,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Merge them together into a single meaningful dict
 | 
					        # Merge them together into a single meaningful dict
 | 
				
			||||||
        if debug:
 | 
					        logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
 | 
				
			||||||
            logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        osd_stats = dict()
 | 
					        osd_stats = dict()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -421,10 +402,7 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        # Upload OSD data for the cluster (primary-only)
 | 
					        # Upload OSD data for the cluster (primary-only)
 | 
				
			||||||
        if this_node.coordinator_state == "primary":
 | 
					        if this_node.coordinator_state == "primary":
 | 
				
			||||||
            if debug:
 | 
					            logger.out("Trigger updates for each OSD", state="d", prefix="ceph-thread")
 | 
				
			||||||
                logger.out(
 | 
					 | 
				
			||||||
                    "Trigger updates for each OSD", state="d", prefix="ceph-thread"
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            for osd in osd_list:
 | 
					            for osd in osd_list:
 | 
				
			||||||
                try:
 | 
					                try:
 | 
				
			||||||
@@ -441,20 +419,16 @@ def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    queue.put(osds_this_node)
 | 
					    queue.put(osds_this_node)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if debug:
 | 
					    logger.out("Thread finished", state="d", prefix="ceph-thread")
 | 
				
			||||||
        logger.out("Thread finished", state="d", prefix="ceph-thread")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# VM stats update function
 | 
					# VM stats update function
 | 
				
			||||||
def collect_vm_stats(logger, config, zkhandler, this_node, queue):
 | 
					def collect_vm_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			||||||
    debug = config["debug"]
 | 
					    logger.out("Thread starting", state="d", prefix="vm-thread")
 | 
				
			||||||
    if debug:
 | 
					 | 
				
			||||||
        logger.out("Thread starting", state="d", prefix="vm-thread")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Connect to libvirt
 | 
					    # Connect to libvirt
 | 
				
			||||||
    libvirt_name = "qemu:///system"
 | 
					    libvirt_name = "qemu:///system"
 | 
				
			||||||
    if debug:
 | 
					    logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
 | 
				
			||||||
        logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
 | 
					 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
        lv_conn = libvirt.open(libvirt_name)
 | 
					        lv_conn = libvirt.open(libvirt_name)
 | 
				
			||||||
        if lv_conn is None:
 | 
					        if lv_conn is None:
 | 
				
			||||||
@@ -467,12 +441,11 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
    memprov = 0
 | 
					    memprov = 0
 | 
				
			||||||
    vcpualloc = 0
 | 
					    vcpualloc = 0
 | 
				
			||||||
    # Toggle state management of dead VMs to restart them
 | 
					    # Toggle state management of dead VMs to restart them
 | 
				
			||||||
    if debug:
 | 
					    logger.out(
 | 
				
			||||||
        logger.out(
 | 
					        "Toggle state management of dead VMs to restart them",
 | 
				
			||||||
            "Toggle state management of dead VMs to restart them",
 | 
					        state="d",
 | 
				
			||||||
            state="d",
 | 
					        prefix="vm-thread",
 | 
				
			||||||
            prefix="vm-thread",
 | 
					    )
 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
    # Make a copy of the d_domain; if not, and it changes in flight, this can fail
 | 
					    # Make a copy of the d_domain; if not, and it changes in flight, this can fail
 | 
				
			||||||
    fixed_d_domain = this_node.d_domain.copy()
 | 
					    fixed_d_domain = this_node.d_domain.copy()
 | 
				
			||||||
    for domain, instance in fixed_d_domain.items():
 | 
					    for domain, instance in fixed_d_domain.items():
 | 
				
			||||||
@@ -518,12 +491,11 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
            domain_name = domain.name()
 | 
					            domain_name = domain.name()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # Get all the raw information about the VM
 | 
					            # Get all the raw information about the VM
 | 
				
			||||||
            if debug:
 | 
					            logger.out(
 | 
				
			||||||
                logger.out(
 | 
					                "Getting general statistics for VM {}".format(domain_name),
 | 
				
			||||||
                    "Getting general statistics for VM {}".format(domain_name),
 | 
					                state="d",
 | 
				
			||||||
                    state="d",
 | 
					                prefix="vm-thread",
 | 
				
			||||||
                    prefix="vm-thread",
 | 
					            )
 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
            (
 | 
					            (
 | 
				
			||||||
                domain_state,
 | 
					                domain_state,
 | 
				
			||||||
                domain_maxmem,
 | 
					                domain_maxmem,
 | 
				
			||||||
@@ -537,29 +509,25 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
            domain_memory_stats = domain.memoryStats()
 | 
					            domain_memory_stats = domain.memoryStats()
 | 
				
			||||||
            domain_cpu_stats = domain.getCPUStats(True)[0]
 | 
					            domain_cpu_stats = domain.getCPUStats(True)[0]
 | 
				
			||||||
        except Exception as e:
 | 
					        except Exception as e:
 | 
				
			||||||
            if debug:
 | 
					            try:
 | 
				
			||||||
                try:
 | 
					                logger.out(
 | 
				
			||||||
                    logger.out(
 | 
					                    "Failed getting VM information for {}: {}".format(domain.name(), e),
 | 
				
			||||||
                        "Failed getting VM information for {}: {}".format(
 | 
					                    state="d",
 | 
				
			||||||
                            domain.name(), e
 | 
					                    prefix="vm-thread",
 | 
				
			||||||
                        ),
 | 
					                )
 | 
				
			||||||
                        state="d",
 | 
					            except Exception:
 | 
				
			||||||
                        prefix="vm-thread",
 | 
					                pass
 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    pass
 | 
					 | 
				
			||||||
            continue
 | 
					            continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Ensure VM is present in the domain_list
 | 
					        # Ensure VM is present in the domain_list
 | 
				
			||||||
        if domain_uuid not in this_node.domain_list:
 | 
					        if domain_uuid not in this_node.domain_list:
 | 
				
			||||||
            this_node.domain_list.append(domain_uuid)
 | 
					            this_node.domain_list.append(domain_uuid)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if debug:
 | 
					        logger.out(
 | 
				
			||||||
            logger.out(
 | 
					            "Getting disk statistics for VM {}".format(domain_name),
 | 
				
			||||||
                "Getting disk statistics for VM {}".format(domain_name),
 | 
					            state="d",
 | 
				
			||||||
                state="d",
 | 
					            prefix="vm-thread",
 | 
				
			||||||
                prefix="vm-thread",
 | 
					        )
 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
        domain_disk_stats = []
 | 
					        domain_disk_stats = []
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            for disk in tree.findall("devices/disk"):
 | 
					            for disk in tree.findall("devices/disk"):
 | 
				
			||||||
@@ -578,23 +546,21 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
                    }
 | 
					                    }
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
        except Exception as e:
 | 
					        except Exception as e:
 | 
				
			||||||
            if debug:
 | 
					            try:
 | 
				
			||||||
                try:
 | 
					                logger.out(
 | 
				
			||||||
                    logger.out(
 | 
					                    "Failed getting disk stats for {}: {}".format(domain.name(), e),
 | 
				
			||||||
                        "Failed getting disk stats for {}: {}".format(domain.name(), e),
 | 
					                    state="d",
 | 
				
			||||||
                        state="d",
 | 
					                    prefix="vm-thread",
 | 
				
			||||||
                        prefix="vm-thread",
 | 
					                )
 | 
				
			||||||
                    )
 | 
					            except Exception:
 | 
				
			||||||
                except Exception:
 | 
					                pass
 | 
				
			||||||
                    pass
 | 
					 | 
				
			||||||
            continue
 | 
					            continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if debug:
 | 
					        logger.out(
 | 
				
			||||||
            logger.out(
 | 
					            "Getting network statistics for VM {}".format(domain_name),
 | 
				
			||||||
                "Getting network statistics for VM {}".format(domain_name),
 | 
					            state="d",
 | 
				
			||||||
                state="d",
 | 
					            prefix="vm-thread",
 | 
				
			||||||
                prefix="vm-thread",
 | 
					        )
 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
        domain_network_stats = []
 | 
					        domain_network_stats = []
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            for interface in tree.findall("devices/interface"):
 | 
					            for interface in tree.findall("devices/interface"):
 | 
				
			||||||
@@ -619,17 +585,14 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
                    }
 | 
					                    }
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
        except Exception as e:
 | 
					        except Exception as e:
 | 
				
			||||||
            if debug:
 | 
					            try:
 | 
				
			||||||
                try:
 | 
					                logger.out(
 | 
				
			||||||
                    logger.out(
 | 
					                    "Failed getting network stats for {}: {}".format(domain.name(), e),
 | 
				
			||||||
                        "Failed getting network stats for {}: {}".format(
 | 
					                    state="d",
 | 
				
			||||||
                            domain.name(), e
 | 
					                    prefix="vm-thread",
 | 
				
			||||||
                        ),
 | 
					                )
 | 
				
			||||||
                        state="d",
 | 
					            except Exception:
 | 
				
			||||||
                        prefix="vm-thread",
 | 
					                pass
 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    pass
 | 
					 | 
				
			||||||
            continue
 | 
					            continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Create the final dictionary
 | 
					        # Create the final dictionary
 | 
				
			||||||
@@ -645,48 +608,42 @@ def collect_vm_stats(logger, config, zkhandler, this_node, queue):
 | 
				
			|||||||
            "net_stats": domain_network_stats,
 | 
					            "net_stats": domain_network_stats,
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if debug:
 | 
					        logger.out(
 | 
				
			||||||
            logger.out(
 | 
					            "Writing statistics for VM {} to Zookeeper".format(domain_name),
 | 
				
			||||||
                "Writing statistics for VM {} to Zookeeper".format(domain_name),
 | 
					            state="d",
 | 
				
			||||||
                state="d",
 | 
					            prefix="vm-thread",
 | 
				
			||||||
                prefix="vm-thread",
 | 
					        )
 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            zkhandler.write(
 | 
					            zkhandler.write(
 | 
				
			||||||
                [(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))]
 | 
					                [(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))]
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
        except Exception as e:
 | 
					        except Exception as e:
 | 
				
			||||||
            if debug:
 | 
					            logger.out(
 | 
				
			||||||
                logger.out(
 | 
					                "Failed to write domain statistics: {}".format(e),
 | 
				
			||||||
                    "Failed to write domain statistics: {}".format(e),
 | 
					                state="d",
 | 
				
			||||||
                    state="d",
 | 
					                prefix="vm-thread",
 | 
				
			||||||
                    prefix="vm-thread",
 | 
					            )
 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Close the Libvirt connection
 | 
					    # Close the Libvirt connection
 | 
				
			||||||
    lv_conn.close()
 | 
					    lv_conn.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if debug:
 | 
					    logger.out(
 | 
				
			||||||
        logger.out(
 | 
					        f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}",
 | 
				
			||||||
            f"VM stats: doms: {len(running_domains)}; memalloc: {memalloc}; memprov: {memprov}; vcpualloc: {vcpualloc}",
 | 
					        state="d",
 | 
				
			||||||
            state="d",
 | 
					        prefix="vm-thread",
 | 
				
			||||||
            prefix="vm-thread",
 | 
					    )
 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    queue.put(len(running_domains))
 | 
					    queue.put(len(running_domains))
 | 
				
			||||||
    queue.put(memalloc)
 | 
					    queue.put(memalloc)
 | 
				
			||||||
    queue.put(memprov)
 | 
					    queue.put(memprov)
 | 
				
			||||||
    queue.put(vcpualloc)
 | 
					    queue.put(vcpualloc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if debug:
 | 
					    logger.out("Thread finished", state="d", prefix="vm-thread")
 | 
				
			||||||
        logger.out("Thread finished", state="d", prefix="vm-thread")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Keepalive update function
 | 
					# Keepalive update function
 | 
				
			||||||
def node_keepalive(logger, config, zkhandler, this_node, netstats):
 | 
					def node_keepalive(logger, config, zkhandler, this_node, netstats):
 | 
				
			||||||
    debug = config["debug"]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # Display node information to the terminal
 | 
					    # Display node information to the terminal
 | 
				
			||||||
    if config["log_keepalives"]:
 | 
					    if config["log_keepalives"]:
 | 
				
			||||||
        if this_node.coordinator_state == "primary":
 | 
					        if this_node.coordinator_state == "primary":
 | 
				
			||||||
@@ -746,10 +703,7 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
 | 
				
			|||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Get past state and update if needed
 | 
					    # Get past state and update if needed
 | 
				
			||||||
    if debug:
 | 
					    logger.out("Get past state and update if needed", state="d", prefix="main-thread")
 | 
				
			||||||
        logger.out(
 | 
					 | 
				
			||||||
            "Get past state and update if needed", state="d", prefix="main-thread"
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    past_state = zkhandler.read(("node.state.daemon", this_node.name))
 | 
					    past_state = zkhandler.read(("node.state.daemon", this_node.name))
 | 
				
			||||||
    if past_state != "run" and past_state != "shutdown":
 | 
					    if past_state != "run" and past_state != "shutdown":
 | 
				
			||||||
@@ -759,10 +713,9 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
 | 
				
			|||||||
        this_node.daemon_state = "run"
 | 
					        this_node.daemon_state = "run"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Ensure the primary key is properly set
 | 
					    # Ensure the primary key is properly set
 | 
				
			||||||
    if debug:
 | 
					    logger.out(
 | 
				
			||||||
        logger.out(
 | 
					        "Ensure the primary key is properly set", state="d", prefix="main-thread"
 | 
				
			||||||
            "Ensure the primary key is properly set", state="d", prefix="main-thread"
 | 
					    )
 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
    if this_node.coordinator_state == "primary":
 | 
					    if this_node.coordinator_state == "primary":
 | 
				
			||||||
        if zkhandler.read("base.config.primary_node") != this_node.name:
 | 
					        if zkhandler.read("base.config.primary_node") != this_node.name:
 | 
				
			||||||
            zkhandler.write([("base.config.primary_node", this_node.name)])
 | 
					            zkhandler.write([("base.config.primary_node", this_node.name)])
 | 
				
			||||||
@@ -843,8 +796,7 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    # Set our information in zookeeper
 | 
					    # Set our information in zookeeper
 | 
				
			||||||
    keepalive_time = int(time.time())
 | 
					    keepalive_time = int(time.time())
 | 
				
			||||||
    if debug:
 | 
					    logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
 | 
				
			||||||
        logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
 | 
					 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
        zkhandler.write(
 | 
					        zkhandler.write(
 | 
				
			||||||
            [
 | 
					            [
 | 
				
			||||||
@@ -932,10 +884,9 @@ def node_keepalive(logger, config, zkhandler, this_node, netstats):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    # Look for dead nodes and fence them
 | 
					    # Look for dead nodes and fence them
 | 
				
			||||||
    if not this_node.maintenance:
 | 
					    if not this_node.maintenance:
 | 
				
			||||||
        if debug:
 | 
					        logger.out(
 | 
				
			||||||
            logger.out(
 | 
					            "Look for dead nodes and fence them", state="d", prefix="main-thread"
 | 
				
			||||||
                "Look for dead nodes and fence them", state="d", prefix="main-thread"
 | 
					        )
 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
        if config["daemon_mode"] == "coordinator":
 | 
					        if config["daemon_mode"] == "coordinator":
 | 
				
			||||||
            for node_name in zkhandler.children("base.node"):
 | 
					            for node_name in zkhandler.children("base.node"):
 | 
				
			||||||
                try:
 | 
					                try:
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user