pvc/node-daemon/pvcnoded/util/keepalive.py

915 lines
34 KiB
Python

#!/usr/bin/env python3
# keepalive.py - Utility functions for pvcnoded Keepalives
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import pvcnoded.util.fencing
import daemon_lib.common as common
from apscheduler.schedulers.background import BackgroundScheduler
from rados import Rados
from xml.etree import ElementTree
from queue import Queue
from threading import Thread
from datetime import datetime
import json
import re
import libvirt
import psutil
import os
import time
# State table for pretty stats
libvirt_vm_states = {
0: "NOSTATE",
1: "RUNNING",
2: "BLOCKED",
3: "PAUSED",
4: "SHUTDOWN",
5: "SHUTOFF",
6: "CRASHED",
7: "PMSUSPENDED",
}
def start_keepalive_timer(logger, config, zkhandler, this_node):
keepalive_interval = config["keepalive_interval"]
logger.out(
f"Starting keepalive timer ({keepalive_interval} second interval)", state="s"
)
keepalive_timer = BackgroundScheduler()
keepalive_timer.add_job(
node_keepalive,
args=(logger, config, zkhandler, this_node),
trigger="interval",
seconds=keepalive_interval,
)
keepalive_timer.start()
return keepalive_timer
def stop_keepalive_timer(logger, keepalive_timer):
try:
keepalive_timer.shutdown()
logger.out("Stopping keepalive timer", state="s")
except Exception:
logger.out("Failed to stop keepalive timer", state="w")
# Ceph stats update function
def collect_ceph_stats(logger, config, zkhandler, this_node, queue):
pool_list = zkhandler.children("base.pool")
osd_list = zkhandler.children("base.osd")
debug = config["debug"]
if debug:
logger.out("Thread starting", state="d", prefix="ceph-thread")
# Connect to the Ceph cluster
try:
ceph_conn = Rados(
conffile=config["ceph_config_file"],
conf=dict(keyring=config["ceph_admin_keyring"]),
)
if debug:
logger.out("Connecting to cluster", state="d", prefix="ceph-thread")
ceph_conn.connect(timeout=1)
except Exception as e:
logger.out("Failed to open connection to Ceph cluster: {}".format(e), state="e")
return
if debug:
logger.out("Getting health stats from monitor", state="d", prefix="ceph-thread")
# Get Ceph cluster health for local status output
command = {"prefix": "health", "format": "json"}
try:
health_status = json.loads(
ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1]
)
ceph_health = health_status["status"]
except Exception as e:
logger.out("Failed to obtain Ceph health data: {}".format(e), state="e")
ceph_health = "HEALTH_UNKN"
if ceph_health in ["HEALTH_OK"]:
ceph_health_colour = logger.fmt_green
elif ceph_health in ["HEALTH_UNKN"]:
ceph_health_colour = logger.fmt_cyan
elif ceph_health in ["HEALTH_WARN"]:
ceph_health_colour = logger.fmt_yellow
else:
ceph_health_colour = logger.fmt_red
# Primary-only functions
if this_node.router_state == "primary":
if debug:
logger.out(
"Set ceph health information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
command = {"prefix": "status", "format": "pretty"}
ceph_status = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
try:
zkhandler.write([("base.storage", str(ceph_status))])
except Exception as e:
logger.out("Failed to set Ceph status data: {}".format(e), state="e")
if debug:
logger.out(
"Set ceph rados df information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
# Get rados df info
command = {"prefix": "df", "format": "pretty"}
ceph_df = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1].decode(
"ascii"
)
try:
zkhandler.write([("base.storage.util", str(ceph_df))])
except Exception as e:
logger.out("Failed to set Ceph utilization data: {}".format(e), state="e")
if debug:
logger.out(
"Set pool information in zookeeper (primary only)",
state="d",
prefix="ceph-thread",
)
# Get pool info
command = {"prefix": "df", "format": "json"}
ceph_df_output = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
try:
ceph_pool_df_raw = json.loads(ceph_df_output)["pools"]
except Exception as e:
logger.out("Failed to obtain Pool data (ceph df): {}".format(e), state="w")
ceph_pool_df_raw = []
retcode, stdout, stderr = common.run_os_command(
"rados df --format json", timeout=1
)
try:
rados_pool_df_raw = json.loads(stdout)["pools"]
except Exception as e:
logger.out("Failed to obtain Pool data (rados df): {}".format(e), state="w")
rados_pool_df_raw = []
pool_count = len(ceph_pool_df_raw)
if debug:
logger.out(
"Getting info for {} pools".format(pool_count),
state="d",
prefix="ceph-thread",
)
for pool_idx in range(0, pool_count):
try:
# Combine all the data for this pool
ceph_pool_df = ceph_pool_df_raw[pool_idx]
rados_pool_df = rados_pool_df_raw[pool_idx]
pool = ceph_pool_df
pool.update(rados_pool_df)
# Ignore any pools that aren't in our pool list
if pool["name"] not in pool_list:
if debug:
logger.out(
"Pool {} not in pool list {}".format(
pool["name"], pool_list
),
state="d",
prefix="ceph-thread",
)
continue
else:
if debug:
logger.out(
"Parsing data for pool {}".format(pool["name"]),
state="d",
prefix="ceph-thread",
)
# Assemble a useful data structure
pool_df = {
"id": pool["id"],
"stored_bytes": pool["stats"]["stored"],
"free_bytes": pool["stats"]["max_avail"],
"used_bytes": pool["stats"]["bytes_used"],
"used_percent": pool["stats"]["percent_used"],
"num_objects": pool["stats"]["objects"],
"num_object_clones": pool["num_object_clones"],
"num_object_copies": pool["num_object_copies"],
"num_objects_missing_on_primary": pool[
"num_objects_missing_on_primary"
],
"num_objects_unfound": pool["num_objects_unfound"],
"num_objects_degraded": pool["num_objects_degraded"],
"read_ops": pool["read_ops"],
"read_bytes": pool["read_bytes"],
"write_ops": pool["write_ops"],
"write_bytes": pool["write_bytes"],
}
# Write the pool data to Zookeeper
zkhandler.write(
[(("pool.stats", pool["name"]), str(json.dumps(pool_df)))]
)
except Exception as e:
# One or more of the status commands timed out, just continue
logger.out(
"Failed to format and send pool data: {}".format(e), state="w"
)
pass
# Only grab OSD stats if there are OSDs to grab (otherwise `ceph osd df` hangs)
osds_this_node = 0
if len(osd_list) > 0:
# Get data from Ceph OSDs
if debug:
logger.out("Get data from Ceph OSDs", state="d", prefix="ceph-thread")
# Parse the dump data
osd_dump = dict()
command = {"prefix": "osd dump", "format": "json"}
osd_dump_output = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
try:
osd_dump_raw = json.loads(osd_dump_output)["osds"]
except Exception as e:
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
osd_dump_raw = []
if debug:
logger.out("Loop through OSD dump", state="d", prefix="ceph-thread")
for osd in osd_dump_raw:
osd_dump.update(
{
str(osd["osd"]): {
"uuid": osd["uuid"],
"up": osd["up"],
"in": osd["in"],
"primary_affinity": osd["primary_affinity"],
}
}
)
# Parse the df data
if debug:
logger.out("Parse the OSD df data", state="d", prefix="ceph-thread")
osd_df = dict()
command = {"prefix": "osd df", "format": "json"}
try:
osd_df_raw = json.loads(
ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[1]
)["nodes"]
except Exception as e:
logger.out("Failed to obtain OSD data: {}".format(e), state="w")
osd_df_raw = []
if debug:
logger.out("Loop through OSD df", state="d", prefix="ceph-thread")
for osd in osd_df_raw:
osd_df.update(
{
str(osd["id"]): {
"utilization": osd["utilization"],
"var": osd["var"],
"pgs": osd["pgs"],
"kb": osd["kb"],
"kb_used": osd["kb_used"],
"kb_used_data": osd["kb_used_data"],
"kb_used_omap": osd["kb_used_omap"],
"kb_used_meta": osd["kb_used_meta"],
"kb_avail": osd["kb_avail"],
"weight": osd["crush_weight"],
"reweight": osd["reweight"],
"class": osd["device_class"],
}
}
)
# Parse the status data
if debug:
logger.out("Parse the OSD status data", state="d", prefix="ceph-thread")
osd_status = dict()
command = {"prefix": "osd status", "format": "pretty"}
try:
osd_status_raw = ceph_conn.mon_command(json.dumps(command), b"", timeout=1)[
1
].decode("ascii")
except Exception as e:
logger.out("Failed to obtain OSD status data: {}".format(e), state="w")
osd_status_raw = []
if debug:
logger.out("Loop through OSD status data", state="d", prefix="ceph-thread")
for line in osd_status_raw.split("\n"):
# Strip off colour
line = re.sub(r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))", "", line)
# Split it for parsing
line = line.split()
if len(line) > 1 and line[1].isdigit():
# This is an OSD line so parse it
osd_id = line[1]
node = line[3].split(".")[0]
used = line[5]
avail = line[7]
wr_ops = line[9]
wr_data = line[11]
rd_ops = line[13]
rd_data = line[15]
state = line[17]
osd_status.update(
{
str(osd_id): {
"node": node,
"used": used,
"avail": avail,
"wr_ops": wr_ops,
"wr_data": wr_data,
"rd_ops": rd_ops,
"rd_data": rd_data,
"state": state,
}
}
)
# Merge them together into a single meaningful dict
if debug:
logger.out("Merge OSD data together", state="d", prefix="ceph-thread")
osd_stats = dict()
for osd in osd_list:
if zkhandler.read(("osd.node", osd)) == config["node_hostname"]:
osds_this_node += 1
try:
this_dump = osd_dump[osd]
this_dump.update(osd_df[osd])
this_dump.update(osd_status[osd])
osd_stats[osd] = this_dump
except KeyError as e:
# One or more of the status commands timed out, just continue
logger.out(
"Failed to parse OSD stats into dictionary: {}".format(e), state="w"
)
# Upload OSD data for the cluster (primary-only)
if this_node.router_state == "primary":
if debug:
logger.out(
"Trigger updates for each OSD", state="d", prefix="ceph-thread"
)
for osd in osd_list:
try:
stats = json.dumps(osd_stats[osd])
zkhandler.write([(("osd.stats", osd), str(stats))])
except KeyError as e:
# One or more of the status commands timed out, just continue
logger.out(
"Failed to upload OSD stats from dictionary: {}".format(e),
state="w",
)
ceph_conn.shutdown()
queue.put(ceph_health_colour)
queue.put(ceph_health)
queue.put(osds_this_node)
if debug:
logger.out("Thread finished", state="d", prefix="ceph-thread")
# VM stats update function
def collect_vm_stats(logger, config, zkhandler, this_node, queue):
debug = config["debug"]
if debug:
logger.out("Thread starting", state="d", prefix="vm-thread")
# Connect to libvirt
libvirt_name = "qemu:///system"
if debug:
logger.out("Connecting to libvirt", state="d", prefix="vm-thread")
try:
lv_conn = libvirt.open(libvirt_name)
if lv_conn is None:
raise Exception
except Exception:
logger.out('Failed to open connection to "{}"'.format(libvirt_name), state="e")
return
memalloc = 0
memprov = 0
vcpualloc = 0
# Toggle state management of dead VMs to restart them
if debug:
logger.out(
"Toggle state management of dead VMs to restart them",
state="d",
prefix="vm-thread",
)
# Make a copy of the d_domain; if not, and it changes in flight, this can fail
fixed_d_domain = this_node.d_domain.copy()
for domain, instance in fixed_d_domain.items():
if domain in this_node.domain_list:
# Add the allocated memory to our memalloc value
memalloc += instance.getmemory()
memprov += instance.getmemory()
vcpualloc += instance.getvcpus()
if instance.getstate() == "start" and instance.getnode() == this_node.name:
if instance.getdom() is not None:
try:
if instance.getdom().state()[0] != libvirt.VIR_DOMAIN_RUNNING:
logger.out(
"VM {} has failed".format(instance.domname),
state="w",
prefix="vm-thread",
)
raise
except Exception:
# Toggle a state "change"
logger.out(
"Resetting state to {} for VM {}".format(
instance.getstate(), instance.domname
),
state="i",
prefix="vm-thread",
)
zkhandler.write(
[(("domain.state", domain), instance.getstate())]
)
elif instance.getnode() == this_node.name:
memprov += instance.getmemory()
# Get list of running domains from Libvirt
running_domains = lv_conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_ACTIVE)
# Get statistics from any running VMs
for domain in running_domains:
try:
# Get basic information about the VM
tree = ElementTree.fromstring(domain.XMLDesc())
domain_uuid = domain.UUIDString()
domain_name = domain.name()
# Get all the raw information about the VM
if debug:
logger.out(
"Getting general statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
(
domain_state,
domain_maxmem,
domain_mem,
domain_vcpus,
domain_cputime,
) = domain.info()
# We can't properly gather stats from a non-running VMs so continue
if domain_state != libvirt.VIR_DOMAIN_RUNNING:
continue
domain_memory_stats = domain.memoryStats()
domain_cpu_stats = domain.getCPUStats(True)[0]
except Exception as e:
if debug:
try:
logger.out(
"Failed getting VM information for {}: {}".format(
domain.name(), e
),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
# Ensure VM is present in the domain_list
if domain_uuid not in this_node.domain_list:
this_node.domain_list.append(domain_uuid)
if debug:
logger.out(
"Getting disk statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
domain_disk_stats = []
try:
for disk in tree.findall("devices/disk"):
disk_name = disk.find("source").get("name")
if not disk_name:
disk_name = disk.find("source").get("file")
disk_stats = domain.blockStats(disk.find("target").get("dev"))
domain_disk_stats.append(
{
"name": disk_name,
"rd_req": disk_stats[0],
"rd_bytes": disk_stats[1],
"wr_req": disk_stats[2],
"wr_bytes": disk_stats[3],
"err": disk_stats[4],
}
)
except Exception as e:
if debug:
try:
logger.out(
"Failed getting disk stats for {}: {}".format(domain.name(), e),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
if debug:
logger.out(
"Getting network statistics for VM {}".format(domain_name),
state="d",
prefix="vm-thread",
)
domain_network_stats = []
try:
for interface in tree.findall("devices/interface"):
interface_type = interface.get("type")
if interface_type not in ["bridge"]:
continue
interface_name = interface.find("target").get("dev")
interface_bridge = interface.find("source").get("bridge")
interface_stats = domain.interfaceStats(interface_name)
domain_network_stats.append(
{
"name": interface_name,
"bridge": interface_bridge,
"rd_bytes": interface_stats[0],
"rd_packets": interface_stats[1],
"rd_errors": interface_stats[2],
"rd_drops": interface_stats[3],
"wr_bytes": interface_stats[4],
"wr_packets": interface_stats[5],
"wr_errors": interface_stats[6],
"wr_drops": interface_stats[7],
}
)
except Exception as e:
if debug:
try:
logger.out(
"Failed getting network stats for {}: {}".format(
domain.name(), e
),
state="d",
prefix="vm-thread",
)
except Exception:
pass
continue
# Create the final dictionary
domain_stats = {
"state": libvirt_vm_states[domain_state],
"maxmem": domain_maxmem,
"livemem": domain_mem,
"cpus": domain_vcpus,
"cputime": domain_cputime,
"mem_stats": domain_memory_stats,
"cpu_stats": domain_cpu_stats,
"disk_stats": domain_disk_stats,
"net_stats": domain_network_stats,
}
if debug:
logger.out(
"Writing statistics for VM {} to Zookeeper".format(domain_name),
state="d",
prefix="vm-thread",
)
try:
zkhandler.write(
[(("domain.stats", domain_uuid), str(json.dumps(domain_stats)))]
)
except Exception as e:
if debug:
logger.out(
"Failed to write domain statistics: {}".format(e),
state="d",
prefix="vm-thread",
)
# Close the Libvirt connection
lv_conn.close()
queue.put(len(running_domains))
queue.put(memalloc)
queue.put(memprov)
queue.put(vcpualloc)
if debug:
logger.out("Thread finished", state="d", prefix="vm-thread")
# Keepalive update function
def node_keepalive(logger, config, zkhandler, this_node):
debug = config["debug"]
if debug:
logger.out("Keepalive starting", state="d", prefix="main-thread")
# Set the migration selector in Zookeeper for clients to read
if config["enable_hypervisor"]:
if this_node.router_state == "primary":
try:
if (
zkhandler.read("base.config.migration_target_selector")
!= config["migration_target_selector"]
):
raise
except Exception:
zkhandler.write(
[
(
"base.config.migration_target_selector",
config["migration_target_selector"],
)
]
)
# Set the upstream IP in Zookeeper for clients to read
if config["enable_networking"]:
if this_node.router_state == "primary":
try:
if (
zkhandler.read("base.config.upstream_ip")
!= config["upstream_floating_ip"]
):
raise
except Exception:
zkhandler.write(
[("base.config.upstream_ip", config["upstream_floating_ip"])]
)
# Get past state and update if needed
if debug:
logger.out(
"Get past state and update if needed", state="d", prefix="main-thread"
)
past_state = zkhandler.read(("node.state.daemon", this_node.name))
if past_state != "run" and past_state != "shutdown":
this_node.daemon_state = "run"
zkhandler.write([(("node.state.daemon", this_node.name), "run")])
else:
this_node.daemon_state = "run"
# Ensure the primary key is properly set
if debug:
logger.out(
"Ensure the primary key is properly set", state="d", prefix="main-thread"
)
if this_node.router_state == "primary":
if zkhandler.read("base.config.primary_node") != this_node.name:
zkhandler.write([("base.config.primary_node", this_node.name)])
# Run VM statistics collection in separate thread for parallelization
if config["enable_hypervisor"]:
vm_thread_queue = Queue()
vm_stats_thread = Thread(
target=collect_vm_stats,
args=(logger, config, zkhandler, this_node, vm_thread_queue),
kwargs={},
)
vm_stats_thread.start()
# Run Ceph status collection in separate thread for parallelization
if config["enable_storage"]:
ceph_thread_queue = Queue()
ceph_stats_thread = Thread(
target=collect_ceph_stats,
args=(logger, config, zkhandler, this_node, ceph_thread_queue),
kwargs={},
)
ceph_stats_thread.start()
# Get node performance statistics
this_node.memtotal = int(psutil.virtual_memory().total / 1024 / 1024)
this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024)
this_node.memfree = int(psutil.virtual_memory().free / 1024 / 1024)
this_node.cpuload = os.getloadavg()[0]
# Join against running threads
if config["enable_hypervisor"]:
vm_stats_thread.join(timeout=config["keepalive_interval"])
if vm_stats_thread.is_alive():
logger.out("VM stats gathering exceeded timeout, continuing", state="w")
if config["enable_storage"]:
ceph_stats_thread.join(timeout=config["keepalive_interval"])
if ceph_stats_thread.is_alive():
logger.out("Ceph stats gathering exceeded timeout, continuing", state="w")
# Get information from thread queues
if config["enable_hypervisor"]:
try:
this_node.domains_count = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.memalloc = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.memprov = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
this_node.vcpualloc = vm_thread_queue.get(
timeout=config["keepalive_interval"]
)
except Exception:
logger.out("VM stats queue get exceeded timeout, continuing", state="w")
else:
this_node.domains_count = 0
this_node.memalloc = 0
this_node.memprov = 0
this_node.vcpualloc = 0
if config["enable_storage"]:
try:
ceph_health_colour = ceph_thread_queue.get(
timeout=config["keepalive_interval"]
)
ceph_health = ceph_thread_queue.get(timeout=config["keepalive_interval"])
osds_this_node = ceph_thread_queue.get(timeout=config["keepalive_interval"])
except Exception:
logger.out("Ceph stats queue get exceeded timeout, continuing", state="w")
ceph_health_colour = logger.fmt_cyan
ceph_health = "UNKNOWN"
osds_this_node = "?"
# Set our information in zookeeper
keepalive_time = int(time.time())
if debug:
logger.out("Set our information in zookeeper", state="d", prefix="main-thread")
try:
zkhandler.write(
[
(("node.memory.total", this_node.name), str(this_node.memtotal)),
(("node.memory.used", this_node.name), str(this_node.memused)),
(("node.memory.free", this_node.name), str(this_node.memfree)),
(("node.memory.allocated", this_node.name), str(this_node.memalloc)),
(("node.memory.provisioned", this_node.name), str(this_node.memprov)),
(("node.vcpu.allocated", this_node.name), str(this_node.vcpualloc)),
(("node.cpu.load", this_node.name), str(this_node.cpuload)),
(
("node.count.provisioned_domains", this_node.name),
str(this_node.domains_count),
),
(
("node.running_domains", this_node.name),
" ".join(this_node.domain_list),
),
(("node.keepalive", this_node.name), str(keepalive_time)),
]
)
except Exception:
logger.out("Failed to set keepalive data", state="e")
# Display node information to the terminal
if config["log_keepalives"]:
if this_node.router_state == "primary":
cst_colour = logger.fmt_green
elif this_node.router_state == "secondary":
cst_colour = logger.fmt_blue
else:
cst_colour = logger.fmt_cyan
logger.out(
"{}{} keepalive @ {}{} [{}{}{}]".format(
logger.fmt_purple,
config["node_hostname"],
datetime.now(),
logger.fmt_end,
logger.fmt_bold + cst_colour,
this_node.router_state,
logger.fmt_end,
),
state="t",
)
if config["log_keepalive_cluster_details"]:
logger.out(
"{bold}Maintenance:{nofmt} {maint} "
"{bold}Active VMs:{nofmt} {domcount} "
"{bold}Networks:{nofmt} {netcount} "
"{bold}Load:{nofmt} {load} "
"{bold}Memory [MiB]: VMs:{nofmt} {allocmem} "
"{bold}Used:{nofmt} {usedmem} "
"{bold}Free:{nofmt} {freemem}".format(
bold=logger.fmt_bold,
nofmt=logger.fmt_end,
maint=this_node.maintenance,
domcount=this_node.domains_count,
netcount=len(zkhandler.children("base.network")),
load=this_node.cpuload,
freemem=this_node.memfree,
usedmem=this_node.memused,
allocmem=this_node.memalloc,
),
state="t",
)
if config["enable_storage"] and config["log_keepalive_storage_details"]:
logger.out(
"{bold}Ceph cluster status:{nofmt} {health_colour}{health}{nofmt} "
"{bold}Total OSDs:{nofmt} {total_osds} "
"{bold}Node OSDs:{nofmt} {node_osds} "
"{bold}Pools:{nofmt} {total_pools} ".format(
bold=logger.fmt_bold,
health_colour=ceph_health_colour,
nofmt=logger.fmt_end,
health=ceph_health,
total_osds=len(zkhandler.children("base.osd")),
node_osds=osds_this_node,
total_pools=len(zkhandler.children("base.pool")),
),
state="t",
)
# Look for dead nodes and fence them
if not this_node.maintenance:
if debug:
logger.out(
"Look for dead nodes and fence them", state="d", prefix="main-thread"
)
if config["daemon_mode"] == "coordinator":
for node_name in zkhandler.children("base.node"):
try:
node_daemon_state = zkhandler.read(("node.state.daemon", node_name))
node_keepalive = int(zkhandler.read(("node.keepalive", node_name)))
except Exception:
node_daemon_state = "unknown"
node_keepalive = 0
# Handle deadtime and fencng if needed
# (A node is considered dead when its keepalive timer is >6*keepalive_interval seconds
# out-of-date while in 'start' state)
node_deadtime = int(time.time()) - (
int(config["keepalive_interval"]) * int(config["fence_intervals"])
)
if node_keepalive < node_deadtime and node_daemon_state == "run":
logger.out(
"Node {} seems dead - starting monitor for fencing".format(
node_name
),
state="w",
)
zk_lock = zkhandler.writelock(("node.state.daemon", node_name))
with zk_lock:
# Ensures that, if we lost the lock race and come out of waiting,
# we won't try to trigger our own fence thread.
if zkhandler.read(("node.state.daemon", node_name)) != "dead":
fence_thread = Thread(
target=pvcnoded.util.fencing.fence_node,
args=(node_name, zkhandler, config, logger),
kwargs={},
)
fence_thread.start()
# Write the updated data after we start the fence thread
zkhandler.write(
[(("node.state.daemon", node_name), "dead")]
)
if debug:
logger.out("Keepalive finished", state="d", prefix="main-thread")