Port OSD on-node tasks to Celery worker system
Adds Celery versions of the osd_add, osd_replace, osd_refresh, osd_remove, and osd_db_vg_add functions.
This commit is contained in:
@ -30,25 +30,40 @@ class TaskFailure(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def start(celery, msg, current=1, total=2):
|
||||
def start(celery, msg, current=0, total=1):
|
||||
logger = getLogger(__name__)
|
||||
logger.info(f"Starting: {msg}")
|
||||
logger.info(f"Starting {current}/{total}: {msg}")
|
||||
celery.update_state(
|
||||
state="RUNNING", meta={"current": current, "total": total, "status": msg}
|
||||
)
|
||||
sleep(0.5)
|
||||
|
||||
|
||||
def fail(celery, msg, current=1, total=2):
|
||||
def fail(celery, msg, current=1, total=1):
|
||||
logger = getLogger(__name__)
|
||||
logger.error(msg)
|
||||
sys.tracebacklimit = 0
|
||||
raise TaskFailure(msg)
|
||||
|
||||
|
||||
def log_info(celery, msg):
|
||||
logger = getLogger(__name__)
|
||||
logger.info(f"Task log: {msg}")
|
||||
|
||||
|
||||
def log_warn(celery, msg):
|
||||
logger = getLogger(__name__)
|
||||
logger.warning(f"Task log: {msg}")
|
||||
|
||||
|
||||
def log_err(celery, msg):
|
||||
logger = getLogger(__name__)
|
||||
logger.error(f"Task log: {msg}")
|
||||
|
||||
|
||||
def update(celery, msg, current=1, total=2):
|
||||
logger = getLogger(__name__)
|
||||
logger.info(f"Task update: {msg}")
|
||||
logger.info(f"Task update {current}/{total}: {msg}")
|
||||
celery.update_state(
|
||||
state="RUNNING", meta={"current": current, "total": total, "status": msg}
|
||||
)
|
||||
@ -57,10 +72,11 @@ def update(celery, msg, current=1, total=2):
|
||||
|
||||
def finish(celery, msg, current=2, total=2):
|
||||
logger = getLogger(__name__)
|
||||
logger.info(f"Task update {current}/{total}: Finishing up")
|
||||
celery.update_state(
|
||||
state="RUNNING",
|
||||
meta={"current": current, "total": total, "status": "Finishing up"},
|
||||
)
|
||||
sleep(0.25)
|
||||
logger.info(f"Success: {msg}")
|
||||
sleep(0.5)
|
||||
logger.info(f"Success {current}/{total}: {msg}")
|
||||
return {"status": msg, "current": current, "total": total}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -27,6 +27,7 @@ import signal
|
||||
from json import loads
|
||||
from re import match as re_match
|
||||
from re import split as re_split
|
||||
from re import sub as re_sub
|
||||
from distutils.util import strtobool
|
||||
from threading import Thread
|
||||
from shlex import split as shlex_split
|
||||
@ -869,3 +870,75 @@ def sortInterfaceNames(interface_names):
|
||||
return [atoi(c) for c in re_split(r"(\d+)", text)]
|
||||
|
||||
return sorted(interface_names, key=natural_keys)
|
||||
|
||||
|
||||
#
|
||||
# Parse a "detect" device into a real block device name
|
||||
#
|
||||
def get_detect_device(detect_string):
|
||||
"""
|
||||
Parses a "detect:" string into a normalized block device path using lsscsi.
|
||||
|
||||
A detect string is formatted "detect:<NAME>:<SIZE>:<ID>", where
|
||||
NAME is some unique identifier in lsscsi, SIZE is a human-readable
|
||||
size value to within +/- 3% of the real size of the device, and
|
||||
ID is the Nth (0-indexed) matching entry of that NAME and SIZE.
|
||||
"""
|
||||
_, name, size, idd = detect_string.split(":")
|
||||
if _ != "detect":
|
||||
return None
|
||||
|
||||
retcode, stdout, stderr = run_os_command("lsscsi -s")
|
||||
if retcode:
|
||||
print(f"Failed to run lsscsi: {stderr}")
|
||||
return None
|
||||
|
||||
# Get valid lines
|
||||
lsscsi_lines_raw = stdout.split("\n")
|
||||
lsscsi_lines = list()
|
||||
for line in lsscsi_lines_raw:
|
||||
if not line:
|
||||
continue
|
||||
split_line = line.split()
|
||||
if split_line[1] != "disk":
|
||||
continue
|
||||
lsscsi_lines.append(line)
|
||||
|
||||
# Handle size determination (+/- 3%)
|
||||
lsscsi_sizes = set()
|
||||
for line in lsscsi_lines:
|
||||
lsscsi_sizes.add(split_line[-1])
|
||||
for l_size in lsscsi_sizes:
|
||||
b_size = float(re_sub(r"\D.", "", size))
|
||||
t_size = float(re_sub(r"\D.", "", l_size))
|
||||
|
||||
plusthreepct = t_size * 1.03
|
||||
minusthreepct = t_size * 0.97
|
||||
|
||||
if b_size > minusthreepct and b_size < plusthreepct:
|
||||
size = l_size
|
||||
break
|
||||
|
||||
blockdev = None
|
||||
matches = list()
|
||||
for idx, line in enumerate(lsscsi_lines):
|
||||
# Skip non-disk entries
|
||||
if line.split()[1] != "disk":
|
||||
continue
|
||||
# Skip if name is not contained in the line (case-insensitive)
|
||||
if name.lower() not in line.lower():
|
||||
continue
|
||||
# Skip if the size does not match
|
||||
if size != line.split()[-1]:
|
||||
continue
|
||||
# Get our blockdev and append to the list
|
||||
matches.append(line.split()[-2])
|
||||
|
||||
blockdev = None
|
||||
# Find the blockdev at index {idd}
|
||||
for idx, _blockdev in enumerate(matches):
|
||||
if int(idx) == int(idd):
|
||||
blockdev = _blockdev
|
||||
break
|
||||
|
||||
return blockdev
|
||||
|
@ -1782,11 +1782,14 @@ def vm_worker_helper_getdom(tuuid):
|
||||
|
||||
|
||||
def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
|
||||
current_stage = 0
|
||||
total_stages = 3
|
||||
|
||||
start(
|
||||
celery,
|
||||
f"Flushing RBD locks for VM {domain} [forced={force_unlock}]",
|
||||
current=1,
|
||||
total=4,
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
dom_uuid = getDomainUUID(zkhandler, domain)
|
||||
@ -1803,7 +1806,13 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
|
||||
# Get the list of RBD images
|
||||
rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",")
|
||||
|
||||
update(celery, f"Obtaining RBD locks for VM {domain}", current=2, total=4)
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Obtaining RBD locks for VM {domain}",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
# Prepare a list of locks
|
||||
rbd_locks = list()
|
||||
@ -1825,14 +1834,23 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
|
||||
try:
|
||||
lock_list = jloads(lock_list_stdout)
|
||||
except Exception as e:
|
||||
fail(celery, f"Failed to parse JSON lock list for volume {rbd}: {e}")
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to parse JSON lock list for volume {rbd}: {e}",
|
||||
)
|
||||
return
|
||||
|
||||
if lock_list:
|
||||
for lock in lock_list:
|
||||
rbd_locks.append({"rbd": rbd, "lock": lock})
|
||||
|
||||
update(celery, f"Freeing RBD locks for VM {domain}", current=3, total=4)
|
||||
current_stage += 1
|
||||
update(
|
||||
celery,
|
||||
f"Freeing RBD locks for VM {domain}",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
for _lock in rbd_locks:
|
||||
rbd = _lock["rbd"]
|
||||
@ -1850,18 +1868,28 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False):
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to free RBD lock {lock['id']} on volume {rbd}: {lock_remove_stderr}",
|
||||
current=3,
|
||||
total=4,
|
||||
)
|
||||
return
|
||||
|
||||
current_stage += 1
|
||||
return finish(
|
||||
celery, f"Successfully flushed RBD locks for VM {domain}", current=4, total=4
|
||||
celery,
|
||||
f"Successfully flushed RBD locks for VM {domain}",
|
||||
current=4,
|
||||
total=4,
|
||||
)
|
||||
|
||||
|
||||
def vm_worker_attach_device(zkhandler, celery, domain, xml_spec):
|
||||
start(celery, f"Hot-attaching XML device to VM {domain}")
|
||||
current_stage = 0
|
||||
total_stages = 1
|
||||
|
||||
start(
|
||||
celery,
|
||||
f"Hot-attaching XML device to VM {domain}",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
dom_uuid = getDomainUUID(zkhandler, domain)
|
||||
|
||||
@ -1875,7 +1903,10 @@ def vm_worker_attach_device(zkhandler, celery, domain, xml_spec):
|
||||
|
||||
dom = vm_worker_helper_getdom(dom_uuid)
|
||||
if dom is None:
|
||||
fail(celery, f"Failed to find Libvirt object for VM {domain}")
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to find Libvirt object for VM {domain}",
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
@ -1884,11 +1915,25 @@ def vm_worker_attach_device(zkhandler, celery, domain, xml_spec):
|
||||
fail(celery, e)
|
||||
return
|
||||
|
||||
return finish(celery, f"Successfully hot-attached XML device to VM {domain}")
|
||||
current_stage += 1
|
||||
return finish(
|
||||
celery,
|
||||
f"Successfully hot-attached XML device to VM {domain}",
|
||||
current=current_stage,
|
||||
total=total_stages,
|
||||
)
|
||||
|
||||
|
||||
def vm_worker_detach_device(zkhandler, celery, domain, xml_spec):
|
||||
start(celery, f"Hot-detaching XML device from VM {domain}")
|
||||
current_stage = 0
|
||||
total_stages = 1
|
||||
|
||||
start(
|
||||
celery,
|
||||
f"Hot-detaching XML device from VM {domain}",
|
||||
current=current_stage,
|
||||
total_stages=total_stages,
|
||||
)
|
||||
|
||||
dom_uuid = getDomainUUID(zkhandler, domain)
|
||||
|
||||
@ -1902,7 +1947,10 @@ def vm_worker_detach_device(zkhandler, celery, domain, xml_spec):
|
||||
|
||||
dom = vm_worker_helper_getdom(dom_uuid)
|
||||
if dom is None:
|
||||
fail(celery, f"Failed to find Libvirt object for VM {domain}")
|
||||
fail(
|
||||
celery,
|
||||
f"Failed to find Libvirt object for VM {domain}",
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
@ -1911,4 +1959,10 @@ def vm_worker_detach_device(zkhandler, celery, domain, xml_spec):
|
||||
fail(celery, e)
|
||||
return
|
||||
|
||||
return finish(celery, f"Successfully hot-detached XML device from VM {domain}")
|
||||
current_stage += 1
|
||||
return finish(
|
||||
celery,
|
||||
f"Successfully hot-detached XML device from VM {domain}",
|
||||
current=current_stage,
|
||||
total_stages=total_stages,
|
||||
)
|
||||
|
Reference in New Issue
Block a user