Compare commits

...

4 Commits

Author SHA1 Message Date
83c4c6633d Readd RBD lock detection and clearing on startup
This is still needed due to the nature of the locks and freeing them on
startup, and to preserve lock=fail behaviour on VM startup.

Also fixes the fencing lock flush to directly use the client library
outside of Celery. I don't like this hack but it seems prudent until we
move fencing to the workers as well.
2023-11-10 01:33:48 -05:00
2a9bc632fa Add node monitoring plugin for KeyDB/Redis 2023-11-10 00:56:46 -05:00
b5e4c52387 Increase worker concurrency to 3 2023-11-10 00:39:42 -05:00
b522306f87 Increase Celery wait times
It's a bit inefficient, but provides nicer output and a bit of settling
time between each stage.
2023-11-09 23:54:05 -05:00
6 changed files with 165 additions and 13 deletions

View File

@ -25,10 +25,10 @@ CELERY_BIN="$( which celery )"
# app arguments work in a non-backwards-compatible way with Celery 5.
case "$( cat /etc/debian_version )" in
10.*)
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --concurrency 1 --hostname $(hostname -s) --queues $(hostname -s) --loglevel INFO"
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --concurrency 3 --hostname $(hostname -s) --queues $(hostname -s) --loglevel INFO"
;;
*)
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --concurrency 1 --hostname $(hostname -s) --queues $(hostname -s) --loglevel INFO"
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --concurrency 3 --hostname $(hostname -s) --queues $(hostname -s) --loglevel INFO"
;;
esac

View File

@ -79,7 +79,7 @@ def wait_for_celery_task(CLI_CONFIG, task_detail):
# Wait for the task to start
echo(CLI_CONFIG, "Waiting for task to start...", newline=False)
while True:
sleep(0.25)
sleep(0.5)
task_status = pvc.lib.common.task_status(
CLI_CONFIG, task_id=task_id, is_watching=True
)
@ -104,7 +104,7 @@ def wait_for_celery_task(CLI_CONFIG, task_detail):
newline=False,
)
while True:
sleep(0.25)
sleep(0.5)
if task_status.get("state") != "RUNNING":
break
if task_status.get("current") > last_task:

View File

@ -33,10 +33,12 @@ class TaskFailure(Exception):
def start(celery, msg, current=0, total=1):
logger = getLogger(__name__)
logger.info(f"Starting {current}/{total}: {msg}")
if celery is None:
return
celery.update_state(
state="RUNNING", meta={"current": current, "total": total, "status": msg}
)
sleep(0.5)
sleep(1)
def fail(celery, msg, current=1, total=1):
@ -64,19 +66,23 @@ def log_err(celery, msg):
def update(celery, msg, current=1, total=2):
logger = getLogger(__name__)
logger.info(f"Task update {current}/{total}: {msg}")
if celery is None:
return
celery.update_state(
state="RUNNING", meta={"current": current, "total": total, "status": msg}
)
sleep(0.5)
sleep(1)
def finish(celery, msg, current=2, total=2):
logger = getLogger(__name__)
logger.info(f"Task update {current}/{total}: Finishing up")
if celery is None:
return
celery.update_state(
state="RUNNING",
meta={"current": current, "total": total, "status": "Finishing up"},
)
sleep(0.5)
sleep(1)
logger.info(f"Success {current}/{total}: {msg}")
return {"status": msg, "current": current, "total": total}

106
node-daemon/plugins/kydb Normal file
View File

@ -0,0 +1,106 @@
#!/usr/bin/env python3
# kydb.py - PVC Monitoring example plugin for KeyDB/Redis
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2023 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
# This script provides an example of a PVC monitoring plugin script. It will create
# a simple plugin to check the Libvirt daemon instance on the node for operation.
# This script can thus be used as an example or reference implementation of a
# PVC monitoring pluginscript and expanded upon as required.
# A monitoring plugin script must implement the class "MonitoringPluginScript" which
# extends "MonitoringPlugin", providing the 3 functions indicated. Detailed explanation
# of the role of each function is provided in context of the example; see the other
# examples for more potential uses.
# WARNING:
#
# This script will run in the context of the node daemon keepalives as root.
# DO NOT install untrusted, unvetted plugins under any circumstances.
# This import is always required here, as MonitoringPlugin is used by the
# MonitoringPluginScript class
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
# A monitoring plugin script must always expose its nice name, which must be identical to
# the file name
PLUGIN_NAME = "kydb"
# The MonitoringPluginScript class must be named as such, and extend MonitoringPlugin.
class MonitoringPluginScript(MonitoringPlugin):
def setup(self):
"""
setup(): Perform special setup steps during node daemon startup
This step is optional and should be used sparingly.
If you wish for the plugin to not kydb in certain conditions, do any checks here
and return a non-None failure message to indicate the error.
"""
pass
def run(self, coordinator_state=None):
"""
run(): Perform the check actions and return a PluginResult object
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
"""
# Run any imports first
from redis import Redis
rd_conn = None
# Set the health delta to 0 (no change)
health_delta = 0
# Craft a message that can be used by the clients
message = "Successfully connected to Libvirtd on localhost"
# Check the Zookeeper connection
try:
rd_conn = Redis(host='localhost', port=6379, decode_responses=True)
data = rd_conn.info()
except Exception as e:
health_delta = 50
message = f"Failed to connect to KeyDB/Redis: {e}"
finally:
del rd_conn
# Set the health delta in our local PluginResult object
self.plugin_result.set_health_delta(health_delta)
# Set the message in our local PluginResult object
self.plugin_result.set_message(message)
# Return our local PluginResult object
return self.plugin_result
def cleanup(self):
"""
cleanup(): Perform special cleanup steps during node daemon termination
This step is optional and should be used sparingly.
"""
pass

View File

@ -24,8 +24,8 @@ import time
import libvirt
from threading import Thread
from xml.etree import ElementTree
from json import loads as jloads
import daemon_lib.common as common
@ -283,9 +283,49 @@ class VMInstance(object):
self.logger.out(
"Flushing RBD locks", state="i", prefix="Domain {}".format(self.domuuid)
)
VMInstance.flush_locks(
self.zkhandler, self.logger, self.domuuid, self.this_node
)
rbd_list = self.zkhandler.read(
("domain.storage.volumes", self.domuuid)
).split(",")
locks = list()
for rbd in rbd_list:
retcode, stdout, stderr = common.run_os_command(
f"rbd lock list --format json {rbd}"
)
if retcode == 0:
_locks = jloads(stdout)
for lock in _locks:
lock["rbd"] = rbd
locks.append(lock)
for lock in locks:
lockid = lock["id"]
locker = lock["locker"]
owner = lock["address"].split(":")[0]
rbd = lock["rbd"]
if owner == self.this_node.storage_ipaddr:
retcode, stdout, stderr = common.run_os_command(
f'rbd lock remove {rbd} "{lockid}" "{locker}"'
)
else:
self.logger.out(
"RBD lock does not belong to this host (owner {owner}) so freeing this long is dangerous; aborting VM start",
state="e",
prefix="Domain {}".format(self.domuuid),
)
self.zkhandler.write(
[
(("domain.state", self.domuuid), "fail"),
(
("domain.failed_reason", self.domuuid),
f"Could not safely free RBD lock {lockid} ({owner}) on volume {rbd}; stop VM and flush locks manually",
),
]
)
break
if self.zkhandler.read(("domain.state", self.domuuid)) == "fail":
lv_conn.close()
self.dom = None

View File

@ -23,7 +23,7 @@ import time
import daemon_lib.common as common
from pvcnoded.objects.VMInstance import VMInstance
from daemon_lib.vm import vm_worker_flush_locks
#
@ -121,7 +121,7 @@ def migrateFromFencedNode(zkhandler, node_name, config, logger):
# Migrate a VM after a flush
def fence_migrate_vm(dom_uuid):
VMInstance.flush_locks(zkhandler, logger, dom_uuid)
vm_worker_flush_locks(zkhandler, None, dom_uuid, force_unlock=True)
target_node = common.findTargetNode(zkhandler, dom_uuid)