#!/usr/bin/env python3 # ceph.py - PVC client function library, Ceph cluster fuctions # Part of the Parallel Virtual Cluster (PVC) system # # Copyright (C) 2018-2024 Joshua M. Boniface # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # ############################################################################### import os import re import json import time import math from concurrent.futures import ThreadPoolExecutor from distutils.util import strtobool from json import loads as jloads from re import match, search from uuid import uuid4 from os import path import daemon_lib.vm as vm import daemon_lib.common as common from daemon_lib.celery import start, log_info, log_warn, update, fail, finish # # Supplemental functions # # Verify OSD is valid in cluster def verifyOSD(zkhandler, osd_id): return zkhandler.exists(("osd", osd_id)) # Verify Pool is valid in cluster def verifyPool(zkhandler, name): return zkhandler.exists(("pool", name)) # Verify Volume is valid in cluster def verifyVolume(zkhandler, pool, name): return zkhandler.exists(("volume", f"{pool}/{name}")) # Verify Snapshot is valid in cluster def verifySnapshot(zkhandler, pool, volume, name): return zkhandler.exists(("snapshot", f"{pool}/{volume}/{name}")) # Verify OSD path is valid in cluster def verifyOSDBlock(zkhandler, node, device): for osd in zkhandler.children("base.osd"): osd_node = zkhandler.read(("osd.node", osd)) osd_device = zkhandler.read(("osd.device", osd)) if node == osd_node and device == osd_device: return osd return None # Matrix of human-to-byte values byte_unit_matrix = { "B": 1, "K": 1024, "M": 1024 * 1024, "G": 1024 * 1024 * 1024, "T": 1024 * 1024 * 1024 * 1024, "P": 1024 * 1024 * 1024 * 1024 * 1024, "E": 1024 * 1024 * 1024 * 1024 * 1024 * 1024, "Z": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024, "Y": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024, "R": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024, "Q": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024, } # Matrix of human-to-metric values ops_unit_matrix = { "": 1, "K": 1000, "M": 1000 * 1000, "G": 1000 * 1000 * 1000, "T": 1000 * 1000 * 1000 * 1000, "P": 1000 * 1000 * 1000 * 1000 * 1000, "E": 1000 * 1000 * 1000 * 1000 * 1000 * 1000, "Z": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000, "Y": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000, "R": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000, "Q": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000, } # Format byte sizes to/from human-readable units def format_bytes_tohuman(databytes): datahuman = "" for unit in sorted(byte_unit_matrix, key=byte_unit_matrix.get, reverse=True): new_bytes = int(math.ceil(databytes / byte_unit_matrix[unit])) # Round up if 5 or more digits if new_bytes > 9999: # We can jump down another level continue else: # We're at the end, display with this size datahuman = "{}{}".format(new_bytes, unit) return datahuman def format_bytes_fromhuman(datahuman): if not re.search(r"[A-Za-z]+", datahuman): dataunit = "B" datasize = float(datahuman) else: dataunit = str(re.match(r"[0-9\.]+([A-Za-z])[iBb]*", datahuman).group(1)) datasize = float(re.match(r"([0-9\.]+)[A-Za-z]+", datahuman).group(1)) if byte_unit_matrix.get(dataunit.upper()): databytes = int(datasize * byte_unit_matrix[dataunit.upper()]) return databytes else: return None # Format ops sizes to/from human-readable units def format_ops_tohuman(dataops): datahuman = "" for unit in sorted(ops_unit_matrix, key=ops_unit_matrix.get, reverse=True): new_ops = int(math.ceil(dataops / ops_unit_matrix[unit])) # Round up if 5 or more digits if new_ops > 9999: # We can jump down another level continue else: # We're at the end, display with this size datahuman = "{}{}".format(new_ops, unit) return datahuman def format_ops_fromhuman(datahuman): # Trim off human-readable character dataunit = datahuman[-1] datasize = int(datahuman[:-1]) dataops = datasize * ops_unit_matrix[dataunit.upper()] return "{}".format(dataops) def format_pct_tohuman(datapct): datahuman = "{0:.1f}".format(float(datapct * 100.0)) return datahuman # # Status functions # def get_status(zkhandler): primary_node = zkhandler.read("base.config.primary_node") ceph_status = zkhandler.read("base.storage").rstrip() # Create a data structure for the information status_data = { "type": "status", "primary_node": primary_node, "ceph_data": ceph_status, } return True, status_data def get_health(zkhandler): primary_node = zkhandler.read("base.config.primary_node") ceph_health = zkhandler.read("base.storage.health").rstrip() # Create a data structure for the information status_data = { "type": "health", "primary_node": primary_node, "ceph_data": ceph_health, } return True, status_data def get_util(zkhandler): primary_node = zkhandler.read("base.config.primary_node") ceph_df = zkhandler.read("base.storage.util").rstrip() # Create a data structure for the information status_data = { "type": "utilization", "primary_node": primary_node, "ceph_data": ceph_df, } return True, status_data # # OSD functions # def getClusterOSDList(zkhandler): # Get a list of VNIs by listing the children of /networks return zkhandler.children("base.osd") def getOSDInformation(zkhandler, osd_id): ( osd_fsid, osd_node, osd_device, _osd_is_split, osd_db_device, osd_stats_raw, ) = zkhandler.read_many( [ ("osd.ofsid", osd_id), ("osd.node", osd_id), ("osd.device", osd_id), ("osd.is_split", osd_id), ("osd.db_device", osd_id), ("osd.stats", osd_id), ] ) osd_is_split = bool(strtobool(_osd_is_split)) # Parse the stats data osd_stats = dict(json.loads(osd_stats_raw)) osd_information = { "id": osd_id, "fsid": osd_fsid, "node": osd_node, "device": osd_device, "is_split": osd_is_split, "db_device": osd_db_device, "stats": osd_stats, } return osd_information def in_osd(zkhandler, osd_id): if not verifyOSD(zkhandler, osd_id): return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format( osd_id ) retcode, stdout, stderr = common.run_os_command("ceph osd in {}".format(osd_id)) if retcode: return False, "ERROR: Failed to enable OSD {}: {}".format(osd_id, stderr) return True, "Set OSD {} online.".format(osd_id) def out_osd(zkhandler, osd_id): if not verifyOSD(zkhandler, osd_id): return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format( osd_id ) retcode, stdout, stderr = common.run_os_command("ceph osd out {}".format(osd_id)) if retcode: return False, "ERROR: Failed to disable OSD {}: {}".format(osd_id, stderr) return True, "Set OSD {} offline.".format(osd_id) def set_osd(zkhandler, option): retcode, stdout, stderr = common.run_os_command("ceph osd set {}".format(option)) if retcode: return False, 'ERROR: Failed to set property "{}": {}'.format(option, stderr) return True, 'Set OSD property "{}".'.format(option) def unset_osd(zkhandler, option): retcode, stdout, stderr = common.run_os_command("ceph osd unset {}".format(option)) if retcode: return False, 'ERROR: Failed to unset property "{}": {}'.format(option, stderr) return True, 'Unset OSD property "{}".'.format(option) def get_list_osd(zkhandler, limit=None, is_fuzzy=True): osd_list = [] full_osd_list = zkhandler.children("base.osd") if is_fuzzy and limit: # Implicitly assume fuzzy limits if not re.match(r"\^.*", limit): limit = ".*" + limit if not re.match(r".*\$", limit): limit = limit + ".*" for osd in full_osd_list: if limit: try: if re.fullmatch(limit, osd): osd_list.append(getOSDInformation(zkhandler, osd)) except Exception as e: return False, "Regex Error: {}".format(e) else: osd_list.append(getOSDInformation(zkhandler, osd)) return True, sorted(osd_list, key=lambda x: int(x["id"])) # # Pool functions # def getPoolInformation(zkhandler, pool): # Parse the stats data ( pool_stats_raw, tier, pgs, ) = zkhandler.read_many( [ ("pool.stats", pool), ("pool.tier", pool), ("pool.pgs", pool), ] ) pool_stats = dict(json.loads(pool_stats_raw)) volume_count = len(getCephVolumes(zkhandler, pool)) if tier is None: tier = "default" pool_information = { "name": pool, "volume_count": volume_count, "tier": tier, "pgs": pgs, "stats": pool_stats, } return pool_information def add_pool(zkhandler, name, pgs, replcfg, tier=None): # Prepare the copies/mincopies variables try: copies, mincopies = replcfg.split(",") copies = int(copies.replace("copies=", "")) mincopies = int(mincopies.replace("mincopies=", "")) except Exception: copies = None mincopies = None if not copies or not mincopies: return False, f'ERROR: Replication configuration "{replcfg}" is not valid.' # Prepare the tiers if applicable if tier is not None and tier in ["hdd", "ssd", "nvme"]: crush_rule = f"{tier}_tier" # Create a CRUSH rule for the relevant tier retcode, stdout, stderr = common.run_os_command( f"ceph osd crush rule create-replicated {crush_rule} default host {tier}" ) if retcode: return ( False, f"ERROR: Failed to create CRUSH rule {tier} for pool {name}: {stderr}", ) else: tier = "default" crush_rule = "replicated" # Create the pool retcode, stdout, stderr = common.run_os_command( f"ceph osd pool create {name} {pgs} {pgs} {crush_rule}" ) if retcode: return False, f'ERROR: Failed to create pool "{name}" with {pgs} PGs: {stderr}' # Set the size and minsize retcode, stdout, stderr = common.run_os_command( f"ceph osd pool set {name} size {copies}" ) if retcode: return False, f'ERROR: Failed to set pool "{name}" size of {copies}: {stderr}' retcode, stdout, stderr = common.run_os_command( f"ceph osd pool set {name} min_size {mincopies}" ) if retcode: return ( False, f'ERROR: Failed to set pool "{name}" minimum size of {mincopies}: {stderr}', ) # Enable RBD application retcode, stdout, stderr = common.run_os_command( f"ceph osd pool application enable {name} rbd" ) if retcode: return ( False, f'ERROR: Failed to enable RBD application on pool "{name}" : {stderr}', ) # Add the new pool to Zookeeper zkhandler.write( [ (("pool", name), ""), (("pool.pgs", name), pgs), (("pool.tier", name), tier), (("pool.stats", name), "{}"), (("volume", name), ""), (("snapshot", name), ""), ] ) return True, f'Created RBD pool "{name}" with {pgs} PGs' def remove_pool(zkhandler, name): if not verifyPool(zkhandler, name): return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format( name ) # 1. Remove pool volumes for volume in zkhandler.children(("volume", name)): remove_volume(zkhandler, name, volume) # 2. Remove the pool retcode, stdout, stderr = common.run_os_command( "ceph osd pool rm {pool} {pool} --yes-i-really-really-mean-it".format(pool=name) ) if retcode: return False, 'ERROR: Failed to remove pool "{}": {}'.format(name, stderr) # 3. Delete pool from Zookeeper zkhandler.delete( [ ("pool", name), ("volume", name), ("snapshot", name), ] ) return True, 'Removed RBD pool "{}" and all volumes.'.format(name) def set_pgs_pool(zkhandler, name, pgs): if not verifyPool(zkhandler, name): return False, f'ERROR: No pool with name "{name}" is present in the cluster.' # Validate new PGs count pgs = int(pgs) if (pgs == 0) or (pgs & (pgs - 1) != 0): return ( False, f'ERROR: Invalid PGs number "{pgs}": must be a non-zero power of 2.', ) # Set the new pgs number retcode, stdout, stderr = common.run_os_command( f"ceph osd pool set {name} pg_num {pgs}" ) if retcode: return False, f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}" # Set the new pgps number if increasing current_pgs = int(zkhandler.read(("pool.pgs", name))) if current_pgs >= pgs: retcode, stdout, stderr = common.run_os_command( f"ceph osd pool set {name} pgp_num {pgs}" ) if retcode: return ( False, f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}", ) # Update Zookeeper count zkhandler.write( [ (("pool.pgs", name), pgs), ] ) return True, f'Set PGs count to {pgs} for RBD pool "{name}".' def get_list_pool(zkhandler, limit=None, is_fuzzy=True): full_pool_list = zkhandler.children("base.pool") if is_fuzzy and limit: # Implicitly assume fuzzy limits if not re.match(r"\^.*", limit): limit = ".*" + limit if not re.match(r".*\$", limit): limit = limit + ".*" get_pool_info = dict() for pool in full_pool_list: is_limit_match = False if limit: try: if re.fullmatch(limit, pool): is_limit_match = True except Exception as e: return False, "Regex Error: {}".format(e) else: is_limit_match = True get_pool_info[pool] = True if is_limit_match else False pool_execute_list = [pool for pool in full_pool_list if get_pool_info[pool]] pool_data_list = list() with ThreadPoolExecutor(max_workers=32, thread_name_prefix="pool_list") as executor: futures = [] for pool in pool_execute_list: futures.append(executor.submit(getPoolInformation, zkhandler, pool)) for future in futures: pool_data_list.append(future.result()) return True, sorted(pool_data_list, key=lambda x: int(x["stats"].get("id", 0))) # # Volume functions # def getCephVolumes(zkhandler, pool): volume_list = list() if not pool: pool_list = zkhandler.children("base.pool") else: pool_list = [pool] for pool_name in pool_list: children = zkhandler.children(("volume", pool_name)) if children is None: continue for volume_name in children: volume_list.append("{}/{}".format(pool_name, volume_name)) return volume_list def getVolumeInformation(zkhandler, pool, volume): # Parse the stats data volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}")) volume_stats = dict(json.loads(volume_stats_raw)) # Format the size to something nicer volume_stats["size"] = format_bytes_tohuman(volume_stats["size"]) volume_information = {"name": volume, "pool": pool, "stats": volume_stats} return volume_information def add_volume(zkhandler, pool, name, size, force_flag=False): # 1. Verify the size of the volume pool_information = getPoolInformation(zkhandler, pool) size_bytes = format_bytes_fromhuman(size) if size_bytes is None: return ( False, f"ERROR: Requested volume size '{size}' does not have a valid SI unit", ) pool_total_free_bytes = int(pool_information["stats"]["free_bytes"]) if size_bytes >= pool_total_free_bytes: return ( False, f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')", ) # Check if we're greater than 80% utilization after the create; error if so unless we have the force flag pool_total_bytes = ( int(pool_information["stats"]["used_bytes"]) + pool_total_free_bytes ) pool_safe_total_bytes = int(pool_total_bytes * 0.80) pool_safe_free_bytes = pool_safe_total_bytes - int( pool_information["stats"]["used_bytes"] ) if size_bytes >= pool_safe_free_bytes and not force_flag: return ( False, f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the safe free space in the pool ('{format_bytes_tohuman(pool_safe_free_bytes)}' for 80% full); retry with force to ignore this error", ) # 2. Create the volume retcode, stdout, stderr = common.run_os_command( "rbd create --size {}B {}/{}".format(size_bytes, pool, name) ) if retcode: return False, 'ERROR: Failed to create RBD volume "{}": {}'.format(name, stderr) # 2. Get volume stats retcode, stdout, stderr = common.run_os_command( "rbd info --format json {}/{}".format(pool, name) ) volstats = stdout # 3. Add the new volume to Zookeeper zkhandler.write( [ (("volume", f"{pool}/{name}"), ""), (("volume.stats", f"{pool}/{name}"), volstats), (("snapshot", f"{pool}/{name}"), ""), ] ) return True, 'Created RBD volume "{}" of size "{}" in pool "{}".'.format( name, format_bytes_tohuman(size_bytes), pool ) def clone_volume(zkhandler, pool, name_src, name_new, force_flag=False): # 1. Verify the volume if not verifyVolume(zkhandler, pool, name_src): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( name_src, pool ) volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{name_src}")) volume_stats = dict(json.loads(volume_stats_raw)) size_bytes = volume_stats["size"] pool_information = getPoolInformation(zkhandler, pool) pool_total_free_bytes = int(pool_information["stats"]["free_bytes"]) if size_bytes >= pool_total_free_bytes: return ( False, f"ERROR: Clone volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')", ) # Check if we're greater than 80% utilization after the create; error if so unless we have the force flag pool_total_bytes = ( int(pool_information["stats"]["used_bytes"]) + pool_total_free_bytes ) pool_safe_total_bytes = int(pool_total_bytes * 0.80) pool_safe_free_bytes = pool_safe_total_bytes - int( pool_information["stats"]["used_bytes"] ) if size_bytes >= pool_safe_free_bytes and not force_flag: return ( False, f"ERROR: Clone volume size '{format_bytes_tohuman(size_bytes)}' is greater than the safe free space in the pool ('{format_bytes_tohuman(pool_safe_free_bytes)}' for 80% full); retry with force to ignore this error", ) # 2. Clone the volume retcode, stdout, stderr = common.run_os_command( "rbd copy {}/{} {}/{}".format(pool, name_src, pool, name_new) ) if retcode: return ( False, 'ERROR: Failed to clone RBD volume "{}" to "{}" in pool "{}": {}'.format( name_src, name_new, pool, stderr ), ) # 3. Get volume stats retcode, stdout, stderr = common.run_os_command( "rbd info --format json {}/{}".format(pool, name_new) ) volstats = stdout # 4. Add the new volume to Zookeeper zkhandler.write( [ (("volume", f"{pool}/{name_new}"), ""), (("volume.stats", f"{pool}/{name_new}"), volstats), (("snapshot", f"{pool}/{name_new}"), ""), ] ) return True, 'Cloned RBD volume "{}" to "{}" in pool "{}"'.format( name_src, name_new, pool ) def resize_volume(zkhandler, pool, name, size, force_flag=False): if not verifyVolume(zkhandler, pool, name): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( name, pool ) # 1. Verify the size of the volume pool_information = getPoolInformation(zkhandler, pool) size_bytes = format_bytes_fromhuman(size) if size_bytes is None: return ( False, f"ERROR: Requested volume size '{size}' does not have a valid SI unit", ) pool_total_free_bytes = int(pool_information["stats"]["free_bytes"]) if size_bytes >= pool_total_free_bytes: return ( False, f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')", ) # Check if we're greater than 80% utilization after the create; error if so unless we have the force flag pool_total_bytes = ( int(pool_information["stats"]["used_bytes"]) + pool_total_free_bytes ) pool_safe_total_bytes = int(pool_total_bytes * 0.80) pool_safe_free_bytes = pool_safe_total_bytes - int( pool_information["stats"]["used_bytes"] ) if size_bytes >= pool_safe_free_bytes and not force_flag: return ( False, f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the safe free space in the pool ('{format_bytes_tohuman(pool_safe_free_bytes)}' for 80% full); retry with force to ignore this error", ) # 2. Resize the volume retcode, stdout, stderr = common.run_os_command( "rbd resize --size {} {}/{}".format( format_bytes_tohuman(size_bytes), pool, name ) ) if retcode: return ( False, 'ERROR: Failed to resize RBD volume "{}" to size "{}" in pool "{}": {}'.format( name, format_bytes_tohuman(size_bytes), pool, stderr ), ) # 3a. Determine the node running this VM if applicable active_node = None volume_vm_name = name.split("_")[0] retcode, vm_info = vm.get_info(zkhandler, volume_vm_name) if retcode: for disk in vm_info["disks"]: # This block device is present in this VM so we can continue if disk["name"] == "{}/{}".format(pool, name): active_node = vm_info["node"] volume_id = disk["dev"] # 3b. Perform a live resize in libvirt if the VM is running if active_node is not None and vm_info.get("state", "") == "start": import libvirt # Run the libvirt command against the target host try: dest_lv = "qemu+tcp://{}/system".format(active_node) target_lv_conn = libvirt.open(dest_lv) target_vm_conn = target_lv_conn.lookupByName(vm_info["name"]) if target_vm_conn: target_vm_conn.blockResize( volume_id, size_bytes, libvirt.VIR_DOMAIN_BLOCK_RESIZE_BYTES, ) target_lv_conn.close() except Exception: pass # 4. Get volume stats retcode, stdout, stderr = common.run_os_command( "rbd info --format json {}/{}".format(pool, name) ) volstats = stdout # 5. Update the volume in Zookeeper zkhandler.write( [ (("volume", f"{pool}/{name}"), ""), (("volume.stats", f"{pool}/{name}"), volstats), (("snapshot", f"{pool}/{name}"), ""), ] ) return True, 'Resized RBD volume "{}" to size "{}" in pool "{}".'.format( name, format_bytes_tohuman(size_bytes), pool ) def rename_volume(zkhandler, pool, name, new_name): if not verifyVolume(zkhandler, pool, name): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( name, pool ) # 1. Rename the volume retcode, stdout, stderr = common.run_os_command( "rbd rename {}/{} {}".format(pool, name, new_name) ) if retcode: return ( False, 'ERROR: Failed to rename volume "{}" to "{}" in pool "{}": {}'.format( name, new_name, pool, stderr ), ) # 2. Rename the volume in Zookeeper zkhandler.rename( [ (("volume", f"{pool}/{name}"), ("volume", f"{pool}/{new_name}")), (("snapshot", f"{pool}/{name}"), ("snapshot", f"{pool}/{new_name}")), ] ) # 3. Get volume stats retcode, stdout, stderr = common.run_os_command( "rbd info --format json {}/{}".format(pool, new_name) ) volstats = stdout # 4. Update the volume stats in Zookeeper zkhandler.write( [ (("volume.stats", f"{pool}/{new_name}"), volstats), ] ) return True, 'Renamed RBD volume "{}" to "{}" in pool "{}".'.format( name, new_name, pool ) def remove_volume(zkhandler, pool, name): if not verifyVolume(zkhandler, pool, name): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( name, pool ) # 1a. Remove PVC-managed volume snapshots for snapshot in zkhandler.children(("snapshot", f"{pool}/{name}")): remove_snapshot(zkhandler, pool, name, snapshot) # 1b. Purge any remaining volume snapshots retcode, stdout, stderr = common.run_os_command( "rbd snap purge {}/{}".format(pool, name) ) if retcode: return ( False, 'ERROR: Failed to purge snapshots from RBD volume "{}" in pool "{}": {}'.format( name, pool, stderr ), ) # 2. Remove the volume retcode, stdout, stderr = common.run_os_command("rbd rm {}/{}".format(pool, name)) if retcode: return False, 'ERROR: Failed to remove RBD volume "{}" in pool "{}": {}'.format( name, pool, stderr ) # 3. Delete volume from Zookeeper zkhandler.delete( [ ("volume", f"{pool}/{name}"), ("snapshot", f"{pool}/{name}"), ] ) return True, 'Removed RBD volume "{}" in pool "{}".'.format(name, pool) def map_volume(zkhandler, pool, name): if not verifyVolume(zkhandler, pool, name): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( name, pool ) # 1. Map the volume onto the local system retcode, stdout, stderr = common.run_os_command("rbd map {}/{}".format(pool, name)) if retcode: return False, 'ERROR: Failed to map RBD volume "{}" in pool "{}": {}'.format( name, pool, stderr ) # 2. Calculate the absolute path to the mapped volume mapped_volume = "/dev/rbd/{}/{}".format(pool, name) # 3. Ensure the volume exists if not os.path.exists(mapped_volume): return ( False, 'ERROR: Mapped volume not found at expected location "{}".'.format( mapped_volume ), ) return True, mapped_volume def unmap_volume(zkhandler, pool, name): if not verifyVolume(zkhandler, pool, name): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( name, pool ) mapped_volume = "/dev/rbd/{}/{}".format(pool, name) # 1. Ensure the volume exists if not os.path.exists(mapped_volume): return ( False, 'ERROR: Mapped volume not found at expected location "{}".'.format( mapped_volume ), ) # 2. Unap the volume retcode, stdout, stderr = common.run_os_command( "rbd unmap {}".format(mapped_volume) ) if retcode: return False, 'ERROR: Failed to unmap RBD volume at "{}": {}'.format( mapped_volume, stderr ) return True, 'Unmapped RBD volume at "{}".'.format(mapped_volume) def get_list_volume(zkhandler, pool, limit=None, is_fuzzy=True): if pool and not verifyPool(zkhandler, pool): return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format( pool ) full_volume_list = getCephVolumes(zkhandler, pool) if is_fuzzy and limit: # Implicitly assume fuzzy limits if not re.match(r"\^.*", limit): limit = ".*" + limit if not re.match(r".*\$", limit): limit = limit + ".*" get_volume_info = dict() for volume in full_volume_list: pool_name, volume_name = volume.split("/") is_limit_match = False # Check on limit if limit: # Try to match the limit against the volume name try: if re.fullmatch(limit, volume_name): is_limit_match = True except Exception as e: return False, "Regex Error: {}".format(e) else: is_limit_match = True get_volume_info[volume] = True if is_limit_match else False # Obtain our volume data in a thread pool volume_execute_list = [ volume for volume in full_volume_list if get_volume_info[volume] ] volume_data_list = list() with ThreadPoolExecutor( max_workers=32, thread_name_prefix="volume_list" ) as executor: futures = [] for volume in volume_execute_list: pool_name, volume_name = volume.split("/") futures.append( executor.submit(getVolumeInformation, zkhandler, pool_name, volume_name) ) for future in futures: volume_data_list.append(future.result()) return True, sorted(volume_data_list, key=lambda x: str(x["name"])) # # Snapshot functions # def getCephSnapshots(zkhandler, pool, volume): snapshot_list = list() volume_list = list() volume_list = getCephVolumes(zkhandler, pool) if volume: for volume_entry in volume_list: volume_pool, volume_name = volume_entry.split("/") if volume_name == volume: volume_list = ["{}/{}".format(volume_pool, volume_name)] for volume_entry in volume_list: for snapshot_name in zkhandler.children(("snapshot", volume_entry)): snapshot_list.append("{}@{}".format(volume_entry, snapshot_name)) return snapshot_list def add_snapshot(zkhandler, pool, volume, name, zk_only=False): if not verifyVolume(zkhandler, pool, volume): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( volume, pool ) # 1. Create the snapshot if not zk_only: retcode, stdout, stderr = common.run_os_command( "rbd snap create {}/{}@{}".format(pool, volume, name) ) if retcode: return ( False, 'ERROR: Failed to create RBD snapshot "{}" of volume "{}" in pool "{}": {}'.format( name, volume, pool, stderr ), ) # 2. Get snapshot stats retcode, stdout, stderr = common.run_os_command( "rbd info --format json {}/{}@{}".format(pool, volume, name) ) snapstats = stdout # 3. Add the snapshot to Zookeeper zkhandler.write( [ (("snapshot", f"{pool}/{volume}/{name}"), ""), (("snapshot.stats", f"{pool}/{volume}/{name}"), snapstats), ] ) # 4. Update the count of snapshots on this volume volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}")) volume_stats = dict(json.loads(volume_stats_raw)) volume_stats["snapshot_count"] = volume_stats["snapshot_count"] + 1 zkhandler.write( [ (("volume.stats", f"{pool}/{volume}"), json.dumps(volume_stats)), ] ) return True, 'Created RBD snapshot "{}" of volume "{}" in pool "{}".'.format( name, volume, pool ) def rename_snapshot(zkhandler, pool, volume, name, new_name): if not verifyVolume(zkhandler, pool, volume): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( volume, pool ) if not verifySnapshot(zkhandler, pool, volume, name): return ( False, 'ERROR: No snapshot with name "{}" is present for volume "{}" in pool "{}".'.format( name, volume, pool ), ) # 1. Rename the snapshot retcode, stdout, stderr = common.run_os_command( "rbd snap rename {pool}/{volume}@{name} {pool}/{volume}@{new_name}".format( pool=pool, volume=volume, name=name, new_name=new_name ) ) if retcode: return ( False, 'ERROR: Failed to rename RBD snapshot "{}" to "{}" for volume "{}" in pool "{}": {}'.format( name, new_name, volume, pool, stderr ), ) # 2. Rename the snapshot in ZK zkhandler.rename( [ ( ("snapshot", f"{pool}/{volume}/{name}"), ("snapshot", f"{pool}/{volume}/{new_name}"), ), ] ) return ( True, 'Renamed RBD snapshot "{}" to "{}" for volume "{}" in pool "{}".'.format( name, new_name, volume, pool ), ) def rollback_snapshot(zkhandler, pool, volume, name): if not verifyVolume(zkhandler, pool, volume): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( volume, pool ) if not verifySnapshot(zkhandler, pool, volume, name): return ( False, 'ERROR: No snapshot with name "{}" is present for volume "{}" in pool "{}".'.format( name, volume, pool ), ) # 1. Roll back the snapshot retcode, stdout, stderr = common.run_os_command( "rbd snap rollback {}/{}@{}".format(pool, volume, name) ) if retcode: return ( False, 'ERROR: Failed to roll back RBD volume "{}" in pool "{}" to snapshot "{}": {}'.format( volume, pool, name, stderr ), ) return True, 'Rolled back RBD volume "{}" in pool "{}" to snapshot "{}".'.format( volume, pool, name ) def remove_snapshot(zkhandler, pool, volume, name): if not verifyVolume(zkhandler, pool, volume): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( volume, pool ) if not verifySnapshot(zkhandler, pool, volume, name): return ( False, 'ERROR: No snapshot with name "{}" is present of volume {} in pool {}.'.format( name, volume, pool ), ) # 1. Remove the snapshot retcode, stdout, stderr = common.run_os_command( "rbd snap rm {}/{}@{}".format(pool, volume, name) ) if retcode: return ( False, 'Failed to remove RBD snapshot "{}" of volume "{}" in pool "{}": {}'.format( name, volume, pool, stderr ), ) # 2. Delete snapshot from Zookeeper zkhandler.delete([("snapshot", f"{pool}/{volume}/{name}")]) # 3. Update the count of snapshots on this volume volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}")) volume_stats = dict(json.loads(volume_stats_raw)) # Format the size to something nicer volume_stats["snapshot_count"] = volume_stats["snapshot_count"] - 1 volume_stats_raw = json.dumps(volume_stats) zkhandler.write([(("volume.stats", f"{pool}/{volume}"), volume_stats_raw)]) return True, 'Removed RBD snapshot "{}" of volume "{}" in pool "{}".'.format( name, volume, pool ) def get_list_snapshot(zkhandler, target_pool, target_volume, limit=None, is_fuzzy=True): snapshot_list = [] full_snapshot_list = getCephSnapshots(zkhandler, target_pool, target_volume) if is_fuzzy and limit: # Implicitly assume fuzzy limits if not re.match(r"\^.*", limit): limit = ".*" + limit if not re.match(r".*\$", limit): limit = limit + ".*" for snapshot in full_snapshot_list: volume, snapshot_name = snapshot.split("@") pool_name, volume_name = volume.split("/") if target_pool and pool_name != target_pool: continue if target_volume and volume_name != target_volume: continue snapshot_stats = json.loads( zkhandler.read( ("snapshot.stats", f"{pool_name}/{volume_name}/{snapshot_name}") ) ) if limit: try: if re.fullmatch(limit, snapshot_name): snapshot_list.append( { "pool": pool_name, "volume": volume_name, "snapshot": snapshot_name, "stats": snapshot_stats, } ) except Exception as e: return False, "Regex Error: {}".format(e) else: snapshot_list.append( { "pool": pool_name, "volume": volume_name, "snapshot": snapshot_name, "stats": snapshot_stats, } ) return True, sorted(snapshot_list, key=lambda x: str(x["snapshot"])) # # Celery worker tasks (must be run on node, outputs log messages to worker) # def osd_worker_helper_find_osds_from_block(device): # Try to query the passed block device directly retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm list --format json {device}" ) if retcode: found_osds = [] else: found_osds = jloads(stdout) return found_osds def osd_worker_add_osd( zkhandler, celery, node, device, weight, ext_db_ratio=None, ext_db_size=None, split_count=None, ): current_stage = 0 total_stages = 5 if split_count is None: _split_count = 1 else: _split_count = split_count total_stages = total_stages + 3 * int(_split_count) if ext_db_ratio is not None or ext_db_size is not None: total_stages = total_stages + 3 * int(_split_count) + 1 start( celery, f"Adding {_split_count} new OSD(s) on device {device} with weight {weight}", current=current_stage, total=total_stages, ) # Handle a detect device if that is passed if match(r"detect:", device): ddevice = common.get_detect_device(device) if ddevice is None: fail( celery, f"Failed to determine block device from detect string {device}", ) return else: log_info( celery, f"Determined block device {ddevice} from detect string {device}" ) device = ddevice if ext_db_size is not None and ext_db_ratio is not None: fail( celery, "Invalid configuration: both an ext_db_size and ext_db_ratio were specified", ) return # Check if device has a partition table; it's not valid if it does retcode, _, _ = common.run_os_command(f"sfdisk --dump {device}") if retcode < 1: fail( celery, f"Device {device} has a partition table and is unsuitable for an OSD", ) return if ext_db_size is not None or ext_db_ratio is not None: ext_db_flag = True else: ext_db_flag = False if split_count is not None: split_flag = f"--osds-per-device {split_count}" is_split = True log_info( celery, f"Creating {split_count} new OSD disks on block device {device}" ) else: split_flag = "" is_split = False log_info(celery, f"Creating 1 new OSD disk on block device {device}") if "nvme" in device: class_flag = "--crush-device-class nvme" else: class_flag = "--crush-device-class ssd" # 1. Zap the block device current_stage += 1 update( celery, f"Zapping block device {device}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm zap --destroy {device}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to perform ceph-volume lvm zap on {device}") return # 2. Prepare the OSD(s) current_stage += 1 update( celery, f"Preparing OSD(s) on device {device}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm batch --yes --prepare --bluestore {split_flag} {class_flag} {device}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to perform ceph-volume lvm batch on {device}") return # 3. Get the list of created OSDs on the device (initial pass) current_stage += 1 update( celery, f"Querying OSD(s) on device {device}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm list --format json {device}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to perform ceph-volume lvm list on {device}") return created_osds = jloads(stdout) # 4. Prepare the WAL and DB devices if ext_db_flag: for created_osd in created_osds: # 4a. Get the OSD FSID and ID from the details osd_details = created_osds[created_osd][0] osd_fsid = osd_details["tags"]["ceph.osd_fsid"] osd_id = osd_details["tags"]["ceph.osd_id"] osd_lv = osd_details["lv_path"] current_stage += 1 update( celery, f"Preparing DB LV for OSD {osd_id}", current=current_stage, total=total_stages, ) # 4b. Prepare the logical volume if ext_db_flag if ext_db_ratio is not None: _, osd_size_bytes, _ = common.run_os_command( f"blockdev --getsize64 {osd_lv}" ) osd_size_bytes = int(osd_size_bytes) osd_db_size_bytes = int(osd_size_bytes * ext_db_ratio) if ext_db_size is not None: osd_db_size_bytes = format_bytes_fromhuman(ext_db_size) osd_db_size_mb = int(osd_db_size_bytes / 1024 / 1024) db_device = f"osd-db/osd-{osd_id}" current_stage += 1 update( celery, f"Preparing Bluestore DB volume for OSD {osd_id} on {db_device}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"lvcreate -L {osd_db_size_mb}M -n osd-{osd_id} --yes osd-db" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to run lvcreate on {db_device}") return # 4c. Attach the new DB device to the OSD current_stage += 1 update( celery, f"Attaching Bluestore DB volume to OSD {osd_id}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm new-db --osd-id {osd_id} --osd-fsid {osd_fsid} --target {db_device}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail( celery, f"Failed to perform ceph-volume lvm new-db on OSD {osd_id}" ) return # 4d. Get the list of created OSDs on the device (final pass) current_stage += 1 update( celery, f"Requerying OSD(s) on device {device}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm list --format json {device}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to perform ceph-volume lvm list on {device}") return created_osds = jloads(stdout) # 5. Activate the OSDs for created_osd in created_osds: # 5a. Get the OSD FSID and ID from the details osd_details = created_osds[created_osd][0] osd_clusterfsid = osd_details["tags"]["ceph.cluster_fsid"] osd_fsid = osd_details["tags"]["ceph.osd_fsid"] osd_id = osd_details["tags"]["ceph.osd_id"] db_device = osd_details["tags"].get("ceph.db_device", "") osd_vg = osd_details["vg_name"] osd_lv = osd_details["lv_name"] # 5b. Add it to the crush map current_stage += 1 update( celery, f"Adding OSD {osd_id} to CRUSH map", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph osd crush add osd.{osd_id} {weight} root=default host={node}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to perform ceph osd crush add for OSD {osd_id}") return # 5c. Activate the OSD current_stage += 1 update( celery, f"Activating OSD {osd_id}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm activate --bluestore {osd_id} {osd_fsid}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to perform ceph osd crush add for OSD {osd_id}") return # 5d. Wait 1 second for it to activate time.sleep(1) # 5e. Verify it started retcode, stdout, stderr = common.run_os_command( f"systemctl status ceph-osd@{osd_id}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to start OSD {osd_id} process") return # 5f. Add the new OSD to PVC current_stage += 1 update( celery, f"Adding OSD {osd_id} to PVC", current=current_stage, total=total_stages, ) zkhandler.write( [ (("osd", osd_id), ""), (("osd.node", osd_id), node), (("osd.device", osd_id), device), (("osd.db_device", osd_id), db_device), (("osd.fsid", osd_id), ""), (("osd.ofsid", osd_id), osd_fsid), (("osd.cfsid", osd_id), osd_clusterfsid), (("osd.lvm", osd_id), ""), (("osd.vg", osd_id), osd_vg), (("osd.lv", osd_id), osd_lv), (("osd.is_split", osd_id), is_split), ( ("osd.stats", osd_id), '{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}', ), ] ) # 6. Wait for OSD to check in current_stage += 1 update( celery, "Waiting for new OSD(s) to report stats", current=current_stage, total=total_stages, ) last_osd = None for osd in created_osds: last_osd = osd while ( jloads( zkhandler.read( ("osd.stats", created_osds[last_osd][0]["tags"]["ceph.osd_id"]) ) )["weight"] == "|" ): time.sleep(1) # 7. Log it current_stage += 1 return finish( celery, f"Successfully created {len(created_osds.keys())} new OSD(s) {','.join(created_osds.keys())} on device {device}", current=current_stage, total=total_stages, ) def osd_worker_replace_osd( zkhandler, celery, node, osd_id, new_device, old_device=None, weight=None, ext_db_ratio=None, ext_db_size=None, ): # Try to determine if any other OSDs shared a block device with this OSD _, osd_list = get_list_osd(zkhandler, None) osd_block = zkhandler.read(("osd.device", osd_id)) all_osds_on_block = [ o for o in osd_list if o["node"] == node and o["device"] == osd_block ] all_osds_on_block_ids = [o["id"] for o in all_osds_on_block] # Set up stages current_stage = 0 total_stages = 3 _split_count = len(all_osds_on_block_ids) total_stages = total_stages + 10 * int(_split_count) if ( ext_db_ratio is not None or ext_db_size is not None or any([True for o in all_osds_on_block if o["db_device"]]) ): total_stages = total_stages + 2 * int(_split_count) start( celery, f"Replacing OSD(s) {','.join(all_osds_on_block_ids)} with device {new_device}", current=current_stage, total=total_stages, ) # Handle a detect device if that is passed if match(r"detect:", new_device): ddevice = common.get_detect_device(new_device) if ddevice is None: fail( celery, f"Failed to determine block device from detect string {new_device}", ) return else: log_info( celery, f"Determined block device {ddevice} from detect string {new_device}", ) new_device = ddevice # Check if device has a partition table; it's not valid if it does retcode, _, _ = common.run_os_command(f"sfdisk --dump {new_device}") if retcode < 1: fail( celery, f"Device {new_device} has a partition table and is unsuitable for an OSD", ) return # Phase 1: Try to determine what we can about the old device real_old_device = None # Determine information from a passed old_device if old_device is not None: found_osds = osd_worker_helper_find_osds_from_block(old_device) if found_osds and osd_id in found_osds.keys(): real_old_device = old_device else: log_warn( celery, f"No OSD(s) found on device {old_device}; falling back to PVC detection", ) # Try to get an old_device from our PVC information if real_old_device is None: found_osds = osd_worker_helper_find_osds_from_block(osd_block) if osd_id in found_osds.keys(): real_old_device = osd_block if real_old_device is None: skip_zap = True log_warn( celery, "No valid old block device found for OSD(s); skipping zap", ) else: skip_zap = False # Determine the weight of the OSD(s) if weight is None: weight = all_osds_on_block[0]["stats"]["weight"] # Take down the OSD(s), but keep it's CRUSH map details and IDs for osd in all_osds_on_block: osd_id = osd["id"] # 1. Set the OSD down and out so it will flush current_stage += 1 update( celery, f"Setting OSD {osd_id} down", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command(f"ceph osd down {osd_id}") log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to set OSD {osd_id} down") return current_stage += 1 update( celery, f"Setting OSD {osd_id} out", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command(f"ceph osd out {osd_id}") log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to set OSD {osd_id} out") return # 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete) current_stage += 1 update( celery, f"Waiting for OSD {osd_id} to be safe to remove", current=current_stage, total=total_stages, ) tcount = 0 while True: retcode, stdout, stderr = common.run_os_command( f"ceph osd safe-to-destroy osd.{osd_id}" ) if int(retcode) in [0, 11]: break else: common.run_os_command(f"ceph osd down {osd_id}") common.run_os_command(f"ceph osd out {osd_id}") time.sleep(1) tcount += 1 if tcount > 60: log_warn( celery, f"Timed out (60s) waiting for OSD {osd_id} to be safe to remove; proceeding anyways", ) break # 3. Stop the OSD process and wait for it to be terminated current_stage += 1 update( celery, f"Stopping OSD {osd_id}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"systemctl stop ceph-osd@{osd_id}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to stop OSD {osd_id}") return time.sleep(5) # 4. Destroy the OSD current_stage += 1 update( celery, f"Destroying OSD {osd_id}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph osd destroy {osd_id} --yes-i-really-mean-it" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to destroy OSD {osd_id}") return current_stage += 1 update( celery, f"Zapping old disk {real_old_device} if possible", current=current_stage, total=total_stages, ) if not skip_zap: # 5. Zap the old disk retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm zap --destroy {real_old_device}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: log_warn( celery, f"Failed to zap old disk {real_old_device}; proceeding anyways" ) # 6. Prepare the volume group on the new device current_stage += 1 update( celery, f"Preparing LVM volume group on new disk {new_device}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm zap --destroy {new_device}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to run ceph-volume lvm zap on new disk {new_device}") return retcode, stdout, stderr = common.run_os_command(f"pvcreate {new_device}") log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to run pvcreate on new disk {new_device}") return vg_uuid = str(uuid4()) retcode, stdout, stderr = common.run_os_command( f"vgcreate ceph-{vg_uuid} {new_device}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to run vgcreate on new disk {new_device}") return # Determine how many OSDs we want on the new device osds_count = len(all_osds_on_block) # Determine the size of the new device _, new_device_size_bytes, _ = common.run_os_command( f"blockdev --getsize64 {new_device}" ) # Calculate the size of each OSD (in MB) based on the default 4M extent size new_osd_size_mb = ( int(int(int(new_device_size_bytes) / osds_count) / 1024 / 1024 / 4) * 4 ) # Calculate the size, if applicable, of the OSD block if we were passed a ratio if ext_db_ratio is not None: osd_new_db_size_mb = int(int(int(new_osd_size_mb * ext_db_ratio) / 4) * 4) elif ext_db_size is not None: osd_new_db_size_mb = int( int(int(format_bytes_fromhuman(ext_db_size)) / 1024 / 1024 / 4) * 4 ) else: if all_osds_on_block[0]["db_device"]: _, new_device_size_bytes, _ = common.run_os_command( f"blockdev --getsize64 {all_osds_on_block[0]['db_device']}" ) osd_new_db_size_mb = int( int(int(new_device_size_bytes) / 1024 / 1024 / 4) * 4 ) else: osd_new_db_size_mb = None for osd in all_osds_on_block: osd_id = osd["id"] osd_fsid = osd["fsid"] current_stage += 1 update( celery, f"Preparing LVM logical volume for OSD {osd_id} on new disk {new_device}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"lvcreate -L {new_osd_size_mb}M -n osd-block-{osd_fsid} ceph-{vg_uuid}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to run lvcreate for OSD {osd_id}") return current_stage += 1 update( celery, f"Preparing OSD {osd_id} on new disk {new_device}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm prepare --bluestore --osd-id {osd_id} --osd-fsid {osd_fsid} --data /dev/ceph-{vg_uuid}/osd-block-{osd_fsid}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to run ceph-volume lvm prepare for OSD {osd_id}") return for osd in all_osds_on_block: osd_id = osd["id"] osd_fsid = osd["fsid"] if osd["db_device"]: db_device = f"osd-db/osd-{osd_id}" current_stage += 1 update( celery, f"Preparing Bluestore DB volume for OSD {osd_id} on {db_device}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"lvremove --force {db_device}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to run lvremove on {db_device}") return retcode, stdout, stderr = common.run_os_command( f"lvcreate -L {osd_new_db_size_mb}M -n osd-{osd_id} --yes osd-db" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to run lvcreate on {db_device}") return current_stage += 1 update( celery, f"Attaching Bluestore DB volume to OSD {osd_id}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm new-db --osd-id {osd_id} --osd-fsid {osd_fsid} --target {db_device}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to run ceph-volume lvm new-db for OSD {osd_id}") return current_stage += 1 update( celery, f"Updating OSD {osd_id} in CRUSH map", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph osd crush add osd.{osd_id} {weight} root=default host={node}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to run ceph osd crush add for OSD {osd_id}") return current_stage += 1 update( celery, f"Activating OSD {osd_id}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm activate --bluestore {osd_id} {osd_fsid}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to run ceph-volume lvm activate for OSD {osd_id}") return # Wait 1 second for it to activate time.sleep(1) # Verify it started retcode, stdout, stderr = common.run_os_command( f"systemctl status ceph-osd@{osd_id}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to start OSD {osd_id} process") return current_stage += 1 update( celery, f"Updating OSD {osd_id} in PVC", current=current_stage, total=total_stages, ) zkhandler.write( [ (("osd.device", osd_id), new_device), (("osd.vg", osd_id), f"ceph-{vg_uuid}"), (("osd.lv", osd_id), f"osd-block-{osd_fsid}"), ] ) # 6. Log it current_stage += 1 return finish( celery, f"Successfully replaced OSD(s) {','.join(all_osds_on_block_ids)} on new disk {new_device}", current=current_stage, total=total_stages, ) def osd_worker_refresh_osd( zkhandler, celery, node, osd_id, device, ext_db_flag, ): # Try to determine if any other OSDs shared a block device with this OSD _, osd_list = get_list_osd(zkhandler, None) osd_block = zkhandler.read(("osd.device", osd_id)) all_osds_on_block = [ o for o in osd_list if o["node"] == node and o["device"] == osd_block ] all_osds_on_block_ids = [o["id"] for o in all_osds_on_block] # Set up stages current_stage = 0 total_stages = 1 _split_count = len(all_osds_on_block_ids) total_stages = total_stages + 3 * int(_split_count) start( celery, f"Refreshing/reimporting OSD(s) {','.join(all_osds_on_block_ids)} on device {device}", current=current_stage, total=total_stages, ) # Handle a detect device if that is passed if match(r"detect:", device): ddevice = common.get_detect_device(device) if ddevice is None: fail( celery, f"Failed to determine block device from detect string {device}", ) return else: log_info( celery, f"Determined block device {ddevice} from detect string {device}", ) device = ddevice retcode, stdout, stderr = common.run_os_command("ceph osd ls") osd_list = stdout.split("\n") if osd_id not in osd_list: fail( celery, f"Could not find OSD {osd_id} in the cluster", ) return found_osds = osd_worker_helper_find_osds_from_block(device) if osd_id not in found_osds.keys(): fail( celery, f"Could not find OSD {osd_id} on device {device}", ) return for osd in found_osds: found_osd = found_osds[osd][0] lv_device = found_osd["lv_path"] _, osd_pvc_information = get_list_osd(zkhandler, osd) osd_information = osd_pvc_information[0] current_stage += 1 update( celery, "Querying for OSD on device", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm list --format json {lv_device}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to run ceph-volume lvm list for OSD {osd}") return osd_detail = jloads(stdout)[osd][0] osd_fsid = osd_detail["tags"]["ceph.osd_fsid"] if osd_fsid != osd_information["fsid"]: fail( celery, f"OSD {osd} FSID {osd_information['fsid']} does not match volume FSID {osd_fsid}; OSD cannot be imported", ) return dev_flags = f"--data {lv_device}" if ext_db_flag: db_device = "osd-db/osd-{osd}" dev_flags += f" --block.db {db_device}" if not path.exists(f"/dev/{db_device}"): fail( celery, f"OSD Bluestore DB volume {db_device} does not exist; OSD cannot be imported", ) return else: db_device = "" current_stage += 1 update( celery, f"Activating OSD {osd}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm activate --bluestore {osd} {osd_fsid}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to run ceph-volume lvm activate for OSD {osd}") return # Wait 1 second for it to activate time.sleep(1) # Verify it started retcode, stdout, stderr = common.run_os_command( f"systemctl status ceph-osd@{osd}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail(celery, f"Failed to start OSD {osd} process") return current_stage += 1 update( celery, f"Updating OSD {osd} in PVC", current=current_stage, total=total_stages, ) zkhandler.write( [ (("osd.device", osd), device), (("osd.vg", osd), osd_detail["vg_name"]), (("osd.lv", osd), osd_detail["lv_name"]), ] ) # 6. Log it current_stage += 1 return finish( celery, f"Successfully reimported OSD(s) {','.join(all_osds_on_block_ids)} on device {device}", current=current_stage, total=total_stages, ) def osd_worker_remove_osd( zkhandler, celery, node, osd_id, force_flag=False, skip_zap_flag=False ): # Get initial data data_device = zkhandler.read(("osd.device", osd_id)) if zkhandler.exists(("osd.db_device", osd_id)): db_device = zkhandler.read(("osd.db_device", osd_id)) else: db_device = None # Set up stages current_stage = 0 total_stages = 7 if not force_flag: total_stages += 1 if not skip_zap_flag: total_stages += 2 if db_device: total_stages += 1 start( celery, f"Removing OSD {osd_id}", current=current_stage, total=total_stages, ) # Verify the OSD is present retcode, stdout, stderr = common.run_os_command("ceph osd ls") osd_list = stdout.split("\n") if osd_id not in osd_list: if not force_flag: fail( celery, f"OSD {osd_id} not found in Ceph", ) return else: log_warn( celery, f"OSD {osd_id} not found in Ceph; ignoring error due to force flag", ) # 1. Set the OSD down and out so it will flush current_stage += 1 update( celery, f"Setting OSD {osd_id} down", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command(f"ceph osd down {osd_id}") log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: if not force_flag: fail( celery, f"Failed to set OSD {osd_id} down", ) return else: log_warn( celery, f"Failed to set OSD {osd_id} down; ignoring error due to force flag", ) current_stage += 1 update( celery, f"Setting OSD {osd_id} out", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command(f"ceph osd out {osd_id}") log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: if not force_flag: fail( celery, f"Failed to set OSD {osd_id} down", ) return else: log_warn( celery, f"Failed to set OSD {osd_id} down; ignoring error due to force flag", ) # 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete) if not force_flag: current_stage += 1 update( celery, f"Waiting for OSD {osd_id} to be safe to remove", current=current_stage, total=total_stages, ) tcount = 0 while True: retcode, stdout, stderr = common.run_os_command( f"ceph osd safe-to-destroy osd.{osd_id}" ) if int(retcode) in [0, 11]: break else: common.run_os_command(f"ceph osd down {osd_id}") common.run_os_command(f"ceph osd out {osd_id}") time.sleep(1) tcount += 1 if tcount > 60: log_warn( celery, f"Timed out (60s) waiting for OSD {osd_id} to be safe to remove; proceeding anyways", ) break # 3. Stop the OSD process and wait for it to be terminated current_stage += 1 update( celery, f"Stopping OSD {osd_id}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command(f"systemctl stop ceph-osd@{osd_id}") log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: if not force_flag: fail( celery, f"Failed to stop OSD {osd_id} process", ) return else: log_warn( celery, f"Failed to stop OSD {osd_id} process; ignoring error due to force flag", ) time.sleep(5) # 4. Delete OSD from ZK current_stage += 1 update( celery, f"Deleting OSD {osd_id} from PVC", current=current_stage, total=total_stages, ) zkhandler.delete(("osd", osd_id), recursive=True) # 5a. Destroy the OSD from Ceph current_stage += 1 update( celery, f"Destroying OSD {osd_id}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"ceph osd destroy {osd_id} --yes-i-really-mean-it" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: if not force_flag: fail( celery, f"Failed to destroy OSD {osd_id}", ) return else: log_warn( celery, f"Failed to destroy OSD {osd_id}; ignoring error due to force flag", ) time.sleep(2) # 5b. Purge the OSD from Ceph current_stage += 1 update( celery, f"Removing OSD {osd_id} from CRUSH map", current=current_stage, total=total_stages, ) # Remove the OSD from the CRUSH map retcode, stdout, stderr = common.run_os_command(f"ceph osd crush rm osd.{osd_id}") log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: if not force_flag: fail( celery, f"Failed to remove OSD {osd_id} from CRUSH map", ) return else: log_warn( celery, f"Failed to remove OSD {osd_id} from CRUSH map; ignoring error due to force flag", ) # Purge the OSD current_stage += 1 update( celery, f"Purging OSD {osd_id}", current=current_stage, total=total_stages, ) if force_flag: force_arg = "--force" else: force_arg = "" retcode, stdout, stderr = common.run_os_command( f"ceph osd purge {osd_id} {force_arg} --yes-i-really-mean-it" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: if not force_flag: fail( celery, f"Failed to purge OSD {osd_id}", ) return else: log_warn( celery, f"Failed to purge OSD {osd_id}; ignoring error due to force flag", ) # 6. Remove the DB device if db_device: current_stage += 1 update( celery, f"Removing OSD DB logical volume {db_device}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( f"lvremove --yes --force {db_device}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: if not force_flag: fail( celery, f"Failed to remove OSD DB logical volume {db_device}", ) return else: log_warn( celery, f"Failed to remove OSD DB logical volume {db_device}; ignoring error due to force flag", ) if not skip_zap_flag: current_stage += 1 update( celery, f"Zapping old disk {data_device} if possible", current=current_stage, total=total_stages, ) # 7. Determine the block devices found_osds = osd_worker_helper_find_osds_from_block(data_device) if osd_id in found_osds.keys(): # Try to determine if any other OSDs shared a block device with this OSD _, osd_list = get_list_osd(zkhandler, None) all_osds_on_block = [ o for o in osd_list if o["node"] == node and o["device"] == data_device ] if len(all_osds_on_block) < 1: log_info( celery, f"Found no peer split OSD(s) on {data_device}; zapping disk", ) retcode, stdout, stderr = common.run_os_command( f"ceph-volume lvm zap --destroy {data_device}" ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: if not force_flag: fail( celery, f"Failed to run ceph-volume lvm zap on device {data_device}", ) return else: log_warn( celery, f"Failed to run ceph-volume lvm zap on device {data_device}; ignoring error due to force flag", ) else: log_warn( celery, f"Found {len(all_osds_on_block)} OSD(s) still remaining on {data_device}; skipping zap", ) else: log_warn( celery, f"Could not find OSD {osd_id} on device {data_device}; skipping zap", ) # 6. Log it current_stage += 1 return finish( celery, f"Successfully removed OSD {osd_id}", current=current_stage, total=total_stages, ) def osd_worker_add_db_vg(zkhandler, celery, device): # Set up stages current_stage = 0 total_stages = 4 start( celery, f"Creating new OSD database volume group on device {device}", current=current_stage, total=total_stages, ) # Check if an existsing volume group exists retcode, stdout, stderr = common.run_os_command("vgdisplay osd-db") if retcode != 5: fail( celery, "Ceph OSD database VG already exists", ) return # Handle a detect device if that is passed if match(r"detect:", device): ddevice = common.get_detect_device(device) if ddevice is None: fail( celery, f"Failed to determine block device from detect string {device}", ) return else: log_info( celery, f"Determined block device {ddevice} from detect string {device}", ) device = ddevice # 1. Create an empty partition table current_stage += 1 update( celery, f"Creating partitions on device {device}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command("sgdisk --clear {}".format(device)) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail( celery, f"Failed to create partition table on device {device}", ) return retcode, stdout, stderr = common.run_os_command( "sgdisk --new 1:: --typecode 1:8e00 {}".format(device) ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail( celery, f"Failed to set partition type to LVM PV on device {device}", ) return # Handle the partition ID portion if search(r"by-path", device) or search(r"by-id", device): # /dev/disk/by-path/pci-0000:03:00.0-scsi-0:1:0:0 -> pci-0000:03:00.0-scsi-0:1:0:0-part1 partition = "{}-part1".format(device) elif search(r"nvme", device): # /dev/nvme0n1 -> nvme0n1p1 partition = "{}p1".format(device) else: # /dev/sda -> sda1 # No other '/dev/disk/by-*' types are valid for raw block devices anyways partition = "{}1".format(device) # 2. Create the PV current_stage += 1 update( celery, f"Creating LVM PV on device {device}", current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( "pvcreate --force {}".format(partition) ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail( celery, f"Failed to create LVM PV on device {device}", ) return # 2. Create the VG (named 'osd-db') current_stage += 1 update( celery, f'Creating LVM VG "osd-db" on device {device}', current=current_stage, total=total_stages, ) retcode, stdout, stderr = common.run_os_command( "vgcreate --force osd-db {}".format(partition) ) log_info(celery, f"stdout: {stdout}") log_info(celery, f"stderr: {stderr}") if retcode: fail( celery, f"Failed to create LVM VG on device {device}", ) return # Log it current_stage += 1 return finish( celery, f"Successfully created new OSD DB volume group on device {device}", current=current_stage, total=total_stages, )