Port OSD on-node tasks to Celery worker system

Adds Celery versions of the osd_add, osd_replace, osd_refresh,
osd_remove, and osd_db_vg_add functions.
This commit is contained in:
2023-11-09 14:05:15 -05:00
parent 89681d54b9
commit 87757c3c59
12 changed files with 2034 additions and 1722 deletions

View File

@ -30,25 +30,40 @@ class TaskFailure(Exception):
pass
def start(celery, msg, current=1, total=2):
def start(celery, msg, current=0, total=1):
logger = getLogger(__name__)
logger.info(f"Starting: {msg}")
logger.info(f"Starting {current}/{total}: {msg}")
celery.update_state(
state="RUNNING", meta={"current": current, "total": total, "status": msg}
)
sleep(0.5)
def fail(celery, msg, current=1, total=2):
def fail(celery, msg, current=1, total=1):
logger = getLogger(__name__)
logger.error(msg)
sys.tracebacklimit = 0
raise TaskFailure(msg)
def log_info(celery, msg):
logger = getLogger(__name__)
logger.info(f"Task log: {msg}")
def log_warn(celery, msg):
logger = getLogger(__name__)
logger.warning(f"Task log: {msg}")
def log_err(celery, msg):
logger = getLogger(__name__)
logger.error(f"Task log: {msg}")
def update(celery, msg, current=1, total=2):
logger = getLogger(__name__)
logger.info(f"Task update: {msg}")
logger.info(f"Task update {current}/{total}: {msg}")
celery.update_state(
state="RUNNING", meta={"current": current, "total": total, "status": msg}
)
@ -57,10 +72,11 @@ def update(celery, msg, current=1, total=2):
def finish(celery, msg, current=2, total=2):
logger = getLogger(__name__)
logger.info(f"Task update {current}/{total}: Finishing up")
celery.update_state(
state="RUNNING",
meta={"current": current, "total": total, "status": "Finishing up"},
)
sleep(0.25)
logger.info(f"Success: {msg}")
sleep(0.5)
logger.info(f"Success {current}/{total}: {msg}")
return {"status": msg, "current": current, "total": total}

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,7 @@ import signal
from json import loads
from re import match as re_match
from re import split as re_split
from re import sub as re_sub
from distutils.util import strtobool
from threading import Thread
from shlex import split as shlex_split
@ -869,3 +870,75 @@ def sortInterfaceNames(interface_names):
return [atoi(c) for c in re_split(r"(\d+)", text)]
return sorted(interface_names, key=natural_keys)
#
# Parse a "detect" device into a real block device name
#
def get_detect_device(detect_string):
"""
Parses a "detect:" string into a normalized block device path using lsscsi.
A detect string is formatted "detect:<NAME>:<SIZE>:<ID>", where
NAME is some unique identifier in lsscsi, SIZE is a human-readable
size value to within +/- 3% of the real size of the device, and
ID is the Nth (0-indexed) matching entry of that NAME and SIZE.
"""
_, name, size, idd = detect_string.split(":")
if _ != "detect":
return None
retcode, stdout, stderr = run_os_command("lsscsi -s")
if retcode:
print(f"Failed to run lsscsi: {stderr}")
return None
# Get valid lines
lsscsi_lines_raw = stdout.split("\n")
lsscsi_lines = list()
for line in lsscsi_lines_raw:
if not line:
continue
split_line = line.split()
if split_line[1] != "disk":
continue
lsscsi_lines.append(line)
# Handle size determination (+/- 3%)
lsscsi_sizes = set()
for line in lsscsi_lines:
lsscsi_sizes.add(split_line[-1])
for l_size in lsscsi_sizes:
b_size = float(re_sub(r"\D.", "", size))
t_size = float(re_sub(r"\D.", "", l_size))
plusthreepct = t_size * 1.03
minusthreepct = t_size * 0.97
if b_size > minusthreepct and b_size < plusthreepct:
size = l_size
break
blockdev = None
matches = list()
for idx, line in enumerate(lsscsi_lines):
# Skip non-disk entries
if line.split()[1] != "disk":
continue
# Skip if name is not contained in the line (case-insensitive)
if name.lower() not in line.lower():
continue
# Skip if the size does not match
if size != line.split()[-1]:
continue
# Get our blockdev and append to the list
matches.append(line.split()[-2])
blockdev = None
# Find the blockdev at index {idd}
for idx, _blockdev in enumerate(matches):
if int(idx) == int(idd):
blockdev = _blockdev
break
return blockdev

View File

@ -1782,11 +1782,14 @@ def vm_worker_helper_getdom(tuuid):
def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
current_stage = 0
total_stages = 3
start(
celery,
f"Flushing RBD locks for VM {domain} [forced={force_unlock}]",
current=1,
total=4,
current=current_stage,
total=total_stages,
)
dom_uuid = getDomainUUID(zkhandler, domain)
@ -1803,7 +1806,13 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
# Get the list of RBD images
rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",")
update(celery, f"Obtaining RBD locks for VM {domain}", current=2, total=4)
current_stage += 1
update(
celery,
f"Obtaining RBD locks for VM {domain}",
current=current_stage,
total=total_stages,
)
# Prepare a list of locks
rbd_locks = list()
@ -1825,14 +1834,23 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
try:
lock_list = jloads(lock_list_stdout)
except Exception as e:
fail(celery, f"Failed to parse JSON lock list for volume {rbd}: {e}")
fail(
celery,
f"Failed to parse JSON lock list for volume {rbd}: {e}",
)
return
if lock_list:
for lock in lock_list:
rbd_locks.append({"rbd": rbd, "lock": lock})
update(celery, f"Freeing RBD locks for VM {domain}", current=3, total=4)
current_stage += 1
update(
celery,
f"Freeing RBD locks for VM {domain}",
current=current_stage,
total=total_stages,
)
for _lock in rbd_locks:
rbd = _lock["rbd"]
@ -1850,18 +1868,28 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
fail(
celery,
f"Failed to free RBD lock {lock['id']} on volume {rbd}: {lock_remove_stderr}",
current=3,
total=4,
)
return
current_stage += 1
return finish(
celery, f"Successfully flushed RBD locks for VM {domain}", current=4, total=4
celery,
f"Successfully flushed RBD locks for VM {domain}",
current=4,
total=4,
)
def vm_worker_attach_device(zkhandler, celery, domain, xml_spec):
start(celery, f"Hot-attaching XML device to VM {domain}")
current_stage = 0
total_stages = 1
start(
celery,
f"Hot-attaching XML device to VM {domain}",
current=current_stage,
total=total_stages,
)
dom_uuid = getDomainUUID(zkhandler, domain)
@ -1875,7 +1903,10 @@ def vm_worker_attach_device(zkhandler, celery, domain, xml_spec):
dom = vm_worker_helper_getdom(dom_uuid)
if dom is None:
fail(celery, f"Failed to find Libvirt object for VM {domain}")
fail(
celery,
f"Failed to find Libvirt object for VM {domain}",
)
return
try:
@ -1884,11 +1915,25 @@ def vm_worker_attach_device(zkhandler, celery, domain, xml_spec):
fail(celery, e)
return
return finish(celery, f"Successfully hot-attached XML device to VM {domain}")
current_stage += 1
return finish(
celery,
f"Successfully hot-attached XML device to VM {domain}",
current=current_stage,
total=total_stages,
)
def vm_worker_detach_device(zkhandler, celery, domain, xml_spec):
start(celery, f"Hot-detaching XML device from VM {domain}")
current_stage = 0
total_stages = 1
start(
celery,
f"Hot-detaching XML device from VM {domain}",
current=current_stage,
total_stages=total_stages,
)
dom_uuid = getDomainUUID(zkhandler, domain)
@ -1902,7 +1947,10 @@ def vm_worker_detach_device(zkhandler, celery, domain, xml_spec):
dom = vm_worker_helper_getdom(dom_uuid)
if dom is None:
fail(celery, f"Failed to find Libvirt object for VM {domain}")
fail(
celery,
f"Failed to find Libvirt object for VM {domain}",
)
return
try:
@ -1911,4 +1959,10 @@ def vm_worker_detach_device(zkhandler, celery, domain, xml_spec):
fail(celery, e)
return
return finish(celery, f"Successfully hot-detached XML device from VM {domain}")
current_stage += 1
return finish(
celery,
f"Successfully hot-detached XML device from VM {domain}",
current=current_stage,
total_stages=total_stages,
)