pvc/daemon-common/ceph.py

1288 lines
41 KiB
Python

#!/usr/bin/env python3
# ceph.py - PVC client function library, Ceph cluster fuctions
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import os
import re
import json
import time
import math
from concurrent.futures import ThreadPoolExecutor
import daemon_lib.vm as vm
import daemon_lib.common as common
#
# Supplemental functions
#
# Verify OSD is valid in cluster
def verifyOSD(zkhandler, osd_id):
return zkhandler.exists(("osd", osd_id))
# Verify Pool is valid in cluster
def verifyPool(zkhandler, name):
return zkhandler.exists(("pool", name))
# Verify Volume is valid in cluster
def verifyVolume(zkhandler, pool, name):
return zkhandler.exists(("volume", f"{pool}/{name}"))
# Verify Snapshot is valid in cluster
def verifySnapshot(zkhandler, pool, volume, name):
return zkhandler.exists(("snapshot", f"{pool}/{volume}/{name}"))
# Verify OSD path is valid in cluster
def verifyOSDBlock(zkhandler, node, device):
for osd in zkhandler.children("base.osd"):
osd_node = zkhandler.read(("osd.node", osd))
osd_device = zkhandler.read(("osd.device", osd))
if node == osd_node and device == osd_device:
return osd
return None
# Matrix of human-to-byte values
byte_unit_matrix = {
"B": 1,
"K": 1024,
"M": 1024 * 1024,
"G": 1024 * 1024 * 1024,
"T": 1024 * 1024 * 1024 * 1024,
"P": 1024 * 1024 * 1024 * 1024 * 1024,
"E": 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
"Z": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
"Y": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
"R": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
"Q": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
}
# Matrix of human-to-metric values
ops_unit_matrix = {
"": 1,
"K": 1000,
"M": 1000 * 1000,
"G": 1000 * 1000 * 1000,
"T": 1000 * 1000 * 1000 * 1000,
"P": 1000 * 1000 * 1000 * 1000 * 1000,
"E": 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
"Z": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
"Y": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
"R": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
"Q": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
}
# Format byte sizes to/from human-readable units
def format_bytes_tohuman(databytes):
datahuman = ""
for unit in sorted(byte_unit_matrix, key=byte_unit_matrix.get, reverse=True):
new_bytes = int(math.ceil(databytes / byte_unit_matrix[unit]))
# Round up if 5 or more digits
if new_bytes > 9999:
# We can jump down another level
continue
else:
# We're at the end, display with this size
datahuman = "{}{}".format(new_bytes, unit)
return datahuman
def format_bytes_fromhuman(datahuman):
if not re.search(r"[A-Za-z]+", datahuman):
dataunit = "B"
datasize = int(datahuman)
else:
dataunit = str(re.match(r"[0-9]+([A-Za-z])[iBb]*", datahuman).group(1))
datasize = int(re.match(r"([0-9]+)[A-Za-z]+", datahuman).group(1))
if byte_unit_matrix.get(dataunit):
databytes = datasize * byte_unit_matrix[dataunit]
return databytes
else:
return None
# Format ops sizes to/from human-readable units
def format_ops_tohuman(dataops):
datahuman = ""
for unit in sorted(ops_unit_matrix, key=ops_unit_matrix.get, reverse=True):
new_ops = int(math.ceil(dataops / ops_unit_matrix[unit]))
# Round up if 5 or more digits
if new_ops > 9999:
# We can jump down another level
continue
else:
# We're at the end, display with this size
datahuman = "{}{}".format(new_ops, unit)
return datahuman
def format_ops_fromhuman(datahuman):
# Trim off human-readable character
dataunit = datahuman[-1]
datasize = int(datahuman[:-1])
dataops = datasize * ops_unit_matrix[dataunit]
return "{}".format(dataops)
def format_pct_tohuman(datapct):
datahuman = "{0:.1f}".format(float(datapct * 100.0))
return datahuman
#
# Status functions
#
def get_status(zkhandler):
primary_node = zkhandler.read("base.config.primary_node")
ceph_status = zkhandler.read("base.storage").rstrip()
# Create a data structure for the information
status_data = {
"type": "status",
"primary_node": primary_node,
"ceph_data": ceph_status,
}
return True, status_data
def get_health(zkhandler):
primary_node = zkhandler.read("base.config.primary_node")
ceph_health = zkhandler.read("base.storage.health").rstrip()
# Create a data structure for the information
status_data = {
"type": "health",
"primary_node": primary_node,
"ceph_data": ceph_health,
}
return True, status_data
def get_util(zkhandler):
primary_node = zkhandler.read("base.config.primary_node")
ceph_df = zkhandler.read("base.storage.util").rstrip()
# Create a data structure for the information
status_data = {
"type": "utilization",
"primary_node": primary_node,
"ceph_data": ceph_df,
}
return True, status_data
#
# OSD functions
#
def getClusterOSDList(zkhandler):
# Get a list of VNIs by listing the children of /networks
return zkhandler.children("base.osd")
def getOSDInformation(zkhandler, osd_id):
# Get the devices
osd_node = zkhandler.read(("osd.node", osd_id))
osd_device = zkhandler.read(("osd.device", osd_id))
osd_db_device = zkhandler.read(("osd.db_device", osd_id))
# Parse the stats data
osd_stats_raw = zkhandler.read(("osd.stats", osd_id))
osd_stats = dict(json.loads(osd_stats_raw))
osd_information = {
"id": osd_id,
"node": osd_node,
"device": osd_device,
"db_device": osd_db_device,
"stats": osd_stats,
}
return osd_information
# OSD DB VG actions use the /cmd/ceph pipe
# These actions must occur on the specific node they reference
def add_osd_db_vg(zkhandler, node, device):
# Verify the target node exists
if not common.verifyNode(zkhandler, node):
return False, 'ERROR: No node named "{}" is present in the cluster.'.format(
node
)
# Tell the cluster to create a new OSD for the host
add_osd_db_vg_string = "db_vg_add {},{}".format(node, device)
zkhandler.write([("base.cmd.ceph", add_osd_db_vg_string)])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
with zkhandler.readlock("base.cmd.ceph"):
try:
result = zkhandler.read("base.cmd.ceph").split()[0]
if result == "success-db_vg_add":
message = 'Created new OSD database VG at "{}" on node "{}".'.format(
device, node
)
success = True
else:
message = "ERROR: Failed to create new OSD database VG; check node logs for details."
success = False
except Exception:
message = "ERROR: Command ignored by node."
success = False
# Acquire a write lock to ensure things go smoothly
with zkhandler.writelock("base.cmd.ceph"):
time.sleep(0.5)
zkhandler.write([("base.cmd.ceph", "")])
return success, message
# OSD actions use the /cmd/ceph pipe
# These actions must occur on the specific node they reference
def add_osd(zkhandler, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05):
# Verify the target node exists
if not common.verifyNode(zkhandler, node):
return False, 'ERROR: No node named "{}" is present in the cluster.'.format(
node
)
# Verify target block device isn't in use
block_osd = verifyOSDBlock(zkhandler, node, device)
if block_osd:
return (
False,
'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format(
device, node, block_osd
),
)
# Tell the cluster to create a new OSD for the host
add_osd_string = "osd_add {},{},{},{},{}".format(
node, device, weight, ext_db_flag, ext_db_ratio
)
zkhandler.write([("base.cmd.ceph", add_osd_string)])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
with zkhandler.readlock("base.cmd.ceph"):
try:
result = zkhandler.read("base.cmd.ceph").split()[0]
if result == "success-osd_add":
message = 'Created new OSD with block device "{}" on node "{}".'.format(
device, node
)
success = True
else:
message = (
"ERROR: Failed to create new OSD; check node logs for details."
)
success = False
except Exception:
message = "ERROR: Command ignored by node."
success = False
# Acquire a write lock to ensure things go smoothly
with zkhandler.writelock("base.cmd.ceph"):
time.sleep(0.5)
zkhandler.write([("base.cmd.ceph", "")])
return success, message
def replace_osd(zkhandler, osd_id, new_device, weight):
# Get current OSD information
osd_information = getOSDInformation(zkhandler, osd_id)
node = osd_information["node"]
old_device = osd_information["device"]
ext_db_flag = True if osd_information["db_device"] else False
# Verify target block device isn't in use
block_osd = verifyOSDBlock(zkhandler, node, new_device)
if block_osd and block_osd != osd_id:
return (
False,
'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format(
new_device, node, block_osd
),
)
# Tell the cluster to create a new OSD for the host
replace_osd_string = "osd_replace {},{},{},{},{},{}".format(
node, osd_id, old_device, new_device, weight, ext_db_flag
)
zkhandler.write([("base.cmd.ceph", replace_osd_string)])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
with zkhandler.readlock("base.cmd.ceph"):
try:
result = zkhandler.read("base.cmd.ceph").split()[0]
if result == "success-osd_replace":
message = 'Replaced OSD {} with block device "{}" on node "{}".'.format(
osd_id, new_device, node
)
success = True
else:
message = "ERROR: Failed to replace OSD; check node logs for details."
success = False
except Exception:
message = "ERROR: Command ignored by node."
success = False
# Acquire a write lock to ensure things go smoothly
with zkhandler.writelock("base.cmd.ceph"):
time.sleep(0.5)
zkhandler.write([("base.cmd.ceph", "")])
return success, message
def refresh_osd(zkhandler, osd_id, device):
# Get current OSD information
osd_information = getOSDInformation(zkhandler, osd_id)
node = osd_information["node"]
ext_db_flag = True if osd_information["db_device"] else False
# Verify target block device isn't in use
block_osd = verifyOSDBlock(zkhandler, node, device)
if not block_osd or block_osd != osd_id:
return (
False,
'ERROR: Block device "{}" on node "{}" is not used by OSD "{}"; use replace instead'.format(
device, node, osd_id
),
)
# Tell the cluster to create a new OSD for the host
refresh_osd_string = "osd_refresh {},{},{},{}".format(
node, osd_id, device, ext_db_flag
)
zkhandler.write([("base.cmd.ceph", refresh_osd_string)])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
with zkhandler.readlock("base.cmd.ceph"):
try:
result = zkhandler.read("base.cmd.ceph").split()[0]
if result == "success-osd_refresh":
message = (
'Refreshed OSD {} with block device "{}" on node "{}".'.format(
osd_id, device, node
)
)
success = True
else:
message = "ERROR: Failed to refresh OSD; check node logs for details."
success = False
except Exception:
message = "ERROR: Command ignored by node."
success = False
# Acquire a write lock to ensure things go smoothly
with zkhandler.writelock("base.cmd.ceph"):
time.sleep(0.5)
zkhandler.write([("base.cmd.ceph", "")])
return success, message
def remove_osd(zkhandler, osd_id, force_flag):
if not verifyOSD(zkhandler, osd_id):
return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(
osd_id
)
# Tell the cluster to remove an OSD
remove_osd_string = "osd_remove {},{}".format(osd_id, str(force_flag))
zkhandler.write([("base.cmd.ceph", remove_osd_string)])
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
with zkhandler.readlock("base.cmd.ceph"):
try:
result = zkhandler.read("base.cmd.ceph").split()[0]
if result == "success-osd_remove":
message = 'Removed OSD "{}" from the cluster.'.format(osd_id)
success = True
else:
message = "ERROR: Failed to remove OSD; check node logs for details."
success = False
except Exception:
success = False
message = "ERROR Command ignored by node."
# Acquire a write lock to ensure things go smoothly
with zkhandler.writelock("base.cmd.ceph"):
time.sleep(0.5)
zkhandler.write([("base.cmd.ceph", "")])
return success, message
def in_osd(zkhandler, osd_id):
if not verifyOSD(zkhandler, osd_id):
return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(
osd_id
)
retcode, stdout, stderr = common.run_os_command("ceph osd in {}".format(osd_id))
if retcode:
return False, "ERROR: Failed to enable OSD {}: {}".format(osd_id, stderr)
return True, "Set OSD {} online.".format(osd_id)
def out_osd(zkhandler, osd_id):
if not verifyOSD(zkhandler, osd_id):
return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(
osd_id
)
retcode, stdout, stderr = common.run_os_command("ceph osd out {}".format(osd_id))
if retcode:
return False, "ERROR: Failed to disable OSD {}: {}".format(osd_id, stderr)
return True, "Set OSD {} offline.".format(osd_id)
def set_osd(zkhandler, option):
retcode, stdout, stderr = common.run_os_command("ceph osd set {}".format(option))
if retcode:
return False, 'ERROR: Failed to set property "{}": {}'.format(option, stderr)
return True, 'Set OSD property "{}".'.format(option)
def unset_osd(zkhandler, option):
retcode, stdout, stderr = common.run_os_command("ceph osd unset {}".format(option))
if retcode:
return False, 'ERROR: Failed to unset property "{}": {}'.format(option, stderr)
return True, 'Unset OSD property "{}".'.format(option)
def get_list_osd(zkhandler, limit, is_fuzzy=True):
osd_list = []
full_osd_list = zkhandler.children("base.osd")
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
for osd in full_osd_list:
if limit:
try:
if re.fullmatch(limit, osd):
osd_list.append(getOSDInformation(zkhandler, osd))
except Exception as e:
return False, "Regex Error: {}".format(e)
else:
osd_list.append(getOSDInformation(zkhandler, osd))
return True, sorted(osd_list, key=lambda x: int(x["id"]))
#
# Pool functions
#
def getPoolInformation(zkhandler, pool):
# Parse the stats data
pool_stats_raw = zkhandler.read(("pool.stats", pool))
pool_stats = dict(json.loads(pool_stats_raw))
volume_count = len(getCephVolumes(zkhandler, pool))
tier = zkhandler.read(("pool.tier", pool))
if tier is None:
tier = "default"
pgs = zkhandler.read(("pool.pgs", pool))
pool_information = {
"name": pool,
"volume_count": volume_count,
"tier": tier,
"pgs": pgs,
"stats": pool_stats,
}
return pool_information
def add_pool(zkhandler, name, pgs, replcfg, tier=None):
# Prepare the copies/mincopies variables
try:
copies, mincopies = replcfg.split(",")
copies = int(copies.replace("copies=", ""))
mincopies = int(mincopies.replace("mincopies=", ""))
except Exception:
copies = None
mincopies = None
if not copies or not mincopies:
return False, f'ERROR: Replication configuration "{replcfg}" is not valid.'
# Prepare the tiers if applicable
if tier is not None and tier in ["hdd", "ssd", "nvme"]:
crush_rule = f"{tier}_tier"
# Create a CRUSH rule for the relevant tier
retcode, stdout, stderr = common.run_os_command(
f"ceph osd crush rule create-replicated {crush_rule} default host {tier}"
)
if retcode:
return (
False,
f"ERROR: Failed to create CRUSH rule {tier} for pool {name}: {stderr}",
)
else:
tier = "default"
crush_rule = "replicated"
# Create the pool
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool create {name} {pgs} {pgs} {crush_rule}"
)
if retcode:
return False, f'ERROR: Failed to create pool "{name}" with {pgs} PGs: {stderr}'
# Set the size and minsize
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool set {name} size {copies}"
)
if retcode:
return False, f'ERROR: Failed to set pool "{name}" size of {copies}: {stderr}'
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool set {name} min_size {mincopies}"
)
if retcode:
return (
False,
f'ERROR: Failed to set pool "{name}" minimum size of {mincopies}: {stderr}',
)
# Enable RBD application
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool application enable {name} rbd"
)
if retcode:
return (
False,
f'ERROR: Failed to enable RBD application on pool "{name}" : {stderr}',
)
# Add the new pool to Zookeeper
zkhandler.write(
[
(("pool", name), ""),
(("pool.pgs", name), pgs),
(("pool.tier", name), tier),
(("pool.stats", name), "{}"),
(("volume", name), ""),
(("snapshot", name), ""),
]
)
return True, f'Created RBD pool "{name}" with {pgs} PGs'
def remove_pool(zkhandler, name):
if not verifyPool(zkhandler, name):
return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(
name
)
# 1. Remove pool volumes
for volume in zkhandler.children(("volume", name)):
remove_volume(zkhandler, name, volume)
# 2. Remove the pool
retcode, stdout, stderr = common.run_os_command(
"ceph osd pool rm {pool} {pool} --yes-i-really-really-mean-it".format(pool=name)
)
if retcode:
return False, 'ERROR: Failed to remove pool "{}": {}'.format(name, stderr)
# 3. Delete pool from Zookeeper
zkhandler.delete(
[
("pool", name),
("volume", name),
("snapshot", name),
]
)
return True, 'Removed RBD pool "{}" and all volumes.'.format(name)
def set_pgs_pool(zkhandler, name, pgs):
if not verifyPool(zkhandler, name):
return False, f'ERROR: No pool with name "{name}" is present in the cluster.'
# Validate new PGs count
pgs = int(pgs)
if (pgs == 0) or (pgs & (pgs - 1) != 0):
return (
False,
f'ERROR: Invalid PGs number "{pgs}": must be a non-zero power of 2.',
)
# Set the new pgs number
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool set {name} pg_num {pgs}"
)
if retcode:
return False, f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}"
# Set the new pgps number if increasing
current_pgs = int(zkhandler.read(("pool.pgs", name)))
if current_pgs >= pgs:
retcode, stdout, stderr = common.run_os_command(
f"ceph osd pool set {name} pgp_num {pgs}"
)
if retcode:
return (
False,
f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}",
)
# Update Zookeeper count
zkhandler.write(
[
(("pool.pgs", name), pgs),
]
)
return True, f'Set PGs count to {pgs} for RBD pool "{name}".'
def get_list_pool(zkhandler, limit, is_fuzzy=True):
full_pool_list = zkhandler.children("base.pool")
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
get_pool_info = dict()
for pool in full_pool_list:
is_limit_match = False
if limit:
try:
if re.fullmatch(limit, pool):
is_limit_match = True
except Exception as e:
return False, "Regex Error: {}".format(e)
else:
is_limit_match = True
get_pool_info[pool] = True if is_limit_match else False
pool_execute_list = [pool for pool in full_pool_list if get_pool_info[pool]]
pool_data_list = list()
with ThreadPoolExecutor(max_workers=32, thread_name_prefix="pool_list") as executor:
futures = []
for pool in pool_execute_list:
futures.append(executor.submit(getPoolInformation, zkhandler, pool))
for future in futures:
pool_data_list.append(future.result())
return True, sorted(pool_data_list, key=lambda x: int(x["stats"].get("id", 0)))
#
# Volume functions
#
def getCephVolumes(zkhandler, pool):
volume_list = list()
if not pool:
pool_list = zkhandler.children("base.pool")
else:
pool_list = [pool]
for pool_name in pool_list:
for volume_name in zkhandler.children(("volume", pool_name)):
volume_list.append("{}/{}".format(pool_name, volume_name))
return volume_list
def getVolumeInformation(zkhandler, pool, volume):
# Parse the stats data
volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}"))
volume_stats = dict(json.loads(volume_stats_raw))
# Format the size to something nicer
volume_stats["size"] = format_bytes_tohuman(volume_stats["size"])
volume_information = {"name": volume, "pool": pool, "stats": volume_stats}
return volume_information
def add_volume(zkhandler, pool, name, size):
# 1. Verify the size of the volume
pool_information = getPoolInformation(zkhandler, pool)
size_bytes = format_bytes_fromhuman(size)
if size_bytes is None:
return (
False,
f"ERROR: Requested volume size '{size}' does not have a valid SI unit",
)
if size_bytes >= int(pool_information["stats"]["free_bytes"]):
return (
False,
f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')",
)
# 2. Create the volume
retcode, stdout, stderr = common.run_os_command(
"rbd create --size {}B {}/{}".format(size_bytes, pool, name)
)
if retcode:
return False, 'ERROR: Failed to create RBD volume "{}": {}'.format(name, stderr)
# 2. Get volume stats
retcode, stdout, stderr = common.run_os_command(
"rbd info --format json {}/{}".format(pool, name)
)
volstats = stdout
# 3. Add the new volume to Zookeeper
zkhandler.write(
[
(("volume", f"{pool}/{name}"), ""),
(("volume.stats", f"{pool}/{name}"), volstats),
(("snapshot", f"{pool}/{name}"), ""),
]
)
return True, 'Created RBD volume "{}" of size "{}" in pool "{}".'.format(
name, format_bytes_tohuman(size_bytes), pool
)
def clone_volume(zkhandler, pool, name_src, name_new):
if not verifyVolume(zkhandler, pool, name_src):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
name_src, pool
)
# 1. Clone the volume
retcode, stdout, stderr = common.run_os_command(
"rbd copy {}/{} {}/{}".format(pool, name_src, pool, name_new)
)
if retcode:
return (
False,
'ERROR: Failed to clone RBD volume "{}" to "{}" in pool "{}": {}'.format(
name_src, name_new, pool, stderr
),
)
# 2. Get volume stats
retcode, stdout, stderr = common.run_os_command(
"rbd info --format json {}/{}".format(pool, name_new)
)
volstats = stdout
# 3. Add the new volume to Zookeeper
zkhandler.write(
[
(("volume", f"{pool}/{name_new}"), ""),
(("volume.stats", f"{pool}/{name_new}"), volstats),
(("snapshot", f"{pool}/{name_new}"), ""),
]
)
return True, 'Cloned RBD volume "{}" to "{}" in pool "{}"'.format(
name_src, name_new, pool
)
def resize_volume(zkhandler, pool, name, size):
if not verifyVolume(zkhandler, pool, name):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
name, pool
)
# 1. Verify the size of the volume
pool_information = getPoolInformation(zkhandler, pool)
size_bytes = format_bytes_fromhuman(size)
if size_bytes is None:
return (
False,
f"ERROR: Requested volume size '{size}' does not have a valid SI unit",
)
if size_bytes >= int(pool_information["stats"]["free_bytes"]):
return (
False,
f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')",
)
# 2. Resize the volume
retcode, stdout, stderr = common.run_os_command(
"rbd resize --size {} {}/{}".format(
format_bytes_tohuman(size_bytes), pool, name
)
)
if retcode:
return (
False,
'ERROR: Failed to resize RBD volume "{}" to size "{}" in pool "{}": {}'.format(
name, format_bytes_tohuman(size_bytes), pool, stderr
),
)
# 3a. Determine the node running this VM if applicable
active_node = None
volume_vm_name = name.split("_")[0]
retcode, vm_info = vm.get_info(zkhandler, volume_vm_name)
if retcode:
for disk in vm_info["disks"]:
# This block device is present in this VM so we can continue
if disk["name"] == "{}/{}".format(pool, name):
active_node = vm_info["node"]
volume_id = disk["dev"]
# 3b. Perform a live resize in libvirt if the VM is running
if active_node is not None and vm_info.get("state", "") == "start":
import libvirt
# Run the libvirt command against the target host
try:
dest_lv = "qemu+tcp://{}/system".format(active_node)
target_lv_conn = libvirt.open(dest_lv)
target_vm_conn = target_lv_conn.lookupByName(vm_info["name"])
if target_vm_conn:
target_vm_conn.blockResize(
volume_id,
size_bytes,
libvirt.VIR_DOMAIN_BLOCK_RESIZE_BYTES,
)
target_lv_conn.close()
except Exception:
pass
# 4. Get volume stats
retcode, stdout, stderr = common.run_os_command(
"rbd info --format json {}/{}".format(pool, name)
)
volstats = stdout
# 5. Update the volume in Zookeeper
zkhandler.write(
[
(("volume", f"{pool}/{name}"), ""),
(("volume.stats", f"{pool}/{name}"), volstats),
(("snapshot", f"{pool}/{name}"), ""),
]
)
return True, 'Resized RBD volume "{}" to size "{}" in pool "{}".'.format(
name, format_bytes_tohuman(size_bytes), pool
)
def rename_volume(zkhandler, pool, name, new_name):
if not verifyVolume(zkhandler, pool, name):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
name, pool
)
# 1. Rename the volume
retcode, stdout, stderr = common.run_os_command(
"rbd rename {}/{} {}".format(pool, name, new_name)
)
if retcode:
return (
False,
'ERROR: Failed to rename volume "{}" to "{}" in pool "{}": {}'.format(
name, new_name, pool, stderr
),
)
# 2. Rename the volume in Zookeeper
zkhandler.rename(
[
(("volume", f"{pool}/{name}"), ("volume", f"{pool}/{new_name}")),
(("snapshot", f"{pool}/{name}"), ("snapshot", f"{pool}/{new_name}")),
]
)
# 3. Get volume stats
retcode, stdout, stderr = common.run_os_command(
"rbd info --format json {}/{}".format(pool, new_name)
)
volstats = stdout
# 4. Update the volume stats in Zookeeper
zkhandler.write(
[
(("volume.stats", f"{pool}/{new_name}"), volstats),
]
)
return True, 'Renamed RBD volume "{}" to "{}" in pool "{}".'.format(
name, new_name, pool
)
def remove_volume(zkhandler, pool, name):
if not verifyVolume(zkhandler, pool, name):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
name, pool
)
# 1. Remove volume snapshots
for snapshot in zkhandler.children(("snapshot", f"{pool}/{name}")):
remove_snapshot(zkhandler, pool, name, snapshot)
# 2. Remove the volume
retcode, stdout, stderr = common.run_os_command("rbd rm {}/{}".format(pool, name))
if retcode:
return False, 'ERROR: Failed to remove RBD volume "{}" in pool "{}": {}'.format(
name, pool, stderr
)
# 3. Delete volume from Zookeeper
zkhandler.delete(
[
("volume", f"{pool}/{name}"),
("snapshot", f"{pool}/{name}"),
]
)
return True, 'Removed RBD volume "{}" in pool "{}".'.format(name, pool)
def map_volume(zkhandler, pool, name):
if not verifyVolume(zkhandler, pool, name):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
name, pool
)
# 1. Map the volume onto the local system
retcode, stdout, stderr = common.run_os_command("rbd map {}/{}".format(pool, name))
if retcode:
return False, 'ERROR: Failed to map RBD volume "{}" in pool "{}": {}'.format(
name, pool, stderr
)
# 2. Calculate the absolute path to the mapped volume
mapped_volume = "/dev/rbd/{}/{}".format(pool, name)
# 3. Ensure the volume exists
if not os.path.exists(mapped_volume):
return (
False,
'ERROR: Mapped volume not found at expected location "{}".'.format(
mapped_volume
),
)
return True, mapped_volume
def unmap_volume(zkhandler, pool, name):
if not verifyVolume(zkhandler, pool, name):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
name, pool
)
mapped_volume = "/dev/rbd/{}/{}".format(pool, name)
# 1. Ensure the volume exists
if not os.path.exists(mapped_volume):
return (
False,
'ERROR: Mapped volume not found at expected location "{}".'.format(
mapped_volume
),
)
# 2. Unap the volume
retcode, stdout, stderr = common.run_os_command(
"rbd unmap {}".format(mapped_volume)
)
if retcode:
return False, 'ERROR: Failed to unmap RBD volume at "{}": {}'.format(
mapped_volume, stderr
)
return True, 'Unmapped RBD volume at "{}".'.format(mapped_volume)
def get_list_volume(zkhandler, pool, limit, is_fuzzy=True):
if pool and not verifyPool(zkhandler, pool):
return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(
pool
)
full_volume_list = getCephVolumes(zkhandler, pool)
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
get_volume_info = dict()
for volume in full_volume_list:
pool_name, volume_name = volume.split("/")
is_limit_match = False
# Check on limit
if limit:
# Try to match the limit against the volume name
try:
if re.fullmatch(limit, volume_name):
is_limit_match = True
except Exception as e:
return False, "Regex Error: {}".format(e)
else:
is_limit_match = True
get_volume_info[volume] = True if is_limit_match else False
# Obtain our volume data in a thread pool
volume_execute_list = [
volume for volume in full_volume_list if get_volume_info[volume]
]
volume_data_list = list()
with ThreadPoolExecutor(
max_workers=32, thread_name_prefix="volume_list"
) as executor:
futures = []
for volume in volume_execute_list:
pool_name, volume_name = volume.split("/")
futures.append(
executor.submit(getVolumeInformation, zkhandler, pool_name, volume_name)
)
for future in futures:
volume_data_list.append(future.result())
return True, sorted(volume_data_list, key=lambda x: str(x["name"]))
#
# Snapshot functions
#
def getCephSnapshots(zkhandler, pool, volume):
snapshot_list = list()
volume_list = list()
volume_list = getCephVolumes(zkhandler, pool)
if volume:
for volume_entry in volume_list:
volume_pool, volume_name = volume_entry.split("/")
if volume_name == volume:
volume_list = ["{}/{}".format(volume_pool, volume_name)]
for volume_entry in volume_list:
for snapshot_name in zkhandler.children(("snapshot", volume_entry)):
snapshot_list.append("{}@{}".format(volume_entry, snapshot_name))
return snapshot_list
def add_snapshot(zkhandler, pool, volume, name):
if not verifyVolume(zkhandler, pool, volume):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
volume, pool
)
# 1. Create the snapshot
retcode, stdout, stderr = common.run_os_command(
"rbd snap create {}/{}@{}".format(pool, volume, name)
)
if retcode:
return (
False,
'ERROR: Failed to create RBD snapshot "{}" of volume "{}" in pool "{}": {}'.format(
name, volume, pool, stderr
),
)
# 2. Add the snapshot to Zookeeper
zkhandler.write(
[
(("snapshot", f"{pool}/{volume}/{name}"), ""),
(("snapshot.stats", f"{pool}/{volume}/{name}"), "{}"),
]
)
# 3. Update the count of snapshots on this volume
volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}"))
volume_stats = dict(json.loads(volume_stats_raw))
# Format the size to something nicer
volume_stats["snapshot_count"] = volume_stats["snapshot_count"] + 1
volume_stats_raw = json.dumps(volume_stats)
zkhandler.write(
[
(("volume.stats", f"{pool}/{volume}"), volume_stats_raw),
]
)
return True, 'Created RBD snapshot "{}" of volume "{}" in pool "{}".'.format(
name, volume, pool
)
def rename_snapshot(zkhandler, pool, volume, name, new_name):
if not verifyVolume(zkhandler, pool, volume):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
volume, pool
)
if not verifySnapshot(zkhandler, pool, volume, name):
return (
False,
'ERROR: No snapshot with name "{}" is present for volume "{}" in pool "{}".'.format(
name, volume, pool
),
)
# 1. Rename the snapshot
retcode, stdout, stderr = common.run_os_command(
"rbd snap rename {pool}/{volume}@{name} {pool}/{volume}@{new_name}".format(
pool=pool, volume=volume, name=name, new_name=new_name
)
)
if retcode:
return (
False,
'ERROR: Failed to rename RBD snapshot "{}" to "{}" for volume "{}" in pool "{}": {}'.format(
name, new_name, volume, pool, stderr
),
)
# 2. Rename the snapshot in ZK
zkhandler.rename(
[
(
("snapshot", f"{pool}/{volume}/{name}"),
("snapshot", f"{pool}/{volume}/{new_name}"),
),
]
)
return (
True,
'Renamed RBD snapshot "{}" to "{}" for volume "{}" in pool "{}".'.format(
name, new_name, volume, pool
),
)
def remove_snapshot(zkhandler, pool, volume, name):
if not verifyVolume(zkhandler, pool, volume):
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
volume, pool
)
if not verifySnapshot(zkhandler, pool, volume, name):
return (
False,
'ERROR: No snapshot with name "{}" is present of volume {} in pool {}.'.format(
name, volume, pool
),
)
# 1. Remove the snapshot
retcode, stdout, stderr = common.run_os_command(
"rbd snap rm {}/{}@{}".format(pool, volume, name)
)
if retcode:
return (
False,
'Failed to remove RBD snapshot "{}" of volume "{}" in pool "{}": {}'.format(
name, volume, pool, stderr
),
)
# 2. Delete snapshot from Zookeeper
zkhandler.delete([("snapshot", f"{pool}/{volume}/{name}")])
# 3. Update the count of snapshots on this volume
volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}"))
volume_stats = dict(json.loads(volume_stats_raw))
# Format the size to something nicer
volume_stats["snapshot_count"] = volume_stats["snapshot_count"] - 1
volume_stats_raw = json.dumps(volume_stats)
zkhandler.write([(("volume.stats", f"{pool}/{volume}"), volume_stats_raw)])
return True, 'Removed RBD snapshot "{}" of volume "{}" in pool "{}".'.format(
name, volume, pool
)
def get_list_snapshot(zkhandler, pool, volume, limit, is_fuzzy=True):
snapshot_list = []
if pool and not verifyPool(zkhandler, pool):
return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(
pool
)
if volume and not verifyPool(zkhandler, volume):
return (
False,
'ERROR: No volume with name "{}" is present in the cluster.'.format(volume),
)
full_snapshot_list = getCephSnapshots(zkhandler, pool, volume)
if is_fuzzy and limit:
# Implicitly assume fuzzy limits
if not re.match(r"\^.*", limit):
limit = ".*" + limit
if not re.match(r".*\$", limit):
limit = limit + ".*"
for snapshot in full_snapshot_list:
volume, snapshot_name = snapshot.split("@")
pool_name, volume_name = volume.split("/")
if limit:
try:
if re.fullmatch(limit, snapshot_name):
snapshot_list.append(
{
"pool": pool_name,
"volume": volume_name,
"snapshot": snapshot_name,
}
)
except Exception as e:
return False, "Regex Error: {}".format(e)
else:
snapshot_list.append(
{"pool": pool_name, "volume": volume_name, "snapshot": snapshot_name}
)
return True, sorted(snapshot_list, key=lambda x: str(x["snapshot"]))