From 7a40c7a55bbc7407318d666dbe26b963165eb8eb Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Fri, 6 May 2022 15:31:58 -0400 Subject: [PATCH] Add support for replacing/refreshing OSDs Adds commands to both replace an OSD disk, and refresh (reimport) an existing OSD disk on a new node. This handles the cases where an OSD disk should be replaced (either due to upgrades or failures) or where a node is rebuilt in-place and an existing OSD must be re-imported to it. This should avoid the need to do a full remove/add sequence for either case. Also cleans up some aspects of OSD removal that are identical between methods (e.g. using safe-to-destroy and sleeping after stopping) and fixes a bug if an OSD does not truly exist when the daemon starts up. --- api-daemon/pvcapid/flaskapi.py | 96 ++++ api-daemon/pvcapid/helper.py | 32 ++ client-cli/pvc/cli_lib/ceph.py | 40 ++ client-cli/pvc/pvc.py | 70 +++ daemon-common/ceph.py | 99 +++- docs/manuals/swagger.json | 74 +++ node-daemon/pvcnoded/objects/CephInstance.py | 479 +++++++++++++++++-- 7 files changed, 853 insertions(+), 37 deletions(-) diff --git a/api-daemon/pvcapid/flaskapi.py b/api-daemon/pvcapid/flaskapi.py index c94d8bf1..fbb43db3 100755 --- a/api-daemon/pvcapid/flaskapi.py +++ b/api-daemon/pvcapid/flaskapi.py @@ -4097,6 +4097,102 @@ class API_Storage_Ceph_OSD_Element(Resource): """ return api_helper.ceph_osd_list(osdid) + @RequestParser( + [ + { + "name": "device", + "required": True, + "helptext": "A valid device or detect string must be specified.", + }, + { + "name": "weight", + "required": True, + "helptext": "An OSD weight must be specified.", + }, + { + "name": "yes-i-really-mean-it", + "required": True, + "helptext": "Please confirm that 'yes-i-really-mean-it'.", + }, + ] + ) + @Authenticator + def post(self, osdid, reqargs): + """ + Replace a Ceph OSD in the cluster + Note: This task may take up to 30s to complete and return + --- + tags: + - storage / ceph + parameters: + - in: query + name: device + type: string + required: true + description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") to replace the OSD onto + - in: query + name: weight + type: number + required: true + description: The Ceph CRUSH weight for the replaced OSD + responses: + 200: + description: OK + schema: + type: object + id: Message + 400: + description: Bad request + schema: + type: object + id: Message + """ + return api_helper.ceph_osd_replace( + osdid, + reqargs.get("device", None), + reqargs.get("weight", None), + ) + + @RequestParser( + [ + { + "name": "device", + "required": True, + "helptext": "A valid device or detect string must be specified.", + }, + ] + ) + @Authenticator + def put(self, osdid, reqargs): + """ + Refresh (reimport) a Ceph OSD in the cluster + Note: This task may take up to 30s to complete and return + --- + tags: + - storage / ceph + parameters: + - in: query + name: device + type: string + required: true + description: The block device (e.g. "/dev/sdb", "/dev/disk/by-path/...", etc.) or detect string ("detect:NAME:SIZE:ID") that the OSD should be using + responses: + 200: + description: OK + schema: + type: object + id: Message + 400: + description: Bad request + schema: + type: object + id: Message + """ + return api_helper.ceph_osd_refresh( + osdid, + reqargs.get("device", None), + ) + @RequestParser( [ { diff --git a/api-daemon/pvcapid/helper.py b/api-daemon/pvcapid/helper.py index 0975c331..7d5d21a3 100755 --- a/api-daemon/pvcapid/helper.py +++ b/api-daemon/pvcapid/helper.py @@ -1301,6 +1301,38 @@ def ceph_osd_add(zkhandler, node, device, weight, ext_db_flag=False, ext_db_rati return output, retcode +@ZKConnection(config) +def ceph_osd_replace(zkhandler, osd_id, device, weight): + """ + Replace a Ceph OSD in the PVC Ceph storage cluster. + """ + retflag, retdata = pvc_ceph.replace_osd(zkhandler, osd_id, device, weight) + + if retflag: + retcode = 200 + else: + retcode = 400 + + output = {"message": retdata.replace('"', "'")} + return output, retcode + + +@ZKConnection(config) +def ceph_osd_refresh(zkhandler, osd_id, device): + """ + Refresh (reimport) a Ceph OSD in the PVC Ceph storage cluster. + """ + retflag, retdata = pvc_ceph.refresh_osd(zkhandler, osd_id, device) + + if retflag: + retcode = 200 + else: + retcode = 400 + + output = {"message": retdata.replace('"', "'")} + return output, retcode + + @ZKConnection(config) def ceph_osd_remove(zkhandler, osd_id, force_flag): """ diff --git a/client-cli/pvc/cli_lib/ceph.py b/client-cli/pvc/cli_lib/ceph.py index 2558e0a4..c65ebd8b 100644 --- a/client-cli/pvc/cli_lib/ceph.py +++ b/client-cli/pvc/cli_lib/ceph.py @@ -255,6 +255,46 @@ def ceph_osd_add(config, node, device, weight, ext_db_flag, ext_db_ratio): return retstatus, response.json().get("message", "") +def ceph_osd_replace(config, osdid, device, weight): + """ + Replace an existing Ceph OSD with a new device + + API endpoint: POST /api/v1/storage/ceph/osd/{osdid} + API arguments: device={device}, weight={weight} + API schema: {"message":"{data}"} + """ + params = {"device": device, "weight": weight, "yes-i-really-mean-it": "yes"} + response = call_api(config, "post", f"/storage/ceph/osd/{osdid}", params=params) + + if response.status_code == 200: + retstatus = True + else: + retstatus = False + + return retstatus, response.json().get("message", "") + + +def ceph_osd_refresh(config, osdid, device): + """ + Refresh (reimport) an existing Ceph OSD with device {device} + + API endpoint: PUT /api/v1/storage/ceph/osd/{osdid} + API arguments: device={device} + API schema: {"message":"{data}"} + """ + params = { + "device": device, + } + response = call_api(config, "put", f"/storage/ceph/osd/{osdid}", params=params) + + if response.status_code == 200: + retstatus = True + else: + retstatus = False + + return retstatus, response.json().get("message", "") + + def ceph_osd_remove(config, osdid, force_flag): """ Remove Ceph OSD diff --git a/client-cli/pvc/pvc.py b/client-cli/pvc/pvc.py index 83b447d3..85bf4703 100755 --- a/client-cli/pvc/pvc.py +++ b/client-cli/pvc/pvc.py @@ -3371,6 +3371,74 @@ def ceph_osd_add(node, device, weight, ext_db_flag, ext_db_ratio, confirm_flag): cleanup(retcode, retmsg) +############################################################################### +# pvc storage osd replace +############################################################################### +@click.command(name="replace", short_help="Replace OSD block device.") +@click.argument("osdid") +@click.argument("device") +@click.option( + "-w", + "--weight", + "weight", + default=1.0, + show_default=True, + help="New weight of the OSD within the CRUSH map.", +) +@click.option( + "-y", + "--yes", + "confirm_flag", + is_flag=True, + default=False, + help="Confirm the removal", +) +@cluster_req +def ceph_osd_replace(osdid, device, weight, confirm_flag): + """ + Replace the block device of an existing OSD with ID OSDID with DEVICE. Use this command to replace a failed or smaller OSD block device with a new one. + + DEVICE must be a valid raw block device (e.g. '/dev/sda', '/dev/nvme0n1', '/dev/disk/by-path/...', '/dev/disk/by-id/...') or a "detect" string. Using partitions is not supported. A "detect" string is a string in the form "detect:::". For details, see 'pvc storage osd add --help'. + + The weight of an OSD should reflect the ratio of the OSD to other OSDs in the storage cluster. For details, see 'pvc storage osd add --help'. Note that the current weight must be explicitly specified if it differs from the default. + + Existing IDs, external DB devices, etc. of the OSD will be preserved; data will be lost and rebuilt from the remaining healthy OSDs. + """ + if not confirm_flag and not config["unsafe"]: + try: + click.confirm( + "Replace OSD {} with block device {}".format(osdid, device), + prompt_suffix="? ", + abort=True, + ) + except Exception: + exit(0) + + retcode, retmsg = pvc_ceph.ceph_osd_replace(config, osdid, device, weight) + cleanup(retcode, retmsg) + + +############################################################################### +# pvc storage osd refresh +############################################################################### +@click.command(name="refresh", short_help="Refresh (reimport) OSD device.") +@click.argument("osdid") +@click.argument("device") +@cluster_req +def ceph_osd_refresh(osdid, device): + """ + Refresh (reimport) the block DEVICE of an existing OSD with ID OSDID. Use this command to reimport a working OSD into a rebuilt/replaced node. + + DEVICE must be a valid raw block device (e.g. '/dev/sda', '/dev/nvme0n1', '/dev/disk/by-path/...', '/dev/disk/by-id/...') or a "detect" string. Using partitions is not supported. A "detect" string is a string in the form "detect:::". For details, see 'pvc storage osd add --help'. + + Existing data, IDs, weights, etc. of the OSD will be preserved. + + NOTE: If a device had an external DB device, this is not automatically handled at this time. It is best to remove and re-add the OSD instead. + """ + retcode, retmsg = pvc_ceph.ceph_osd_refresh(config, osdid, device) + cleanup(retcode, retmsg) + + ############################################################################### # pvc storage osd remove ############################################################################### @@ -5872,6 +5940,8 @@ ceph_benchmark.add_command(ceph_benchmark_list) ceph_osd.add_command(ceph_osd_create_db_vg) ceph_osd.add_command(ceph_osd_add) +ceph_osd.add_command(ceph_osd_replace) +ceph_osd.add_command(ceph_osd_refresh) ceph_osd.add_command(ceph_osd_remove) ceph_osd.add_command(ceph_osd_in) ceph_osd.add_command(ceph_osd_out) diff --git a/daemon-common/ceph.py b/daemon-common/ceph.py index 79752bbc..e1a50838 100644 --- a/daemon-common/ceph.py +++ b/daemon-common/ceph.py @@ -236,7 +236,7 @@ def add_osd_db_vg(zkhandler, node, device): return success, message -# OSD addition and removal uses the /cmd/ceph pipe +# OSD actions use the /cmd/ceph pipe # These actions must occur on the specific node they reference def add_osd(zkhandler, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05): # Verify the target node exists @@ -288,6 +288,103 @@ def add_osd(zkhandler, node, device, weight, ext_db_flag=False, ext_db_ratio=0.0 return success, message +def replace_osd(zkhandler, osd_id, new_device, weight): + # Get current OSD information + osd_information = getOSDInformation(zkhandler, osd_id) + node = osd_information["node"] + old_device = osd_information["device"] + ext_db_flag = True if osd_information["db_device"] else False + + # Verify target block device isn't in use + block_osd = verifyOSDBlock(zkhandler, node, new_device) + if block_osd and block_osd != osd_id: + return ( + False, + 'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format( + new_device, node, block_osd + ), + ) + + # Tell the cluster to create a new OSD for the host + replace_osd_string = "osd_replace {},{},{},{},{},{}".format( + node, osd_id, old_device, new_device, weight, ext_db_flag + ) + zkhandler.write([("base.cmd.ceph", replace_osd_string)]) + # Wait 1/2 second for the cluster to get the message and start working + time.sleep(0.5) + # Acquire a read lock, so we get the return exclusively + with zkhandler.readlock("base.cmd.ceph"): + try: + result = zkhandler.read("base.cmd.ceph").split()[0] + if result == "success-osd_replace": + message = 'Replaced OSD {} with block device "{}" on node "{}".'.format( + osd_id, new_device, node + ) + success = True + else: + message = "ERROR: Failed to replace OSD; check node logs for details." + success = False + except Exception: + message = "ERROR: Command ignored by node." + success = False + + # Acquire a write lock to ensure things go smoothly + with zkhandler.writelock("base.cmd.ceph"): + time.sleep(0.5) + zkhandler.write([("base.cmd.ceph", "")]) + + return success, message + + +def refresh_osd(zkhandler, osd_id, device): + # Get current OSD information + osd_information = getOSDInformation(zkhandler, osd_id) + node = osd_information["node"] + ext_db_flag = True if osd_information["db_device"] else False + + # Verify target block device isn't in use + block_osd = verifyOSDBlock(zkhandler, node, device) + if not block_osd or block_osd != osd_id: + return ( + False, + 'ERROR: Block device "{}" on node "{}" is not used by OSD "{}"; use replace instead'.format( + device, node, osd_id + ), + ) + + # Tell the cluster to create a new OSD for the host + refresh_osd_string = "osd_refresh {},{},{},{}".format( + node, osd_id, device, ext_db_flag + ) + zkhandler.write([("base.cmd.ceph", refresh_osd_string)]) + # Wait 1/2 second for the cluster to get the message and start working + time.sleep(0.5) + # Acquire a read lock, so we get the return exclusively + with zkhandler.readlock("base.cmd.ceph"): + try: + result = zkhandler.read("base.cmd.ceph").split()[0] + if result == "success-osd_refresh": + message = ( + 'Refreshed OSD {} with block device "{}" on node "{}".'.format( + osd_id, device, node + ) + ) + success = True + else: + message = "ERROR: Failed to refresh OSD; check node logs for details." + success = False + except Exception: + message = "ERROR: Command ignored by node." + success = False + + # Acquire a write lock to ensure things go smoothly + with zkhandler.writelock("base.cmd.ceph"): + time.sleep(0.5) + zkhandler.write([("base.cmd.ceph", "")]) + + return success, message + + def remove_osd(zkhandler, osd_id, force_flag): if not verifyOSD(zkhandler, osd_id): return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format( diff --git a/docs/manuals/swagger.json b/docs/manuals/swagger.json index 601a60ac..055ded6d 100644 --- a/docs/manuals/swagger.json +++ b/docs/manuals/swagger.json @@ -5094,6 +5094,13 @@ "delete": { "description": "Note: This task may take up to 30s to complete and return
Warning: This operation may have unintended consequences for the storage cluster; ensure the cluster can support removing the OSD before proceeding", "parameters": [ + { + "description": "Force removal even if some step(s) fail", + "in": "query", + "name": "force", + "required": "flase", + "type": "boolean" + }, { "description": "A confirmation string to ensure that the API consumer really means it", "in": "query", @@ -5141,6 +5148,73 @@ "tags": [ "storage / ceph" ] + }, + "post": { + "description": "Note: This task may take up to 30s to complete and return", + "parameters": [ + { + "description": "The block device (e.g. \"/dev/sdb\", \"/dev/disk/by-path/...\", etc.) or detect string (\"detect:NAME:SIZE:ID\") to replace the OSD onto", + "in": "query", + "name": "device", + "required": true, + "type": "string" + }, + { + "description": "The Ceph CRUSH weight for the replaced OSD", + "in": "query", + "name": "weight", + "required": true, + "type": "number" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/Message" + } + }, + "400": { + "description": "Bad request", + "schema": { + "$ref": "#/definitions/Message" + } + } + }, + "summary": "Replace a Ceph OSD in the cluster", + "tags": [ + "storage / ceph" + ] + }, + "put": { + "description": "Note: This task may take up to 30s to complete and return", + "parameters": [ + { + "description": "The block device (e.g. \"/dev/sdb\", \"/dev/disk/by-path/...\", etc.) or detect string (\"detect:NAME:SIZE:ID\") that the OSD should be using", + "in": "query", + "name": "device", + "required": true, + "type": "string" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/Message" + } + }, + "400": { + "description": "Bad request", + "schema": { + "$ref": "#/definitions/Message" + } + } + }, + "summary": "Refresh (reimport) a Ceph OSD in the cluster", + "tags": [ + "storage / ceph" + ] } }, "/api/v1/storage/ceph/osd/{osdid}/state": { diff --git a/node-daemon/pvcnoded/objects/CephInstance.py b/node-daemon/pvcnoded/objects/CephInstance.py index 2f605ad6..8f770755 100644 --- a/node-daemon/pvcnoded/objects/CephInstance.py +++ b/node-daemon/pvcnoded/objects/CephInstance.py @@ -21,7 +21,6 @@ import time import json -import psutil import daemon_lib.common as common @@ -217,6 +216,10 @@ class CephOSDInstance(object): retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm list {find_device}" ) + osd_blockdev = None + osd_fsid = None + osd_clusterfsid = None + osd_device = None for line in stdout.split("\n"): if "block device" in line: osd_blockdev = line.split()[-1] @@ -227,7 +230,7 @@ class CephOSDInstance(object): if "devices" in line: osd_device = line.split()[-1] - if not osd_fsid: + if not osd_blockdev or not osd_fsid or not osd_clusterfsid or not osd_device: self.logger.out( f"Failed to find updated OSD information via ceph-volume for {find_device}", state="e", @@ -404,6 +407,7 @@ class CephOSDInstance(object): print(stdout) print(stderr) raise Exception + time.sleep(0.5) # 6. Verify it started @@ -447,6 +451,381 @@ class CephOSDInstance(object): logger.out("Failed to create new OSD disk: {}".format(e), state="e") return False + @staticmethod + def replace_osd( + zkhandler, + logger, + node, + osd_id, + old_device, + new_device, + weight, + ext_db_flag=False, + ): + # Handle a detect device if that is passed + if match(r"detect:", new_device): + ddevice = get_detect_device(new_device) + if ddevice is None: + logger.out( + f"Failed to determine block device from detect string {new_device}", + state="e", + ) + return False + else: + logger.out( + f"Determined block device {ddevice} from detect string {new_device}", + state="i", + ) + new_device = ddevice + + # We are ready to create a new OSD on this node + logger.out( + "Replacing OSD {} disk with block device {}".format(osd_id, new_device), + state="i", + ) + try: + # Verify the OSD is present + retcode, stdout, stderr = common.run_os_command("ceph osd ls") + osd_list = stdout.split("\n") + if osd_id not in osd_list: + logger.out( + "Could not find OSD {} in the cluster".format(osd_id), state="e" + ) + return True + + # 1. Set the OSD down and out so it will flush + logger.out("Setting down OSD disk with ID {}".format(osd_id), state="i") + retcode, stdout, stderr = common.run_os_command( + "ceph osd down {}".format(osd_id) + ) + if retcode: + print("ceph osd down") + print(stdout) + print(stderr) + raise Exception + + logger.out("Setting out OSD disk with ID {}".format(osd_id), state="i") + retcode, stdout, stderr = common.run_os_command( + "ceph osd out {}".format(osd_id) + ) + if retcode: + print("ceph osd out") + print(stdout) + print(stderr) + raise Exception + + # 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete) + logger.out("Waiting for OSD {osd_id} to be safe to remove", state="i") + while True: + retcode, stdout, stderr = common.run_os_command( + f"ceph osd safe-to-destroy osd.{osd_id}" + ) + if retcode in [0, 11]: + # Code 0 = success + # Code 11 = "Error EAGAIN: OSD(s) 5 have no reported stats, and not all PGs are active+clean; we cannot draw any conclusions." which means all PGs have been remappped but backfill is still occurring + break + else: + time.sleep(5) + + # 3. Stop the OSD process + logger.out("Stopping OSD disk with ID {}".format(osd_id), state="i") + retcode, stdout, stderr = common.run_os_command( + "systemctl stop ceph-osd@{}".format(osd_id) + ) + if retcode: + print("systemctl stop") + print(stdout) + print(stderr) + raise Exception + time.sleep(2) + + # 4. Destroy the OSD + logger.out("Destroying OSD with ID {osd_id}", state="i") + retcode, stdout, stderr = common.run_os_command( + f"ceph osd destroy {osd_id} --yes-i-really-mean-it" + ) + if retcode: + print("ceph osd destroy") + print(stdout) + print(stderr) + raise Exception + + # 5. Adjust the weight + logger.out( + "Adjusting weight of OSD disk with ID {} in CRUSH map".format(osd_id), + state="i", + ) + retcode, stdout, stderr = common.run_os_command( + "ceph osd crush reweight osd.{osdid} {weight}".format( + osdid=osd_id, weight=weight + ) + ) + if retcode: + print("ceph osd crush reweight") + print(stdout) + print(stderr) + raise Exception + + # 6a. Zap the new disk to ensure it is ready to go + logger.out("Zapping disk {}".format(new_device), state="i") + retcode, stdout, stderr = common.run_os_command( + "ceph-volume lvm zap --destroy {}".format(new_device) + ) + if retcode: + print("ceph-volume lvm zap") + print(stdout) + print(stderr) + raise Exception + + dev_flags = "--data {}".format(new_device) + + # 6b. Prepare the logical volume if ext_db_flag + if ext_db_flag: + db_device = "osd-db/osd-{}".format(osd_id) + dev_flags += " --block.db {}".format(db_device) + else: + db_device = "" + + # 6c. Replace the OSD + logger.out( + "Preparing LVM for replaced OSD {} disk on {}".format( + osd_id, new_device + ), + state="i", + ) + retcode, stdout, stderr = common.run_os_command( + "ceph-volume lvm prepare --osd-id {osdid} --bluestore {devices}".format( + osdid=osd_id, devices=dev_flags + ) + ) + if retcode: + print("ceph-volume lvm prepare") + print(stdout) + print(stderr) + raise Exception + + # 7a. Get OSD information + logger.out( + "Getting OSD information for ID {} on {}".format(osd_id, new_device), + state="i", + ) + retcode, stdout, stderr = common.run_os_command( + "ceph-volume lvm list {device}".format(device=new_device) + ) + for line in stdout.split("\n"): + if "block device" in line: + osd_blockdev = line.split()[-1] + if "osd fsid" in line: + osd_fsid = line.split()[-1] + if "cluster fsid" in line: + osd_clusterfsid = line.split()[-1] + if "devices" in line: + osd_device = line.split()[-1] + + if not osd_fsid: + print("ceph-volume lvm list") + print("Could not find OSD information in data:") + print(stdout) + print(stderr) + raise Exception + + # Split OSD blockdev into VG and LV components + # osd_blockdev = /dev/ceph-/osd-block- + _, _, osd_vg, osd_lv = osd_blockdev.split("/") + + # Reset whatever we were given to Ceph's /dev/xdX naming + if new_device != osd_device: + new_device = osd_device + + # 7b. Activate the OSD + logger.out("Activating new OSD disk with ID {}".format(osd_id), state="i") + retcode, stdout, stderr = common.run_os_command( + "ceph-volume lvm activate --bluestore {osdid} {osdfsid}".format( + osdid=osd_id, osdfsid=osd_fsid + ) + ) + if retcode: + print("ceph-volume lvm activate") + print(stdout) + print(stderr) + raise Exception + + time.sleep(0.5) + + # 8. Verify it started + retcode, stdout, stderr = common.run_os_command( + "systemctl status ceph-osd@{osdid}".format(osdid=osd_id) + ) + if retcode: + print("systemctl status") + print(stdout) + print(stderr) + raise Exception + + # 9. Update Zookeeper information + logger.out( + "Adding new OSD disk with ID {} to Zookeeper".format(osd_id), state="i" + ) + zkhandler.write( + [ + (("osd", osd_id), ""), + (("osd.node", osd_id), node), + (("osd.device", osd_id), new_device), + (("osd.db_device", osd_id), db_device), + (("osd.fsid", osd_id), ""), + (("osd.ofsid", osd_id), osd_fsid), + (("osd.cfsid", osd_id), osd_clusterfsid), + (("osd.lvm", osd_id), ""), + (("osd.vg", osd_id), osd_vg), + (("osd.lv", osd_id), osd_lv), + ( + ("osd.stats", osd_id), + '{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}', + ), + ] + ) + + # Log it + logger.out( + "Replaced OSD {} disk with device {}".format(osd_id, new_device), + state="o", + ) + return True + except Exception as e: + # Log it + logger.out("Failed to replace OSD {} disk: {}".format(osd_id, e), state="e") + return False + + @staticmethod + def refresh_osd(zkhandler, logger, node, osd_id, device, ext_db_flag): + # Handle a detect device if that is passed + if match(r"detect:", device): + ddevice = get_detect_device(device) + if ddevice is None: + logger.out( + f"Failed to determine block device from detect string {device}", + state="e", + ) + return False + else: + logger.out( + f"Determined block device {ddevice} from detect string {device}", + state="i", + ) + device = ddevice + + # We are ready to create a new OSD on this node + logger.out( + "Refreshing OSD {} disk on block device {}".format(osd_id, device), + state="i", + ) + try: + # 1. Verify the OSD is present + retcode, stdout, stderr = common.run_os_command("ceph osd ls") + osd_list = stdout.split("\n") + if osd_id not in osd_list: + logger.out( + "Could not find OSD {} in the cluster".format(osd_id), state="e" + ) + return True + + dev_flags = "--data {}".format(device) + + if ext_db_flag: + db_device = "osd-db/osd-{}".format(osd_id) + dev_flags += " --block.db {}".format(db_device) + else: + db_device = "" + + # 2. Get OSD information + logger.out( + "Getting OSD information for ID {} on {}".format(osd_id, device), + state="i", + ) + retcode, stdout, stderr = common.run_os_command( + "ceph-volume lvm list {device}".format(device=device) + ) + for line in stdout.split("\n"): + if "block device" in line: + osd_blockdev = line.split()[-1] + if "osd fsid" in line: + osd_fsid = line.split()[-1] + if "cluster fsid" in line: + osd_clusterfsid = line.split()[-1] + if "devices" in line: + osd_device = line.split()[-1] + + if not osd_fsid: + print("ceph-volume lvm list") + print("Could not find OSD information in data:") + print(stdout) + print(stderr) + raise Exception + + # Split OSD blockdev into VG and LV components + # osd_blockdev = /dev/ceph-/osd-block- + _, _, osd_vg, osd_lv = osd_blockdev.split("/") + + # Reset whatever we were given to Ceph's /dev/xdX naming + if device != osd_device: + device = osd_device + + # 3. Activate the OSD + logger.out("Activating new OSD disk with ID {}".format(osd_id), state="i") + retcode, stdout, stderr = common.run_os_command( + "ceph-volume lvm activate --bluestore {osdid} {osdfsid}".format( + osdid=osd_id, osdfsid=osd_fsid + ) + ) + if retcode: + print("ceph-volume lvm activate") + print(stdout) + print(stderr) + raise Exception + + time.sleep(0.5) + + # 4. Verify it started + retcode, stdout, stderr = common.run_os_command( + "systemctl status ceph-osd@{osdid}".format(osdid=osd_id) + ) + if retcode: + print("systemctl status") + print(stdout) + print(stderr) + raise Exception + + # 5. Update Zookeeper information + logger.out( + "Adding new OSD disk with ID {} to Zookeeper".format(osd_id), state="i" + ) + zkhandler.write( + [ + (("osd", osd_id), ""), + (("osd.node", osd_id), node), + (("osd.device", osd_id), device), + (("osd.db_device", osd_id), db_device), + (("osd.fsid", osd_id), ""), + (("osd.ofsid", osd_id), osd_fsid), + (("osd.cfsid", osd_id), osd_clusterfsid), + (("osd.lvm", osd_id), ""), + (("osd.vg", osd_id), osd_vg), + (("osd.lv", osd_id), osd_lv), + ( + ("osd.stats", osd_id), + '{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}', + ), + ] + ) + + # Log it + logger.out("Refreshed OSD {} disk on {}".format(osd_id, device), state="o") + return True + except Exception as e: + # Log it + logger.out("Failed to refresh OSD {} disk: {}".format(osd_id, e), state="e") + return False + @staticmethod def remove_osd(zkhandler, logger, osd_id, osd_obj, force_flag): logger.out("Removing OSD disk {}".format(osd_id), state="i") @@ -490,29 +869,16 @@ class CephOSDInstance(object): else: raise Exception - # 2. Wait for the OSD to flush - logger.out("Flushing OSD disk with ID {}".format(osd_id), state="i") - osd_string = str() + # 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete) + logger.out("Waiting for OSD {osd_id} to be safe to remove", state="i") while True: - try: - retcode, stdout, stderr = common.run_os_command( - "ceph pg dump osds --format json" - ) - dump_string = json.loads(stdout) - for osd in dump_string: - if str(osd["osd"]) == osd_id: - osd_string = osd - num_pgs = osd_string["num_pgs"] - if num_pgs > 0: - time.sleep(5) - else: - if force_flag: - logger.out("Ignoring error due to force flag", state="i") - else: - raise Exception - - except Exception: + retcode, stdout, stderr = common.run_os_command( + f"ceph osd safe-to-destroy osd.{osd_id}" + ) + if int(retcode) in [0, 11]: break + else: + time.sleep(5) # 3. Stop the OSD process and wait for it to be terminated logger.out("Stopping OSD disk with ID {}".format(osd_id), state="i") @@ -527,19 +893,7 @@ class CephOSDInstance(object): logger.out("Ignoring error due to force flag", state="i") else: raise Exception - - # FIXME: There has to be a better way to do this /shrug - while True: - is_osd_up = False - # Find if there is a process named ceph-osd with arg '--id {id}' - for p in psutil.process_iter(attrs=["name", "cmdline"]): - if "ceph-osd" == p.info["name"] and "--id {}".format( - osd_id - ) in " ".join(p.info["cmdline"]): - is_osd_up = True - # If there isn't, continue - if not is_osd_up: - break + time.sleep(2) # 4. Determine the block devices osd_vg = zkhandler.read(("osd.vg", osd_id)) @@ -912,6 +1266,59 @@ def ceph_command(zkhandler, logger, this_node, data, d_osd): # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) + # Replacing an OSD + if command == "osd_replace": + node, osd_id, old_device, new_device, weight, ext_db_flag = args.split(",") + ext_db_flag = bool(strtobool(ext_db_flag)) + if node == this_node.name: + # Lock the command queue + zk_lock = zkhandler.writelock("base.cmd.ceph") + with zk_lock: + # Add the OSD + result = CephOSDInstance.replace_osd( + zkhandler, + logger, + node, + osd_id, + old_device, + new_device, + weight, + ext_db_flag, + ) + # Command succeeded + if result: + # Update the command queue + zkhandler.write([("base.cmd.ceph", "success-{}".format(data))]) + # Command failed + else: + # Update the command queue + zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))]) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + + # Refreshing an OSD + if command == "osd_refresh": + node, osd_id, device, ext_db_flag = args.split(",") + ext_db_flag = bool(strtobool(ext_db_flag)) + if node == this_node.name: + # Lock the command queue + zk_lock = zkhandler.writelock("base.cmd.ceph") + with zk_lock: + # Add the OSD + result = CephOSDInstance.refresh_osd( + zkhandler, logger, node, osd_id, device, ext_db_flag + ) + # Command succeeded + if result: + # Update the command queue + zkhandler.write([("base.cmd.ceph", "success-{}".format(data))]) + # Command failed + else: + # Update the command queue + zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))]) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + # Removing an OSD elif command == "osd_remove": osd_id, force = args.split(",")