Port OSD on-node tasks to Celery worker system

Adds Celery versions of the osd_add, osd_replace, osd_refresh,
osd_remove, and osd_db_vg_add functions.
This commit is contained in:
2023-11-09 14:05:15 -05:00
parent 89681d54b9
commit 42ce2f99fa
11 changed files with 1949 additions and 1699 deletions

View File

@ -32,7 +32,7 @@ class TaskFailure(Exception):
def start(celery, msg, current=1, total=2):
logger = getLogger(__name__)
logger.info(f"Starting: {msg}")
logger.info(f"Starting {current}/{total}: {msg}")
celery.update_state(
state="RUNNING", meta={"current": current, "total": total, "status": msg}
)
@ -46,9 +46,24 @@ def fail(celery, msg, current=1, total=2):
raise TaskFailure(msg)
def log_info(celery, msg):
logger = getLogger(__name__)
logger.info(f"Task log: {msg}")
def log_warn(celery, msg):
logger = getLogger(__name__)
logger.warning(f"Task log: {msg}")
def log_err(celery, msg):
logger = getLogger(__name__)
logger.error(f"Task log: {msg}")
def update(celery, msg, current=1, total=2):
logger = getLogger(__name__)
logger.info(f"Task update: {msg}")
logger.info(f"Task update {current}/{total}: {msg}")
celery.update_state(
state="RUNNING", meta={"current": current, "total": total, "status": msg}
)
@ -57,10 +72,11 @@ def update(celery, msg, current=1, total=2):
def finish(celery, msg, current=2, total=2):
logger = getLogger(__name__)
logger.info(f"Task update {current}/{total}: Finishing up")
celery.update_state(
state="RUNNING",
meta={"current": current, "total": total, "status": "Finishing up"},
)
sleep(0.25)
logger.info(f"Success: {msg}")
sleep(0.5)
logger.info(f"Success {current}/{total}: {msg}")
return {"status": msg, "current": current, "total": total}

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,7 @@ import signal
from json import loads
from re import match as re_match
from re import split as re_split
from re import sub as re_sub
from distutils.util import strtobool
from threading import Thread
from shlex import split as shlex_split
@ -869,3 +870,75 @@ def sortInterfaceNames(interface_names):
return [atoi(c) for c in re_split(r"(\d+)", text)]
return sorted(interface_names, key=natural_keys)
#
# Parse a "detect" device into a real block device name
#
def get_detect_device(detect_string):
"""
Parses a "detect:" string into a normalized block device path using lsscsi.
A detect string is formatted "detect:<NAME>:<SIZE>:<ID>", where
NAME is some unique identifier in lsscsi, SIZE is a human-readable
size value to within +/- 3% of the real size of the device, and
ID is the Nth (0-indexed) matching entry of that NAME and SIZE.
"""
_, name, size, idd = detect_string.split(":")
if _ != "detect":
return None
retcode, stdout, stderr = run_os_command("lsscsi -s")
if retcode:
print(f"Failed to run lsscsi: {stderr}")
return None
# Get valid lines
lsscsi_lines_raw = stdout.split("\n")
lsscsi_lines = list()
for line in lsscsi_lines_raw:
if not line:
continue
split_line = line.split()
if split_line[1] != "disk":
continue
lsscsi_lines.append(line)
# Handle size determination (+/- 3%)
lsscsi_sizes = set()
for line in lsscsi_lines:
lsscsi_sizes.add(split_line[-1])
for l_size in lsscsi_sizes:
b_size = float(re_sub(r"\D.", "", size))
t_size = float(re_sub(r"\D.", "", l_size))
plusthreepct = t_size * 1.03
minusthreepct = t_size * 0.97
if b_size > minusthreepct and b_size < plusthreepct:
size = l_size
break
blockdev = None
matches = list()
for idx, line in enumerate(lsscsi_lines):
# Skip non-disk entries
if line.split()[1] != "disk":
continue
# Skip if name is not contained in the line (case-insensitive)
if name.lower() not in line.lower():
continue
# Skip if the size does not match
if size != line.split()[-1]:
continue
# Get our blockdev and append to the list
matches.append(line.split()[-2])
blockdev = None
# Find the blockdev at index {idd}
for idx, _blockdev in enumerate(matches):
if int(idx) == int(idd):
blockdev = _blockdev
break
return blockdev