#!/usr/bin/env python3 # ceph.py - PVC client function library, Ceph cluster fuctions # Part of the Parallel Virtual Cluster (PVC) system # # Copyright (C) 2018-2022 Joshua M. Boniface # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # ############################################################################### import os import re import json import time import math from concurrent.futures import ThreadPoolExecutor import daemon_lib.vm as vm import daemon_lib.common as common # # Supplemental functions # # Verify OSD is valid in cluster def verifyOSD(zkhandler, osd_id): return zkhandler.exists(("osd", osd_id)) # Verify Pool is valid in cluster def verifyPool(zkhandler, name): return zkhandler.exists(("pool", name)) # Verify Volume is valid in cluster def verifyVolume(zkhandler, pool, name): return zkhandler.exists(("volume", f"{pool}/{name}")) # Verify Snapshot is valid in cluster def verifySnapshot(zkhandler, pool, volume, name): return zkhandler.exists(("snapshot", f"{pool}/{volume}/{name}")) # Verify OSD path is valid in cluster def verifyOSDBlock(zkhandler, node, device): for osd in zkhandler.children("base.osd"): osd_node = zkhandler.read(("osd.node", osd)) osd_device = zkhandler.read(("osd.device", osd)) if node == osd_node and device == osd_device: return osd return None # Matrix of human-to-byte values byte_unit_matrix = { "B": 1, "K": 1024, "M": 1024 * 1024, "G": 1024 * 1024 * 1024, "T": 1024 * 1024 * 1024 * 1024, "P": 1024 * 1024 * 1024 * 1024 * 1024, "E": 1024 * 1024 * 1024 * 1024 * 1024 * 1024, "Z": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024, "Y": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024, "R": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024, "Q": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024, } # Matrix of human-to-metric values ops_unit_matrix = { "": 1, "K": 1000, "M": 1000 * 1000, "G": 1000 * 1000 * 1000, "T": 1000 * 1000 * 1000 * 1000, "P": 1000 * 1000 * 1000 * 1000 * 1000, "E": 1000 * 1000 * 1000 * 1000 * 1000 * 1000, "Z": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000, "Y": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000, "R": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000, "Q": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000, } # Format byte sizes to/from human-readable units def format_bytes_tohuman(databytes): datahuman = "" for unit in sorted(byte_unit_matrix, key=byte_unit_matrix.get, reverse=True): new_bytes = int(math.ceil(databytes / byte_unit_matrix[unit])) # Round up if 5 or more digits if new_bytes > 9999: # We can jump down another level continue else: # We're at the end, display with this size datahuman = "{}{}".format(new_bytes, unit) return datahuman def format_bytes_fromhuman(datahuman): if not re.search(r"[A-Za-z]+", datahuman): dataunit = "B" datasize = int(datahuman) else: dataunit = str(re.match(r"[0-9]+([A-Za-z])[iBb]*", datahuman).group(1)) datasize = int(re.match(r"([0-9]+)[A-Za-z]+", datahuman).group(1)) if byte_unit_matrix.get(dataunit): databytes = datasize * byte_unit_matrix[dataunit] return databytes else: return None # Format ops sizes to/from human-readable units def format_ops_tohuman(dataops): datahuman = "" for unit in sorted(ops_unit_matrix, key=ops_unit_matrix.get, reverse=True): new_ops = int(math.ceil(dataops / ops_unit_matrix[unit])) # Round up if 5 or more digits if new_ops > 9999: # We can jump down another level continue else: # We're at the end, display with this size datahuman = "{}{}".format(new_ops, unit) return datahuman def format_ops_fromhuman(datahuman): # Trim off human-readable character dataunit = datahuman[-1] datasize = int(datahuman[:-1]) dataops = datasize * ops_unit_matrix[dataunit] return "{}".format(dataops) def format_pct_tohuman(datapct): datahuman = "{0:.1f}".format(float(datapct * 100.0)) return datahuman # # Status functions # def get_status(zkhandler): primary_node = zkhandler.read("base.config.primary_node") ceph_status = zkhandler.read("base.storage").rstrip() # Create a data structure for the information status_data = { "type": "status", "primary_node": primary_node, "ceph_data": ceph_status, } return True, status_data def get_health(zkhandler): primary_node = zkhandler.read("base.config.primary_node") ceph_health = zkhandler.read("base.storage.health").rstrip() # Create a data structure for the information status_data = { "type": "health", "primary_node": primary_node, "ceph_data": ceph_health, } return True, status_data def get_util(zkhandler): primary_node = zkhandler.read("base.config.primary_node") ceph_df = zkhandler.read("base.storage.util").rstrip() # Create a data structure for the information status_data = { "type": "utilization", "primary_node": primary_node, "ceph_data": ceph_df, } return True, status_data # # OSD functions # def getClusterOSDList(zkhandler): # Get a list of VNIs by listing the children of /networks return zkhandler.children("base.osd") def getOSDInformation(zkhandler, osd_id): # Get the devices osd_node = zkhandler.read(("osd.node", osd_id)) osd_device = zkhandler.read(("osd.device", osd_id)) osd_db_device = zkhandler.read(("osd.db_device", osd_id)) # Parse the stats data osd_stats_raw = zkhandler.read(("osd.stats", osd_id)) osd_stats = dict(json.loads(osd_stats_raw)) osd_information = { "id": osd_id, "node": osd_node, "device": osd_device, "db_device": osd_db_device, "stats": osd_stats, } return osd_information # OSD DB VG actions use the /cmd/ceph pipe # These actions must occur on the specific node they reference def add_osd_db_vg(zkhandler, node, device): # Verify the target node exists if not common.verifyNode(zkhandler, node): return False, 'ERROR: No node named "{}" is present in the cluster.'.format( node ) # Tell the cluster to create a new OSD for the host add_osd_db_vg_string = "db_vg_add {},{}".format(node, device) zkhandler.write([("base.cmd.ceph", add_osd_db_vg_string)]) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively with zkhandler.readlock("base.cmd.ceph"): try: result = zkhandler.read("base.cmd.ceph").split()[0] if result == "success-db_vg_add": message = 'Created new OSD database VG at "{}" on node "{}".'.format( device, node ) success = True else: message = "ERROR: Failed to create new OSD database VG; check node logs for details." success = False except Exception: message = "ERROR: Command ignored by node." success = False # Acquire a write lock to ensure things go smoothly with zkhandler.writelock("base.cmd.ceph"): time.sleep(0.5) zkhandler.write([("base.cmd.ceph", "")]) return success, message # OSD actions use the /cmd/ceph pipe # These actions must occur on the specific node they reference def add_osd(zkhandler, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05): # Verify the target node exists if not common.verifyNode(zkhandler, node): return False, 'ERROR: No node named "{}" is present in the cluster.'.format( node ) # Verify target block device isn't in use block_osd = verifyOSDBlock(zkhandler, node, device) if block_osd: return ( False, 'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format( device, node, block_osd ), ) # Tell the cluster to create a new OSD for the host add_osd_string = "osd_add {},{},{},{},{}".format( node, device, weight, ext_db_flag, ext_db_ratio ) zkhandler.write([("base.cmd.ceph", add_osd_string)]) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively with zkhandler.readlock("base.cmd.ceph"): try: result = zkhandler.read("base.cmd.ceph").split()[0] if result == "success-osd_add": message = 'Created new OSD with block device "{}" on node "{}".'.format( device, node ) success = True else: message = ( "ERROR: Failed to create new OSD; check node logs for details." ) success = False except Exception: message = "ERROR: Command ignored by node." success = False # Acquire a write lock to ensure things go smoothly with zkhandler.writelock("base.cmd.ceph"): time.sleep(0.5) zkhandler.write([("base.cmd.ceph", "")]) return success, message def replace_osd(zkhandler, osd_id, new_device, weight): # Get current OSD information osd_information = getOSDInformation(zkhandler, osd_id) node = osd_information["node"] old_device = osd_information["device"] ext_db_flag = True if osd_information["db_device"] else False # Verify target block device isn't in use block_osd = verifyOSDBlock(zkhandler, node, new_device) if block_osd and block_osd != osd_id: return ( False, 'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format( new_device, node, block_osd ), ) # Tell the cluster to create a new OSD for the host replace_osd_string = "osd_replace {},{},{},{},{},{}".format( node, osd_id, old_device, new_device, weight, ext_db_flag ) zkhandler.write([("base.cmd.ceph", replace_osd_string)]) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively with zkhandler.readlock("base.cmd.ceph"): try: result = zkhandler.read("base.cmd.ceph").split()[0] if result == "success-osd_replace": message = 'Replaced OSD {} with block device "{}" on node "{}".'.format( osd_id, new_device, node ) success = True else: message = "ERROR: Failed to replace OSD; check node logs for details." success = False except Exception: message = "ERROR: Command ignored by node." success = False # Acquire a write lock to ensure things go smoothly with zkhandler.writelock("base.cmd.ceph"): time.sleep(0.5) zkhandler.write([("base.cmd.ceph", "")]) return success, message def refresh_osd(zkhandler, osd_id, device): # Get current OSD information osd_information = getOSDInformation(zkhandler, osd_id) node = osd_information["node"] ext_db_flag = True if osd_information["db_device"] else False # Verify target block device isn't in use block_osd = verifyOSDBlock(zkhandler, node, device) if not block_osd or block_osd != osd_id: return ( False, 'ERROR: Block device "{}" on node "{}" is not used by OSD "{}"; use replace instead'.format( device, node, osd_id ), ) # Tell the cluster to create a new OSD for the host refresh_osd_string = "osd_refresh {},{},{},{}".format( node, osd_id, device, ext_db_flag ) zkhandler.write([("base.cmd.ceph", refresh_osd_string)]) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively with zkhandler.readlock("base.cmd.ceph"): try: result = zkhandler.read("base.cmd.ceph").split()[0] if result == "success-osd_refresh": message = ( 'Refreshed OSD {} with block device "{}" on node "{}".'.format( osd_id, device, node ) ) success = True else: message = "ERROR: Failed to refresh OSD; check node logs for details." success = False except Exception: message = "ERROR: Command ignored by node." success = False # Acquire a write lock to ensure things go smoothly with zkhandler.writelock("base.cmd.ceph"): time.sleep(0.5) zkhandler.write([("base.cmd.ceph", "")]) return success, message def remove_osd(zkhandler, osd_id, force_flag): if not verifyOSD(zkhandler, osd_id): return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format( osd_id ) # Tell the cluster to remove an OSD remove_osd_string = "osd_remove {},{}".format(osd_id, str(force_flag)) zkhandler.write([("base.cmd.ceph", remove_osd_string)]) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively with zkhandler.readlock("base.cmd.ceph"): try: result = zkhandler.read("base.cmd.ceph").split()[0] if result == "success-osd_remove": message = 'Removed OSD "{}" from the cluster.'.format(osd_id) success = True else: message = "ERROR: Failed to remove OSD; check node logs for details." success = False except Exception: success = False message = "ERROR Command ignored by node." # Acquire a write lock to ensure things go smoothly with zkhandler.writelock("base.cmd.ceph"): time.sleep(0.5) zkhandler.write([("base.cmd.ceph", "")]) return success, message def in_osd(zkhandler, osd_id): if not verifyOSD(zkhandler, osd_id): return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format( osd_id ) retcode, stdout, stderr = common.run_os_command("ceph osd in {}".format(osd_id)) if retcode: return False, "ERROR: Failed to enable OSD {}: {}".format(osd_id, stderr) return True, "Set OSD {} online.".format(osd_id) def out_osd(zkhandler, osd_id): if not verifyOSD(zkhandler, osd_id): return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format( osd_id ) retcode, stdout, stderr = common.run_os_command("ceph osd out {}".format(osd_id)) if retcode: return False, "ERROR: Failed to disable OSD {}: {}".format(osd_id, stderr) return True, "Set OSD {} offline.".format(osd_id) def set_osd(zkhandler, option): retcode, stdout, stderr = common.run_os_command("ceph osd set {}".format(option)) if retcode: return False, 'ERROR: Failed to set property "{}": {}'.format(option, stderr) return True, 'Set OSD property "{}".'.format(option) def unset_osd(zkhandler, option): retcode, stdout, stderr = common.run_os_command("ceph osd unset {}".format(option)) if retcode: return False, 'ERROR: Failed to unset property "{}": {}'.format(option, stderr) return True, 'Unset OSD property "{}".'.format(option) def get_list_osd(zkhandler, limit, is_fuzzy=True): osd_list = [] full_osd_list = zkhandler.children("base.osd") if is_fuzzy and limit: # Implicitly assume fuzzy limits if not re.match(r"\^.*", limit): limit = ".*" + limit if not re.match(r".*\$", limit): limit = limit + ".*" for osd in full_osd_list: if limit: try: if re.fullmatch(limit, osd): osd_list.append(getOSDInformation(zkhandler, osd)) except Exception as e: return False, "Regex Error: {}".format(e) else: osd_list.append(getOSDInformation(zkhandler, osd)) return True, sorted(osd_list, key=lambda x: int(x["id"])) # # Pool functions # def getPoolInformation(zkhandler, pool): # Parse the stats data pool_stats_raw = zkhandler.read(("pool.stats", pool)) pool_stats = dict(json.loads(pool_stats_raw)) volume_count = len(getCephVolumes(zkhandler, pool)) tier = zkhandler.read(("pool.tier", pool)) if tier is None: tier = "default" pgs = zkhandler.read(("pool.pgs", pool)) pool_information = { "name": pool, "volume_count": volume_count, "tier": tier, "pgs": pgs, "stats": pool_stats, } return pool_information def add_pool(zkhandler, name, pgs, replcfg, tier=None): # Prepare the copies/mincopies variables try: copies, mincopies = replcfg.split(",") copies = int(copies.replace("copies=", "")) mincopies = int(mincopies.replace("mincopies=", "")) except Exception: copies = None mincopies = None if not copies or not mincopies: return False, f'ERROR: Replication configuration "{replcfg}" is not valid.' # Prepare the tiers if applicable if tier is not None and tier in ["hdd", "ssd", "nvme"]: crush_rule = f"{tier}_tier" # Create a CRUSH rule for the relevant tier retcode, stdout, stderr = common.run_os_command( f"ceph osd crush rule create-replicated {crush_rule} default host {tier}" ) if retcode: return ( False, f"ERROR: Failed to create CRUSH rule {tier} for pool {name}: {stderr}", ) else: tier = "default" crush_rule = "replicated" # Create the pool retcode, stdout, stderr = common.run_os_command( f"ceph osd pool create {name} {pgs} {pgs} {crush_rule}" ) if retcode: return False, f'ERROR: Failed to create pool "{name}" with {pgs} PGs: {stderr}' # Set the size and minsize retcode, stdout, stderr = common.run_os_command( f"ceph osd pool set {name} size {copies}" ) if retcode: return False, f'ERROR: Failed to set pool "{name}" size of {copies}: {stderr}' retcode, stdout, stderr = common.run_os_command( f"ceph osd pool set {name} min_size {mincopies}" ) if retcode: return ( False, f'ERROR: Failed to set pool "{name}" minimum size of {mincopies}: {stderr}', ) # Enable RBD application retcode, stdout, stderr = common.run_os_command( f"ceph osd pool application enable {name} rbd" ) if retcode: return ( False, f'ERROR: Failed to enable RBD application on pool "{name}" : {stderr}', ) # Add the new pool to Zookeeper zkhandler.write( [ (("pool", name), ""), (("pool.pgs", name), pgs), (("pool.tier", name), tier), (("pool.stats", name), "{}"), (("volume", name), ""), (("snapshot", name), ""), ] ) return True, f'Created RBD pool "{name}" with {pgs} PGs' def remove_pool(zkhandler, name): if not verifyPool(zkhandler, name): return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format( name ) # 1. Remove pool volumes for volume in zkhandler.children(("volume", name)): remove_volume(zkhandler, name, volume) # 2. Remove the pool retcode, stdout, stderr = common.run_os_command( "ceph osd pool rm {pool} {pool} --yes-i-really-really-mean-it".format(pool=name) ) if retcode: return False, 'ERROR: Failed to remove pool "{}": {}'.format(name, stderr) # 3. Delete pool from Zookeeper zkhandler.delete( [ ("pool", name), ("volume", name), ("snapshot", name), ] ) return True, 'Removed RBD pool "{}" and all volumes.'.format(name) def set_pgs_pool(zkhandler, name, pgs): if not verifyPool(zkhandler, name): return False, f'ERROR: No pool with name "{name}" is present in the cluster.' # Validate new PGs count pgs = int(pgs) if (pgs == 0) or (pgs & (pgs - 1) != 0): return ( False, f'ERROR: Invalid PGs number "{pgs}": must be a non-zero power of 2.', ) # Set the new pgs number retcode, stdout, stderr = common.run_os_command( f"ceph osd pool set {name} pg_num {pgs}" ) if retcode: return False, f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}" # Set the new pgps number if increasing current_pgs = int(zkhandler.read(("pool.pgs", name))) if current_pgs >= pgs: retcode, stdout, stderr = common.run_os_command( f"ceph osd pool set {name} pgp_num {pgs}" ) if retcode: return ( False, f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}", ) # Update Zookeeper count zkhandler.write( [ (("pool.pgs", name), pgs), ] ) return True, f'Set PGs count to {pgs} for RBD pool "{name}".' def get_list_pool(zkhandler, limit, is_fuzzy=True): full_pool_list = zkhandler.children("base.pool") if is_fuzzy and limit: # Implicitly assume fuzzy limits if not re.match(r"\^.*", limit): limit = ".*" + limit if not re.match(r".*\$", limit): limit = limit + ".*" get_pool_info = dict() for pool in full_pool_list: is_limit_match = False if limit: try: if re.fullmatch(limit, pool): is_limit_match = True except Exception as e: return False, "Regex Error: {}".format(e) else: is_limit_match = True get_pool_info[pool] = True if is_limit_match else False pool_execute_list = [pool for pool in full_pool_list if get_pool_info[pool]] pool_data_list = list() with ThreadPoolExecutor(max_workers=32, thread_name_prefix="pool_list") as executor: futures = [] for pool in pool_execute_list: futures.append(executor.submit(getPoolInformation, zkhandler, pool)) for future in futures: pool_data_list.append(future.result()) return True, sorted(pool_data_list, key=lambda x: int(x["stats"].get("id", 0))) # # Volume functions # def getCephVolumes(zkhandler, pool): volume_list = list() if not pool: pool_list = zkhandler.children("base.pool") else: pool_list = [pool] for pool_name in pool_list: for volume_name in zkhandler.children(("volume", pool_name)): volume_list.append("{}/{}".format(pool_name, volume_name)) return volume_list def getVolumeInformation(zkhandler, pool, volume): # Parse the stats data volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}")) volume_stats = dict(json.loads(volume_stats_raw)) # Format the size to something nicer volume_stats["size"] = format_bytes_tohuman(volume_stats["size"]) volume_information = {"name": volume, "pool": pool, "stats": volume_stats} return volume_information def add_volume(zkhandler, pool, name, size): # 1. Verify the size of the volume pool_information = getPoolInformation(zkhandler, pool) size_bytes = format_bytes_fromhuman(size) if size_bytes is None: return ( False, f"ERROR: Requested volume size '{size}' does not have a valid SI unit", ) if size_bytes >= int(pool_information["stats"]["free_bytes"]): return ( False, f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')", ) # 2. Create the volume retcode, stdout, stderr = common.run_os_command( "rbd create --size {}B {}/{}".format(size_bytes, pool, name) ) if retcode: return False, 'ERROR: Failed to create RBD volume "{}": {}'.format(name, stderr) # 2. Get volume stats retcode, stdout, stderr = common.run_os_command( "rbd info --format json {}/{}".format(pool, name) ) volstats = stdout # 3. Add the new volume to Zookeeper zkhandler.write( [ (("volume", f"{pool}/{name}"), ""), (("volume.stats", f"{pool}/{name}"), volstats), (("snapshot", f"{pool}/{name}"), ""), ] ) return True, 'Created RBD volume "{}" of size "{}" in pool "{}".'.format( name, format_bytes_tohuman(size_bytes), pool ) def clone_volume(zkhandler, pool, name_src, name_new): if not verifyVolume(zkhandler, pool, name_src): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( name_src, pool ) # 1. Clone the volume retcode, stdout, stderr = common.run_os_command( "rbd copy {}/{} {}/{}".format(pool, name_src, pool, name_new) ) if retcode: return ( False, 'ERROR: Failed to clone RBD volume "{}" to "{}" in pool "{}": {}'.format( name_src, name_new, pool, stderr ), ) # 2. Get volume stats retcode, stdout, stderr = common.run_os_command( "rbd info --format json {}/{}".format(pool, name_new) ) volstats = stdout # 3. Add the new volume to Zookeeper zkhandler.write( [ (("volume", f"{pool}/{name_new}"), ""), (("volume.stats", f"{pool}/{name_new}"), volstats), (("snapshot", f"{pool}/{name_new}"), ""), ] ) return True, 'Cloned RBD volume "{}" to "{}" in pool "{}"'.format( name_src, name_new, pool ) def resize_volume(zkhandler, pool, name, size): if not verifyVolume(zkhandler, pool, name): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( name, pool ) # 1. Verify the size of the volume pool_information = getPoolInformation(zkhandler, pool) size_bytes = format_bytes_fromhuman(size) if size_bytes is None: return ( False, f"ERROR: Requested volume size '{size}' does not have a valid SI unit", ) if size_bytes >= int(pool_information["stats"]["free_bytes"]): return ( False, f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')", ) # 2. Resize the volume retcode, stdout, stderr = common.run_os_command( "rbd resize --size {} {}/{}".format( format_bytes_tohuman(size_bytes), pool, name ) ) if retcode: return ( False, 'ERROR: Failed to resize RBD volume "{}" to size "{}" in pool "{}": {}'.format( name, format_bytes_tohuman(size_bytes), pool, stderr ), ) # 3a. Determine the node running this VM if applicable active_node = None volume_vm_name = name.split("_")[0] retcode, vm_info = vm.get_info(zkhandler, volume_vm_name) if retcode: for disk in vm_info["disks"]: # This block device is present in this VM so we can continue if disk["name"] == "{}/{}".format(pool, name): active_node = vm_info["node"] volume_id = disk["dev"] # 3b. Perform a live resize in libvirt if the VM is running if active_node is not None and vm_info.get("state", "") == "start": import libvirt # Run the libvirt command against the target host try: dest_lv = "qemu+tcp://{}/system".format(active_node) target_lv_conn = libvirt.open(dest_lv) target_vm_conn = target_lv_conn.lookupByName(vm_info["name"]) if target_vm_conn: target_vm_conn.blockResize( volume_id, size_bytes, libvirt.VIR_DOMAIN_BLOCK_RESIZE_BYTES, ) target_lv_conn.close() except Exception: pass # 4. Get volume stats retcode, stdout, stderr = common.run_os_command( "rbd info --format json {}/{}".format(pool, name) ) volstats = stdout # 5. Update the volume in Zookeeper zkhandler.write( [ (("volume", f"{pool}/{name}"), ""), (("volume.stats", f"{pool}/{name}"), volstats), (("snapshot", f"{pool}/{name}"), ""), ] ) return True, 'Resized RBD volume "{}" to size "{}" in pool "{}".'.format( name, format_bytes_tohuman(size_bytes), pool ) def rename_volume(zkhandler, pool, name, new_name): if not verifyVolume(zkhandler, pool, name): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( name, pool ) # 1. Rename the volume retcode, stdout, stderr = common.run_os_command( "rbd rename {}/{} {}".format(pool, name, new_name) ) if retcode: return ( False, 'ERROR: Failed to rename volume "{}" to "{}" in pool "{}": {}'.format( name, new_name, pool, stderr ), ) # 2. Rename the volume in Zookeeper zkhandler.rename( [ (("volume", f"{pool}/{name}"), ("volume", f"{pool}/{new_name}")), (("snapshot", f"{pool}/{name}"), ("snapshot", f"{pool}/{new_name}")), ] ) # 3. Get volume stats retcode, stdout, stderr = common.run_os_command( "rbd info --format json {}/{}".format(pool, new_name) ) volstats = stdout # 4. Update the volume stats in Zookeeper zkhandler.write( [ (("volume.stats", f"{pool}/{new_name}"), volstats), ] ) return True, 'Renamed RBD volume "{}" to "{}" in pool "{}".'.format( name, new_name, pool ) def remove_volume(zkhandler, pool, name): if not verifyVolume(zkhandler, pool, name): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( name, pool ) # 1. Remove volume snapshots for snapshot in zkhandler.children(("snapshot", f"{pool}/{name}")): remove_snapshot(zkhandler, pool, name, snapshot) # 2. Remove the volume retcode, stdout, stderr = common.run_os_command("rbd rm {}/{}".format(pool, name)) if retcode: return False, 'ERROR: Failed to remove RBD volume "{}" in pool "{}": {}'.format( name, pool, stderr ) # 3. Delete volume from Zookeeper zkhandler.delete( [ ("volume", f"{pool}/{name}"), ("snapshot", f"{pool}/{name}"), ] ) return True, 'Removed RBD volume "{}" in pool "{}".'.format(name, pool) def map_volume(zkhandler, pool, name): if not verifyVolume(zkhandler, pool, name): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( name, pool ) # 1. Map the volume onto the local system retcode, stdout, stderr = common.run_os_command("rbd map {}/{}".format(pool, name)) if retcode: return False, 'ERROR: Failed to map RBD volume "{}" in pool "{}": {}'.format( name, pool, stderr ) # 2. Calculate the absolute path to the mapped volume mapped_volume = "/dev/rbd/{}/{}".format(pool, name) # 3. Ensure the volume exists if not os.path.exists(mapped_volume): return ( False, 'ERROR: Mapped volume not found at expected location "{}".'.format( mapped_volume ), ) return True, mapped_volume def unmap_volume(zkhandler, pool, name): if not verifyVolume(zkhandler, pool, name): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( name, pool ) mapped_volume = "/dev/rbd/{}/{}".format(pool, name) # 1. Ensure the volume exists if not os.path.exists(mapped_volume): return ( False, 'ERROR: Mapped volume not found at expected location "{}".'.format( mapped_volume ), ) # 2. Unap the volume retcode, stdout, stderr = common.run_os_command( "rbd unmap {}".format(mapped_volume) ) if retcode: return False, 'ERROR: Failed to unmap RBD volume at "{}": {}'.format( mapped_volume, stderr ) return True, 'Unmapped RBD volume at "{}".'.format(mapped_volume) def get_list_volume(zkhandler, pool, limit, is_fuzzy=True): if pool and not verifyPool(zkhandler, pool): return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format( pool ) full_volume_list = getCephVolumes(zkhandler, pool) if is_fuzzy and limit: # Implicitly assume fuzzy limits if not re.match(r"\^.*", limit): limit = ".*" + limit if not re.match(r".*\$", limit): limit = limit + ".*" get_volume_info = dict() for volume in full_volume_list: pool_name, volume_name = volume.split("/") is_limit_match = False # Check on limit if limit: # Try to match the limit against the volume name try: if re.fullmatch(limit, volume_name): is_limit_match = True except Exception as e: return False, "Regex Error: {}".format(e) else: is_limit_match = True get_volume_info[volume] = True if is_limit_match else False # Obtain our volume data in a thread pool volume_execute_list = [ volume for volume in full_volume_list if get_volume_info[volume] ] volume_data_list = list() with ThreadPoolExecutor( max_workers=32, thread_name_prefix="volume_list" ) as executor: futures = [] for volume in volume_execute_list: pool_name, volume_name = volume.split("/") futures.append( executor.submit(getVolumeInformation, zkhandler, pool_name, volume_name) ) for future in futures: volume_data_list.append(future.result()) return True, sorted(volume_data_list, key=lambda x: str(x["name"])) # # Snapshot functions # def getCephSnapshots(zkhandler, pool, volume): snapshot_list = list() volume_list = list() volume_list = getCephVolumes(zkhandler, pool) if volume: for volume_entry in volume_list: volume_pool, volume_name = volume_entry.split("/") if volume_name == volume: volume_list = ["{}/{}".format(volume_pool, volume_name)] for volume_entry in volume_list: for snapshot_name in zkhandler.children(("snapshot", volume_entry)): snapshot_list.append("{}@{}".format(volume_entry, snapshot_name)) return snapshot_list def add_snapshot(zkhandler, pool, volume, name): if not verifyVolume(zkhandler, pool, volume): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( volume, pool ) # 1. Create the snapshot retcode, stdout, stderr = common.run_os_command( "rbd snap create {}/{}@{}".format(pool, volume, name) ) if retcode: return ( False, 'ERROR: Failed to create RBD snapshot "{}" of volume "{}" in pool "{}": {}'.format( name, volume, pool, stderr ), ) # 2. Add the snapshot to Zookeeper zkhandler.write( [ (("snapshot", f"{pool}/{volume}/{name}"), ""), (("snapshot.stats", f"{pool}/{volume}/{name}"), "{}"), ] ) # 3. Update the count of snapshots on this volume volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}")) volume_stats = dict(json.loads(volume_stats_raw)) # Format the size to something nicer volume_stats["snapshot_count"] = volume_stats["snapshot_count"] + 1 volume_stats_raw = json.dumps(volume_stats) zkhandler.write( [ (("volume.stats", f"{pool}/{volume}"), volume_stats_raw), ] ) return True, 'Created RBD snapshot "{}" of volume "{}" in pool "{}".'.format( name, volume, pool ) def rename_snapshot(zkhandler, pool, volume, name, new_name): if not verifyVolume(zkhandler, pool, volume): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( volume, pool ) if not verifySnapshot(zkhandler, pool, volume, name): return ( False, 'ERROR: No snapshot with name "{}" is present for volume "{}" in pool "{}".'.format( name, volume, pool ), ) # 1. Rename the snapshot retcode, stdout, stderr = common.run_os_command( "rbd snap rename {pool}/{volume}@{name} {pool}/{volume}@{new_name}".format( pool=pool, volume=volume, name=name, new_name=new_name ) ) if retcode: return ( False, 'ERROR: Failed to rename RBD snapshot "{}" to "{}" for volume "{}" in pool "{}": {}'.format( name, new_name, volume, pool, stderr ), ) # 2. Rename the snapshot in ZK zkhandler.rename( [ ( ("snapshot", f"{pool}/{volume}/{name}"), ("snapshot", f"{pool}/{volume}/{new_name}"), ), ] ) return ( True, 'Renamed RBD snapshot "{}" to "{}" for volume "{}" in pool "{}".'.format( name, new_name, volume, pool ), ) def remove_snapshot(zkhandler, pool, volume, name): if not verifyVolume(zkhandler, pool, volume): return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format( volume, pool ) if not verifySnapshot(zkhandler, pool, volume, name): return ( False, 'ERROR: No snapshot with name "{}" is present of volume {} in pool {}.'.format( name, volume, pool ), ) # 1. Remove the snapshot retcode, stdout, stderr = common.run_os_command( "rbd snap rm {}/{}@{}".format(pool, volume, name) ) if retcode: return ( False, 'Failed to remove RBD snapshot "{}" of volume "{}" in pool "{}": {}'.format( name, volume, pool, stderr ), ) # 2. Delete snapshot from Zookeeper zkhandler.delete([("snapshot", f"{pool}/{volume}/{name}")]) # 3. Update the count of snapshots on this volume volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}")) volume_stats = dict(json.loads(volume_stats_raw)) # Format the size to something nicer volume_stats["snapshot_count"] = volume_stats["snapshot_count"] - 1 volume_stats_raw = json.dumps(volume_stats) zkhandler.write([(("volume.stats", f"{pool}/{volume}"), volume_stats_raw)]) return True, 'Removed RBD snapshot "{}" of volume "{}" in pool "{}".'.format( name, volume, pool ) def get_list_snapshot(zkhandler, pool, volume, limit, is_fuzzy=True): snapshot_list = [] if pool and not verifyPool(zkhandler, pool): return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format( pool ) if volume and not verifyPool(zkhandler, volume): return ( False, 'ERROR: No volume with name "{}" is present in the cluster.'.format(volume), ) full_snapshot_list = getCephSnapshots(zkhandler, pool, volume) if is_fuzzy and limit: # Implicitly assume fuzzy limits if not re.match(r"\^.*", limit): limit = ".*" + limit if not re.match(r".*\$", limit): limit = limit + ".*" for snapshot in full_snapshot_list: volume, snapshot_name = snapshot.split("@") pool_name, volume_name = volume.split("/") if limit: try: if re.fullmatch(limit, snapshot_name): snapshot_list.append( { "pool": pool_name, "volume": volume_name, "snapshot": snapshot_name, } ) except Exception as e: return False, "Regex Error: {}".format(e) else: snapshot_list.append( {"pool": pool_name, "volume": volume_name, "snapshot": snapshot_name} ) return True, sorted(snapshot_list, key=lambda x: str(x["snapshot"]))