From ce17c60a20ce5bc3603b72ce91a56f2153a46cb2 Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Thu, 9 Nov 2023 14:05:15 -0500 Subject: [PATCH] Port OSD on-node tasks to Celery worker system Adds Celery versions of the osd_add, osd_replace, osd_refresh, osd_remove, and osd_db_vg_add functions. --- api-daemon/pvcapid/flaskapi.py | 203 ++- api-daemon/pvcapid/helper.py | 30 + client-cli/pvc/cli/cli.py | 115 +- client-cli/pvc/cli/helpers.py | 120 +- client-cli/pvc/cli/waiters.py | 118 ++ client-cli/pvc/lib/storage.py | 97 +- client-cli/pvc/lib/vm.py | 2 +- daemon-common/celery.py | 28 +- daemon-common/ceph.py | 1612 +++++++++++++++--- daemon-common/common.py | 73 + daemon-common/vm.py | 82 +- node-daemon/pvcnoded/objects/CephInstance.py | 1282 -------------- 12 files changed, 2039 insertions(+), 1723 deletions(-) diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index 42e0bed6..b828ca70 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -35,6 +35,13 @@ from daemon_lib.vm import ( vm_worker_attach_device, vm_worker_detach_device, ) +from daemon_lib.ceph import ( + osd_worker_add_osd, + osd_worker_replace_osd, + osd_worker_refresh_osd, + osd_worker_remove_osd, + osd_worker_add_db_vg, +) from pvcapid.Daemon import config, strtobool, API_VERSION @@ -237,6 +244,116 @@ def vm_device_detach(self, domain, xml, run_on=None): return run_vm_device_detach(self, domain, xml) +@celery.task(name="osd.add", bind=True, routing_key="run_on") +def osd_add( + self, + device, + weight, + ext_db_ratio=None, + ext_db_size=None, + split_count=None, + run_on=None, +): + @ZKConnection(config) + def run_osd_add( + zkhandler, + self, + run_on, + device, + weight, + ext_db_ratio=None, + ext_db_size=None, + split_count=None, + ): + return osd_worker_add_osd( + zkhandler, + self, + run_on, + device, + weight, + ext_db_ratio, + ext_db_size, + split_count, + ) + + return run_osd_add( + self, run_on, device, weight, ext_db_ratio, ext_db_size, split_count + ) + + +@celery.task(name="osd.replace", bind=True, routing_key="run_on") +def osd_replace( + self, + osd_id, + new_device, + old_device=None, + weight=None, + ext_db_ratio=None, + ext_db_size=None, + run_on=None, +): + @ZKConnection(config) + def run_osd_replace( + zkhandler, + self, + run_on, + osd_id, + new_device, + old_device=None, + weight=None, + ext_db_ratio=None, + ext_db_size=None, + ): + return osd_worker_replace_osd( + zkhandler, + self, + run_on, + osd_id, + new_device, + old_device, + weight, + ext_db_ratio, + ext_db_size, + ) + + return run_osd_replace( + self, run_on, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size + ) + + +@celery.task(name="osd.refresh", bind=True, routing_key="run_on") +def osd_refresh(self, osd_id, device, ext_db_flag=False, run_on=None): + @ZKConnection(config) + def run_osd_refresh(zkhandler, self, run_on, osd_id, device, ext_db_flag=False): + return osd_worker_refresh_osd( + zkhandler, self, run_on, osd_id, device, ext_db_flag + ) + + return run_osd_refresh(self, run_on, osd_id, device, ext_db_flag) + + +@celery.task(name="osd.remove", bind=True, routing_key="run_on") +def osd_remove(self, osd_id, force_flag=False, skip_zap_flag=False, run_on=None): + @ZKConnection(config) + def run_osd_remove( + zkhandler, self, run_on, osd_id, force_flag=False, skip_zap_flag=False + ): + return osd_worker_remove_osd( + zkhandler, self, run_on, osd_id, force_flag, skip_zap_flag + ) + + return run_osd_remove(self, run_on, osd_id, force_flag, skip_zap_flag) + + +@celery.task(name="osd.add_db_vg", bind=True, routing_key="run_on") +def osd_add_db_vg(self, device, run_on=None): + @ZKConnection(config) + def run_osd_add_db_vg(zkhandler, self, run_on, device): + return osd_worker_add_db_vg(zkhandler, self, run_on, device) + + return run_osd_add_db_vg(self, run_on, device) + + ########################################################## # API Root/Authentication ########################################################## @@ -738,7 +855,14 @@ class API_Tasks_Element(Resource): "total": 1, "status": "Pending job start", } - elif task.state != "FAILURE": + elif task.state == "FAILURE": + response = { + "state": task.state, + "current": 1, + "total": 1, + "status": str(task.info), + } + else: response = { "state": task.state, "current": task.info.get("current", 0), @@ -747,13 +871,6 @@ class API_Tasks_Element(Resource): } if "result" in task.info: response["result"] = task.info["result"] - else: - response = { - "state": task.state, - "current": 1, - "total": 1, - "status": str(task.info), - } return response @@ -4374,8 +4491,14 @@ class API_Storage_Ceph_OSDDB_Root(Resource): type: object id: Message """ - return api_helper.ceph_osd_db_vg_add( - reqargs.get("node", None), reqargs.get("device", None) + node = reqargs.get("node", None) + + task = osd_add_db_vg.delay(reqargs.get("device", None), run_on=node) + + return ( + {"task_id": task.id, "run_on": node}, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, ) @@ -4565,13 +4688,21 @@ class API_Storage_Ceph_OSD_Root(Resource): type: object id: Message """ - return api_helper.ceph_osd_add( - reqargs.get("node", None), + node = reqargs.get("node", None) + + task = osd_add.delay( reqargs.get("device", None), reqargs.get("weight", None), reqargs.get("ext_db_ratio", None), reqargs.get("ext_db_size", None), reqargs.get("osd_count", None), + run_on=node, + ) + + return ( + {"task_id": task.id, "run_on": node}, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, ) @@ -4671,13 +4802,26 @@ class API_Storage_Ceph_OSD_Element(Resource): type: object id: Message """ - return api_helper.ceph_osd_replace( + osd_node_detail, retcode = api_helper.ceph_osd_node(osdid) + if retcode == 200: + node = osd_node_detail["node"] + else: + return osd_node_detail, retcode + + task = osd_replace.delay( osdid, reqargs.get("new_device"), reqargs.get("old_device", None), reqargs.get("weight", None), reqargs.get("ext_db_ratio", None), reqargs.get("ext_db_size", None), + run_on=node, + ) + + return ( + {"task_id": task.id, "run_on": node}, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, ) @RequestParser( @@ -4715,9 +4859,22 @@ class API_Storage_Ceph_OSD_Element(Resource): type: object id: Message """ - return api_helper.ceph_osd_refresh( + osd_node_detail, retcode = api_helper.ceph_osd_node(osdid) + if retcode == 200: + node = osd_node_detail["node"] + else: + return osd_node_detail, retcode + + task = osd_refresh.delay( osdid, reqargs.get("device", None), + run_on=node, + ) + + return ( + {"task_id": task.id, "run_on": node}, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, ) @RequestParser( @@ -4771,7 +4928,23 @@ class API_Storage_Ceph_OSD_Element(Resource): type: object id: Message """ - return api_helper.ceph_osd_remove(osdid, reqargs.get("force", False)) + osd_node_detail, retcode = api_helper.ceph_osd_node(osdid) + if retcode == 200: + node = osd_node_detail["node"] + else: + return osd_node_detail, retcode + + task = osd_remove.delay( + osdid, + force_flag=reqargs.get("force", False), + run_on=node, + ) + + return ( + {"task_id": task.id, "run_on": node}, + 202, + {"Location": Api.url_for(api, API_Tasks_Element, task_id=task.id)}, + ) api.add_resource(API_Storage_Ceph_OSD_Element, "/storage/ceph/osd/") diff --git a/api-daemon/pvcapid/helper.py b/api-daemon/pvcapid/helper.py index 064d3093..b859336a 100755 --- a/api-daemon/pvcapid/helper.py +++ b/api-daemon/pvcapid/helper.py @@ -1332,6 +1332,36 @@ def ceph_osd_list(zkhandler, limit=None): return retdata, retcode +@pvc_common.Profiler(config) +@ZKConnection(config) +def ceph_osd_node(zkhandler, osd): + """ + Return the current node of OSD OSD. + """ + retflag, retdata = pvc_ceph.get_list_osd(zkhandler, None) + + if retflag: + if retdata: + osd = [o for o in retdata if o["id"] == osd] + if len(osd) < 1: + retcode = 404 + retdata = {"message": "OSD not found."} + else: + retcode = 200 + retdata = { + "id": osd[0]["id"], + "node": osd[0]["node"], + } + else: + retcode = 404 + retdata = {"message": "OSD not found."} + else: + retcode = 400 + retdata = {"message": retdata} + + return retdata, retcode + + @ZKConnection(config) def ceph_osd_state(zkhandler, osd): retflag, retdata = pvc_ceph.get_list_osd(zkhandler, osd) diff --git a/client-cli/pvc/cli/cli.py b/client-cli/pvc/cli/cli.py index 88222db7..d0f8348d 100644 --- a/client-cli/pvc/cli/cli.py +++ b/client-cli/pvc/cli/cli.py @@ -1582,10 +1582,10 @@ def cli_vm_flush_locks(domain, wait_flag): NOTE: This is a task-based command. The "--wait" flag (default) will block and show progress. Specifying the "--no-wait" flag will return immediately with a job ID instead, which can be queried externally later. """ - retcode, retmsg = pvc.lib.vm.vm_locks(CLI_CONFIG, domain, wait_flag=wait_flag) + retcode, retmsg = pvc.lib.vm.vm_locks(CLI_CONFIG, domain, wait_flag) if retcode and wait_flag: - retmsg = wait_for_flush_locks(CLI_CONFIG, retmsg) + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) finish(retcode, retmsg) @@ -3372,10 +3372,18 @@ def cli_storage_osd(): @connection_req @click.argument("node") @click.argument("device") +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress", +) @confirm_opt( "Destroy all data on and create a new OSD database volume group on node {node} device {device}" ) -def cli_storage_osd_create_db_vg(node, device): +def cli_storage_osd_create_db_vg(node, device, wait_flag): """ Create a new Ceph OSD database volume group on node NODE with block device DEVICE. @@ -3390,7 +3398,12 @@ def cli_storage_osd_create_db_vg(node, device): A "detect" string is a string in the form "detect:::". Detect strings allow for automatic determination of Linux block device paths from known basic information about disks by leveraging "lsscsi" on the target host. The "NAME" should be some descriptive identifier, for instance the manufacturer (e.g. "INTEL"), the "HUMAN-SIZE" should be the labeled human-readable size of the device (e.g. "480GB", "1.92TB"), and "ID" specifies the Nth 0-indexed device which matches the "NAME" and "HUMAN-SIZE" values (e.g. "2" would match the third device with the corresponding "NAME" and "HUMAN-SIZE"). When matching against sizes, there is +/- 3% flexibility to account for base-1000 vs. base-1024 differences and rounding errors. The "NAME" may contain whitespace but if so the entire detect string should be quoted, and is case-insensitive. More information about detect strings can be found in the manual. """ - retcode, retmsg = pvc.lib.storage.ceph_osd_db_vg_add(CLI_CONFIG, node, device) + retcode, retmsg = pvc.lib.storage.ceph_osd_db_vg_add( + CLI_CONFIG, node, device, wait_flag + ) + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) finish(retcode, retmsg) @@ -3434,8 +3447,18 @@ def cli_storage_osd_create_db_vg(node, device): type=int, help="Split (an NVMe) disk into this many OSDs.", ) +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress", +) @confirm_opt("Destroy all data on and create new OSD(s) on node {node} device {device}") -def cli_storage_osd_add(node, device, weight, ext_db_ratio, ext_db_size, osd_count): +def cli_storage_osd_add( + node, device, weight, ext_db_ratio, ext_db_size, osd_count, wait_flag +): """ Add a new Ceph OSD on node NODE with block device DEVICE. @@ -3456,11 +3479,6 @@ def cli_storage_osd_add(node, device, weight, ext_db_ratio, ext_db_size, osd_cou NOTE: This command may take a long time to complete. Observe the node logs of the hosting OSD node for detailed status. """ - echo( - CLI_CONFIG, - "Waiting for node task to complete, this may take some time... ", - newline=False, - ) retcode, retmsg = pvc.lib.storage.ceph_osd_add( CLI_CONFIG, node, @@ -3469,8 +3487,11 @@ def cli_storage_osd_add(node, device, weight, ext_db_ratio, ext_db_size, osd_cou ext_db_ratio, ext_db_size, osd_count, + wait_flag, ) - echo(CLI_CONFIG, "done.") + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) finish(retcode, retmsg) @@ -3509,11 +3530,19 @@ def cli_storage_osd_add(node, device, weight, ext_db_ratio, ext_db_size, osd_cou default=None, help="Create a new external database logical volume for the OSD(s) with this human-unit size; if unset, old ext_db_size is used", ) +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress", +) @confirm_opt( "Destroy all data on and replace OSD {osdid} (and peer split OSDs) with new device {new_device}" ) def cli_storage_osd_replace( - osdid, new_device, old_device, weight, ext_db_ratio, ext_db_size + osdid, new_device, old_device, weight, ext_db_ratio, ext_db_size, wait_flag ): """ Replace the block device of an existing OSD with ID OSDID, and any peer split OSDs with the same block device, with NEW_DEVICE. Use this command to replace a failed or smaller OSD block device with a new one in one command. @@ -3533,15 +3562,19 @@ def cli_storage_osd_replace( NOTE: This command may take a long time to complete. Observe the node logs of the hosting OSD node for detailed status. """ - echo( - CLI_CONFIG, - "Waiting for node task to complete, this may take some time... ", - newline=False, - ) retcode, retmsg = pvc.lib.storage.ceph_osd_replace( - CLI_CONFIG, osdid, new_device, old_device, weight, ext_db_ratio, ext_db_size + CLI_CONFIG, + osdid, + new_device, + old_device, + weight, + ext_db_ratio, + ext_db_size, + wait_flag, ) - echo(CLI_CONFIG, "done.") + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) finish(retcode, retmsg) @@ -3552,8 +3585,16 @@ def cli_storage_osd_replace( @connection_req @click.argument("osdid") @click.argument("device") -@confirm_opt("Refresh OSD {osdid} on device {device}") -def cli_storage_osd_refresh(osdid, device): +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress", +) +@confirm_opt("Refresh OSD {osdid} (and peer split OSDs) on device {device}") +def cli_storage_osd_refresh(osdid, device, wait_flag): """ Refresh (reimport) the block DEVICE of an existing OSD with ID OSDID. Use this command to reimport a working OSD into a rebuilt/replaced node. @@ -3566,13 +3607,12 @@ def cli_storage_osd_refresh(osdid, device): NOTE: This command may take a long time to complete. Observe the node logs of the hosting OSD node for detailed status. """ - echo( - CLI_CONFIG, - "Waiting for node task to complete, this may take some time... ", - newline=False, + retcode, retmsg = pvc.lib.storage.ceph_osd_refresh( + CLI_CONFIG, osdid, device, wait_flag ) - retcode, retmsg = pvc.lib.storage.ceph_osd_refresh(CLI_CONFIG, osdid, device) - echo(CLI_CONFIG, "done.") + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) finish(retcode, retmsg) @@ -3590,8 +3630,16 @@ def cli_storage_osd_refresh(osdid, device): default=False, help="Force removal even if steps fail", ) +@click.option( + "--wait/--no-wait", + "wait_flag", + is_flag=True, + default=True, + show_default=True, + help="Wait or don't wait for task to complete, showing progress", +) @confirm_opt("Remove and destroy data on OSD {osdid}") -def cli_storage_osd_remove(osdid, force_flag): +def cli_storage_osd_remove(osdid, force_flag, wait_flag): """ Remove a Ceph OSD with ID OSDID. @@ -3602,13 +3650,12 @@ def cli_storage_osd_remove(osdid, force_flag): NOTE: This command may take a long time to complete. Observe the node logs of the hosting OSD node for detailed status. """ - echo( - CLI_CONFIG, - "Waiting for node task to complete, this may take some time... ", - newline=False, + retcode, retmsg = pvc.lib.storage.ceph_osd_remove( + CLI_CONFIG, osdid, force_flag, wait_flag ) - retcode, retmsg = pvc.lib.storage.ceph_osd_remove(CLI_CONFIG, osdid, force_flag) - echo(CLI_CONFIG, "done.") + + if retcode and wait_flag: + retmsg = wait_for_celery_task(CLI_CONFIG, retmsg) finish(retcode, retmsg) diff --git a/client-cli/pvc/cli/helpers.py b/client-cli/pvc/cli/helpers.py index 583d7743..d173e084 100644 --- a/client-cli/pvc/cli/helpers.py +++ b/client-cli/pvc/cli/helpers.py @@ -20,7 +20,7 @@ ############################################################################### from click import echo as click_echo -from click import progressbar, confirm +from click import confirm from datetime import datetime from distutils.util import strtobool from getpass import getuser @@ -32,7 +32,6 @@ from socket import gethostname from subprocess import run, PIPE from sys import argv from syslog import syslog, openlog, closelog, LOG_AUTH -from time import sleep from yaml import load as yload from yaml import BaseLoader, SafeLoader @@ -191,123 +190,6 @@ def update_store(store_path, store_data): jdump(store_data, fh, sort_keys=True, indent=4) -def wait_for_flush_locks(CLI_CONFIG, task_detail): - """ - Wait for a flush_locks task to complete - """ - - task_id = task_detail["task_id"] - run_on = task_detail["run_on"] - - echo(CLI_CONFIG, f"Task ID: {task_id} assigned to node {run_on}") - echo(CLI_CONFIG, "") - - # Wait for the task to start - echo(CLI_CONFIG, "Waiting for task to start...", newline=False) - while True: - sleep(0.25) - task_status = pvc.lib.common.task_status( - CLI_CONFIG, task_id=task_id, is_watching=True - ) - if task_status.get("state") != "PENDING": - break - echo(CLI_CONFIG, ".", newline=False) - echo(CLI_CONFIG, " done.") - echo(CLI_CONFIG, "") - - # Start following the task state, updating progress as we go - total_task = task_status.get("total") - with progressbar(length=total_task, show_eta=False) as bar: - last_task = 0 - maxlen = 0 - while True: - sleep(0.25) - if task_status.get("state") != "RUNNING": - break - if task_status.get("current") > last_task: - current_task = int(task_status.get("current")) - bar.update(current_task - last_task) - last_task = current_task - # The extensive spaces at the end cause this to overwrite longer previous messages - curlen = len(str(task_status.get("status"))) - if curlen > maxlen: - maxlen = curlen - lendiff = maxlen - curlen - overwrite_whitespace = " " * lendiff - echo( - CLI_CONFIG, - " " + task_status.get("status") + overwrite_whitespace, - newline=False, - ) - task_status = pvc.lib.common.task_status( - CLI_CONFIG, task_id=task_id, is_watching=True - ) - if task_status.get("state") == "SUCCESS": - bar.update(total_task - last_task) - - echo(CLI_CONFIG, "") - retdata = task_status.get("state") + ": " + task_status.get("status") - - return retdata - - -def wait_for_provisioner(CLI_CONFIG, task_id): - """ - Wait for a provisioner task to complete - """ - - echo(CLI_CONFIG, f"Task ID: {task_id}") - echo(CLI_CONFIG, "") - - # Wait for the task to start - echo(CLI_CONFIG, "Waiting for task to start...", newline=False) - while True: - sleep(1) - task_status = pvc.lib.provisioner.task_status( - CLI_CONFIG, task_id, is_watching=True - ) - if task_status.get("state") != "PENDING": - break - echo(CLI_CONFIG, ".", newline=False) - echo(CLI_CONFIG, " done.") - echo(CLI_CONFIG, "") - - # Start following the task state, updating progress as we go - total_task = task_status.get("total") - with progressbar(length=total_task, show_eta=False) as bar: - last_task = 0 - maxlen = 0 - while True: - sleep(1) - if task_status.get("state") != "RUNNING": - break - if task_status.get("current") > last_task: - current_task = int(task_status.get("current")) - bar.update(current_task - last_task) - last_task = current_task - # The extensive spaces at the end cause this to overwrite longer previous messages - curlen = len(str(task_status.get("status"))) - if curlen > maxlen: - maxlen = curlen - lendiff = maxlen - curlen - overwrite_whitespace = " " * lendiff - echo( - CLI_CONFIG, - " " + task_status.get("status") + overwrite_whitespace, - newline=False, - ) - task_status = pvc.lib.provisioner.task_status( - CLI_CONFIG, task_id, is_watching=True - ) - if task_status.get("state") == "SUCCESS": - bar.update(total_task - last_task) - - echo(CLI_CONFIG, "") - retdata = task_status.get("state") + ": " + task_status.get("status") - - return retdata - - def get_autobackup_config(CLI_CONFIG, cfgfile): try: config = dict() diff --git a/client-cli/pvc/cli/waiters.py b/client-cli/pvc/cli/waiters.py index 705c19c5..05b38ab4 100644 --- a/client-cli/pvc/cli/waiters.py +++ b/client-cli/pvc/cli/waiters.py @@ -19,6 +19,7 @@ # ############################################################################### +from click import progressbar from time import sleep, time from pvc.cli.helpers import echo @@ -62,3 +63,120 @@ def cli_node_waiter(config, node, state_field, state_value): t_end = time() echo(config, f" done. [{int(t_end - t_start)}s]") + + +def wait_for_celery_task(CLI_CONFIG, task_detail): + """ + Wait for a Celery task to complete + """ + + task_id = task_detail["task_id"] + run_on = task_detail["run_on"] + + echo(CLI_CONFIG, f"Task ID: {task_id} assigned to node {run_on}") + echo(CLI_CONFIG, "") + + # Wait for the task to start + echo(CLI_CONFIG, "Waiting for task to start...", newline=False) + while True: + sleep(0.25) + task_status = pvc.lib.common.task_status( + CLI_CONFIG, task_id=task_id, is_watching=True + ) + if task_status.get("state") != "PENDING": + break + echo(CLI_CONFIG, ".", newline=False) + echo(CLI_CONFIG, " done.") + echo(CLI_CONFIG, "") + + # Start following the task state, updating progress as we go + total_task = task_status.get("total") + with progressbar(length=total_task, show_eta=False) as bar: + last_task = 0 + maxlen = 0 + while True: + sleep(0.25) + if task_status.get("state") != "RUNNING": + break + if task_status.get("current") > last_task: + current_task = int(task_status.get("current")) + bar.update(current_task - last_task) + last_task = current_task + # The extensive spaces at the end cause this to overwrite longer previous messages + curlen = len(str(task_status.get("status"))) + if curlen > maxlen: + maxlen = curlen + lendiff = maxlen - curlen + overwrite_whitespace = " " * lendiff + echo( + CLI_CONFIG, + " " + task_status.get("status") + overwrite_whitespace, + newline=False, + ) + task_status = pvc.lib.common.task_status( + CLI_CONFIG, task_id=task_id, is_watching=True + ) + if task_status.get("state") == "SUCCESS": + bar.update(total_task - last_task) + + echo(CLI_CONFIG, "") + retdata = task_status.get("state") + ": " + task_status.get("status") + + return retdata + + +def wait_for_provisioner(CLI_CONFIG, task_id): + """ + Wait for a provisioner task to complete + """ + + echo(CLI_CONFIG, f"Task ID: {task_id}") + echo(CLI_CONFIG, "") + + # Wait for the task to start + echo(CLI_CONFIG, "Waiting for task to start...", newline=False) + while True: + sleep(1) + task_status = pvc.lib.provisioner.task_status( + CLI_CONFIG, task_id, is_watching=True + ) + if task_status.get("state") != "PENDING": + break + echo(CLI_CONFIG, ".", newline=False) + echo(CLI_CONFIG, " done.") + echo(CLI_CONFIG, "") + + # Start following the task state, updating progress as we go + total_task = task_status.get("total") + with progressbar(length=total_task, show_eta=False) as bar: + last_task = 0 + maxlen = 0 + while True: + sleep(1) + if task_status.get("state") != "RUNNING": + break + if task_status.get("current") > last_task: + current_task = int(task_status.get("current")) + bar.update(current_task - last_task) + last_task = current_task + # The extensive spaces at the end cause this to overwrite longer previous messages + curlen = len(str(task_status.get("status"))) + if curlen > maxlen: + maxlen = curlen + lendiff = maxlen - curlen + overwrite_whitespace = " " * lendiff + echo( + CLI_CONFIG, + " " + task_status.get("status") + overwrite_whitespace, + newline=False, + ) + task_status = pvc.lib.provisioner.task_status( + CLI_CONFIG, task_id, is_watching=True + ) + if task_status.get("state") == "SUCCESS": + bar.update(total_task - last_task) + + echo(CLI_CONFIG, "") + retdata = task_status.get("state") + ": " + task_status.get("status") + + return retdata diff --git a/client-cli/pvc/lib/storage.py b/client-cli/pvc/lib/storage.py index 33a1e8b5..3e1fa482 100644 --- a/client-cli/pvc/lib/storage.py +++ b/client-cli/pvc/lib/storage.py @@ -164,7 +164,7 @@ def format_raw_output(config, status_data): # # OSD DB VG functions # -def ceph_osd_db_vg_add(config, node, device): +def ceph_osd_db_vg_add(config, node, device, wait_flag): """ Add new Ceph OSD database volume group @@ -175,12 +175,21 @@ def ceph_osd_db_vg_add(config, node, device): params = {"node": node, "device": device} response = call_api(config, "post", "/storage/ceph/osddb", params=params) - if response.status_code == 200: - retstatus = True + if response.status_code == 202: + retvalue = True + retjson = response.json() + if not wait_flag: + retdata = ( + f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}" + ) + else: + # Just return the task JSON without formatting + retdata = response.json() else: - retstatus = False + retvalue = False + retdata = response.json().get("message", "") - return retstatus, response.json().get("message", "") + return retvalue, retdata # @@ -231,7 +240,9 @@ def ceph_osd_list(config, limit): return False, response.json().get("message", "") -def ceph_osd_add(config, node, device, weight, ext_db_ratio, ext_db_size, osd_count): +def ceph_osd_add( + config, node, device, weight, ext_db_ratio, ext_db_size, osd_count, wait_flag +): """ Add new Ceph OSD @@ -254,16 +265,25 @@ def ceph_osd_add(config, node, device, weight, ext_db_ratio, ext_db_size, osd_co response = call_api(config, "post", "/storage/ceph/osd", params=params) - if response.status_code == 200: - retstatus = True + if response.status_code == 202: + retvalue = True + retjson = response.json() + if not wait_flag: + retdata = ( + f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}" + ) + else: + # Just return the task JSON without formatting + retdata = response.json() else: - retstatus = False + retvalue = False + retdata = response.json().get("message", "") - return retstatus, response.json().get("message", "") + return retvalue, retdata def ceph_osd_replace( - config, osdid, new_device, old_device, weight, ext_db_ratio, ext_db_size + config, osdid, new_device, old_device, weight, ext_db_ratio, ext_db_size, wait_flag ): """ Replace an existing Ceph OSD with a new device @@ -288,15 +308,24 @@ def ceph_osd_replace( response = call_api(config, "post", f"/storage/ceph/osd/{osdid}", params=params) - if response.status_code == 200: - retstatus = True + if response.status_code == 202: + retvalue = True + retjson = response.json() + if not wait_flag: + retdata = ( + f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}" + ) + else: + # Just return the task JSON without formatting + retdata = response.json() else: - retstatus = False + retvalue = False + retdata = response.json().get("message", "") - return retstatus, response.json().get("message", "") + return retvalue, retdata -def ceph_osd_refresh(config, osdid, device): +def ceph_osd_refresh(config, osdid, device, wait_flag): """ Refresh (reimport) an existing Ceph OSD with device {device} @@ -309,15 +338,24 @@ def ceph_osd_refresh(config, osdid, device): } response = call_api(config, "put", f"/storage/ceph/osd/{osdid}", params=params) - if response.status_code == 200: - retstatus = True + if response.status_code == 202: + retvalue = True + retjson = response.json() + if not wait_flag: + retdata = ( + f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}" + ) + else: + # Just return the task JSON without formatting + retdata = response.json() else: - retstatus = False + retvalue = False + retdata = response.json().get("message", "") - return retstatus, response.json().get("message", "") + return retvalue, retdata -def ceph_osd_remove(config, osdid, force_flag): +def ceph_osd_remove(config, osdid, force_flag, wait_flag): """ Remove Ceph OSD @@ -330,12 +368,21 @@ def ceph_osd_remove(config, osdid, force_flag): config, "delete", "/storage/ceph/osd/{osdid}".format(osdid=osdid), params=params ) - if response.status_code == 200: - retstatus = True + if response.status_code == 202: + retvalue = True + retjson = response.json() + if not wait_flag: + retdata = ( + f"Task ID: {retjson['task_id']} assigned to node {retjson['run_on']}" + ) + else: + # Just return the task JSON without formatting + retdata = response.json() else: - retstatus = False + retvalue = False + retdata = response.json().get("message", "") - return retstatus, response.json().get("message", "") + return retvalue, retdata def ceph_osd_state(config, osdid, state): diff --git a/client-cli/pvc/lib/vm.py b/client-cli/pvc/lib/vm.py index d3fa3b98..4fe21a49 100644 --- a/client-cli/pvc/lib/vm.py +++ b/client-cli/pvc/lib/vm.py @@ -415,7 +415,7 @@ def vm_node(config, vm, target_node, action, force=False, wait=False, force_live return retstatus, response.json().get("message", "") -def vm_locks(config, vm, wait_flag=False): +def vm_locks(config, vm, wait_flag): """ Flush RBD locks of (stopped) VM diff --git a/daemon-common/celery.py b/daemon-common/celery.py index a4be1e46..547c11ef 100644 --- a/daemon-common/celery.py +++ b/daemon-common/celery.py @@ -30,25 +30,40 @@ class TaskFailure(Exception): pass -def start(celery, msg, current=1, total=2): +def start(celery, msg, current=0, total=1): logger = getLogger(__name__) - logger.info(f"Starting: {msg}") + logger.info(f"Starting {current}/{total}: {msg}") celery.update_state( state="RUNNING", meta={"current": current, "total": total, "status": msg} ) sleep(0.5) -def fail(celery, msg, current=1, total=2): +def fail(celery, msg, current=1, total=1): logger = getLogger(__name__) logger.error(msg) sys.tracebacklimit = 0 raise TaskFailure(msg) +def log_info(celery, msg): + logger = getLogger(__name__) + logger.info(f"Task log: {msg}") + + +def log_warn(celery, msg): + logger = getLogger(__name__) + logger.warning(f"Task log: {msg}") + + +def log_err(celery, msg): + logger = getLogger(__name__) + logger.error(f"Task log: {msg}") + + def update(celery, msg, current=1, total=2): logger = getLogger(__name__) - logger.info(f"Task update: {msg}") + logger.info(f"Task update {current}/{total}: {msg}") celery.update_state( state="RUNNING", meta={"current": current, "total": total, "status": msg} ) @@ -57,10 +72,11 @@ def update(celery, msg, current=1, total=2): def finish(celery, msg, current=2, total=2): logger = getLogger(__name__) + logger.info(f"Task update {current}/{total}: Finishing up") celery.update_state( state="RUNNING", meta={"current": current, "total": total, "status": "Finishing up"}, ) - sleep(0.25) - logger.info(f"Success: {msg}") + sleep(0.5) + logger.info(f"Success {current}/{total}: {msg}") return {"status": msg, "current": current, "total": total} diff --git a/daemon-common/ceph.py b/daemon-common/ceph.py index 7a435b65..ec4c9793 100644 --- a/daemon-common/ceph.py +++ b/daemon-common/ceph.py @@ -27,10 +27,16 @@ import math from concurrent.futures import ThreadPoolExecutor from distutils.util import strtobool +from json import loads as jloads +from re import match, search +from uuid import uuid4 +from os import path import daemon_lib.vm as vm import daemon_lib.common as common +from daemon_lib.celery import start, log_info, log_warn, update, fail, finish + # # Supplemental functions @@ -231,233 +237,6 @@ def getOSDInformation(zkhandler, osd_id): return osd_information -# OSD DB VG actions use the /cmd/ceph pipe -# These actions must occur on the specific node they reference -def add_osd_db_vg(zkhandler, node, device): - # Verify the target node exists - if not common.verifyNode(zkhandler, node): - return False, 'ERROR: No node named "{}" is present in the cluster.'.format( - node - ) - - # Tell the cluster to create a new OSD for the host - add_osd_db_vg_string = "db_vg_add {},{}".format(node, device) - zkhandler.write([("base.cmd.ceph", add_osd_db_vg_string)]) - # Wait 1/2 second for the cluster to get the message and start working - time.sleep(0.5) - # Acquire a read lock, so we get the return exclusively - with zkhandler.readlock("base.cmd.ceph"): - try: - result = zkhandler.read("base.cmd.ceph").split()[0] - if result == "success-db_vg_add": - message = 'Created new OSD database VG at "{}" on node "{}".'.format( - device, node - ) - success = True - else: - message = "ERROR: Failed to create new OSD database VG; check node logs for details." - success = False - except Exception: - message = "ERROR: Command ignored by node." - success = False - - # Acquire a write lock to ensure things go smoothly - with zkhandler.writelock("base.cmd.ceph"): - time.sleep(0.5) - zkhandler.write([("base.cmd.ceph", "")]) - - return success, message - - -# OSD actions use the /cmd/ceph pipe -# These actions must occur on the specific node they reference -def add_osd( - zkhandler, - node, - device, - weight, - ext_db_ratio=None, - ext_db_size=None, - split_count=None, -): - # Verify that options are valid - if ext_db_ratio is not None and ext_db_size is not None: - return ( - False, - "ERROR: Both an ext_db_ratio and ext_db_size were specified; choose only one.", - ) - - # Verify the target node exists - if not common.verifyNode(zkhandler, node): - return False, 'ERROR: No node named "{}" is present in the cluster.'.format( - node - ) - - # Verify target block device isn't in use - block_osd = verifyOSDBlock(zkhandler, node, device) - if block_osd: - return ( - False, - 'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format( - device, node, block_osd - ), - ) - - # Tell the cluster to create a new OSD for the host - add_osd_string = "osd_add {},{},{},{},{},{}".format( - node, device, weight, ext_db_ratio, ext_db_size, split_count - ) - zkhandler.write([("base.cmd.ceph", add_osd_string)]) - # Wait 1/2 second for the cluster to get the message and start working - time.sleep(0.5) - # Acquire a read lock, so we get the return exclusively - with zkhandler.readlock("base.cmd.ceph"): - try: - result = zkhandler.read("base.cmd.ceph").split()[0] - if result == "success-osd_add": - message = f'Created {split_count} new OSD(s) on node "{node}" block device "{device}"' - success = True - else: - message = "ERROR: Failed to create OSD(s); check node logs for details." - success = False - except Exception: - message = "ERROR: Command ignored by node." - success = False - - # Acquire a write lock to ensure things go smoothly - with zkhandler.writelock("base.cmd.ceph"): - time.sleep(0.5) - zkhandler.write([("base.cmd.ceph", "")]) - - return success, message - - -def replace_osd( - zkhandler, - osd_id, - new_device, - old_device=None, - weight=None, - ext_db_ratio=None, - ext_db_size=None, -): - # Get current OSD information - osd_information = getOSDInformation(zkhandler, osd_id) - node = osd_information["node"] - - # Verify target block device isn't in use - block_osd = verifyOSDBlock(zkhandler, node, new_device) - if block_osd and block_osd != osd_id: - return ( - False, - 'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format( - new_device, node, block_osd - ), - ) - - # Tell the cluster to create a new OSD for the host - replace_osd_string = "osd_replace {},{},{},{},{},{},{}".format( - node, osd_id, new_device, old_device, weight, ext_db_ratio, ext_db_size - ) - zkhandler.write([("base.cmd.ceph", replace_osd_string)]) - # Wait 1/2 second for the cluster to get the message and start working - time.sleep(0.5) - # Acquire a read lock, so we get the return exclusively - with zkhandler.readlock("base.cmd.ceph"): - try: - result = zkhandler.read("base.cmd.ceph").split()[0] - if result == "success-osd_replace": - message = 'Replaced OSD {} with block device "{}" on node "{}".'.format( - osd_id, new_device, node - ) - success = True - else: - message = "ERROR: Failed to replace OSD; check node logs for details." - success = False - except Exception: - message = "ERROR: Command ignored by node." - success = False - - # Acquire a write lock to ensure things go smoothly - with zkhandler.writelock("base.cmd.ceph"): - time.sleep(0.5) - zkhandler.write([("base.cmd.ceph", "")]) - - return success, message - - -def refresh_osd(zkhandler, osd_id, device): - # Get current OSD information - osd_information = getOSDInformation(zkhandler, osd_id) - node = osd_information["node"] - ext_db_flag = True if osd_information["db_device"] else False - - # Tell the cluster to create a new OSD for the host - refresh_osd_string = "osd_refresh {},{},{},{}".format( - node, osd_id, device, ext_db_flag - ) - zkhandler.write([("base.cmd.ceph", refresh_osd_string)]) - # Wait 1/2 second for the cluster to get the message and start working - time.sleep(0.5) - # Acquire a read lock, so we get the return exclusively - with zkhandler.readlock("base.cmd.ceph"): - try: - result = zkhandler.read("base.cmd.ceph").split()[0] - if result == "success-osd_refresh": - message = ( - 'Refreshed OSD {} with block device "{}" on node "{}".'.format( - osd_id, device, node - ) - ) - success = True - else: - message = "ERROR: Failed to refresh OSD; check node logs for details." - success = False - except Exception: - message = "ERROR: Command ignored by node." - success = False - - # Acquire a write lock to ensure things go smoothly - with zkhandler.writelock("base.cmd.ceph"): - time.sleep(0.5) - zkhandler.write([("base.cmd.ceph", "")]) - - return success, message - - -def remove_osd(zkhandler, osd_id, force_flag): - if not verifyOSD(zkhandler, osd_id): - return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format( - osd_id - ) - - # Tell the cluster to remove an OSD - remove_osd_string = "osd_remove {},{}".format(osd_id, str(force_flag)) - zkhandler.write([("base.cmd.ceph", remove_osd_string)]) - # Wait 1/2 second for the cluster to get the message and start working - time.sleep(0.5) - # Acquire a read lock, so we get the return exclusively - with zkhandler.readlock("base.cmd.ceph"): - try: - result = zkhandler.read("base.cmd.ceph").split()[0] - if result == "success-osd_remove": - message = 'Removed OSD "{}" from the cluster.'.format(osd_id) - success = True - else: - message = "ERROR: Failed to remove OSD; check node logs for details." - success = False - except Exception: - success = False - message = "ERROR Command ignored by node." - - # Acquire a write lock to ensure things go smoothly - with zkhandler.writelock("base.cmd.ceph"): - time.sleep(0.5) - zkhandler.write([("base.cmd.ceph", "")]) - - return success, message - - def in_osd(zkhandler, osd_id): if not verifyOSD(zkhandler, osd_id): return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format( @@ -1298,3 +1077,1382 @@ def get_list_snapshot(zkhandler, pool, volume, limit, is_fuzzy=True): ) return True, sorted(snapshot_list, key=lambda x: str(x["snapshot"])) + + +# +# Celery worker tasks (must be run on node, outputs log messages to worker) +# +def osd_worker_helper_find_osds_from_block(device): + # Try to query the passed block device directly + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm list --format json {device}" + ) + if retcode: + found_osds = [] + else: + found_osds = jloads(stdout) + + return found_osds + + +def osd_worker_add_osd( + zkhandler, + celery, + node, + device, + weight, + ext_db_ratio=None, + ext_db_size=None, + split_count=None, +): + current_stage = 0 + total_stages = 4 + if split_count is None: + _split_count = 1 + else: + _split_count = split_count + total_stages = total_stages + 3 * int(_split_count) + if ext_db_ratio is not None or ext_db_size is not None: + total_stages = total_stages + 3 * int(_split_count) + 1 + + start( + celery, + f"Adding {_split_count} new OSD(s) on device {device} with weight {weight}", + current=current_stage, + total=total_stages, + ) + + # Handle a detect device if that is passed + if match(r"detect:", device): + ddevice = common.get_detect_device(device) + if ddevice is None: + fail( + celery, + f"Failed to determine block device from detect string {device}", + ) + return + else: + log_info( + celery, f"Determined block device {ddevice} from detect string {device}" + ) + device = ddevice + + if ext_db_size is not None and ext_db_ratio is not None: + fail( + celery, + "Invalid configuration: both an ext_db_size and ext_db_ratio were specified", + ) + return + + # Check if device has a partition table; it's not valid if it does + retcode, _, _ = common.run_os_command(f"sfdisk --dump {device}") + if retcode < 1: + fail( + celery, + f"Device {device} has a partition table and is unsuitable for an OSD", + ) + return + + if ext_db_size is not None or ext_db_ratio is not None: + ext_db_flag = True + else: + ext_db_flag = False + + if split_count is not None: + split_flag = f"--osds-per-device {split_count}" + is_split = True + log_info( + celery, f"Creating {split_count} new OSD disks on block device {device}" + ) + else: + split_flag = "" + is_split = False + log_info(celery, f"Creating 1 new OSD disk on block device {device}") + + if "nvme" in device: + class_flag = "--crush-device-class nvme" + else: + class_flag = "--crush-device-class ssd" + + # 1. Zap the block device + current_stage += 1 + update( + celery, + f"Zapping block device {device}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm zap --destroy {device}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to perform ceph-volume lvm zap on {device}") + return + + # 2. Prepare the OSD(s) + current_stage += 1 + update( + celery, + f"Preparing OSD(s) on device {device}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm batch --yes --prepare --bluestore {split_flag} {class_flag} {device}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to perform ceph-volume lvm batch on {device}") + return + + # 3. Get the list of created OSDs on the device (initial pass) + current_stage += 1 + update( + celery, + f"Querying OSD(s) on device {device}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm list --format json {device}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to perform ceph-volume lvm list on {device}") + return + + created_osds = jloads(stdout) + + # 4. Prepare the WAL and DB devices + if ext_db_flag: + for created_osd in created_osds: + # 4a. Get the OSD FSID and ID from the details + osd_details = created_osds[created_osd][0] + osd_fsid = osd_details["tags"]["ceph.osd_fsid"] + osd_id = osd_details["tags"]["ceph.osd_id"] + osd_lv = osd_details["lv_path"] + + current_stage += 1 + update( + celery, + f"Preparing DB LV for OSD {osd_id}", + current=current_stage, + total=total_stages, + ) + + # 4b. Prepare the logical volume if ext_db_flag + if ext_db_ratio is not None: + _, osd_size_bytes, _ = common.run_os_command( + f"blockdev --getsize64 {osd_lv}" + ) + osd_size_bytes = int(osd_size_bytes) + osd_db_size_bytes = int(osd_size_bytes * ext_db_ratio) + if ext_db_size is not None: + osd_db_size_bytes = format_bytes_fromhuman(ext_db_size) + osd_db_size_mb = int(osd_db_size_bytes / 1024 / 1024) + + db_device = f"osd-db/osd-{osd_id}" + + current_stage += 1 + update( + celery, + f"Preparing Bluestore DB volume for OSD {osd_id} on {db_device}", + current=current_stage, + total=total_stages, + ) + + retcode, stdout, stderr = common.run_os_command( + f"lvcreate -L {osd_db_size_mb}M -n osd-{osd_id} --yes osd-db" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to run lvcreate on {db_device}") + return + + # 4c. Attach the new DB device to the OSD + current_stage += 1 + update( + celery, + f"Attaching Bluestore DB volume to OSD {osd_id}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm new-db --osd-id {osd_id} --osd-fsid {osd_fsid} --target {db_device}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail( + celery, f"Failed to perform ceph-volume lvm new-db on OSD {osd_id}" + ) + return + + # 4d. Get the list of created OSDs on the device (final pass) + current_stage += 1 + update( + celery, + f"Requerying OSD(s) on device {device}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm list --format json {device}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to perform ceph-volume lvm list on {device}") + return + + created_osds = jloads(stdout) + + # 5. Activate the OSDs + for created_osd in created_osds: + # 5a. Get the OSD FSID and ID from the details + osd_details = created_osds[created_osd][0] + osd_clusterfsid = osd_details["tags"]["ceph.cluster_fsid"] + osd_fsid = osd_details["tags"]["ceph.osd_fsid"] + osd_id = osd_details["tags"]["ceph.osd_id"] + db_device = osd_details["tags"].get("ceph.db_device", "") + osd_vg = osd_details["vg_name"] + osd_lv = osd_details["lv_name"] + + # 5b. Add it to the crush map + current_stage += 1 + update( + celery, + f"Adding OSD {osd_id} to CRUSH map", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph osd crush add osd.{osd_id} {weight} root=default host={node}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to perform ceph osd crush add for OSD {osd_id}") + return + + # 5c. Activate the OSD + current_stage += 1 + update( + celery, + f"Activating OSD {osd_id}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm activate --bluestore {osd_id} {osd_fsid}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to perform ceph osd crush add for OSD {osd_id}") + return + + # 5d. Wait 1 second for it to activate + time.sleep(1) + + # 5e. Verify it started + retcode, stdout, stderr = common.run_os_command( + f"systemctl status ceph-osd@{osd_id}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to start OSD {osd_id} process") + return + + # 5f. Add the new OSD to PVC + current_stage += 1 + update( + celery, + f"Adding OSD {osd_id} to PVC", + current=current_stage, + total=total_stages, + ) + zkhandler.write( + [ + (("osd", osd_id), ""), + (("osd.node", osd_id), node), + (("osd.device", osd_id), device), + (("osd.db_device", osd_id), db_device), + (("osd.fsid", osd_id), ""), + (("osd.ofsid", osd_id), osd_fsid), + (("osd.cfsid", osd_id), osd_clusterfsid), + (("osd.lvm", osd_id), ""), + (("osd.vg", osd_id), osd_vg), + (("osd.lv", osd_id), osd_lv), + (("osd.is_split", osd_id), is_split), + ( + ("osd.stats", osd_id), + '{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}', + ), + ] + ) + + # 6. Log it + current_stage += 1 + return finish( + celery, + f"Successfully created {len(created_osds.keys())} new OSD(s) {','.join(created_osds.keys())} on device {device}", + current=current_stage, + total=total_stages, + ) + + +def osd_worker_replace_osd( + zkhandler, + celery, + node, + osd_id, + new_device, + old_device=None, + weight=None, + ext_db_ratio=None, + ext_db_size=None, +): + # Try to determine if any other OSDs shared a block device with this OSD + _, osd_list = get_list_osd(zkhandler, None) + osd_block = zkhandler.read(("osd.device", osd_id)) + all_osds_on_block = [ + o for o in osd_list if o["node"] == node and o["device"] == osd_block + ] + all_osds_on_block_ids = [o["id"] for o in all_osds_on_block] + + # Set up stages + current_stage = 0 + total_stages = 3 + _split_count = len(all_osds_on_block_ids) + total_stages = total_stages + 10 * int(_split_count) + if ( + ext_db_ratio is not None + or ext_db_size is not None + or any([True for o in all_osds_on_block if o["db_device"]]) + ): + total_stages = total_stages + 2 * int(_split_count) + + start( + celery, + f"Replacing OSD(s) {','.join(all_osds_on_block_ids)} with device {new_device}", + current=current_stage, + total=total_stages, + ) + + # Handle a detect device if that is passed + if match(r"detect:", new_device): + ddevice = common.get_detect_device(new_device) + if ddevice is None: + fail( + celery, + f"Failed to determine block device from detect string {new_device}", + ) + return + else: + log_info( + celery, + f"Determined block device {ddevice} from detect string {new_device}", + ) + new_device = ddevice + + # Check if device has a partition table; it's not valid if it does + retcode, _, _ = common.run_os_command(f"sfdisk --dump {new_device}") + if retcode < 1: + fail( + celery, + f"Device {new_device} has a partition table and is unsuitable for an OSD", + ) + return + + # Phase 1: Try to determine what we can about the old device + real_old_device = None + + # Determine information from a passed old_device + if old_device is not None: + found_osds = osd_worker_helper_find_osds_from_block(old_device) + if found_osds and osd_id in found_osds.keys(): + real_old_device = old_device + else: + log_warn( + celery, + f"No OSD(s) found on device {old_device}; falling back to PVC detection", + ) + + # Try to get an old_device from our PVC information + if real_old_device is None: + found_osds = osd_worker_helper_find_osds_from_block(osd_block) + + if osd_id in found_osds.keys(): + real_old_device = osd_block + + if real_old_device is None: + skip_zap = True + log_warn( + celery, + "No valid old block device found for OSD(s); skipping zap", + ) + else: + skip_zap = False + + # Determine the weight of the OSD(s) + if weight is None: + weight = all_osds_on_block[0]["stats"]["weight"] + + # Take down the OSD(s), but keep it's CRUSH map details and IDs + for osd in all_osds_on_block: + osd_id = osd["id"] + + # 1. Set the OSD down and out so it will flush + current_stage += 1 + update( + celery, + f"Setting OSD {osd_id} down", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command(f"ceph osd down {osd_id}") + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to set OSD {osd_id} down") + return + + current_stage += 1 + update( + celery, + f"Setting OSD {osd_id} out", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command(f"ceph osd out {osd_id}") + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to set OSD {osd_id} out") + return + + # 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete) + current_stage += 1 + update( + celery, + f"Waiting for OSD {osd_id} to be safe to remove", + current=current_stage, + total=total_stages, + ) + tcount = 0 + while True: + retcode, stdout, stderr = common.run_os_command( + f"ceph osd safe-to-destroy osd.{osd_id}" + ) + if int(retcode) in [0, 11]: + break + else: + common.run_os_command(f"ceph osd down {osd_id}") + common.run_os_command(f"ceph osd out {osd_id}") + time.sleep(1) + tcount += 1 + if tcount > 60: + log_warn( + celery, + f"Timed out (60s) waiting for OSD {osd_id} to be safe to remove; proceeding anyways", + ) + break + + # 3. Stop the OSD process and wait for it to be terminated + current_stage += 1 + update( + celery, + f"Stopping OSD {osd_id}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"systemctl stop ceph-osd@{osd_id}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to stop OSD {osd_id}") + return + time.sleep(5) + + # 4. Destroy the OSD + current_stage += 1 + update( + celery, + f"Destroying OSD {osd_id}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph osd destroy {osd_id} --yes-i-really-mean-it" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to destroy OSD {osd_id}") + return + + current_stage += 1 + update( + celery, + f"Zapping old disk {real_old_device} if possible", + current=current_stage, + total=total_stages, + ) + if not skip_zap: + # 5. Zap the old disk + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm zap --destroy {real_old_device}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + log_warn( + celery, f"Failed to zap old disk {real_old_device}; proceeding anyways" + ) + + # 6. Prepare the volume group on the new device + current_stage += 1 + update( + celery, + f"Preparing LVM volume group on new disk {new_device}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm zap --destroy {new_device}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to run ceph-volume lvm zap on new disk {new_device}") + return + + retcode, stdout, stderr = common.run_os_command(f"pvcreate {new_device}") + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to run pvcreate on new disk {new_device}") + return + + vg_uuid = str(uuid4()) + retcode, stdout, stderr = common.run_os_command( + f"vgcreate ceph-{vg_uuid} {new_device}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to run vgcreate on new disk {new_device}") + return + + # Determine how many OSDs we want on the new device + osds_count = len(all_osds_on_block) + + # Determine the size of the new device + _, new_device_size_bytes, _ = common.run_os_command( + f"blockdev --getsize64 {new_device}" + ) + + # Calculate the size of each OSD (in MB) based on the default 4M extent size + new_osd_size_mb = ( + int(int(int(new_device_size_bytes) / osds_count) / 1024 / 1024 / 4) * 4 + ) + + # Calculate the size, if applicable, of the OSD block if we were passed a ratio + if ext_db_ratio is not None: + osd_new_db_size_mb = int(int(int(new_osd_size_mb * ext_db_ratio) / 4) * 4) + elif ext_db_size is not None: + osd_new_db_size_mb = int( + int(int(format_bytes_fromhuman(ext_db_size)) / 1024 / 1024 / 4) * 4 + ) + else: + if all_osds_on_block[0]["db_device"]: + _, new_device_size_bytes, _ = common.run_os_command( + f"blockdev --getsize64 {all_osds_on_block[0]['db_device']}" + ) + osd_new_db_size_mb = int( + int(int(new_device_size_bytes) / 1024 / 1024 / 4) * 4 + ) + else: + osd_new_db_size_mb = None + + for osd in all_osds_on_block: + osd_id = osd["id"] + osd_fsid = osd["fsid"] + + current_stage += 1 + update( + celery, + f"Preparing LVM logical volume for OSD {osd_id} on new disk {new_device}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"lvcreate -L {new_osd_size_mb}M -n osd-block-{osd_fsid} ceph-{vg_uuid}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to run lvcreate for OSD {osd_id}") + return + + current_stage += 1 + update( + celery, + f"Preparing OSD {osd_id} on new disk {new_device}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm prepare --bluestore --osd-id {osd_id} --osd-fsid {osd_fsid} --data /dev/ceph-{vg_uuid}/osd-block-{osd_fsid}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to run ceph-volume lvm prepare for OSD {osd_id}") + return + + for osd in all_osds_on_block: + osd_id = osd["id"] + osd_fsid = osd["fsid"] + + if osd["db_device"]: + db_device = f"osd-db/osd-{osd_id}" + + current_stage += 1 + update( + celery, + f"Preparing Bluestore DB volume for OSD {osd_id} on {db_device}", + current=current_stage, + total=total_stages, + ) + + retcode, stdout, stderr = common.run_os_command( + f"lvremove --force {db_device}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to run lvremove on {db_device}") + return + + retcode, stdout, stderr = common.run_os_command( + f"lvcreate -L {osd_new_db_size_mb}M -n osd-{osd_id} --yes osd-db" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to run lvcreate on {db_device}") + return + + current_stage += 1 + update( + celery, + f"Attaching Bluestore DB volume to OSD {osd_id}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm new-db --osd-id {osd_id} --osd-fsid {osd_fsid} --target {db_device}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to run ceph-volume lvm new-db for OSD {osd_id}") + return + + current_stage += 1 + update( + celery, + f"Updating OSD {osd_id} in CRUSH map", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph osd crush add osd.{osd_id} {weight} root=default host={node}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to run ceph osd crush add for OSD {osd_id}") + return + + current_stage += 1 + update( + celery, + f"Activating OSD {osd_id}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm activate --bluestore {osd_id} {osd_fsid}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to run ceph-volume lvm activate for OSD {osd_id}") + return + + # Wait 1 second for it to activate + time.sleep(1) + + # Verify it started + retcode, stdout, stderr = common.run_os_command( + f"systemctl status ceph-osd@{osd_id}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to start OSD {osd_id} process") + return + + current_stage += 1 + update( + celery, + f"Updating OSD {osd_id} in PVC", + current=current_stage, + total=total_stages, + ) + zkhandler.write( + [ + (("osd.device", osd_id), new_device), + (("osd.vg", osd_id), f"ceph-{vg_uuid}"), + (("osd.lv", osd_id), f"osd-block-{osd_fsid}"), + ] + ) + + # 6. Log it + current_stage += 1 + return finish( + celery, + f"Successfully replaced OSD(s) {','.join(all_osds_on_block_ids)} on new disk {new_device}", + current=current_stage, + total=total_stages, + ) + + +def osd_worker_refresh_osd( + zkhandler, + celery, + node, + osd_id, + device, + ext_db_flag, +): + # Try to determine if any other OSDs shared a block device with this OSD + _, osd_list = get_list_osd(zkhandler, None) + osd_block = zkhandler.read(("osd.device", osd_id)) + all_osds_on_block = [ + o for o in osd_list if o["node"] == node and o["device"] == osd_block + ] + all_osds_on_block_ids = [o["id"] for o in all_osds_on_block] + + # Set up stages + current_stage = 0 + total_stages = 1 + _split_count = len(all_osds_on_block_ids) + total_stages = total_stages + 3 * int(_split_count) + + start( + celery, + f"Refreshing/reimporting OSD(s) {','.join(all_osds_on_block_ids)} on device {device}", + current=current_stage, + total=total_stages, + ) + + # Handle a detect device if that is passed + if match(r"detect:", device): + ddevice = common.get_detect_device(device) + if ddevice is None: + fail( + celery, + f"Failed to determine block device from detect string {device}", + ) + return + else: + log_info( + celery, + f"Determined block device {ddevice} from detect string {device}", + ) + device = ddevice + + retcode, stdout, stderr = common.run_os_command("ceph osd ls") + osd_list = stdout.split("\n") + if osd_id not in osd_list: + fail( + celery, + f"Could not find OSD {osd_id} in the cluster", + ) + return + + found_osds = osd_worker_helper_find_osds_from_block(device) + if osd_id not in found_osds.keys(): + fail( + celery, + f"Could not find OSD {osd_id} on device {device}", + ) + return + + for osd in found_osds: + found_osd = found_osds[osd][0] + lv_device = found_osd["lv_path"] + + _, osd_pvc_information = get_list_osd(zkhandler, osd) + osd_information = osd_pvc_information[0] + + current_stage += 1 + update( + celery, + "Querying for OSD on device", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm list --format json {lv_device}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to run ceph-volume lvm list for OSD {osd}") + return + + osd_detail = jloads(stdout)[osd][0] + + osd_fsid = osd_detail["tags"]["ceph.osd_fsid"] + if osd_fsid != osd_information["fsid"]: + fail( + celery, + f"OSD {osd} FSID {osd_information['fsid']} does not match volume FSID {osd_fsid}; OSD cannot be imported", + ) + return + + dev_flags = f"--data {lv_device}" + + if ext_db_flag: + db_device = "osd-db/osd-{osd}" + dev_flags += f" --block.db {db_device}" + + if not path.exists(f"/dev/{db_device}"): + fail( + celery, + f"OSD Bluestore DB volume {db_device} does not exist; OSD cannot be imported", + ) + return + else: + db_device = "" + + current_stage += 1 + update( + celery, + f"Activating OSD {osd}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm activate --bluestore {osd} {osd_fsid}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to run ceph-volume lvm activate for OSD {osd}") + return + + # Wait 1 second for it to activate + time.sleep(1) + + # Verify it started + retcode, stdout, stderr = common.run_os_command( + f"systemctl status ceph-osd@{osd}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail(celery, f"Failed to start OSD {osd} process") + return + + current_stage += 1 + update( + celery, + f"Updating OSD {osd} in PVC", + current=current_stage, + total=total_stages, + ) + zkhandler.write( + [ + (("osd.device", osd), device), + (("osd.vg", osd), osd_detail["vg_name"]), + (("osd.lv", osd), osd_detail["lv_name"]), + ] + ) + + # 6. Log it + current_stage += 1 + return finish( + celery, + f"Successfully reimported OSD(s) {','.join(all_osds_on_block_ids)} on device {device}", + current=current_stage, + total=total_stages, + ) + + +def osd_worker_remove_osd( + zkhandler, celery, node, osd_id, force_flag=False, skip_zap_flag=False +): + # Get initial data + data_device = zkhandler.read(("osd.device", osd_id)) + if zkhandler.exists(("osd.db_device", osd_id)): + db_device = zkhandler.read(("osd.db_device", osd_id)) + else: + db_device = None + + # Set up stages + current_stage = 0 + total_stages = 7 + if not force_flag: + total_stages += 1 + if not skip_zap_flag: + total_stages += 2 + if db_device is not None: + total_stages += 1 + + start( + celery, + f"Removing OSD {osd_id}", + current=current_stage, + total=total_stages, + ) + + # Verify the OSD is present + retcode, stdout, stderr = common.run_os_command("ceph osd ls") + osd_list = stdout.split("\n") + if osd_id not in osd_list: + if not force_flag: + fail( + celery, + f"OSD {osd_id} not found in Ceph", + ) + return + else: + log_warn( + celery, + f"OSD {osd_id} not found in Ceph; ignoring error due to force flag", + ) + + # 1. Set the OSD down and out so it will flush + current_stage += 1 + update( + celery, + f"Setting OSD {osd_id} down", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command(f"ceph osd down {osd_id}") + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + if not force_flag: + fail( + celery, + f"Failed to set OSD {osd_id} down", + ) + return + else: + log_warn( + celery, + f"Failed to set OSD {osd_id} down; ignoring error due to force flag", + ) + + current_stage += 1 + update( + celery, + f"Setting OSD {osd_id} out", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command(f"ceph osd out {osd_id}") + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + if not force_flag: + fail( + celery, + f"Failed to set OSD {osd_id} down", + ) + return + else: + log_warn( + celery, + f"Failed to set OSD {osd_id} down; ignoring error due to force flag", + ) + + # 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete) + if not force_flag: + current_stage += 1 + update( + celery, + f"Waiting for OSD {osd_id} to be safe to remove", + current=current_stage, + total=total_stages, + ) + tcount = 0 + while True: + retcode, stdout, stderr = common.run_os_command( + f"ceph osd safe-to-destroy osd.{osd_id}" + ) + if int(retcode) in [0, 11]: + break + else: + common.run_os_command(f"ceph osd down {osd_id}") + common.run_os_command(f"ceph osd out {osd_id}") + time.sleep(1) + tcount += 1 + if tcount > 60: + log_warn( + celery, + f"Timed out (60s) waiting for OSD {osd_id} to be safe to remove; proceeding anyways", + ) + break + + # 3. Stop the OSD process and wait for it to be terminated + current_stage += 1 + update( + celery, + f"Stopping OSD {osd_id}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command(f"systemctl stop ceph-osd@{osd_id}") + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + if not force_flag: + fail( + celery, + f"Failed to stop OSD {osd_id} process", + ) + return + else: + log_warn( + celery, + f"Failed to stop OSD {osd_id} process; ignoring error due to force flag", + ) + time.sleep(5) + + # 4. Delete OSD from ZK + current_stage += 1 + update( + celery, + f"Deleting OSD {osd_id} from PVC", + current=current_stage, + total=total_stages, + ) + + zkhandler.delete(("osd", osd_id), recursive=True) + + # 5a. Destroy the OSD from Ceph + current_stage += 1 + update( + celery, + f"Destroying OSD {osd_id}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph osd destroy {osd_id} --yes-i-really-mean-it" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + if not force_flag: + fail( + celery, + f"Failed to destroy OSD {osd_id}", + ) + return + else: + log_warn( + celery, + f"Failed to destroy OSD {osd_id}; ignoring error due to force flag", + ) + time.sleep(2) + + # 5b. Purge the OSD from Ceph + current_stage += 1 + update( + celery, + f"Removing OSD {osd_id} from CRUSH map", + current=current_stage, + total=total_stages, + ) + + # Remove the OSD from the CRUSH map + retcode, stdout, stderr = common.run_os_command(f"ceph osd crush rm osd.{osd_id}") + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + if not force_flag: + fail( + celery, + f"Failed to remove OSD {osd_id} from CRUSH map", + ) + return + else: + log_warn( + celery, + f"Failed to remove OSD {osd_id} from CRUSH map; ignoring error due to force flag", + ) + + # Purge the OSD + current_stage += 1 + update( + celery, + f"Purging OSD {osd_id}", + current=current_stage, + total=total_stages, + ) + + if force_flag: + force_arg = "--force" + else: + force_arg = "" + + retcode, stdout, stderr = common.run_os_command( + f"ceph osd purge {osd_id} {force_arg} --yes-i-really-mean-it" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + if not force_flag: + fail( + celery, + f"Failed to purge OSD {osd_id}", + ) + return + else: + log_warn( + celery, + f"Failed to purge OSD {osd_id}; ignoring error due to force flag", + ) + + # 6. Remove the DB device + if db_device is not None: + current_stage += 1 + update( + celery, + f"Removing OSD DB logical volume {db_device}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + f"lvremove --yes --force {db_device}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + if not force_flag: + fail( + celery, + f"Failed to remove OSD DB logical volume {db_device}", + ) + return + else: + log_warn( + celery, + f"Failed to remove OSD DB logical volume {db_device}; ignoring error due to force flag", + ) + + if not skip_zap_flag: + current_stage += 1 + update( + celery, + f"Zapping old disk {data_device} if possible", + current=current_stage, + total=total_stages, + ) + + # 7. Determine the block devices + found_osds = osd_worker_helper_find_osds_from_block(data_device) + if osd_id in found_osds.keys(): + # Try to determine if any other OSDs shared a block device with this OSD + _, osd_list = get_list_osd(zkhandler, None) + all_osds_on_block = [ + o for o in osd_list if o["node"] == node and o["device"] == data_device + ] + + if len(all_osds_on_block) < 1: + log_info( + celery, + f"Found no peer split OSD(s) on {data_device}; zapping disk", + ) + retcode, stdout, stderr = common.run_os_command( + f"ceph-volume lvm zap --destroy {data_device}" + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + if not force_flag: + fail( + celery, + f"Failed to run ceph-volume lvm zap on device {data_device}", + ) + return + else: + log_warn( + celery, + f"Failed to run ceph-volume lvm zap on device {data_device}; ignoring error due to force flag", + ) + else: + log_warn( + celery, + f"Found {len(all_osds_on_block)} OSD(s) still remaining on {data_device}; skipping zap", + ) + else: + log_warn( + celery, + f"Could not find OSD {osd_id} on device {data_device}; skipping zap", + ) + + # 6. Log it + current_stage += 1 + return finish( + celery, + f"Successfully removed OSD {osd_id}", + current=current_stage, + total=total_stages, + ) + + +def osd_worker_add_db_vg(zkhandler, celery, device): + # Set up stages + current_stage = 0 + total_stages = 4 + + start( + celery, + f"Creating new OSD database volume group on device {device}", + current=current_stage, + total=total_stages, + ) + # Check if an existsing volume group exists + retcode, stdout, stderr = common.run_os_command("vgdisplay osd-db") + if retcode != 5: + fail( + celery, + "Ceph OSD database VG already exists", + ) + return + + # Handle a detect device if that is passed + if match(r"detect:", device): + ddevice = common.get_detect_device(device) + if ddevice is None: + fail( + celery, + f"Failed to determine block device from detect string {device}", + ) + return + else: + log_info( + celery, + f"Determined block device {ddevice} from detect string {device}", + ) + device = ddevice + + # 1. Create an empty partition table + current_stage += 1 + update( + celery, + f"Creating partitions on device {device}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command("sgdisk --clear {}".format(device)) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail( + celery, + f"Failed to create partition table on device {device}", + ) + return + + retcode, stdout, stderr = common.run_os_command( + "sgdisk --new 1:: --typecode 1:8e00 {}".format(device) + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail( + celery, + f"Failed to set partition type to LVM PV on device {device}", + ) + return + + # Handle the partition ID portion + if search(r"by-path", device) or search(r"by-id", device): + # /dev/disk/by-path/pci-0000:03:00.0-scsi-0:1:0:0 -> pci-0000:03:00.0-scsi-0:1:0:0-part1 + partition = "{}-part1".format(device) + elif search(r"nvme", device): + # /dev/nvme0n1 -> nvme0n1p1 + partition = "{}p1".format(device) + else: + # /dev/sda -> sda1 + # No other '/dev/disk/by-*' types are valid for raw block devices anyways + partition = "{}1".format(device) + + # 2. Create the PV + current_stage += 1 + update( + celery, + f"Creating LVM PV on device {device}", + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + "pvcreate --force {}".format(partition) + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail( + celery, + f"Failed to create LVM PV on device {device}", + ) + return + + # 2. Create the VG (named 'osd-db') + current_stage += 1 + update( + celery, + f'Creating LVM VG "osd-db" on device {device}', + current=current_stage, + total=total_stages, + ) + retcode, stdout, stderr = common.run_os_command( + "vgcreate --force osd-db {}".format(partition) + ) + log_info(celery, f"stdout: {stdout}") + log_info(celery, f"stderr: {stderr}") + if retcode: + fail( + celery, + f"Failed to create LVM VG on device {device}", + ) + return + + # Log it + current_stage += 1 + return finish( + celery, + f"Successfully created new OSD DB volume group on device {device}", + current=current_stage, + total=total_stages, + ) diff --git a/daemon-common/common.py b/daemon-common/common.py index 253a11f2..657e6f7c 100644 --- a/daemon-common/common.py +++ b/daemon-common/common.py @@ -27,6 +27,7 @@ import signal from json import loads from re import match as re_match from re import split as re_split +from re import sub as re_sub from distutils.util import strtobool from threading import Thread from shlex import split as shlex_split @@ -869,3 +870,75 @@ def sortInterfaceNames(interface_names): return [atoi(c) for c in re_split(r"(\d+)", text)] return sorted(interface_names, key=natural_keys) + + +# +# Parse a "detect" device into a real block device name +# +def get_detect_device(detect_string): + """ + Parses a "detect:" string into a normalized block device path using lsscsi. + + A detect string is formatted "detect:::", where + NAME is some unique identifier in lsscsi, SIZE is a human-readable + size value to within +/- 3% of the real size of the device, and + ID is the Nth (0-indexed) matching entry of that NAME and SIZE. + """ + _, name, size, idd = detect_string.split(":") + if _ != "detect": + return None + + retcode, stdout, stderr = run_os_command("lsscsi -s") + if retcode: + print(f"Failed to run lsscsi: {stderr}") + return None + + # Get valid lines + lsscsi_lines_raw = stdout.split("\n") + lsscsi_lines = list() + for line in lsscsi_lines_raw: + if not line: + continue + split_line = line.split() + if split_line[1] != "disk": + continue + lsscsi_lines.append(line) + + # Handle size determination (+/- 3%) + lsscsi_sizes = set() + for line in lsscsi_lines: + lsscsi_sizes.add(split_line[-1]) + for l_size in lsscsi_sizes: + b_size = float(re_sub(r"\D.", "", size)) + t_size = float(re_sub(r"\D.", "", l_size)) + + plusthreepct = t_size * 1.03 + minusthreepct = t_size * 0.97 + + if b_size > minusthreepct and b_size < plusthreepct: + size = l_size + break + + blockdev = None + matches = list() + for idx, line in enumerate(lsscsi_lines): + # Skip non-disk entries + if line.split()[1] != "disk": + continue + # Skip if name is not contained in the line (case-insensitive) + if name.lower() not in line.lower(): + continue + # Skip if the size does not match + if size != line.split()[-1]: + continue + # Get our blockdev and append to the list + matches.append(line.split()[-2]) + + blockdev = None + # Find the blockdev at index {idd} + for idx, _blockdev in enumerate(matches): + if int(idx) == int(idd): + blockdev = _blockdev + break + + return blockdev diff --git a/daemon-common/vm.py b/daemon-common/vm.py index 15b7d4cd..b74d834c 100644 --- a/daemon-common/vm.py +++ b/daemon-common/vm.py @@ -1782,11 +1782,14 @@ def vm_worker_helper_getdom(tuuid): def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False): + current_stage = 0 + total_stages = 3 + start( celery, f"Flushing RBD locks for VM {domain} [forced={force_unlock}]", - current=1, - total=4, + current=current_stage, + total=total_stages, ) dom_uuid = getDomainUUID(zkhandler, domain) @@ -1803,7 +1806,13 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False): # Get the list of RBD images rbd_list = zkhandler.read(("domain.storage.volumes", dom_uuid)).split(",") - update(celery, f"Obtaining RBD locks for VM {domain}", current=2, total=4) + current_stage += 1 + update( + celery, + f"Obtaining RBD locks for VM {domain}", + current=current_stage, + total=total_stages, + ) # Prepare a list of locks rbd_locks = list() @@ -1825,14 +1834,23 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False): try: lock_list = jloads(lock_list_stdout) except Exception as e: - fail(celery, f"Failed to parse JSON lock list for volume {rbd}: {e}") + fail( + celery, + f"Failed to parse JSON lock list for volume {rbd}: {e}", + ) return if lock_list: for lock in lock_list: rbd_locks.append({"rbd": rbd, "lock": lock}) - update(celery, f"Freeing RBD locks for VM {domain}", current=3, total=4) + current_stage += 1 + update( + celery, + f"Freeing RBD locks for VM {domain}", + current=current_stage, + total=total_stages, + ) for _lock in rbd_locks: rbd = _lock["rbd"] @@ -1850,18 +1868,28 @@ def vm_worker_flush_locks(zkhandler, celery, domain, force_unlock=False): fail( celery, f"Failed to free RBD lock {lock['id']} on volume {rbd}: {lock_remove_stderr}", - current=3, - total=4, ) return + current_stage += 1 return finish( - celery, f"Successfully flushed RBD locks for VM {domain}", current=4, total=4 + celery, + f"Successfully flushed RBD locks for VM {domain}", + current=4, + total=4, ) def vm_worker_attach_device(zkhandler, celery, domain, xml_spec): - start(celery, f"Hot-attaching XML device to VM {domain}") + current_stage = 0 + total_stages = 1 + + start( + celery, + f"Hot-attaching XML device to VM {domain}", + current=current_stage, + total=total_stages, + ) dom_uuid = getDomainUUID(zkhandler, domain) @@ -1875,7 +1903,10 @@ def vm_worker_attach_device(zkhandler, celery, domain, xml_spec): dom = vm_worker_helper_getdom(dom_uuid) if dom is None: - fail(celery, f"Failed to find Libvirt object for VM {domain}") + fail( + celery, + f"Failed to find Libvirt object for VM {domain}", + ) return try: @@ -1884,11 +1915,25 @@ def vm_worker_attach_device(zkhandler, celery, domain, xml_spec): fail(celery, e) return - return finish(celery, f"Successfully hot-attached XML device to VM {domain}") + current_stage += 1 + return finish( + celery, + f"Successfully hot-attached XML device to VM {domain}", + current=current_stage, + total=total_stages, + ) def vm_worker_detach_device(zkhandler, celery, domain, xml_spec): - start(celery, f"Hot-detaching XML device from VM {domain}") + current_stage = 0 + total_stages = 1 + + start( + celery, + f"Hot-detaching XML device from VM {domain}", + current=current_stage, + total_stages=total_stages, + ) dom_uuid = getDomainUUID(zkhandler, domain) @@ -1902,7 +1947,10 @@ def vm_worker_detach_device(zkhandler, celery, domain, xml_spec): dom = vm_worker_helper_getdom(dom_uuid) if dom is None: - fail(celery, f"Failed to find Libvirt object for VM {domain}") + fail( + celery, + f"Failed to find Libvirt object for VM {domain}", + ) return try: @@ -1911,4 +1959,10 @@ def vm_worker_detach_device(zkhandler, celery, domain, xml_spec): fail(celery, e) return - return finish(celery, f"Successfully hot-detached XML device from VM {domain}") + current_stage += 1 + return finish( + celery, + f"Successfully hot-detached XML device from VM {domain}", + current=current_stage, + total_stages=total_stages, + ) diff --git a/node-daemon/pvcnoded/objects/CephInstance.py b/node-daemon/pvcnoded/objects/CephInstance.py index 3a873d41..234ff63a 100644 --- a/node-daemon/pvcnoded/objects/CephInstance.py +++ b/node-daemon/pvcnoded/objects/CephInstance.py @@ -19,86 +19,9 @@ # ############################################################################### -import time import json import daemon_lib.common as common -from daemon_lib.ceph import format_bytes_fromhuman, get_list_osd - -from distutils.util import strtobool -from re import search, match, sub -from os import path -from uuid import uuid4 -from json import loads as jloads - - -def get_detect_device(detect_string): - """ - Parses a "detect:" string into a normalized block device path using lsscsi. - - A detect string is formatted "detect:::", where - NAME is some unique identifier in lsscsi, SIZE is a human-readable - size value to within +/- 3% of the real size of the device, and - ID is the Nth (0-indexed) matching entry of that NAME and SIZE. - """ - _, name, size, idd = detect_string.split(":") - if _ != "detect": - return None - - retcode, stdout, stderr = common.run_os_command("lsscsi -s") - if retcode: - print(f"Failed to run lsscsi: {stderr}") - return None - - # Get valid lines - lsscsi_lines_raw = stdout.split("\n") - lsscsi_lines = list() - for line in lsscsi_lines_raw: - if not line: - continue - split_line = line.split() - if split_line[1] != "disk": - continue - lsscsi_lines.append(line) - - # Handle size determination (+/- 3%) - lsscsi_sizes = set() - for line in lsscsi_lines: - lsscsi_sizes.add(split_line[-1]) - for l_size in lsscsi_sizes: - b_size = float(sub(r"\D.", "", size)) - t_size = float(sub(r"\D.", "", l_size)) - - plusthreepct = t_size * 1.03 - minusthreepct = t_size * 0.97 - - if b_size > minusthreepct and b_size < plusthreepct: - size = l_size - break - - blockdev = None - matches = list() - for idx, line in enumerate(lsscsi_lines): - # Skip non-disk entries - if line.split()[1] != "disk": - continue - # Skip if name is not contained in the line (case-insensitive) - if name.lower() not in line.lower(): - continue - # Skip if the size does not match - if size != line.split()[-1]: - continue - # Get our blockdev and append to the list - matches.append(line.split()[-2]) - - blockdev = None - # Find the blockdev at index {idd} - for idx, _blockdev in enumerate(matches): - if int(idx) == int(idd): - blockdev = _blockdev - break - - return blockdev class CephOSDInstance(object): @@ -262,1045 +185,6 @@ class CephOSDInstance(object): self.vg = osd_vg self.lv = osd_lv - @staticmethod - def find_osds_from_block(logger, device): - # Try to query the passed block device directly - logger.out(f"Querying for OSD(s) on disk {device}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm list --format json {device}" - ) - if retcode: - found_osds = [] - else: - found_osds = jloads(stdout) - - return found_osds - - @staticmethod - def add_osd( - zkhandler, - logger, - node, - device, - weight, - ext_db_ratio=None, - ext_db_size=None, - split_count=None, - ): - # Handle a detect device if that is passed - if match(r"detect:", device): - ddevice = get_detect_device(device) - if ddevice is None: - logger.out( - f"Failed to determine block device from detect string {device}", - state="e", - ) - return False - else: - logger.out( - f"Determined block device {ddevice} from detect string {device}", - state="i", - ) - device = ddevice - - if ext_db_size is not None and ext_db_ratio is not None: - logger.out( - "Invalid configuration: both an ext_db_size and ext_db_ratio were specified", - state="e", - ) - return False - - # Check if device has a partition table; it's not valid if it does - retcode, _, _ = common.run_os_command(f"sfdisk --dump {device}") - if retcode < 1: - logger.out( - f"Device {device} has a partition table and is unsuitable for an OSD", - state="e", - ) - return False - - if ext_db_size is not None or ext_db_ratio is not None: - ext_db_flag = True - else: - ext_db_flag = False - - if split_count is not None: - split_flag = f"--osds-per-device {split_count}" - is_split = True - logger.out( - f"Creating {split_count} new OSD disks on block device {device}", - state="i", - ) - else: - split_flag = "" - is_split = False - logger.out(f"Creating 1 new OSD disk on block device {device}", state="i") - - if "nvme" in device: - class_flag = "--crush-device-class nvme" - else: - class_flag = "--crush-device-class ssd" - - # 1. Zap the block device - logger.out(f"Zapping disk {device}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm zap --destroy {device}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm zap", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # 2. Prepare the OSD(s) - logger.out(f"Preparing OSD(s) on disk {device}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm batch --yes --prepare --bluestore {split_flag} {class_flag} {device}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm batch", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - logger.out( - f"Successfully prepared {split_count} OSDs on disk {device}", state="o" - ) - - # 3. Get the list of created OSDs on the device (initial pass) - logger.out(f"Querying OSD(s) on disk {device}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm list --format json {device}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm list", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - created_osds = jloads(stdout) - - # 4. Prepare the WAL and DB devices - if ext_db_flag: - for created_osd in created_osds: - # 4a. Get the OSD FSID and ID from the details - osd_details = created_osds[created_osd][0] - osd_fsid = osd_details["tags"]["ceph.osd_fsid"] - osd_id = osd_details["tags"]["ceph.osd_id"] - osd_lv = osd_details["lv_path"] - logger.out(f"Creating Bluestore DB volume for OSD {osd_id}", state="i") - - # 4b. Prepare the logical volume if ext_db_flag - if ext_db_ratio is not None: - _, osd_size_bytes, _ = common.run_os_command( - f"blockdev --getsize64 {osd_lv}" - ) - osd_size_bytes = int(osd_size_bytes) - osd_db_size_bytes = int(osd_size_bytes * ext_db_ratio) - if ext_db_size is not None: - osd_db_size_bytes = format_bytes_fromhuman(ext_db_size) - - result = CephOSDInstance.create_osd_db_lv( - zkhandler, logger, osd_id, osd_db_size_bytes - ) - if not result: - raise Exception - db_device = f"osd-db/osd-{osd_id}" - - # 4c. Attach the new DB device to the OSD - logger.out(f"Attaching Bluestore DB volume to OSD {osd_id}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm new-db --osd-id {osd_id} --osd-fsid {osd_fsid} --target {db_device}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm new-db", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # 4d. Get the list of created OSDs on the device (final pass) - logger.out(f"Requerying OSD(s) on disk {device}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm list --format json {device}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm list", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - created_osds = jloads(stdout) - - # 5. Activate the OSDs - logger.out(f"Activating OSD(s) on disk {device}", state="i") - for created_osd in created_osds: - # 5a. Get the OSD FSID and ID from the details - osd_details = created_osds[created_osd][0] - osd_clusterfsid = osd_details["tags"]["ceph.cluster_fsid"] - osd_fsid = osd_details["tags"]["ceph.osd_fsid"] - osd_id = osd_details["tags"]["ceph.osd_id"] - db_device = osd_details["tags"].get("ceph.db_device", "") - osd_vg = osd_details["vg_name"] - osd_lv = osd_details["lv_name"] - - # 5b. Add it to the crush map - logger.out(f"Adding OSD {osd_id} to CRUSH map", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph osd crush add osd.{osd_id} {weight} root=default host={node}" - ) - if retcode: - logger.out("Failed: ceph osd crush add", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # 5c. Activate the OSD - logger.out(f"Activating OSD {osd_id}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm activate --bluestore {osd_id} {osd_fsid}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm activate", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # 5d. Wait 1 second for it to activate - time.sleep(1) - - # 5e. Verify it started - retcode, stdout, stderr = common.run_os_command( - f"systemctl status ceph-osd@{osd_id}" - ) - if retcode: - logger.out(f"Failed: OSD {osd_id} unit is not active", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # 5f. Add the new OSD to PVC - logger.out(f"Adding OSD {osd_id} to PVC", state="i") - zkhandler.write( - [ - (("osd", osd_id), ""), - (("osd.node", osd_id), node), - (("osd.device", osd_id), device), - (("osd.db_device", osd_id), db_device), - (("osd.fsid", osd_id), ""), - (("osd.ofsid", osd_id), osd_fsid), - (("osd.cfsid", osd_id), osd_clusterfsid), - (("osd.lvm", osd_id), ""), - (("osd.vg", osd_id), osd_vg), - (("osd.lv", osd_id), osd_lv), - (("osd.is_split", osd_id), is_split), - ( - ("osd.stats", osd_id), - '{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}', - ), - ] - ) - - # 6. Log it - logger.out( - f"Successfully created {split_count} new OSD(s) {','.join(created_osds.keys())} on disk {device}", - state="o", - ) - return True - - @staticmethod - def replace_osd( - zkhandler, - logger, - node, - osd_id, - new_device, - old_device=None, - weight=None, - ext_db_ratio=None, - ext_db_size=None, - ): - # Handle a detect device if that is passed - if match(r"detect:", new_device): - ddevice = get_detect_device(new_device) - if ddevice is None: - logger.out( - f"Failed to determine block device from detect string {new_device}", - state="e", - ) - return False - else: - logger.out( - f"Determined block device {ddevice} from detect string {new_device}", - state="i", - ) - new_device = ddevice - - # Check if device has a partition table; it's not valid if it does - retcode, _, _ = common.run_os_command(f"sfdisk --dump {new_device}") - if retcode < 1: - logger.out( - f"Device {new_device} has a partition table and is unsuitable for an OSD", - state="e", - ) - return False - - # Phase 1: Try to determine what we can about the old device - real_old_device = None - osd_block = zkhandler.read(("osd.device", osd_id)) - - # Determine information from a passed old_device - if old_device is not None: - found_osds = CephOSDInstance.find_osds_from_block(logger, old_device) - if found_osds and osd_id in found_osds.keys(): - real_old_device = old_device - else: - logger.out( - f"No OSD(s) found on disk {old_device}; falling back to PVC detection", - state="w", - ) - - # Try to get an old_device from our PVC information - if real_old_device is None: - found_osds = CephOSDInstance.find_osds_from_block(logger, osd_block) - - if osd_id in found_osds.keys(): - real_old_device = osd_block - - if real_old_device is None: - skip_zap = True - logger.out( - "No valid old block device found for OSD(s); skipping zap", state="w" - ) - else: - skip_zap = False - - # Try to determine if any other OSDs shared a block device with this OSD - _, osd_list = get_list_osd(zkhandler, None) - all_osds_on_block = [ - o for o in osd_list if o["node"] == node and o["device"] == osd_block - ] - - # Determine the weight of the OSD(s) - if weight is None: - weight = all_osds_on_block[0]["stats"]["weight"] - - # Take down the OSD(s), but keep it's CRUSH map details and IDs - try: - for osd in all_osds_on_block: - osd_id = osd["id"] - - # 1. Set the OSD down and out so it will flush - logger.out(f"Setting down OSD {osd_id}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph osd down {osd_id}" - ) - if retcode: - logger.out("Failed: ceph osd down", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - logger.out(f"Setting out OSD {osd_id}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph osd out {osd_id}" - ) - if retcode: - logger.out("Failed: ceph osd out", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete) - logger.out(f"Waiting for OSD {osd_id} to be safe to remove", state="i") - tcount = 0 - while True: - retcode, stdout, stderr = common.run_os_command( - f"ceph osd safe-to-destroy osd.{osd_id}" - ) - if int(retcode) in [0, 11]: - break - else: - common.run_os_command(f"ceph osd down {osd_id}") - common.run_os_command(f"ceph osd out {osd_id}") - time.sleep(1) - tcount += 1 - if tcount > 60: - logger.out( - f"Timed out (60s) waiting for OSD {osd_id} to be safe to remove; proceeding", - state="w", - ) - break - - # 3. Stop the OSD process and wait for it to be terminated - logger.out(f"Stopping OSD {osd_id}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"systemctl stop ceph-osd@{osd_id}" - ) - if retcode: - logger.out("Failed: systemctl stop", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - time.sleep(5) - - # 4. Destroy the OSD - logger.out(f"Destroying OSD {osd_id}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph osd destroy {osd_id} --yes-i-really-mean-it" - ) - if retcode: - logger.out("Failed: ceph osd destroy", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - - if not skip_zap: - logger.out(f"Zapping disk {real_old_device}", state="i") - # 5. Zap the old disk - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm zap --destroy {real_old_device}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm zap", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # 6. Prepare the volume group on the new device - logger.out(f"Preparing LVM volume group on disk {new_device}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm zap --destroy {new_device}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm zap", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - retcode, stdout, stderr = common.run_os_command(f"pvcreate {new_device}") - if retcode: - logger.out("Failed: pvcreate", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - vg_uuid = str(uuid4()) - retcode, stdout, stderr = common.run_os_command( - f"vgcreate ceph-{vg_uuid} {new_device}" - ) - if retcode: - logger.out("Failed: vgcreate", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # Determine how many OSDs we want on the new device - osds_count = len(all_osds_on_block) - - # Determine the size of the new device - _, new_device_size_bytes, _ = common.run_os_command( - f"blockdev --getsize64 {new_device}" - ) - - # Calculate the size of each OSD (in MB) based on the default 4M extent size - new_osd_size_mb = ( - int(int(int(new_device_size_bytes) / osds_count) / 1024 / 1024 / 4) * 4 - ) - - # Calculate the size, if applicable, of the OSD block if we were passed a ratio - if ext_db_ratio is not None: - osd_new_db_size_mb = int( - int(int(new_osd_size_mb * ext_db_ratio) / 4) * 4 - ) - elif ext_db_size is not None: - osd_new_db_size_mb = int( - int(int(format_bytes_fromhuman(ext_db_size)) / 1024 / 1024 / 4) * 4 - ) - else: - _, new_device_size_bytes, _ = common.run_os_command( - f"blockdev --getsize64 {all_osds_on_block[0]['db_device']}" - ) - osd_new_db_size_mb = int( - int(int(new_device_size_bytes) / 1024 / 1024 / 4) * 4 - ) - - for osd in all_osds_on_block: - osd_id = osd["id"] - osd_fsid = osd["fsid"] - - logger.out( - f"Preparing LVM logical volume on disk {new_device} for OSD {osd_id}", - state="i", - ) - retcode, stdout, stderr = common.run_os_command( - f"lvcreate -L {new_osd_size_mb}M -n osd-block-{osd_fsid} ceph-{vg_uuid}" - ) - if retcode: - logger.out("Failed: lvcreate", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - logger.out(f"Preparing OSD {osd_id} on disk {new_device}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm prepare --bluestore --osd-id {osd_id} --osd-fsid {osd_fsid} --data /dev/ceph-{vg_uuid}/osd-block-{osd_fsid}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm prepare", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - for osd in all_osds_on_block: - osd_id = osd["id"] - osd_fsid = osd["fsid"] - - if osd["db_device"]: - db_device = f"osd-db/osd-{osd_id}" - - logger.out( - f"Destroying old Bluestore DB volume for OSD {osd_id}", - state="i", - ) - retcode, stdout, stderr = common.run_os_command( - f"lvremove --force {db_device}" - ) - - logger.out( - f"Creating new Bluestore DB volume for OSD {osd_id}", state="i" - ) - retcode, stdout, stderr = common.run_os_command( - f"lvcreate -L {osd_new_db_size_mb}M -n osd-{osd_id} --yes osd-db" - ) - if retcode: - logger.out("Failed: lvcreate", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - logger.out( - f"Attaching old Bluestore DB volume to OSD {osd_id}", state="i" - ) - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm new-db --osd-id {osd_id} --osd-fsid {osd_fsid} --target {db_device}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm new-db", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - logger.out(f"Adding OSD {osd_id} to CRUSH map", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph osd crush add osd.{osd_id} {weight} root=default host={node}" - ) - if retcode: - logger.out("Failed: ceph osd crush add", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - logger.out(f"Activating OSD {osd_id}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm activate --bluestore {osd_id} {osd_fsid}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm activate", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # Wait 1 second for it to activate - time.sleep(1) - - # Verify it started - retcode, stdout, stderr = common.run_os_command( - f"systemctl status ceph-osd@{osd_id}" - ) - if retcode: - logger.out(f"Failed: OSD {osd_id} unit is not active", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - logger.out(f"Updating OSD {osd_id} details in PVC", state="i") - zkhandler.write( - [ - (("osd.device", osd_id), new_device), - (("osd.vg", osd_id), f"ceph-{vg_uuid}"), - (("osd.lv", osd_id), f"osd-block-{osd_fsid}"), - ] - ) - - # Log it - logger.out( - f"Successfully replaced OSDs {','.join([o['id'] for o in all_osds_on_block])} on new disk {new_device}", - state="o", - ) - return True - except Exception as e: - # Log it - logger.out( - f"Failed to replace OSD(s) on new disk {new_device}: {e}", state="e" - ) - return False - - @staticmethod - def refresh_osd(zkhandler, logger, node, osd_id, device, ext_db_flag): - # Handle a detect device if that is passed - if match(r"detect:", device): - ddevice = get_detect_device(device) - if ddevice is None: - logger.out( - f"Failed to determine block device from detect string {device}", - state="e", - ) - return False - else: - logger.out( - f"Determined block device {ddevice} from detect string {device}", - state="i", - ) - device = ddevice - - retcode, stdout, stderr = common.run_os_command("ceph osd ls") - osd_list = stdout.split("\n") - if osd_id not in osd_list: - logger.out(f"Could not find OSD {osd_id} in the cluster", state="e") - return False - - found_osds = CephOSDInstance.find_osds_from_block(logger, device) - if osd_id not in found_osds.keys(): - logger.out(f"Could not find OSD {osd_id} on device {device}", state="e") - return False - - logger.out( - f"Refreshing OSD {osd_id} disk on block device {device}", - state="i", - ) - try: - for osd in found_osds: - found_osd = found_osds[osd] - lv_device = found_osd["lv_path"] - - _, osd_pvc_information = get_list_osd(zkhandler, osd_id) - osd_information = osd_pvc_information[0] - - logger.out(f"Querying OSD on device {lv_device}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm list --format json {lv_device}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm list", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - osd_detail = jloads(stdout)[osd_id][0] - - osd_fsid = osd_detail["tags"]["ceph.osd_fsid"] - if osd_fsid != osd_information["fsid"]: - logger.out( - f"OSD {osd_id} FSID {osd_information['fsid']} does not match volume FSID {osd_fsid}; OSD cannot be imported", - state="e", - ) - - dev_flags = f"--data {lv_device}" - - if ext_db_flag: - db_device = "osd-db/osd-{osd_id}" - dev_flags += f" --block.db {db_device}" - - if not path.exists(f"/dev/{db_device}"): - logger.out( - f"OSD Bluestore DB volume {db_device} does not exist; OSD cannot be imported", - state="e", - ) - return - else: - db_device = "" - - logger.out(f"Activating OSD {osd_id}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm activate --bluestore {osd_id} {osd_fsid}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm activate", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # Wait 1 second for it to activate - time.sleep(1) - - # Verify it started - retcode, stdout, stderr = common.run_os_command( - f"systemctl status ceph-osd@{osd_id}" - ) - if retcode: - logger.out(f"Failed: OSD {osd_id} unit is not active", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - logger.out(f"Updating OSD {osd_id} details in PVC", state="i") - zkhandler.write( - [ - (("osd.device", osd_id), device), - (("osd.vg", osd_id), osd_detail["vg_name"]), - (("osd.lv", osd_id), osd_detail["lv_name"]), - ] - ) - - logger.out( - f"Successfully reimported OSD {osd_id} on {device}", state="o" - ) - - return True - except Exception as e: - # Log it - logger.out(f"Failed to refresh OSD {osd_id} disk: {e}", state="e") - return False - - @staticmethod - def remove_osd( - zkhandler, logger, node, osd_id, force_flag=False, skip_zap_flag=False - ): - logger.out(f"Removing OSD {osd_id}", state="i") - try: - # Verify the OSD is present - retcode, stdout, stderr = common.run_os_command("ceph osd ls") - osd_list = stdout.split("\n") - if osd_id not in osd_list: - logger.out(f"Could not find OSD {osd_id} in the cluster", state="e") - if force_flag: - logger.out("Ignoring error due to force flag", state="i") - else: - return True - - # 1. Set the OSD down and out so it will flush - logger.out(f"Setting down OSD {osd_id}", state="i") - retcode, stdout, stderr = common.run_os_command(f"ceph osd down {osd_id}") - if retcode: - logger.out("Failed: ceph osd down", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - if force_flag: - logger.out("Ignoring error due to force flag", state="i") - else: - raise Exception - - logger.out(f"Setting out OSD {osd_id}", state="i") - retcode, stdout, stderr = common.run_os_command(f"ceph osd out {osd_id}") - if retcode: - logger.out("Failed: ceph osd out", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - if force_flag: - logger.out("Ignoring error due to force flag", state="i") - else: - raise Exception - - # 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete) - if not force_flag: - logger.out(f"Waiting for OSD {osd_id} to be safe to remove", state="i") - tcount = 0 - while True: - retcode, stdout, stderr = common.run_os_command( - f"ceph osd safe-to-destroy osd.{osd_id}" - ) - if int(retcode) in [0, 11]: - break - else: - common.run_os_command(f"ceph osd down {osd_id}") - common.run_os_command(f"ceph osd out {osd_id}") - time.sleep(1) - tcount += 1 - if tcount > 60: - logger.out( - f"Timed out (60s) waiting for OSD {osd_id} to be safe to remove; proceeding", - state="w", - ) - break - - # 3. Stop the OSD process and wait for it to be terminated - logger.out(f"Stopping OSD {osd_id}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"systemctl stop ceph-osd@{osd_id}" - ) - if retcode: - logger.out("Failed: systemctl stop", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - if force_flag: - logger.out("Ignoring error due to force flag", state="i") - else: - raise Exception - time.sleep(5) - - # 4. Delete OSD from ZK - data_device = zkhandler.read(("osd.device", osd_id)) - if zkhandler.exists(("osd.db_device", osd_id)): - db_device = zkhandler.read(("osd.db_device", osd_id)) - else: - db_device = None - - logger.out(f"Deleting OSD {osd_id} from PVC", state="i") - zkhandler.delete(("osd", osd_id), recursive=True) - - # 5a. Destroy the OSD from Ceph - logger.out(f"Destroying OSD {osd_id}", state="i") - retcode, stdout, stderr = common.run_os_command( - f"ceph osd destroy {osd_id} --yes-i-really-mean-it" - ) - if retcode: - logger.out("Failed: ceph osd destroy", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - time.sleep(2) - - # 5b. Purge the OSD from Ceph - logger.out(f"Purging OSD {osd_id}", state="i") - if force_flag: - force_arg = "--force" - else: - force_arg = "" - - # Remove the OSD from the CRUSH map - retcode, stdout, stderr = common.run_os_command( - f"ceph osd crush rm osd.{osd_id}" - ) - if retcode: - logger.out("Failed: ceph osd crush rm", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - if force_flag: - logger.out("Ignoring error due to force flag", state="i") - else: - raise Exception - # Purge the OSD - retcode, stdout, stderr = common.run_os_command( - f"ceph osd purge {osd_id} {force_arg} --yes-i-really-mean-it" - ) - if retcode: - logger.out("Failed: ceph osd purge", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - if force_flag: - logger.out("Ignoring error due to force flag", state="i") - else: - raise Exception - - # 6. Remove the DB device - if db_device is not None: - logger.out( - f'Removing OSD DB logical volume "{db_device}"', - state="i", - ) - retcode, stdout, stderr = common.run_os_command( - f"lvremove --yes --force {db_device}" - ) - - if not skip_zap_flag: - # 7. Determine the block devices - logger.out( - f"Getting disk info for OSD {osd_id} device {data_device}", - state="i", - ) - found_osds = CephOSDInstance.find_osds_from_block(logger, data_device) - if osd_id in found_osds.keys(): - # Try to determine if any other OSDs shared a block device with this OSD - _, osd_list = get_list_osd(zkhandler, None) - all_osds_on_block = [ - o - for o in osd_list - if o["node"] == node and o["device"] == data_device - ] - - if len(all_osds_on_block) < 1: - logger.out( - f"Found no peer split OSDs on {data_device}; zapping disk", - state="i", - ) - retcode, stdout, stderr = common.run_os_command( - f"ceph-volume lvm zap --destroy {data_device}" - ) - if retcode: - logger.out("Failed: ceph-volume lvm zap", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - else: - logger.out( - f"Found {len(all_osds_on_block)} OSD(s) still remaining on {data_device}; skipping zap", - state="w", - ) - else: - logger.out( - f"Could not find OSD {osd_id} on device {data_device}; skipping zap", - state="w", - ) - - # Log it - logger.out(f"Successfully removed OSD {osd_id}", state="o") - return True - except Exception as e: - # Log it - logger.out(f"Failed to remove OSD {osd_id}: {e}", state="e") - return False - - @staticmethod - def add_db_vg(zkhandler, logger, device): - # Check if an existsing volume group exists - retcode, stdout, stderr = common.run_os_command("vgdisplay osd-db") - if retcode != 5: - logger.out('Ceph OSD database VG "osd-db" already exists', state="e") - return False - - # Handle a detect device if that is passed - if match(r"detect:", device): - ddevice = get_detect_device(device) - if ddevice is None: - logger.out( - f"Failed to determine block device from detect string {device}", - state="e", - ) - return False - else: - logger.out( - f"Determined block device {ddevice} from detect string {device}", - state="i", - ) - device = ddevice - - logger.out( - "Creating new OSD database volume group on block device {}".format(device), - state="i", - ) - try: - # 1. Create an empty partition table - logger.out( - "Creating partitions on block device {}".format(device), state="i" - ) - retcode, stdout, stderr = common.run_os_command( - "sgdisk --clear {}".format(device) - ) - if retcode: - logger.out("Failed: sgdisk create partition table", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - retcode, stdout, stderr = common.run_os_command( - "sgdisk --new 1:: --typecode 1:8e00 {}".format(device) - ) - if retcode: - logger.out("Failed: sgdisk create pv partition", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # Handle the partition ID portion - if search(r"by-path", device) or search(r"by-id", device): - # /dev/disk/by-path/pci-0000:03:00.0-scsi-0:1:0:0 -> pci-0000:03:00.0-scsi-0:1:0:0-part1 - partition = "{}-part1".format(device) - elif search(r"nvme", device): - # /dev/nvme0n1 -> nvme0n1p1 - partition = "{}p1".format(device) - else: - # /dev/sda -> sda1 - # No other '/dev/disk/by-*' types are valid for raw block devices anyways - partition = "{}1".format(device) - - # 2. Create the PV - logger.out("Creating PV on block device {}".format(partition), state="i") - retcode, stdout, stderr = common.run_os_command( - "pvcreate --force {}".format(partition) - ) - if retcode: - logger.out("Failed: pv creation", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # 2. Create the VG (named 'osd-db') - logger.out( - 'Creating VG "osd-db" on block device {}'.format(partition), state="i" - ) - retcode, stdout, stderr = common.run_os_command( - "vgcreate --force osd-db {}".format(partition) - ) - if retcode: - logger.out("Failed: vg creation", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # Log it - logger.out( - "Created new OSD database volume group on block device {}".format( - device - ), - state="o", - ) - return True - except Exception as e: - # Log it - logger.out( - "Failed to create OSD database volume group: {}".format(e), state="e" - ) - return False - - @staticmethod - def create_osd_db_lv(zkhandler, logger, osd_id, osd_db_size_bytes): - logger.out( - "Creating new OSD database logical volume for OSD ID {}".format(osd_id), - state="i", - ) - try: - # 0. Check if an existsing logical volume exists - retcode, stdout, stderr = common.run_os_command( - "lvdisplay osd-db/osd{}".format(osd_id) - ) - if retcode != 5: - logger.out( - 'Ceph OSD database LV "osd-db/osd{}" already exists'.format(osd_id), - state="e", - ) - return False - - # 1. Determine LV sizing - osd_db_size_m = int(osd_db_size_bytes / 1024 / 1024) - - # 2. Create the LV - logger.out( - f'Creating DB LV "osd-db/osd-{osd_id}" of size {osd_db_size_m}M', - state="i", - ) - retcode, stdout, stderr = common.run_os_command( - "lvcreate --yes --name osd-{} --size {} osd-db".format( - osd_id, osd_db_size_m - ) - ) - if retcode: - logger.out("Failed: db lv creation", state="e") - logger.out(stdout, state="d") - logger.out(stderr, state="d") - raise Exception - - # Log it - logger.out( - 'Created new OSD database logical volume "osd-db/osd-{}"'.format( - osd_id - ), - state="o", - ) - return True - except Exception as e: - # Log it - logger.out( - "Failed to create OSD database logical volume: {}".format(e), state="e" - ) - return False - class CephPoolInstance(object): def __init__(self, zkhandler, logger, this_node, name): @@ -1400,169 +284,3 @@ class CephSnapshotInstance(object): if data and data != self.stats: self.stats = json.loads(data) - - -# Primary command function -# This command pipe is only used for OSD adds and removes -def ceph_command(zkhandler, logger, this_node, data, d_osd): - # Get the command and args; the * + join ensures arguments with spaces (e.g. detect strings) are recombined right - command, *args = data.split() - args = " ".join(args) - - # Adding a new OSD - if command == "osd_add": - ( - node, - device, - weight, - ext_db_ratio, - ext_db_size, - split_count, - ) = args.split(",") - try: - ext_db_ratio = float(ext_db_ratio) - except Exception: - ext_db_ratio = None - try: - split_count = int(split_count) - except Exception: - split_count = None - - if node == this_node.name: - # Lock the command queue - zk_lock = zkhandler.writelock("base.cmd.ceph") - with zk_lock: - # Add the OSD - result = CephOSDInstance.add_osd( - zkhandler, - logger, - node, - device, - weight, - ext_db_ratio, - ext_db_size, - split_count, - ) - # Command succeeded - if result: - # Update the command queue - zkhandler.write([("base.cmd.ceph", "success-{}".format(data))]) - # Command failed - else: - # Update the command queue - zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))]) - # Wait 1 seconds before we free the lock, to ensure the client hits the lock - time.sleep(1) - - # Replacing an OSD - if command == "osd_replace": - ( - node, - osd_id, - new_device, - old_device, - weight, - ext_db_ratio, - ext_db_size, - ) = args.split(",") - old_device = None if old_device == "None" else old_device - weight = None if weight == "None" else weight - ext_db_ratio = None if ext_db_ratio == "None" else ext_db_ratio - ext_db_size = None if ext_db_size == "None" else ext_db_size - if node == this_node.name: - # Lock the command queue - zk_lock = zkhandler.writelock("base.cmd.ceph") - with zk_lock: - # Add the OSD - result = CephOSDInstance.replace_osd( - zkhandler, - logger, - node, - osd_id, - new_device, - old_device, - weight, - ext_db_ratio, - ext_db_size, - ) - # Command succeeded - if result: - # Update the command queue - zkhandler.write([("base.cmd.ceph", "success-{}".format(data))]) - # Command failed - else: - # Update the command queue - zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))]) - # Wait 1 seconds before we free the lock, to ensure the client hits the lock - time.sleep(1) - - # Refreshing an OSD - if command == "osd_refresh": - node, osd_id, device, ext_db_flag = args.split(",") - ext_db_flag = bool(strtobool(ext_db_flag)) - if node == this_node.name: - # Lock the command queue - zk_lock = zkhandler.writelock("base.cmd.ceph") - with zk_lock: - # Add the OSD - result = CephOSDInstance.refresh_osd( - zkhandler, logger, node, osd_id, device, ext_db_flag - ) - # Command succeeded - if result: - # Update the command queue - zkhandler.write([("base.cmd.ceph", "success-{}".format(data))]) - # Command failed - else: - # Update the command queue - zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))]) - # Wait 1 seconds before we free the lock, to ensure the client hits the lock - time.sleep(1) - - # Removing an OSD - elif command == "osd_remove": - osd_id, force = args.split(",") - force_flag = bool(strtobool(force)) - - # Verify osd_id is in the list - if osd_id not in d_osd.keys(): - return - - if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: - # Lock the command queue - zk_lock = zkhandler.writelock("base.cmd.ceph") - with zk_lock: - # Remove the OSD - result = CephOSDInstance.remove_osd( - zkhandler, logger, this_node.name, osd_id, force_flag - ) - # Command succeeded - if result: - # Update the command queue - zkhandler.write([("base.cmd.ceph", "success-{}".format(data))]) - # Command failed - else: - # Update the command queue - zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))]) - # Wait 1 seconds before we free the lock, to ensure the client hits the lock - time.sleep(1) - - # Adding a new DB VG - elif command == "db_vg_add": - node, device = args.split(",") - if node == this_node.name: - # Lock the command queue - zk_lock = zkhandler.writelock("base.cmd.ceph") - with zk_lock: - # Add the VG - result = CephOSDInstance.add_db_vg(zkhandler, logger, device) - # Command succeeded - if result: - # Update the command queue - zkhandler.write([("base.cmd.ceph", "success-{}".format(data))]) - # Command failed - else: - # Update the command queue - zkhandler.write([("base.cmd.ceph", "failure={}".format(data))]) - # Wait 1 seconds before we free the lock, to ensure the client hits the lock - time.sleep(1)