Compare commits
4 Commits
07026efb63
...
83c4c6633d
Author | SHA1 | Date | |
---|---|---|---|
83c4c6633d | |||
2a9bc632fa | |||
b5e4c52387 | |||
b522306f87 |
@ -25,10 +25,10 @@ CELERY_BIN="$( which celery )"
|
|||||||
# app arguments work in a non-backwards-compatible way with Celery 5.
|
# app arguments work in a non-backwards-compatible way with Celery 5.
|
||||||
case "$( cat /etc/debian_version )" in
|
case "$( cat /etc/debian_version )" in
|
||||||
10.*)
|
10.*)
|
||||||
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --concurrency 1 --hostname $(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
CELERY_ARGS="worker --app pvcapid.flaskapi.celery --concurrency 3 --hostname $(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
||||||
;;
|
;;
|
||||||
*)
|
*)
|
||||||
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --concurrency 1 --hostname $(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
CELERY_ARGS="--app pvcapid.flaskapi.celery worker --concurrency 3 --hostname $(hostname -s) --queues $(hostname -s) --loglevel INFO"
|
||||||
;;
|
;;
|
||||||
esac
|
esac
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ def wait_for_celery_task(CLI_CONFIG, task_detail):
|
|||||||
# Wait for the task to start
|
# Wait for the task to start
|
||||||
echo(CLI_CONFIG, "Waiting for task to start...", newline=False)
|
echo(CLI_CONFIG, "Waiting for task to start...", newline=False)
|
||||||
while True:
|
while True:
|
||||||
sleep(0.25)
|
sleep(0.5)
|
||||||
task_status = pvc.lib.common.task_status(
|
task_status = pvc.lib.common.task_status(
|
||||||
CLI_CONFIG, task_id=task_id, is_watching=True
|
CLI_CONFIG, task_id=task_id, is_watching=True
|
||||||
)
|
)
|
||||||
@ -104,7 +104,7 @@ def wait_for_celery_task(CLI_CONFIG, task_detail):
|
|||||||
newline=False,
|
newline=False,
|
||||||
)
|
)
|
||||||
while True:
|
while True:
|
||||||
sleep(0.25)
|
sleep(0.5)
|
||||||
if task_status.get("state") != "RUNNING":
|
if task_status.get("state") != "RUNNING":
|
||||||
break
|
break
|
||||||
if task_status.get("current") > last_task:
|
if task_status.get("current") > last_task:
|
||||||
|
@ -33,10 +33,12 @@ class TaskFailure(Exception):
|
|||||||
def start(celery, msg, current=0, total=1):
|
def start(celery, msg, current=0, total=1):
|
||||||
logger = getLogger(__name__)
|
logger = getLogger(__name__)
|
||||||
logger.info(f"Starting {current}/{total}: {msg}")
|
logger.info(f"Starting {current}/{total}: {msg}")
|
||||||
|
if celery is None:
|
||||||
|
return
|
||||||
celery.update_state(
|
celery.update_state(
|
||||||
state="RUNNING", meta={"current": current, "total": total, "status": msg}
|
state="RUNNING", meta={"current": current, "total": total, "status": msg}
|
||||||
)
|
)
|
||||||
sleep(0.5)
|
sleep(1)
|
||||||
|
|
||||||
|
|
||||||
def fail(celery, msg, current=1, total=1):
|
def fail(celery, msg, current=1, total=1):
|
||||||
@ -64,19 +66,23 @@ def log_err(celery, msg):
|
|||||||
def update(celery, msg, current=1, total=2):
|
def update(celery, msg, current=1, total=2):
|
||||||
logger = getLogger(__name__)
|
logger = getLogger(__name__)
|
||||||
logger.info(f"Task update {current}/{total}: {msg}")
|
logger.info(f"Task update {current}/{total}: {msg}")
|
||||||
|
if celery is None:
|
||||||
|
return
|
||||||
celery.update_state(
|
celery.update_state(
|
||||||
state="RUNNING", meta={"current": current, "total": total, "status": msg}
|
state="RUNNING", meta={"current": current, "total": total, "status": msg}
|
||||||
)
|
)
|
||||||
sleep(0.5)
|
sleep(1)
|
||||||
|
|
||||||
|
|
||||||
def finish(celery, msg, current=2, total=2):
|
def finish(celery, msg, current=2, total=2):
|
||||||
logger = getLogger(__name__)
|
logger = getLogger(__name__)
|
||||||
logger.info(f"Task update {current}/{total}: Finishing up")
|
logger.info(f"Task update {current}/{total}: Finishing up")
|
||||||
|
if celery is None:
|
||||||
|
return
|
||||||
celery.update_state(
|
celery.update_state(
|
||||||
state="RUNNING",
|
state="RUNNING",
|
||||||
meta={"current": current, "total": total, "status": "Finishing up"},
|
meta={"current": current, "total": total, "status": "Finishing up"},
|
||||||
)
|
)
|
||||||
sleep(0.5)
|
sleep(1)
|
||||||
logger.info(f"Success {current}/{total}: {msg}")
|
logger.info(f"Success {current}/{total}: {msg}")
|
||||||
return {"status": msg, "current": current, "total": total}
|
return {"status": msg, "current": current, "total": total}
|
||||||
|
106
node-daemon/plugins/kydb
Normal file
106
node-daemon/plugins/kydb
Normal file
@ -0,0 +1,106 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# kydb.py - PVC Monitoring example plugin for KeyDB/Redis
|
||||||
|
# Part of the Parallel Virtual Cluster (PVC) system
|
||||||
|
#
|
||||||
|
# Copyright (C) 2018-2023 Joshua M. Boniface <joshua@boniface.me>
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, version 3.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
###############################################################################
|
||||||
|
|
||||||
|
# This script provides an example of a PVC monitoring plugin script. It will create
|
||||||
|
# a simple plugin to check the Libvirt daemon instance on the node for operation.
|
||||||
|
|
||||||
|
# This script can thus be used as an example or reference implementation of a
|
||||||
|
# PVC monitoring pluginscript and expanded upon as required.
|
||||||
|
|
||||||
|
# A monitoring plugin script must implement the class "MonitoringPluginScript" which
|
||||||
|
# extends "MonitoringPlugin", providing the 3 functions indicated. Detailed explanation
|
||||||
|
# of the role of each function is provided in context of the example; see the other
|
||||||
|
# examples for more potential uses.
|
||||||
|
|
||||||
|
# WARNING:
|
||||||
|
#
|
||||||
|
# This script will run in the context of the node daemon keepalives as root.
|
||||||
|
# DO NOT install untrusted, unvetted plugins under any circumstances.
|
||||||
|
|
||||||
|
|
||||||
|
# This import is always required here, as MonitoringPlugin is used by the
|
||||||
|
# MonitoringPluginScript class
|
||||||
|
from pvcnoded.objects.MonitoringInstance import MonitoringPlugin
|
||||||
|
|
||||||
|
|
||||||
|
# A monitoring plugin script must always expose its nice name, which must be identical to
|
||||||
|
# the file name
|
||||||
|
PLUGIN_NAME = "kydb"
|
||||||
|
|
||||||
|
|
||||||
|
# The MonitoringPluginScript class must be named as such, and extend MonitoringPlugin.
|
||||||
|
class MonitoringPluginScript(MonitoringPlugin):
|
||||||
|
def setup(self):
|
||||||
|
"""
|
||||||
|
setup(): Perform special setup steps during node daemon startup
|
||||||
|
|
||||||
|
This step is optional and should be used sparingly.
|
||||||
|
|
||||||
|
If you wish for the plugin to not kydb in certain conditions, do any checks here
|
||||||
|
and return a non-None failure message to indicate the error.
|
||||||
|
"""
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
def run(self, coordinator_state=None):
|
||||||
|
"""
|
||||||
|
run(): Perform the check actions and return a PluginResult object
|
||||||
|
|
||||||
|
The {coordinator_state} can be used to check if this is a "primary" coordinator, "secondary" coordinator, or "client" (non-coordinator)
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Run any imports first
|
||||||
|
from redis import Redis
|
||||||
|
|
||||||
|
rd_conn = None
|
||||||
|
|
||||||
|
# Set the health delta to 0 (no change)
|
||||||
|
health_delta = 0
|
||||||
|
# Craft a message that can be used by the clients
|
||||||
|
message = "Successfully connected to Libvirtd on localhost"
|
||||||
|
|
||||||
|
# Check the Zookeeper connection
|
||||||
|
try:
|
||||||
|
rd_conn = Redis(host='localhost', port=6379, decode_responses=True)
|
||||||
|
data = rd_conn.info()
|
||||||
|
except Exception as e:
|
||||||
|
health_delta = 50
|
||||||
|
message = f"Failed to connect to KeyDB/Redis: {e}"
|
||||||
|
finally:
|
||||||
|
del rd_conn
|
||||||
|
|
||||||
|
# Set the health delta in our local PluginResult object
|
||||||
|
self.plugin_result.set_health_delta(health_delta)
|
||||||
|
|
||||||
|
# Set the message in our local PluginResult object
|
||||||
|
self.plugin_result.set_message(message)
|
||||||
|
|
||||||
|
# Return our local PluginResult object
|
||||||
|
return self.plugin_result
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
"""
|
||||||
|
cleanup(): Perform special cleanup steps during node daemon termination
|
||||||
|
|
||||||
|
This step is optional and should be used sparingly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
pass
|
@ -24,8 +24,8 @@ import time
|
|||||||
import libvirt
|
import libvirt
|
||||||
|
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
from xml.etree import ElementTree
|
from xml.etree import ElementTree
|
||||||
|
from json import loads as jloads
|
||||||
|
|
||||||
import daemon_lib.common as common
|
import daemon_lib.common as common
|
||||||
|
|
||||||
@ -283,9 +283,49 @@ class VMInstance(object):
|
|||||||
self.logger.out(
|
self.logger.out(
|
||||||
"Flushing RBD locks", state="i", prefix="Domain {}".format(self.domuuid)
|
"Flushing RBD locks", state="i", prefix="Domain {}".format(self.domuuid)
|
||||||
)
|
)
|
||||||
VMInstance.flush_locks(
|
|
||||||
self.zkhandler, self.logger, self.domuuid, self.this_node
|
rbd_list = self.zkhandler.read(
|
||||||
|
("domain.storage.volumes", self.domuuid)
|
||||||
|
).split(",")
|
||||||
|
|
||||||
|
locks = list()
|
||||||
|
for rbd in rbd_list:
|
||||||
|
retcode, stdout, stderr = common.run_os_command(
|
||||||
|
f"rbd lock list --format json {rbd}"
|
||||||
)
|
)
|
||||||
|
if retcode == 0:
|
||||||
|
_locks = jloads(stdout)
|
||||||
|
for lock in _locks:
|
||||||
|
lock["rbd"] = rbd
|
||||||
|
locks.append(lock)
|
||||||
|
|
||||||
|
for lock in locks:
|
||||||
|
lockid = lock["id"]
|
||||||
|
locker = lock["locker"]
|
||||||
|
owner = lock["address"].split(":")[0]
|
||||||
|
rbd = lock["rbd"]
|
||||||
|
|
||||||
|
if owner == self.this_node.storage_ipaddr:
|
||||||
|
retcode, stdout, stderr = common.run_os_command(
|
||||||
|
f'rbd lock remove {rbd} "{lockid}" "{locker}"'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.logger.out(
|
||||||
|
"RBD lock does not belong to this host (owner {owner}) so freeing this long is dangerous; aborting VM start",
|
||||||
|
state="e",
|
||||||
|
prefix="Domain {}".format(self.domuuid),
|
||||||
|
)
|
||||||
|
self.zkhandler.write(
|
||||||
|
[
|
||||||
|
(("domain.state", self.domuuid), "fail"),
|
||||||
|
(
|
||||||
|
("domain.failed_reason", self.domuuid),
|
||||||
|
f"Could not safely free RBD lock {lockid} ({owner}) on volume {rbd}; stop VM and flush locks manually",
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
if self.zkhandler.read(("domain.state", self.domuuid)) == "fail":
|
if self.zkhandler.read(("domain.state", self.domuuid)) == "fail":
|
||||||
lv_conn.close()
|
lv_conn.close()
|
||||||
self.dom = None
|
self.dom = None
|
||||||
|
@ -23,7 +23,7 @@ import time
|
|||||||
|
|
||||||
import daemon_lib.common as common
|
import daemon_lib.common as common
|
||||||
|
|
||||||
from pvcnoded.objects.VMInstance import VMInstance
|
from daemon_lib.vm import vm_worker_flush_locks
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
@ -121,7 +121,7 @@ def migrateFromFencedNode(zkhandler, node_name, config, logger):
|
|||||||
|
|
||||||
# Migrate a VM after a flush
|
# Migrate a VM after a flush
|
||||||
def fence_migrate_vm(dom_uuid):
|
def fence_migrate_vm(dom_uuid):
|
||||||
VMInstance.flush_locks(zkhandler, logger, dom_uuid)
|
vm_worker_flush_locks(zkhandler, None, dom_uuid, force_unlock=True)
|
||||||
|
|
||||||
target_node = common.findTargetNode(zkhandler, dom_uuid)
|
target_node = common.findTargetNode(zkhandler, dom_uuid)
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user