Port OSD on-node tasks to Celery worker system
Adds Celery versions of the osd_add, osd_replace, osd_refresh, osd_remove, and osd_db_vg_add functions.
This commit is contained in:
@ -32,7 +32,7 @@ class TaskFailure(Exception):
|
||||
|
||||
def start(celery, msg, current=1, total=2):
|
||||
logger = getLogger(__name__)
|
||||
logger.info(f"Starting: {msg}")
|
||||
logger.info(f"Starting {current}/{total}: {msg}")
|
||||
celery.update_state(
|
||||
state="RUNNING", meta={"current": current, "total": total, "status": msg}
|
||||
)
|
||||
@ -46,9 +46,24 @@ def fail(celery, msg, current=1, total=2):
|
||||
raise TaskFailure(msg)
|
||||
|
||||
|
||||
def log_info(celery, msg):
|
||||
logger = getLogger(__name__)
|
||||
logger.info(f"Task log: {msg}")
|
||||
|
||||
|
||||
def log_warn(celery, msg):
|
||||
logger = getLogger(__name__)
|
||||
logger.warning(f"Task log: {msg}")
|
||||
|
||||
|
||||
def log_err(celery, msg):
|
||||
logger = getLogger(__name__)
|
||||
logger.error(f"Task log: {msg}")
|
||||
|
||||
|
||||
def update(celery, msg, current=1, total=2):
|
||||
logger = getLogger(__name__)
|
||||
logger.info(f"Task update: {msg}")
|
||||
logger.info(f"Task update {current}/{total}: {msg}")
|
||||
celery.update_state(
|
||||
state="RUNNING", meta={"current": current, "total": total, "status": msg}
|
||||
)
|
||||
@ -57,10 +72,11 @@ def update(celery, msg, current=1, total=2):
|
||||
|
||||
def finish(celery, msg, current=2, total=2):
|
||||
logger = getLogger(__name__)
|
||||
logger.info(f"Task update {current}/{total}: Finishing up")
|
||||
celery.update_state(
|
||||
state="RUNNING",
|
||||
meta={"current": current, "total": total, "status": "Finishing up"},
|
||||
)
|
||||
sleep(0.25)
|
||||
logger.info(f"Success: {msg}")
|
||||
sleep(0.5)
|
||||
logger.info(f"Success {current}/{total}: {msg}")
|
||||
return {"status": msg, "current": current, "total": total}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -27,6 +27,7 @@ import signal
|
||||
from json import loads
|
||||
from re import match as re_match
|
||||
from re import split as re_split
|
||||
from re import sub as re_sub
|
||||
from distutils.util import strtobool
|
||||
from threading import Thread
|
||||
from shlex import split as shlex_split
|
||||
@ -869,3 +870,75 @@ def sortInterfaceNames(interface_names):
|
||||
return [atoi(c) for c in re_split(r"(\d+)", text)]
|
||||
|
||||
return sorted(interface_names, key=natural_keys)
|
||||
|
||||
|
||||
#
|
||||
# Parse a "detect" device into a real block device name
|
||||
#
|
||||
def get_detect_device(detect_string):
|
||||
"""
|
||||
Parses a "detect:" string into a normalized block device path using lsscsi.
|
||||
|
||||
A detect string is formatted "detect:<NAME>:<SIZE>:<ID>", where
|
||||
NAME is some unique identifier in lsscsi, SIZE is a human-readable
|
||||
size value to within +/- 3% of the real size of the device, and
|
||||
ID is the Nth (0-indexed) matching entry of that NAME and SIZE.
|
||||
"""
|
||||
_, name, size, idd = detect_string.split(":")
|
||||
if _ != "detect":
|
||||
return None
|
||||
|
||||
retcode, stdout, stderr = run_os_command("lsscsi -s")
|
||||
if retcode:
|
||||
print(f"Failed to run lsscsi: {stderr}")
|
||||
return None
|
||||
|
||||
# Get valid lines
|
||||
lsscsi_lines_raw = stdout.split("\n")
|
||||
lsscsi_lines = list()
|
||||
for line in lsscsi_lines_raw:
|
||||
if not line:
|
||||
continue
|
||||
split_line = line.split()
|
||||
if split_line[1] != "disk":
|
||||
continue
|
||||
lsscsi_lines.append(line)
|
||||
|
||||
# Handle size determination (+/- 3%)
|
||||
lsscsi_sizes = set()
|
||||
for line in lsscsi_lines:
|
||||
lsscsi_sizes.add(split_line[-1])
|
||||
for l_size in lsscsi_sizes:
|
||||
b_size = float(re_sub(r"\D.", "", size))
|
||||
t_size = float(re_sub(r"\D.", "", l_size))
|
||||
|
||||
plusthreepct = t_size * 1.03
|
||||
minusthreepct = t_size * 0.97
|
||||
|
||||
if b_size > minusthreepct and b_size < plusthreepct:
|
||||
size = l_size
|
||||
break
|
||||
|
||||
blockdev = None
|
||||
matches = list()
|
||||
for idx, line in enumerate(lsscsi_lines):
|
||||
# Skip non-disk entries
|
||||
if line.split()[1] != "disk":
|
||||
continue
|
||||
# Skip if name is not contained in the line (case-insensitive)
|
||||
if name.lower() not in line.lower():
|
||||
continue
|
||||
# Skip if the size does not match
|
||||
if size != line.split()[-1]:
|
||||
continue
|
||||
# Get our blockdev and append to the list
|
||||
matches.append(line.split()[-2])
|
||||
|
||||
blockdev = None
|
||||
# Find the blockdev at index {idd}
|
||||
for idx, _blockdev in enumerate(matches):
|
||||
if int(idx) == int(idd):
|
||||
blockdev = _blockdev
|
||||
break
|
||||
|
||||
return blockdev
|
||||
|
Reference in New Issue
Block a user