Port OSD on-node tasks to Celery worker system

Adds Celery versions of the osd_add, osd_replace, osd_refresh,
osd_remove, and osd_db_vg_add functions.
This commit is contained in:
Joshua Boniface 2023-11-09 14:05:15 -05:00
parent 89681d54b9
commit ce17c60a20
12 changed files with 2039 additions and 1723 deletions

View File

@ -35,6 +35,13 @@ from daemon_lib.vm import (
vm_worker_attach_device,
vm_worker_detach_device,
)
from daemon_lib.ceph import (
osd_worker_add_osd,
osd_worker_replace_osd,
osd_worker_refresh_osd,
osd_worker_remove_osd,
osd_worker_add_db_vg,
)
from pvcapid.Daemon import config, strtobool, API_VERSION
@ -237,6 +244,116 @@ def vm_device_detach(self, domain, xml, run_on=None):
return run_vm_device_detach(self, domain, xml)
@celery.task(name="osd.add", bind=True, routing_key="run_on")
def osd_add(
self,
device,
weight,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_add(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio=None,
ext_db_size=None,
split_count=None,
):
return osd_worker_add_osd(
zkhandler,
self,
run_on,
device,
weight,
ext_db_ratio,
ext_db_size,
split_count,
)
return run_osd_add(
self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count
)
@celery.task(name="osd.replace", bind=True, routing_key="run_on")
def osd_replace(
self,
osd_id,
new_device,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
run_on=None,
):
@ZKConnection(config)
def run_osd_replace(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device=None,
weight=None,
ext_db_ratio=None,
ext_db_size=None,
):
return osd_worker_replace_osd(
zkhandler,
self,
run_on,
osd_id,
new_device,
old_device,
weight,
ext_db_ratio,
ext_db_size,
)
return run_osd_replace(
self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size
)
@celery.task(name="osd.refresh", bind=True, routing_key="run_on")
def osd_refresh(self, osd_id, device, ext_db_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False):
return osd_worker_refresh_osd(
zkhandler, self, run_on, osd_id, device, ext_db_flag
)
return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag)
@celery.task(name="osd.remove", bind=True, routing_key="run_on")
def osd_remove(self, osd_id, force_flag=False, skip_zap_flag=False, run_on=None):
@ZKConnection(config)
def run_osd_remove(
zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False
):
return osd_worker_remove_osd(
zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag
)
return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag)
@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on")
def osd_add_db_vg(self, device, run_on=None):
@ZKConnection(config)
def run_osd_add_db_vg(zkhandler, self, run_on, device):
return osd_worker_add_db_vg(zkhandler, self, run_on, device)
return run_osd_add_db_vg(self, run_on, device)
##########################################################
# API Root/Authentication
##########################################################
@ -738,7 +855,14 @@ class API_Tasks_Element(Resource):
"total": 1,
"status": "Pending job start",
}
elif task.state != "FAILURE":
elif task.state == "FAILURE":
response = {
"state": task.state,
"current": 1,
"total": 1,
"status": str(task.info),
}
else:
response = {
"state": task.state,
"current": task.info.get("current", 0),
@ -747,13 +871,6 @@ class API_Tasks_Element(Resource):
}
if "result" in task.info:
response["result"] = task.info["result"]
else:
response = {
"state": task.state,
"current": 1,
"total": 1,
"status": str(task.info),
}
return response
@ -4374,8 +4491,14 @@ class API_Storage_Ceph_OSDDB_Root(Resource):
type: object
id: Message
"""
return api_helper.ceph_osd_db_vg_add(
reqargs.get("node", None), reqargs.get("device", None)
node = reqargs.get("node", None)
task = osd_add_db_vg.delay(reqargs.get("device", None), run_on=node)
return (
{"task_id": task.id, "run_on": node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@ -4565,13 +4688,21 @@ class API_Storage_Ceph_OSD_Root(Resource):
type: object
id: Message
"""
return api_helper.ceph_osd_add(
reqargs.get("node", None),
node = reqargs.get("node", None)
task = osd_add.delay(
reqargs.get("device", None),
reqargs.get("weight", None),
reqargs.get("ext_db_ratio", None),
reqargs.get("ext_db_size", None),
reqargs.get("osd_count", None),
run_on=node,
)
return (
{"task_id": task.id, "run_on": node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@ -4671,13 +4802,26 @@ class API_Storage_Ceph_OSD_Element(Resource):
type: object
id: Message
"""
return api_helper.ceph_osd_replace(
osd_node_detail, retcode = api_helper.ceph_osd_node(osdid)
if retcode == 200:
node = osd_node_detail["node"]
else:
return osd_node_detail, retcode
task = osd_replace.delay(
osdid,
reqargs.get("new_device"),
reqargs.get("old_device", None),
reqargs.get("weight", None),
reqargs.get("ext_db_ratio", None),
reqargs.get("ext_db_size", None),
run_on=node,
)
return (
{"task_id": task.id, "run_on": node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@RequestParser(
@ -4715,9 +4859,22 @@ class API_Storage_Ceph_OSD_Element(Resource):
type: object
id: Message
"""
return api_helper.ceph_osd_refresh(
osd_node_detail, retcode = api_helper.ceph_osd_node(osdid)
if retcode == 200:
node = osd_node_detail["node"]
else:
return osd_node_detail, retcode
task = osd_refresh.delay(
osdid,
reqargs.get("device", None),
run_on=node,
)
return (
{"task_id": task.id, "run_on": node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
@RequestParser(
@ -4771,7 +4928,23 @@ class API_Storage_Ceph_OSD_Element(Resource):
type: object
id: Message
"""
return api_helper.ceph_osd_remove(osdid, reqargs.get("force", False))
osd_node_detail, retcode = api_helper.ceph_osd_node(osdid)
if retcode == 200:
node = osd_node_detail["node"]
else:
return osd_node_detail, retcode
task = osd_remove.delay(
osdid,
force_flag=reqargs.get("force", False),
run_on=node,
)
return (
{"task_id": task.id, "run_on": node},
202,
{"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)},
)
api.add_resource(API_Storage_Ceph_OSD_Element, "/storage/ceph/osd/<osdid>")

View File

@ -1332,6 +1332,36 @@ def ceph_osd_list(zkhandler, limit=None):
return retdata, retcode
@pvc_common.Profiler(config)
@ZKConnection(config)
def ceph_osd_node(zkhandler, osd):
"""
Return the current node of OSD OSD.
"""
retflag, retdata = pvc_ceph.get_list_osd(zkhandler, None)
if retflag:
if retdata:
osd = [o for o in retdata if o["id"] == osd]
if len(osd) < 1:
retcode = 404
retdata = {"message": "OSD not found."}
else:
retcode = 200
retdata = {
"id": osd[0]["id"],
"node": osd[0]["node"],
}
else:
retcode = 404
retdata = {"message": "OSD not found."}
else:
retcode = 400
retdata = {"message": retdata}
return retdata, retcode
@ZKConnection(config)
def ceph_osd_state(zkhandler, osd):
retflag, retdata = pvc_ceph.get_list_osd(zkhandler, osd)

View File

@ -1582,10 +1582,10 @@ def cli_vm_flush_locks(domain, wait_flag):
NOTE: This is a task-based command. The "--wait" flag (default) will block and show progress. Specifying the "--no-wait" flag will return immediately with a job ID instead, which can be queried externally later.
"""
retcode, retmsg = pvc.lib.vm.vm_locks(CLI_CONFIG, domain, wait_flag=wait_flag)
retcode, retmsg = pvc.lib.vm.vm_locks(CLI_CONFIG, domain, wait_flag)
if retcode and wait_flag:
retmsg = wait_for_flush_locks(CLI_CONFIG, retmsg)
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
finish(retcode, retmsg)
@ -3372,10 +3372,18 @@ def cli_storage_osd():
@connection_req
@click.argument("node")
@click.argument("device")
@click.option(
"--wait/--no-wait",
"wait_flag",
is_flag=True,
default=True,
show_default=True,
help="Wait or don't wait for task to complete, showing progress",
)
@confirm_opt(
"Destroy all data on and create a new OSD database volume group on node {node} device {device}"
)
def cli_storage_osd_create_db_vg(node, device):
def cli_storage_osd_create_db_vg(node, device, wait_flag):
"""
Create a new Ceph OSD database volume group on node NODE with block device DEVICE.
@ -3390,7 +3398,12 @@ def cli_storage_osd_create_db_vg(node, device):
A "detect" string is a string in the form "detect:<NAME>:<HUMAN-SIZE>:<ID>". Detect strings allow for automatic determination of Linux block device paths from known basic information about disks by leveraging "lsscsi" on the target host. The "NAME" should be some descriptive identifier, for instance the manufacturer (e.g. "INTEL"), the "HUMAN-SIZE" should be the labeled human-readable size of the device (e.g. "480GB", "1.92TB"), and "ID" specifies the Nth 0-indexed device which matches the "NAME" and "HUMAN-SIZE" values (e.g. "2" would match the third device with the corresponding "NAME" and "HUMAN-SIZE"). When matching against sizes, there is +/- 3% flexibility to account for base-1000 vs. base-1024 differences and rounding errors. The "NAME" may contain whitespace but if so the entire detect string should be quoted, and is case-insensitive. More information about detect strings can be found in the manual.
"""
retcode, retmsg = pvc.lib.storage.ceph_osd_db_vg_add(CLI_CONFIG, node, device)
retcode, retmsg = pvc.lib.storage.ceph_osd_db_vg_add(
CLI_CONFIG, node, device, wait_flag
)
if retcode and wait_flag:
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
finish(retcode, retmsg)
@ -3434,8 +3447,18 @@ def cli_storage_osd_create_db_vg(node, device):
type=int,
help="Split (an NVMe) disk into this many OSDs.",
)
@click.option(
"--wait/--no-wait",
"wait_flag",
is_flag=True,
default=True,
show_default=True,
help="Wait or don't wait for task to complete, showing progress",
)
@confirm_opt("Destroy all data on and create new OSD(s) on node {node} device {device}")
def cli_storage_osd_add(node, device, weight, ext_db_ratio, ext_db_size, osd_count):
def cli_storage_osd_add(
node, device, weight, ext_db_ratio, ext_db_size, osd_count, wait_flag
):
"""
Add a new Ceph OSD on node NODE with block device DEVICE.
@ -3456,11 +3479,6 @@ def cli_storage_osd_add(node, device, weight, ext_db_ratio, ext_db_size, osd_cou
NOTE: This command may take a long time to complete. Observe the node logs of the hosting OSD node for detailed status.
"""
echo(
CLI_CONFIG,
"Waiting for node task to complete, this may take some time... ",
newline=False,
)
retcode, retmsg = pvc.lib.storage.ceph_osd_add(
CLI_CONFIG,
node,
@ -3469,8 +3487,11 @@ def cli_storage_osd_add(node, device, weight, ext_db_ratio, ext_db_size, osd_cou
ext_db_ratio,
ext_db_size,
osd_count,
wait_flag,
)
echo(CLI_CONFIG, "done.")
if retcode and wait_flag:
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
finish(retcode, retmsg)
@ -3509,11 +3530,19 @@ def cli_storage_osd_add(node, device, weight, ext_db_ratio, ext_db_size, osd_cou
default=None,
help="Create a new external database logical volume for the OSD(s) with this human-unit size; if unset, old ext_db_size is used",
)
@click.option(
"--wait/--no-wait",
"wait_flag",
is_flag=True,
default=True,
show_default=True,
help="Wait or don't wait for task to complete, showing progress",
)
@confirm_opt(
"Destroy all data on and replace OSD {osdid} (and peer split OSDs) with new device {new_device}"
)
def cli_storage_osd_replace(
osdid, new_device, old_device, weight, ext_db_ratio, ext_db_size
osdid, new_device, old_device, weight, ext_db_ratio, ext_db_size, wait_flag
):
"""
Replace the block device of an existing OSD with ID OSDID, and any peer split OSDs with the same block device, with NEW_DEVICE. Use this command to replace a failed or smaller OSD block device with a new one in one command.
@ -3533,15 +3562,19 @@ def cli_storage_osd_replace(
NOTE: This command may take a long time to complete. Observe the node logs of the hosting OSD node for detailed status.
"""
echo(
CLI_CONFIG,
"Waiting for node task to complete, this may take some time... ",
newline=False,
)
retcode, retmsg = pvc.lib.storage.ceph_osd_replace(
CLI_CONFIG, osdid, new_device, old_device, weight, ext_db_ratio, ext_db_size
CLI_CONFIG,
osdid,
new_device,
old_device,
weight,
ext_db_ratio,
ext_db_size,
wait_flag,
)
echo(CLI_CONFIG, "done.")
if retcode and wait_flag:
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
finish(retcode, retmsg)
@ -3552,8 +3585,16 @@ def cli_storage_osd_replace(
@connection_req
@click.argument("osdid")
@click.argument("device")
@confirm_opt("Refresh OSD {osdid} on device {device}")
def cli_storage_osd_refresh(osdid, device):
@click.option(
"--wait/--no-wait",
"wait_flag",
is_flag=True,
default=True,
show_default=True,
help="Wait or don't wait for task to complete, showing progress",
)
@confirm_opt("Refresh OSD {osdid} (and peer split OSDs) on device {device}")
def cli_storage_osd_refresh(osdid, device, wait_flag):
"""
Refresh (reimport) the block DEVICE of an existing OSD with ID OSDID. Use this command to reimport a working OSD into a rebuilt/replaced node.
@ -3566,13 +3607,12 @@ def cli_storage_osd_refresh(osdid, device):
NOTE: This command may take a long time to complete. Observe the node logs of the hosting OSD node for detailed status.
"""
echo(
CLI_CONFIG,
"Waiting for node task to complete, this may take some time... ",
newline=False,
retcode, retmsg = pvc.lib.storage.ceph_osd_refresh(
CLI_CONFIG, osdid, device, wait_flag
)
retcode, retmsg = pvc.lib.storage.ceph_osd_refresh(CLI_CONFIG, osdid, device)
echo(CLI_CONFIG, "done.")
if retcode and wait_flag:
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
finish(retcode, retmsg)
@ -3590,8 +3630,16 @@ def cli_storage_osd_refresh(osdid, device):
default=False,
help="Force removal even if steps fail",
)
@click.option(
"--wait/--no-wait",
"wait_flag",
is_flag=True,
default=True,
show_default=True,
help="Wait or don't wait for task to complete, showing progress",
)
@confirm_opt("Remove and destroy data on OSD {osdid}")
def cli_storage_osd_remove(osdid, force_flag):
def cli_storage_osd_remove(osdid, force_flag, wait_flag):
"""
Remove a Ceph OSD with ID OSDID.
@ -3602,13 +3650,12 @@ def cli_storage_osd_remove(osdid, force_flag):
NOTE: This command may take a long time to complete. Observe the node logs of the hosting OSD node for detailed status.
"""
echo(
CLI_CONFIG,
"Waiting for node task to complete, this may take some time... ",
newline=False,
retcode, retmsg = pvc.lib.storage.ceph_osd_remove(
CLI_CONFIG, osdid, force_flag, wait_flag
)
retcode, retmsg = pvc.lib.storage.ceph_osd_remove(CLI_CONFIG, osdid, force_flag)
echo(CLI_CONFIG, "done.")
if retcode and wait_flag:
retmsg = wait_for_celery_task(CLI_CONFIG, retmsg)
finish(retcode, retmsg)

View File

@ -20,7 +20,7 @@
###############################################################################
from click import echo as click_echo
from click import progressbar, confirm
from click import confirm
from datetime import datetime
from distutils.util import strtobool
from getpass import getuser
@ -32,7 +32,6 @@ from socket import gethostname
from subprocess import run, PIPE
from sys import argv
from syslog import syslog, openlog, closelog, LOG_AUTH
from time import sleep
from yaml import load as yload
from yaml import BaseLoader, SafeLoader
@ -191,123 +190,6 @@ def update_store(store_path, store_data):
jdump(store_data, fh, sort_keys=True, indent=4)
def wait_for_flush_locks(CLI_CONFIG, task_detail):
"""
Wait for a flush_locks task to complete
"""
task_id = task_detail["task_id"]
run_on = task_detail["run_on"]
echo(CLI_CONFIG, f"Task ID: {task_id} assigned to node {run_on}")
echo(CLI_CONFIG, "")
# Wait for the task to start
echo(CLI_CONFIG, "Waiting for task to start...", newline=False)
while True:
sleep(0.25)
task_status = pvc.lib.common.task_status(
CLI_CONFIG, task_id=task_id, is_watching=True
)
if task_status.get("state") != "PENDING":
break
echo(CLI_CONFIG, ".", newline=False)
echo(CLI_CONFIG, " done.")
echo(CLI_CONFIG, "")
# Start following the task state, updating progress as we go
total_task = task_status.get("total")
with progressbar(length=total_task, show_eta=False) as bar:
last_task = 0
maxlen = 0
while True:
sleep(0.25)
if task_status.get("state") != "RUNNING":
break
if task_status.get("current") > last_task:
current_task = int(task_status.get("current"))
bar.update(current_task - last_task)
last_task = current_task
# The extensive spaces at the end cause this to overwrite longer previous messages
curlen = len(str(task_status.get("status")))
if curlen > maxlen:
maxlen = curlen
lendiff = maxlen - curlen
overwrite_whitespace = " " * lendiff
echo(
CLI_CONFIG,
" " + task_status.get("status") + overwrite_whitespace,
newline=False,
)
task_status = pvc.lib.common.task_status(
CLI_CONFIG, task_id=task_id, is_watching=True
)
if task_status.get("state") == "SUCCESS":
bar.update(total_task - last_task)
echo(CLI_CONFIG, "")
retdata = task_status.get("state") + ": " + task_status.get("status")
return retdata
def wait_for_provisioner(CLI_CONFIG, task_id):
"""
Wait for a provisioner task to complete
"""
echo(CLI_CONFIG, f"Task ID: {task_id}")
echo(CLI_CONFIG, "")
# Wait for the task to start
echo(CLI_CONFIG, "Waiting for task to start...", newline=False)
while True:
sleep(1)
task_status = pvc.lib.provisioner.task_status(
CLI_CONFIG, task_id, is_watching=True
)
if task_status.get("state") != "PENDING":
break
echo(CLI_CONFIG, ".", newline=False)
echo(CLI_CONFIG, " done.")
echo(CLI_CONFIG, "")
# Start following the task state, updating progress as we go
total_task = task_status.get("total")
with progressbar(length=total_task, show_eta=False) as bar:
last_task = 0
maxlen = 0
while True:
sleep(1)
if task_status.get("state") != "RUNNING":
break
if task_status.get("current") > last_task:
current_task = int(task_status.get("current"))
bar.update(current_task - last_task)
last_task = current_task
# The extensive spaces at the end cause this to overwrite longer previous messages
curlen = len(str(task_status.get("status")))
if curlen > maxlen:
maxlen = curlen
lendiff = maxlen - curlen
overwrite_whitespace = " " * lendiff
echo(
CLI_CONFIG,
" " + task_status.get("status") + overwrite_whitespace,
newline=False,
)
task_status = pvc.lib.provisioner.task_status(
CLI_CONFIG, task_id, is_watching=True
)
if task_status.get("state") == "SUCCESS":
bar.update(total_task - last_task)
echo(CLI_CONFIG, "")
retdata = task_status.get("state") + ": " + task_status.get("status")
return retdata
def get_autobackup_config(CLI_CONFIG, cfgfile):
try:
config = dict()

View File

@ -19,6 +19,7 @@
#
###############################################################################
from click import progressbar
from time import sleep, time
from pvc.cli.helpers import echo
@ -62,3 +63,120 @@ def cli_node_waiter(config, node, state_field, state_value):
t_end = time()
echo(config, f" done. [{int(t_end - t_start)}s]")
def wait_for_celery_task(CLI_CONFIG, task_detail):
"""
Wait for a Celery task to complete
"""
task_id = task_detail["task_id"]
run_on = task_detail["run_on"]
echo(CLI_CONFIG, f"Task ID: {task_id} assigned to node {run_on}")
echo(CLI_CONFIG, "")
# Wait for the task to start
echo(CLI_CONFIG, "Waiting for task to start...", newline=False)
while True:
sleep(0.25)
task_status = pvc.lib.common.task_status(
CLI_CONFIG, task_id=task_id, is_watching=True
)
if task_status.get("state") != "PENDING":
break
echo(CLI_CONFIG, ".", newline=False)
echo(CLI_CONFIG, " done.")
echo(CLI_CONFIG, "")
# Start following the task state, updating progress as we go
total_task = task_status.get("total")
with progressbar(length=total_task, show_eta=False) as bar:
last_task = 0
maxlen = 0
while True:
sleep(0.25)
if task_status.get("state") != "RUNNING":
break
if task_status.get("current") > last_task:
current_task = int(task_status.get("current"))
bar.update(current_task - last_task)
last_task = current_task
# The extensive spaces at the end cause this to overwrite longer previous messages
curlen = len(str(task_status.get("status")))
if curlen > maxlen:
maxlen = curlen
lendiff = maxlen - curlen
overwrite_whitespace = " " * lendiff
echo(
CLI_CONFIG,
" " + task_status.get("status") + overwrite_whitespace,
newline=False,
)
task_status = pvc.lib.common.task_status(
CLI_CONFIG, task_id=task_id, is_watching=True
)
if task_status.get("state") == "SUCCESS":
bar.update(total_task - last_task)
echo(CLI_CONFIG, "")
retdata = task_status.get("state") + ": " + task_status.get("status")
return retdata
def wait_for_provisioner(CLI_CONFIG, task_id):
"""
Wait for a provisioner task to complete
"""
echo(CLI_CONFIG, f"Task ID: {task_id}")
echo(CLI_CONFIG, "")
# Wait for the task to start
echo(CLI_CONFIG, "Waiting for task to start...", newline=False)
while True:
sleep(1)
task_status = pvc.lib.provisioner.task_status(
CLI_CONFIG, task_id, is_watching=True
)
if task_status.get("state") != "PENDING":
break
echo(CLI_CONFIG, ".", newline=False)
echo(CLI_CONFIG, " done.")
echo(CLI_CONFIG, "")
# Start following the task state, updating progress as we go
total_task = task_status.get("total")
with progressbar(length=total_task, show_eta=False) as bar:
last_task = 0
maxlen = 0
while True:
sleep(1)
if task_status.get("state") != "RUNNING":
break
if task_status.get("current") > last_task:
current_task = int(task_status.get("current"))
bar.update(current_task - last_task)
last_task = current_task
# The extensive spaces at the end cause this to overwrite longer previous messages
curlen = len(str(task_status.get("status")))
if curlen > maxlen:
maxlen = curlen
lendiff = maxlen - curlen
overwrite_whitespace = " " * lendiff
echo(
CLI_CONFIG,
" " + task_status.get("status") + overwrite_whitespace,
newline=False,
)
task_status = pvc.lib.provisioner.task_status(
CLI_CONFIG, task_id, is_watching=True
)
if task_status.get("state") == "SUCCESS":
bar.update(total_task - last_task)
echo(CLI_CONFIG, "")
retdata = task_status.get("state") + ": " + task_status.get("status")
return retdata

View File

@ -164,7 +164,7 @@ def format_raw_output(config, status_data):
#
# OSD DB VG functions
#
def ceph_osd_db_vg_add(config, node, device):
def ceph_osd_db_vg_add(config, node, device, wait_flag):
"""
Add new Ceph OSD database volume group
@ -175,12 +175,21 @@ def ceph_osd_db_vg_add(config, node, device):
params = {"node": node, "device": device}
response = call_api(config, "post", "/storage/ceph/osddb", params=params)
if response.status_code == 200:
retstatus = True
if response.status_code == 202:
retvalue = True
retjson = response.json()
if not wait_flag:
retdata = (
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
)
else:
# Just return the task JSON without formatting
retdata = response.json()
else:
retstatus = False
retvalue = False
retdata = response.json().get("message", "")
return retstatus, response.json().get("message", "")
return retvalue, retdata
#
@ -231,7 +240,9 @@ def ceph_osd_list(config, limit):
return False, response.json().get("message", "")
def ceph_osd_add(config, node, device, weight, ext_db_ratio, ext_db_size, osd_count):
def ceph_osd_add(
config, node, device, weight, ext_db_ratio, ext_db_size, osd_count, wait_flag
):
"""
Add new Ceph OSD
@ -254,16 +265,25 @@ def ceph_osd_add(config, node, device, weight, ext_db_ratio, ext_db_size, osd_co
response = call_api(config, "post", "/storage/ceph/osd", params=params)
if response.status_code == 200:
retstatus = True
if response.status_code == 202:
retvalue = True
retjson = response.json()
if not wait_flag:
retdata = (
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
)
else:
# Just return the task JSON without formatting
retdata = response.json()
else:
retstatus = False
retvalue = False
retdata = response.json().get("message", "")
return retstatus, response.json().get("message", "")
return retvalue, retdata
def ceph_osd_replace(
config, osdid, new_device, old_device, weight, ext_db_ratio, ext_db_size
config, osdid, new_device, old_device, weight, ext_db_ratio, ext_db_size, wait_flag
):
"""
Replace an existing Ceph OSD with a new device
@ -288,15 +308,24 @@ def ceph_osd_replace(
response = call_api(config, "post", f"/storage/ceph/osd/{osdid}", params=params)
if response.status_code == 200:
retstatus = True
if response.status_code == 202:
retvalue = True
retjson = response.json()
if not wait_flag:
retdata = (
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
)
else:
# Just return the task JSON without formatting
retdata = response.json()
else:
retstatus = False
retvalue = False
retdata = response.json().get("message", "")
return retstatus, response.json().get("message", "")
return retvalue, retdata
def ceph_osd_refresh(config, osdid, device):
def ceph_osd_refresh(config, osdid, device, wait_flag):
"""
Refresh (reimport) an existing Ceph OSD with device {device}
@ -309,15 +338,24 @@ def ceph_osd_refresh(config, osdid, device):
}
response = call_api(config, "put", f"/storage/ceph/osd/{osdid}", params=params)
if response.status_code == 200:
retstatus = True
if response.status_code == 202:
retvalue = True
retjson = response.json()
if not wait_flag:
retdata = (
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
)
else:
# Just return the task JSON without formatting
retdata = response.json()
else:
retstatus = False
retvalue = False
retdata = response.json().get("message", "")
return retstatus, response.json().get("message", "")
return retvalue, retdata
def ceph_osd_remove(config, osdid, force_flag):
def ceph_osd_remove(config, osdid, force_flag, wait_flag):
"""
Remove Ceph OSD
@ -330,12 +368,21 @@ def ceph_osd_remove(config, osdid, force_flag):
config, "delete", "/storage/ceph/osd/{osdid}".format(osdid=osdid), params=params
)
if response.status_code == 200:
retstatus = True
if response.status_code == 202:
retvalue = True
retjson = response.json()
if not wait_flag:
retdata = (
f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}"
)
else:
# Just return the task JSON without formatting
retdata = response.json()
else:
retstatus = False
retvalue = False
retdata = response.json().get("message", "")
return retstatus, response.json().get("message", "")
return retvalue, retdata
def ceph_osd_state(config, osdid, state):

View File

@ -415,7 +415,7 @@ def vm_node(config, vm, target_node, action, force=False, wait=False, force_live
return retstatus, response.json().get("message", "")
def vm_locks(config, vm, wait_flag=False):
def vm_locks(config, vm, wait_flag):
"""
Flush RBD locks of (stopped) VM

View File

@ -30,25 +30,40 @@ class TaskFailure(Exception):
pass
def start(celery, msg, current=1, total=2):
def start(celery, msg, current=0, total=1):
logger = getLogger(__name__)
logger.info(f"Starting: {msg}")
logger.info(f"Starting {current}/{total}: {msg}")
celery.update_state(
state="RUNNING", meta={"current": current, "total": total, "status": msg}
)
sleep(0.5)
def fail(celery, msg, current=1, total=2):
def fail(celery, msg, current=1, total=1):
logger = getLogger(__name__)
logger.error(msg)
sys.tracebacklimit = 0
raise TaskFailure(msg)
def log_info(celery, msg):
logger = getLogger(__name__)
logger.info(f"Task log: {msg}")
def log_warn(celery, msg):
logger = getLogger(__name__)
logger.warning(f"Task log: {msg}")
def log_err(celery, msg):
logger = getLogger(__name__)
logger.error(f"Task log: {msg}")
def update(celery, msg, current=1, total=2):
logger = getLogger(__name__)
logger.info(f"Task update: {msg}")
logger.info(f"Task update {current}/{total}: {msg}")
celery.update_state(
state="RUNNING", meta={"current": current, "total": total, "status": msg}
)
@ -57,10 +72,11 @@ def update(celery, msg, current=1, total=2):
def finish(celery, msg, current=2, total=2):
logger = getLogger(__name__)
logger.info(f"Task update {current}/{total}: Finishing up")
celery.update_state(
state="RUNNING",
meta={"current": current, "total": total, "status": "Finishing up"},
)
sleep(0.25)
logger.info(f"Success: {msg}")
sleep(0.5)
logger.info(f"Success {current}/{total}: {msg}")
return {"status": msg, "current": current, "total": total}

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,7 @@ import signal
from json import loads
from re import match as re_match
from re import split as re_split
from re import sub as re_sub
from distutils.util import strtobool
from threading import Thread
from shlex import split as shlex_split
@ -869,3 +870,75 @@ def sortInterfaceNames(interface_names):
return [atoi(c) for c in re_split(r"(\d+)", text)]
return sorted(interface_names, key=natural_keys)
#
# Parse a "detect" device into a real block device name
#
def get_detect_device(detect_string):
"""
Parses a "detect:" string into a normalized block device path using lsscsi.
A detect string is formatted "detect:<NAME>:<SIZE>:<ID>", where
NAME is some unique identifier in lsscsi, SIZE is a human-readable
size value to within +/- 3% of the real size of the device, and
ID is the Nth (0-indexed) matching entry of that NAME and SIZE.
"""
_, name, size, idd = detect_string.split(":")
if _ != "detect":
return None
retcode, stdout, stderr = run_os_command("lsscsi -s")
if retcode:
print(f"Failed to run lsscsi: {stderr}")
return None
# Get valid lines
lsscsi_lines_raw = stdout.split("\n")
lsscsi_lines = list()
for line in lsscsi_lines_raw:
if not line:
continue
split_line = line.split()
if split_line[1] != "disk":
continue
lsscsi_lines.append(line)
# Handle size determination (+/- 3%)
lsscsi_sizes = set()
for line in lsscsi_lines:
lsscsi_sizes.add(split_line[-1])
for l_size in lsscsi_sizes:
b_size = float(re_sub(r"\D.", "", size))
t_size = float(re_sub(r"\D.", "", l_size))
plusthreepct = t_size * 1.03
minusthreepct = t_size * 0.97
if b_size > minusthreepct and b_size < plusthreepct:
size = l_size
break
blockdev = None
matches = list()
for idx, line in enumerate(lsscsi_lines):
# Skip non-disk entries
if line.split()[1] != "disk":
continue
# Skip if name is not contained in the line (case-insensitive)
if name.lower() not in line.lower():
continue
# Skip if the size does not match
if size != line.split()[-1]:
continue
# Get our blockdev and append to the list
matches.append(line.split()[-2])
blockdev = None
# Find the blockdev at index {idd}
for idx, _blockdev in enumerate(matches):
if int(idx) == int(idd):
blockdev = _blockdev
break
return blockdev

View File

@ -1782,11 +1782,14 @@ def vm_worker_helper_getdom(tuuid):
def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
current_stage = 0
total_stages = 3
start(
celery,
f"Flushing RBD locks for VM {domain} [forced={force_unlock}]",
current=1,
total=4,
current=current_stage,
total=total_stages,
)
dom_uuid = getDomainUUID(zkhandler, domain)
@ -1803,7 +1806,13 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
# Get the list of RBD images
rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",")
update(celery, f"Obtaining RBD locks for VM {domain}", current=2, total=4)
current_stage += 1
update(
celery,
f"Obtaining RBD locks for VM {domain}",
current=current_stage,
total=total_stages,
)
# Prepare a list of locks
rbd_locks = list()
@ -1825,14 +1834,23 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
try:
lock_list = jloads(lock_list_stdout)
except Exception as e:
fail(celery, f"Failed to parse JSON lock list for volume {rbd}: {e}")
fail(
celery,
f"Failed to parse JSON lock list for volume {rbd}: {e}",
)
return
if lock_list:
for lock in lock_list:
rbd_locks.append({"rbd": rbd, "lock": lock})
update(celery, f"Freeing RBD locks for VM {domain}", current=3, total=4)
current_stage += 1
update(
celery,
f"Freeing RBD locks for VM {domain}",
current=current_stage,
total=total_stages,
)
for _lock in rbd_locks:
rbd = _lock["rbd"]
@ -1850,18 +1868,28 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
fail(
celery,
f"Failed to free RBD lock {lock['id']} on volume {rbd}: {lock_remove_stderr}",
current=3,
total=4,
)
return
current_stage += 1
return finish(
celery, f"Successfully flushed RBD locks for VM {domain}", current=4, total=4
celery,
f"Successfully flushed RBD locks for VM {domain}",
current=4,
total=4,
)
def vm_worker_attach_device(zkhandler, celery, domain, xml_spec):
start(celery, f"Hot-attaching XML device to VM {domain}")
current_stage = 0
total_stages = 1
start(
celery,
f"Hot-attaching XML device to VM {domain}",
current=current_stage,
total=total_stages,
)
dom_uuid = getDomainUUID(zkhandler, domain)
@ -1875,7 +1903,10 @@ def vm_worker_attach_device(zkhandler, celery, domain, xml_spec):
dom = vm_worker_helper_getdom(dom_uuid)
if dom is None:
fail(celery, f"Failed to find Libvirt object for VM {domain}")
fail(
celery,
f"Failed to find Libvirt object for VM {domain}",
)
return
try:
@ -1884,11 +1915,25 @@ def vm_worker_attach_device(zkhandler, celery, domain, xml_spec):
fail(celery, e)
return
return finish(celery, f"Successfully hot-attached XML device to VM {domain}")
current_stage += 1
return finish(
celery,
f"Successfully hot-attached XML device to VM {domain}",
current=current_stage,
total=total_stages,
)
def vm_worker_detach_device(zkhandler, celery, domain, xml_spec):
start(celery, f"Hot-detaching XML device from VM {domain}")
current_stage = 0
total_stages = 1
start(
celery,
f"Hot-detaching XML device from VM {domain}",
current=current_stage,
total_stages=total_stages,
)
dom_uuid = getDomainUUID(zkhandler, domain)
@ -1902,7 +1947,10 @@ def vm_worker_detach_device(zkhandler, celery, domain, xml_spec):
dom = vm_worker_helper_getdom(dom_uuid)
if dom is None:
fail(celery, f"Failed to find Libvirt object for VM {domain}")
fail(
celery,
f"Failed to find Libvirt object for VM {domain}",
)
return
try:
@ -1911,4 +1959,10 @@ def vm_worker_detach_device(zkhandler, celery, domain, xml_spec):
fail(celery, e)
return
return finish(celery, f"Successfully hot-detached XML device from VM {domain}")
current_stage += 1
return finish(
celery,
f"Successfully hot-detached XML device from VM {domain}",
current=current_stage,
total_stages=total_stages,
)

File diff suppressed because it is too large Load Diff