1296 lines
41 KiB
Python
1296 lines
41 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# ceph.py - PVC client function library, Ceph cluster fuctions
|
|
# Part of the Parallel Virtual Cluster (PVC) system
|
|
#
|
|
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, version 3.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
#
|
|
###############################################################################
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
import time
|
|
import math
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
import daemon_lib.vm as vm
|
|
import daemon_lib.common as common
|
|
|
|
|
|
#
|
|
# Supplemental functions
|
|
#
|
|
|
|
|
|
# Verify OSD is valid in cluster
|
|
def verifyOSD(zkhandler, osd_id):
|
|
return zkhandler.exists(("osd", osd_id))
|
|
|
|
|
|
# Verify Pool is valid in cluster
|
|
def verifyPool(zkhandler, name):
|
|
return zkhandler.exists(("pool", name))
|
|
|
|
|
|
# Verify Volume is valid in cluster
|
|
def verifyVolume(zkhandler, pool, name):
|
|
return zkhandler.exists(("volume", f"{pool}/{name}"))
|
|
|
|
|
|
# Verify Snapshot is valid in cluster
|
|
def verifySnapshot(zkhandler, pool, volume, name):
|
|
return zkhandler.exists(("snapshot", f"{pool}/{volume}/{name}"))
|
|
|
|
|
|
# Verify OSD path is valid in cluster
|
|
def verifyOSDBlock(zkhandler, node, device):
|
|
for osd in zkhandler.children("base.osd"):
|
|
osd_node = zkhandler.read(("osd.node", osd))
|
|
osd_device = zkhandler.read(("osd.device", osd))
|
|
if node == osd_node and device == osd_device:
|
|
return osd
|
|
return None
|
|
|
|
|
|
# Matrix of human-to-byte values
|
|
byte_unit_matrix = {
|
|
"B": 1,
|
|
"K": 1024,
|
|
"M": 1024 * 1024,
|
|
"G": 1024 * 1024 * 1024,
|
|
"T": 1024 * 1024 * 1024 * 1024,
|
|
"P": 1024 * 1024 * 1024 * 1024 * 1024,
|
|
"E": 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
|
|
"Z": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
|
|
"Y": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
|
|
"R": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
|
|
"Q": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
|
|
}
|
|
|
|
# Matrix of human-to-metric values
|
|
ops_unit_matrix = {
|
|
"": 1,
|
|
"K": 1000,
|
|
"M": 1000 * 1000,
|
|
"G": 1000 * 1000 * 1000,
|
|
"T": 1000 * 1000 * 1000 * 1000,
|
|
"P": 1000 * 1000 * 1000 * 1000 * 1000,
|
|
"E": 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
|
|
"Z": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
|
|
"Y": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
|
|
"R": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
|
|
"Q": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
|
|
}
|
|
|
|
|
|
# Format byte sizes to/from human-readable units
|
|
def format_bytes_tohuman(databytes):
|
|
datahuman = ""
|
|
for unit in sorted(byte_unit_matrix, key=byte_unit_matrix.get, reverse=True):
|
|
new_bytes = int(math.ceil(databytes / byte_unit_matrix[unit]))
|
|
# Round up if 5 or more digits
|
|
if new_bytes > 9999:
|
|
# We can jump down another level
|
|
continue
|
|
else:
|
|
# We're at the end, display with this size
|
|
datahuman = "{}{}".format(new_bytes, unit)
|
|
|
|
return datahuman
|
|
|
|
|
|
def format_bytes_fromhuman(datahuman):
|
|
if not re.search(r"[A-Za-z]+", datahuman):
|
|
dataunit = "B"
|
|
datasize = int(datahuman)
|
|
else:
|
|
dataunit = str(re.match(r"[0-9]+([A-Za-z])[iBb]*", datahuman).group(1))
|
|
datasize = int(re.match(r"([0-9]+)[A-Za-z]+", datahuman).group(1))
|
|
|
|
if byte_unit_matrix.get(dataunit):
|
|
databytes = datasize * byte_unit_matrix[dataunit]
|
|
return databytes
|
|
else:
|
|
return None
|
|
|
|
|
|
# Format ops sizes to/from human-readable units
|
|
def format_ops_tohuman(dataops):
|
|
datahuman = ""
|
|
for unit in sorted(ops_unit_matrix, key=ops_unit_matrix.get, reverse=True):
|
|
new_ops = int(math.ceil(dataops / ops_unit_matrix[unit]))
|
|
# Round up if 5 or more digits
|
|
if new_ops > 9999:
|
|
# We can jump down another level
|
|
continue
|
|
else:
|
|
# We're at the end, display with this size
|
|
datahuman = "{}{}".format(new_ops, unit)
|
|
|
|
return datahuman
|
|
|
|
|
|
def format_ops_fromhuman(datahuman):
|
|
# Trim off human-readable character
|
|
dataunit = datahuman[-1]
|
|
datasize = int(datahuman[:-1])
|
|
dataops = datasize * ops_unit_matrix[dataunit]
|
|
return "{}".format(dataops)
|
|
|
|
|
|
def format_pct_tohuman(datapct):
|
|
datahuman = "{0:.1f}".format(float(datapct * 100.0))
|
|
return datahuman
|
|
|
|
|
|
#
|
|
# Status functions
|
|
#
|
|
def get_status(zkhandler):
|
|
primary_node = zkhandler.read("base.config.primary_node")
|
|
ceph_status = zkhandler.read("base.storage").rstrip()
|
|
|
|
# Create a data structure for the information
|
|
status_data = {
|
|
"type": "status",
|
|
"primary_node": primary_node,
|
|
"ceph_data": ceph_status,
|
|
}
|
|
return True, status_data
|
|
|
|
|
|
def get_health(zkhandler):
|
|
primary_node = zkhandler.read("base.config.primary_node")
|
|
ceph_health = zkhandler.read("base.storage.health").rstrip()
|
|
|
|
# Create a data structure for the information
|
|
status_data = {
|
|
"type": "health",
|
|
"primary_node": primary_node,
|
|
"ceph_data": ceph_health,
|
|
}
|
|
return True, status_data
|
|
|
|
|
|
def get_util(zkhandler):
|
|
primary_node = zkhandler.read("base.config.primary_node")
|
|
ceph_df = zkhandler.read("base.storage.util").rstrip()
|
|
|
|
# Create a data structure for the information
|
|
status_data = {
|
|
"type": "utilization",
|
|
"primary_node": primary_node,
|
|
"ceph_data": ceph_df,
|
|
}
|
|
return True, status_data
|
|
|
|
|
|
#
|
|
# OSD functions
|
|
#
|
|
def getClusterOSDList(zkhandler):
|
|
# Get a list of VNIs by listing the children of /networks
|
|
return zkhandler.children("base.osd")
|
|
|
|
|
|
def getOSDInformation(zkhandler, osd_id):
|
|
# Get the devices
|
|
osd_node = zkhandler.read(("osd.node", osd_id))
|
|
osd_device = zkhandler.read(("osd.device", osd_id))
|
|
osd_is_split = zkhandler.read(("osd.is_split", osd_id))
|
|
osd_db_device = zkhandler.read(("osd.db_device", osd_id))
|
|
# Parse the stats data
|
|
osd_stats_raw = zkhandler.read(("osd.stats", osd_id))
|
|
osd_stats = dict(json.loads(osd_stats_raw))
|
|
|
|
osd_information = {
|
|
"id": osd_id,
|
|
"node": osd_node,
|
|
"device": osd_device,
|
|
"is_split": osd_is_split,
|
|
"db_device": osd_db_device,
|
|
"stats": osd_stats,
|
|
}
|
|
return osd_information
|
|
|
|
|
|
# OSD DB VG actions use the /cmd/ceph pipe
|
|
# These actions must occur on the specific node they reference
|
|
def add_osd_db_vg(zkhandler, node, device):
|
|
# Verify the target node exists
|
|
if not common.verifyNode(zkhandler, node):
|
|
return False, 'ERROR: No node named "{}" is present in the cluster.'.format(
|
|
node
|
|
)
|
|
|
|
# Tell the cluster to create a new OSD for the host
|
|
add_osd_db_vg_string = "db_vg_add {},{}".format(node, device)
|
|
zkhandler.write([("base.cmd.ceph", add_osd_db_vg_string)])
|
|
# Wait 1/2 second for the cluster to get the message and start working
|
|
time.sleep(0.5)
|
|
# Acquire a read lock, so we get the return exclusively
|
|
with zkhandler.readlock("base.cmd.ceph"):
|
|
try:
|
|
result = zkhandler.read("base.cmd.ceph").split()[0]
|
|
if result == "success-db_vg_add":
|
|
message = 'Created new OSD database VG at "{}" on node "{}".'.format(
|
|
device, node
|
|
)
|
|
success = True
|
|
else:
|
|
message = "ERROR: Failed to create new OSD database VG; check node logs for details."
|
|
success = False
|
|
except Exception:
|
|
message = "ERROR: Command ignored by node."
|
|
success = False
|
|
|
|
# Acquire a write lock to ensure things go smoothly
|
|
with zkhandler.writelock("base.cmd.ceph"):
|
|
time.sleep(0.5)
|
|
zkhandler.write([("base.cmd.ceph", "")])
|
|
|
|
return success, message
|
|
|
|
|
|
# OSD actions use the /cmd/ceph pipe
|
|
# These actions must occur on the specific node they reference
|
|
def add_osd(
|
|
zkhandler,
|
|
node,
|
|
device,
|
|
weight,
|
|
ext_db_flag=False,
|
|
ext_db_ratio=0.05,
|
|
split_flag=False,
|
|
split_count=1,
|
|
):
|
|
# Verify the target node exists
|
|
if not common.verifyNode(zkhandler, node):
|
|
return False, 'ERROR: No node named "{}" is present in the cluster.'.format(
|
|
node
|
|
)
|
|
|
|
# Verify target block device isn't in use
|
|
block_osd = verifyOSDBlock(zkhandler, node, device)
|
|
if block_osd:
|
|
return (
|
|
False,
|
|
'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format(
|
|
device, node, block_osd
|
|
),
|
|
)
|
|
|
|
# Tell the cluster to create a new OSD for the host
|
|
add_osd_string = "osd_add {},{},{},{},{},{},{}".format(
|
|
node, device, weight, ext_db_flag, ext_db_ratio, split_flag, split_count
|
|
)
|
|
zkhandler.write([("base.cmd.ceph", add_osd_string)])
|
|
# Wait 1/2 second for the cluster to get the message and start working
|
|
time.sleep(0.5)
|
|
# Acquire a read lock, so we get the return exclusively
|
|
with zkhandler.readlock("base.cmd.ceph"):
|
|
try:
|
|
result = zkhandler.read("base.cmd.ceph").split()[0]
|
|
if result == "success-osd_add":
|
|
message = f'Created {split_count} new OSD(s) on node "{node}" block device "{device}"'
|
|
success = True
|
|
else:
|
|
message = "ERROR: Failed to create OSD(s); check node logs for details."
|
|
success = False
|
|
except Exception:
|
|
message = "ERROR: Command ignored by node."
|
|
success = False
|
|
|
|
# Acquire a write lock to ensure things go smoothly
|
|
with zkhandler.writelock("base.cmd.ceph"):
|
|
time.sleep(0.5)
|
|
zkhandler.write([("base.cmd.ceph", "")])
|
|
|
|
return success, message
|
|
|
|
|
|
def replace_osd(zkhandler, osd_id, new_device, weight):
|
|
# Get current OSD information
|
|
osd_information = getOSDInformation(zkhandler, osd_id)
|
|
node = osd_information["node"]
|
|
old_device = osd_information["device"]
|
|
ext_db_flag = True if osd_information["db_device"] else False
|
|
|
|
# Verify target block device isn't in use
|
|
block_osd = verifyOSDBlock(zkhandler, node, new_device)
|
|
if block_osd and block_osd != osd_id:
|
|
return (
|
|
False,
|
|
'ERROR: Block device "{}" on node "{}" is used by OSD "{}"'.format(
|
|
new_device, node, block_osd
|
|
),
|
|
)
|
|
|
|
# Tell the cluster to create a new OSD for the host
|
|
replace_osd_string = "osd_replace {},{},{},{},{},{}".format(
|
|
node, osd_id, old_device, new_device, weight, ext_db_flag
|
|
)
|
|
zkhandler.write([("base.cmd.ceph", replace_osd_string)])
|
|
# Wait 1/2 second for the cluster to get the message and start working
|
|
time.sleep(0.5)
|
|
# Acquire a read lock, so we get the return exclusively
|
|
with zkhandler.readlock("base.cmd.ceph"):
|
|
try:
|
|
result = zkhandler.read("base.cmd.ceph").split()[0]
|
|
if result == "success-osd_replace":
|
|
message = 'Replaced OSD {} with block device "{}" on node "{}".'.format(
|
|
osd_id, new_device, node
|
|
)
|
|
success = True
|
|
else:
|
|
message = "ERROR: Failed to replace OSD; check node logs for details."
|
|
success = False
|
|
except Exception:
|
|
message = "ERROR: Command ignored by node."
|
|
success = False
|
|
|
|
# Acquire a write lock to ensure things go smoothly
|
|
with zkhandler.writelock("base.cmd.ceph"):
|
|
time.sleep(0.5)
|
|
zkhandler.write([("base.cmd.ceph", "")])
|
|
|
|
return success, message
|
|
|
|
|
|
def refresh_osd(zkhandler, osd_id, device):
|
|
# Get current OSD information
|
|
osd_information = getOSDInformation(zkhandler, osd_id)
|
|
node = osd_information["node"]
|
|
ext_db_flag = True if osd_information["db_device"] else False
|
|
|
|
# Verify target block device isn't in use
|
|
block_osd = verifyOSDBlock(zkhandler, node, device)
|
|
if not block_osd or block_osd != osd_id:
|
|
return (
|
|
False,
|
|
'ERROR: Block device "{}" on node "{}" is not used by OSD "{}"; use replace instead'.format(
|
|
device, node, osd_id
|
|
),
|
|
)
|
|
|
|
# Tell the cluster to create a new OSD for the host
|
|
refresh_osd_string = "osd_refresh {},{},{},{}".format(
|
|
node, osd_id, device, ext_db_flag
|
|
)
|
|
zkhandler.write([("base.cmd.ceph", refresh_osd_string)])
|
|
# Wait 1/2 second for the cluster to get the message and start working
|
|
time.sleep(0.5)
|
|
# Acquire a read lock, so we get the return exclusively
|
|
with zkhandler.readlock("base.cmd.ceph"):
|
|
try:
|
|
result = zkhandler.read("base.cmd.ceph").split()[0]
|
|
if result == "success-osd_refresh":
|
|
message = (
|
|
'Refreshed OSD {} with block device "{}" on node "{}".'.format(
|
|
osd_id, device, node
|
|
)
|
|
)
|
|
success = True
|
|
else:
|
|
message = "ERROR: Failed to refresh OSD; check node logs for details."
|
|
success = False
|
|
except Exception:
|
|
message = "ERROR: Command ignored by node."
|
|
success = False
|
|
|
|
# Acquire a write lock to ensure things go smoothly
|
|
with zkhandler.writelock("base.cmd.ceph"):
|
|
time.sleep(0.5)
|
|
zkhandler.write([("base.cmd.ceph", "")])
|
|
|
|
return success, message
|
|
|
|
|
|
def remove_osd(zkhandler, osd_id, force_flag):
|
|
if not verifyOSD(zkhandler, osd_id):
|
|
return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(
|
|
osd_id
|
|
)
|
|
|
|
# Tell the cluster to remove an OSD
|
|
remove_osd_string = "osd_remove {},{}".format(osd_id, str(force_flag))
|
|
zkhandler.write([("base.cmd.ceph", remove_osd_string)])
|
|
# Wait 1/2 second for the cluster to get the message and start working
|
|
time.sleep(0.5)
|
|
# Acquire a read lock, so we get the return exclusively
|
|
with zkhandler.readlock("base.cmd.ceph"):
|
|
try:
|
|
result = zkhandler.read("base.cmd.ceph").split()[0]
|
|
if result == "success-osd_remove":
|
|
message = 'Removed OSD "{}" from the cluster.'.format(osd_id)
|
|
success = True
|
|
else:
|
|
message = "ERROR: Failed to remove OSD; check node logs for details."
|
|
success = False
|
|
except Exception:
|
|
success = False
|
|
message = "ERROR Command ignored by node."
|
|
|
|
# Acquire a write lock to ensure things go smoothly
|
|
with zkhandler.writelock("base.cmd.ceph"):
|
|
time.sleep(0.5)
|
|
zkhandler.write([("base.cmd.ceph", "")])
|
|
|
|
return success, message
|
|
|
|
|
|
def in_osd(zkhandler, osd_id):
|
|
if not verifyOSD(zkhandler, osd_id):
|
|
return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(
|
|
osd_id
|
|
)
|
|
|
|
retcode, stdout, stderr = common.run_os_command("ceph osd in {}".format(osd_id))
|
|
if retcode:
|
|
return False, "ERROR: Failed to enable OSD {}: {}".format(osd_id, stderr)
|
|
|
|
return True, "Set OSD {} online.".format(osd_id)
|
|
|
|
|
|
def out_osd(zkhandler, osd_id):
|
|
if not verifyOSD(zkhandler, osd_id):
|
|
return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(
|
|
osd_id
|
|
)
|
|
|
|
retcode, stdout, stderr = common.run_os_command("ceph osd out {}".format(osd_id))
|
|
if retcode:
|
|
return False, "ERROR: Failed to disable OSD {}: {}".format(osd_id, stderr)
|
|
|
|
return True, "Set OSD {} offline.".format(osd_id)
|
|
|
|
|
|
def set_osd(zkhandler, option):
|
|
retcode, stdout, stderr = common.run_os_command("ceph osd set {}".format(option))
|
|
if retcode:
|
|
return False, 'ERROR: Failed to set property "{}": {}'.format(option, stderr)
|
|
|
|
return True, 'Set OSD property "{}".'.format(option)
|
|
|
|
|
|
def unset_osd(zkhandler, option):
|
|
retcode, stdout, stderr = common.run_os_command("ceph osd unset {}".format(option))
|
|
if retcode:
|
|
return False, 'ERROR: Failed to unset property "{}": {}'.format(option, stderr)
|
|
|
|
return True, 'Unset OSD property "{}".'.format(option)
|
|
|
|
|
|
def get_list_osd(zkhandler, limit, is_fuzzy=True):
|
|
osd_list = []
|
|
full_osd_list = zkhandler.children("base.osd")
|
|
|
|
if is_fuzzy and limit:
|
|
# Implicitly assume fuzzy limits
|
|
if not re.match(r"\^.*", limit):
|
|
limit = ".*" + limit
|
|
if not re.match(r".*\$", limit):
|
|
limit = limit + ".*"
|
|
|
|
for osd in full_osd_list:
|
|
if limit:
|
|
try:
|
|
if re.fullmatch(limit, osd):
|
|
osd_list.append(getOSDInformation(zkhandler, osd))
|
|
except Exception as e:
|
|
return False, "Regex Error: {}".format(e)
|
|
else:
|
|
osd_list.append(getOSDInformation(zkhandler, osd))
|
|
|
|
return True, sorted(osd_list, key=lambda x: int(x["id"]))
|
|
|
|
|
|
#
|
|
# Pool functions
|
|
#
|
|
def getPoolInformation(zkhandler, pool):
|
|
# Parse the stats data
|
|
pool_stats_raw = zkhandler.read(("pool.stats", pool))
|
|
pool_stats = dict(json.loads(pool_stats_raw))
|
|
volume_count = len(getCephVolumes(zkhandler, pool))
|
|
tier = zkhandler.read(("pool.tier", pool))
|
|
if tier is None:
|
|
tier = "default"
|
|
pgs = zkhandler.read(("pool.pgs", pool))
|
|
|
|
pool_information = {
|
|
"name": pool,
|
|
"volume_count": volume_count,
|
|
"tier": tier,
|
|
"pgs": pgs,
|
|
"stats": pool_stats,
|
|
}
|
|
return pool_information
|
|
|
|
|
|
def add_pool(zkhandler, name, pgs, replcfg, tier=None):
|
|
# Prepare the copies/mincopies variables
|
|
try:
|
|
copies, mincopies = replcfg.split(",")
|
|
copies = int(copies.replace("copies=", ""))
|
|
mincopies = int(mincopies.replace("mincopies=", ""))
|
|
except Exception:
|
|
copies = None
|
|
mincopies = None
|
|
if not copies or not mincopies:
|
|
return False, f'ERROR: Replication configuration "{replcfg}" is not valid.'
|
|
|
|
# Prepare the tiers if applicable
|
|
if tier is not None and tier in ["hdd", "ssd", "nvme"]:
|
|
crush_rule = f"{tier}_tier"
|
|
# Create a CRUSH rule for the relevant tier
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd crush rule create-replicated {crush_rule} default host {tier}"
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
f"ERROR: Failed to create CRUSH rule {tier} for pool {name}: {stderr}",
|
|
)
|
|
else:
|
|
tier = "default"
|
|
crush_rule = "replicated"
|
|
|
|
# Create the pool
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd pool create {name} {pgs} {pgs} {crush_rule}"
|
|
)
|
|
if retcode:
|
|
return False, f'ERROR: Failed to create pool "{name}" with {pgs} PGs: {stderr}'
|
|
|
|
# Set the size and minsize
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd pool set {name} size {copies}"
|
|
)
|
|
if retcode:
|
|
return False, f'ERROR: Failed to set pool "{name}" size of {copies}: {stderr}'
|
|
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd pool set {name} min_size {mincopies}"
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
f'ERROR: Failed to set pool "{name}" minimum size of {mincopies}: {stderr}',
|
|
)
|
|
|
|
# Enable RBD application
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd pool application enable {name} rbd"
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
f'ERROR: Failed to enable RBD application on pool "{name}" : {stderr}',
|
|
)
|
|
|
|
# Add the new pool to Zookeeper
|
|
zkhandler.write(
|
|
[
|
|
(("pool", name), ""),
|
|
(("pool.pgs", name), pgs),
|
|
(("pool.tier", name), tier),
|
|
(("pool.stats", name), "{}"),
|
|
(("volume", name), ""),
|
|
(("snapshot", name), ""),
|
|
]
|
|
)
|
|
|
|
return True, f'Created RBD pool "{name}" with {pgs} PGs'
|
|
|
|
|
|
def remove_pool(zkhandler, name):
|
|
if not verifyPool(zkhandler, name):
|
|
return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(
|
|
name
|
|
)
|
|
|
|
# 1. Remove pool volumes
|
|
for volume in zkhandler.children(("volume", name)):
|
|
remove_volume(zkhandler, name, volume)
|
|
|
|
# 2. Remove the pool
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"ceph osd pool rm {pool} {pool} --yes-i-really-really-mean-it".format(pool=name)
|
|
)
|
|
if retcode:
|
|
return False, 'ERROR: Failed to remove pool "{}": {}'.format(name, stderr)
|
|
|
|
# 3. Delete pool from Zookeeper
|
|
zkhandler.delete(
|
|
[
|
|
("pool", name),
|
|
("volume", name),
|
|
("snapshot", name),
|
|
]
|
|
)
|
|
|
|
return True, 'Removed RBD pool "{}" and all volumes.'.format(name)
|
|
|
|
|
|
def set_pgs_pool(zkhandler, name, pgs):
|
|
if not verifyPool(zkhandler, name):
|
|
return False, f'ERROR: No pool with name "{name}" is present in the cluster.'
|
|
|
|
# Validate new PGs count
|
|
pgs = int(pgs)
|
|
if (pgs == 0) or (pgs & (pgs - 1) != 0):
|
|
return (
|
|
False,
|
|
f'ERROR: Invalid PGs number "{pgs}": must be a non-zero power of 2.',
|
|
)
|
|
|
|
# Set the new pgs number
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd pool set {name} pg_num {pgs}"
|
|
)
|
|
if retcode:
|
|
return False, f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}"
|
|
|
|
# Set the new pgps number if increasing
|
|
current_pgs = int(zkhandler.read(("pool.pgs", name)))
|
|
if current_pgs >= pgs:
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd pool set {name} pgp_num {pgs}"
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}",
|
|
)
|
|
|
|
# Update Zookeeper count
|
|
zkhandler.write(
|
|
[
|
|
(("pool.pgs", name), pgs),
|
|
]
|
|
)
|
|
|
|
return True, f'Set PGs count to {pgs} for RBD pool "{name}".'
|
|
|
|
|
|
def get_list_pool(zkhandler, limit, is_fuzzy=True):
|
|
full_pool_list = zkhandler.children("base.pool")
|
|
|
|
if is_fuzzy and limit:
|
|
# Implicitly assume fuzzy limits
|
|
if not re.match(r"\^.*", limit):
|
|
limit = ".*" + limit
|
|
if not re.match(r".*\$", limit):
|
|
limit = limit + ".*"
|
|
|
|
get_pool_info = dict()
|
|
for pool in full_pool_list:
|
|
is_limit_match = False
|
|
if limit:
|
|
try:
|
|
if re.fullmatch(limit, pool):
|
|
is_limit_match = True
|
|
except Exception as e:
|
|
return False, "Regex Error: {}".format(e)
|
|
else:
|
|
is_limit_match = True
|
|
|
|
get_pool_info[pool] = True if is_limit_match else False
|
|
|
|
pool_execute_list = [pool for pool in full_pool_list if get_pool_info[pool]]
|
|
pool_data_list = list()
|
|
with ThreadPoolExecutor(max_workers=32, thread_name_prefix="pool_list") as executor:
|
|
futures = []
|
|
for pool in pool_execute_list:
|
|
futures.append(executor.submit(getPoolInformation, zkhandler, pool))
|
|
for future in futures:
|
|
pool_data_list.append(future.result())
|
|
|
|
return True, sorted(pool_data_list, key=lambda x: int(x["stats"].get("id", 0)))
|
|
|
|
|
|
#
|
|
# Volume functions
|
|
#
|
|
def getCephVolumes(zkhandler, pool):
|
|
volume_list = list()
|
|
if not pool:
|
|
pool_list = zkhandler.children("base.pool")
|
|
else:
|
|
pool_list = [pool]
|
|
|
|
for pool_name in pool_list:
|
|
for volume_name in zkhandler.children(("volume", pool_name)):
|
|
volume_list.append("{}/{}".format(pool_name, volume_name))
|
|
|
|
return volume_list
|
|
|
|
|
|
def getVolumeInformation(zkhandler, pool, volume):
|
|
# Parse the stats data
|
|
volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}"))
|
|
volume_stats = dict(json.loads(volume_stats_raw))
|
|
# Format the size to something nicer
|
|
volume_stats["size"] = format_bytes_tohuman(volume_stats["size"])
|
|
|
|
volume_information = {"name": volume, "pool": pool, "stats": volume_stats}
|
|
return volume_information
|
|
|
|
|
|
def add_volume(zkhandler, pool, name, size):
|
|
# 1. Verify the size of the volume
|
|
pool_information = getPoolInformation(zkhandler, pool)
|
|
size_bytes = format_bytes_fromhuman(size)
|
|
if size_bytes is None:
|
|
return (
|
|
False,
|
|
f"ERROR: Requested volume size '{size}' does not have a valid SI unit",
|
|
)
|
|
|
|
if size_bytes >= int(pool_information["stats"]["free_bytes"]):
|
|
return (
|
|
False,
|
|
f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')",
|
|
)
|
|
|
|
# 2. Create the volume
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd create --size {}B {}/{}".format(size_bytes, pool, name)
|
|
)
|
|
if retcode:
|
|
return False, 'ERROR: Failed to create RBD volume "{}": {}'.format(name, stderr)
|
|
|
|
# 2. Get volume stats
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd info --format json {}/{}".format(pool, name)
|
|
)
|
|
volstats = stdout
|
|
|
|
# 3. Add the new volume to Zookeeper
|
|
zkhandler.write(
|
|
[
|
|
(("volume", f"{pool}/{name}"), ""),
|
|
(("volume.stats", f"{pool}/{name}"), volstats),
|
|
(("snapshot", f"{pool}/{name}"), ""),
|
|
]
|
|
)
|
|
|
|
return True, 'Created RBD volume "{}" of size "{}" in pool "{}".'.format(
|
|
name, format_bytes_tohuman(size_bytes), pool
|
|
)
|
|
|
|
|
|
def clone_volume(zkhandler, pool, name_src, name_new):
|
|
if not verifyVolume(zkhandler, pool, name_src):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
name_src, pool
|
|
)
|
|
|
|
# 1. Clone the volume
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd copy {}/{} {}/{}".format(pool, name_src, pool, name_new)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'ERROR: Failed to clone RBD volume "{}" to "{}" in pool "{}": {}'.format(
|
|
name_src, name_new, pool, stderr
|
|
),
|
|
)
|
|
|
|
# 2. Get volume stats
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd info --format json {}/{}".format(pool, name_new)
|
|
)
|
|
volstats = stdout
|
|
|
|
# 3. Add the new volume to Zookeeper
|
|
zkhandler.write(
|
|
[
|
|
(("volume", f"{pool}/{name_new}"), ""),
|
|
(("volume.stats", f"{pool}/{name_new}"), volstats),
|
|
(("snapshot", f"{pool}/{name_new}"), ""),
|
|
]
|
|
)
|
|
|
|
return True, 'Cloned RBD volume "{}" to "{}" in pool "{}"'.format(
|
|
name_src, name_new, pool
|
|
)
|
|
|
|
|
|
def resize_volume(zkhandler, pool, name, size):
|
|
if not verifyVolume(zkhandler, pool, name):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
name, pool
|
|
)
|
|
|
|
# 1. Verify the size of the volume
|
|
pool_information = getPoolInformation(zkhandler, pool)
|
|
size_bytes = format_bytes_fromhuman(size)
|
|
if size_bytes is None:
|
|
return (
|
|
False,
|
|
f"ERROR: Requested volume size '{size}' does not have a valid SI unit",
|
|
)
|
|
|
|
if size_bytes >= int(pool_information["stats"]["free_bytes"]):
|
|
return (
|
|
False,
|
|
f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')",
|
|
)
|
|
|
|
# 2. Resize the volume
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd resize --size {} {}/{}".format(
|
|
format_bytes_tohuman(size_bytes), pool, name
|
|
)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'ERROR: Failed to resize RBD volume "{}" to size "{}" in pool "{}": {}'.format(
|
|
name, format_bytes_tohuman(size_bytes), pool, stderr
|
|
),
|
|
)
|
|
|
|
# 3a. Determine the node running this VM if applicable
|
|
active_node = None
|
|
volume_vm_name = name.split("_")[0]
|
|
retcode, vm_info = vm.get_info(zkhandler, volume_vm_name)
|
|
if retcode:
|
|
for disk in vm_info["disks"]:
|
|
# This block device is present in this VM so we can continue
|
|
if disk["name"] == "{}/{}".format(pool, name):
|
|
active_node = vm_info["node"]
|
|
volume_id = disk["dev"]
|
|
# 3b. Perform a live resize in libvirt if the VM is running
|
|
if active_node is not None and vm_info.get("state", "") == "start":
|
|
import libvirt
|
|
|
|
# Run the libvirt command against the target host
|
|
try:
|
|
dest_lv = "qemu+tcp://{}/system".format(active_node)
|
|
target_lv_conn = libvirt.open(dest_lv)
|
|
target_vm_conn = target_lv_conn.lookupByName(vm_info["name"])
|
|
if target_vm_conn:
|
|
target_vm_conn.blockResize(
|
|
volume_id,
|
|
size_bytes,
|
|
libvirt.VIR_DOMAIN_BLOCK_RESIZE_BYTES,
|
|
)
|
|
target_lv_conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
# 4. Get volume stats
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd info --format json {}/{}".format(pool, name)
|
|
)
|
|
volstats = stdout
|
|
|
|
# 5. Update the volume in Zookeeper
|
|
zkhandler.write(
|
|
[
|
|
(("volume", f"{pool}/{name}"), ""),
|
|
(("volume.stats", f"{pool}/{name}"), volstats),
|
|
(("snapshot", f"{pool}/{name}"), ""),
|
|
]
|
|
)
|
|
|
|
return True, 'Resized RBD volume "{}" to size "{}" in pool "{}".'.format(
|
|
name, format_bytes_tohuman(size_bytes), pool
|
|
)
|
|
|
|
|
|
def rename_volume(zkhandler, pool, name, new_name):
|
|
if not verifyVolume(zkhandler, pool, name):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
name, pool
|
|
)
|
|
|
|
# 1. Rename the volume
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd rename {}/{} {}".format(pool, name, new_name)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'ERROR: Failed to rename volume "{}" to "{}" in pool "{}": {}'.format(
|
|
name, new_name, pool, stderr
|
|
),
|
|
)
|
|
|
|
# 2. Rename the volume in Zookeeper
|
|
zkhandler.rename(
|
|
[
|
|
(("volume", f"{pool}/{name}"), ("volume", f"{pool}/{new_name}")),
|
|
(("snapshot", f"{pool}/{name}"), ("snapshot", f"{pool}/{new_name}")),
|
|
]
|
|
)
|
|
|
|
# 3. Get volume stats
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd info --format json {}/{}".format(pool, new_name)
|
|
)
|
|
volstats = stdout
|
|
|
|
# 4. Update the volume stats in Zookeeper
|
|
zkhandler.write(
|
|
[
|
|
(("volume.stats", f"{pool}/{new_name}"), volstats),
|
|
]
|
|
)
|
|
|
|
return True, 'Renamed RBD volume "{}" to "{}" in pool "{}".'.format(
|
|
name, new_name, pool
|
|
)
|
|
|
|
|
|
def remove_volume(zkhandler, pool, name):
|
|
if not verifyVolume(zkhandler, pool, name):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
name, pool
|
|
)
|
|
|
|
# 1. Remove volume snapshots
|
|
for snapshot in zkhandler.children(("snapshot", f"{pool}/{name}")):
|
|
remove_snapshot(zkhandler, pool, name, snapshot)
|
|
|
|
# 2. Remove the volume
|
|
retcode, stdout, stderr = common.run_os_command("rbd rm {}/{}".format(pool, name))
|
|
if retcode:
|
|
return False, 'ERROR: Failed to remove RBD volume "{}" in pool "{}": {}'.format(
|
|
name, pool, stderr
|
|
)
|
|
|
|
# 3. Delete volume from Zookeeper
|
|
zkhandler.delete(
|
|
[
|
|
("volume", f"{pool}/{name}"),
|
|
("snapshot", f"{pool}/{name}"),
|
|
]
|
|
)
|
|
|
|
return True, 'Removed RBD volume "{}" in pool "{}".'.format(name, pool)
|
|
|
|
|
|
def map_volume(zkhandler, pool, name):
|
|
if not verifyVolume(zkhandler, pool, name):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
name, pool
|
|
)
|
|
|
|
# 1. Map the volume onto the local system
|
|
retcode, stdout, stderr = common.run_os_command("rbd map {}/{}".format(pool, name))
|
|
if retcode:
|
|
return False, 'ERROR: Failed to map RBD volume "{}" in pool "{}": {}'.format(
|
|
name, pool, stderr
|
|
)
|
|
|
|
# 2. Calculate the absolute path to the mapped volume
|
|
mapped_volume = "/dev/rbd/{}/{}".format(pool, name)
|
|
|
|
# 3. Ensure the volume exists
|
|
if not os.path.exists(mapped_volume):
|
|
return (
|
|
False,
|
|
'ERROR: Mapped volume not found at expected location "{}".'.format(
|
|
mapped_volume
|
|
),
|
|
)
|
|
|
|
return True, mapped_volume
|
|
|
|
|
|
def unmap_volume(zkhandler, pool, name):
|
|
if not verifyVolume(zkhandler, pool, name):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
name, pool
|
|
)
|
|
|
|
mapped_volume = "/dev/rbd/{}/{}".format(pool, name)
|
|
|
|
# 1. Ensure the volume exists
|
|
if not os.path.exists(mapped_volume):
|
|
return (
|
|
False,
|
|
'ERROR: Mapped volume not found at expected location "{}".'.format(
|
|
mapped_volume
|
|
),
|
|
)
|
|
|
|
# 2. Unap the volume
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd unmap {}".format(mapped_volume)
|
|
)
|
|
if retcode:
|
|
return False, 'ERROR: Failed to unmap RBD volume at "{}": {}'.format(
|
|
mapped_volume, stderr
|
|
)
|
|
|
|
return True, 'Unmapped RBD volume at "{}".'.format(mapped_volume)
|
|
|
|
|
|
def get_list_volume(zkhandler, pool, limit, is_fuzzy=True):
|
|
if pool and not verifyPool(zkhandler, pool):
|
|
return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(
|
|
pool
|
|
)
|
|
|
|
full_volume_list = getCephVolumes(zkhandler, pool)
|
|
|
|
if is_fuzzy and limit:
|
|
# Implicitly assume fuzzy limits
|
|
if not re.match(r"\^.*", limit):
|
|
limit = ".*" + limit
|
|
if not re.match(r".*\$", limit):
|
|
limit = limit + ".*"
|
|
|
|
get_volume_info = dict()
|
|
for volume in full_volume_list:
|
|
pool_name, volume_name = volume.split("/")
|
|
is_limit_match = False
|
|
|
|
# Check on limit
|
|
if limit:
|
|
# Try to match the limit against the volume name
|
|
try:
|
|
if re.fullmatch(limit, volume_name):
|
|
is_limit_match = True
|
|
except Exception as e:
|
|
return False, "Regex Error: {}".format(e)
|
|
else:
|
|
is_limit_match = True
|
|
|
|
get_volume_info[volume] = True if is_limit_match else False
|
|
|
|
# Obtain our volume data in a thread pool
|
|
volume_execute_list = [
|
|
volume for volume in full_volume_list if get_volume_info[volume]
|
|
]
|
|
volume_data_list = list()
|
|
with ThreadPoolExecutor(
|
|
max_workers=32, thread_name_prefix="volume_list"
|
|
) as executor:
|
|
futures = []
|
|
for volume in volume_execute_list:
|
|
pool_name, volume_name = volume.split("/")
|
|
futures.append(
|
|
executor.submit(getVolumeInformation, zkhandler, pool_name, volume_name)
|
|
)
|
|
for future in futures:
|
|
volume_data_list.append(future.result())
|
|
|
|
return True, sorted(volume_data_list, key=lambda x: str(x["name"]))
|
|
|
|
|
|
#
|
|
# Snapshot functions
|
|
#
|
|
def getCephSnapshots(zkhandler, pool, volume):
|
|
snapshot_list = list()
|
|
volume_list = list()
|
|
|
|
volume_list = getCephVolumes(zkhandler, pool)
|
|
if volume:
|
|
for volume_entry in volume_list:
|
|
volume_pool, volume_name = volume_entry.split("/")
|
|
if volume_name == volume:
|
|
volume_list = ["{}/{}".format(volume_pool, volume_name)]
|
|
|
|
for volume_entry in volume_list:
|
|
for snapshot_name in zkhandler.children(("snapshot", volume_entry)):
|
|
snapshot_list.append("{}@{}".format(volume_entry, snapshot_name))
|
|
|
|
return snapshot_list
|
|
|
|
|
|
def add_snapshot(zkhandler, pool, volume, name, zk_only=False):
|
|
if not verifyVolume(zkhandler, pool, volume):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
volume, pool
|
|
)
|
|
|
|
# 1. Create the snapshot
|
|
if not zk_only:
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd snap create {}/{}@{}".format(pool, volume, name)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'ERROR: Failed to create RBD snapshot "{}" of volume "{}" in pool "{}": {}'.format(
|
|
name, volume, pool, stderr
|
|
),
|
|
)
|
|
|
|
# 2. Add the snapshot to Zookeeper
|
|
zkhandler.write(
|
|
[
|
|
(("snapshot", f"{pool}/{volume}/{name}"), ""),
|
|
(("snapshot.stats", f"{pool}/{volume}/{name}"), "{}"),
|
|
]
|
|
)
|
|
|
|
# 3. Update the count of snapshots on this volume
|
|
volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}"))
|
|
volume_stats = dict(json.loads(volume_stats_raw))
|
|
# Format the size to something nicer
|
|
volume_stats["snapshot_count"] = volume_stats["snapshot_count"] + 1
|
|
volume_stats_raw = json.dumps(volume_stats)
|
|
zkhandler.write(
|
|
[
|
|
(("volume.stats", f"{pool}/{volume}"), volume_stats_raw),
|
|
]
|
|
)
|
|
|
|
return True, 'Created RBD snapshot "{}" of volume "{}" in pool "{}".'.format(
|
|
name, volume, pool
|
|
)
|
|
|
|
|
|
def rename_snapshot(zkhandler, pool, volume, name, new_name):
|
|
if not verifyVolume(zkhandler, pool, volume):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
volume, pool
|
|
)
|
|
if not verifySnapshot(zkhandler, pool, volume, name):
|
|
return (
|
|
False,
|
|
'ERROR: No snapshot with name "{}" is present for volume "{}" in pool "{}".'.format(
|
|
name, volume, pool
|
|
),
|
|
)
|
|
|
|
# 1. Rename the snapshot
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd snap rename {pool}/{volume}@{name} {pool}/{volume}@{new_name}".format(
|
|
pool=pool, volume=volume, name=name, new_name=new_name
|
|
)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'ERROR: Failed to rename RBD snapshot "{}" to "{}" for volume "{}" in pool "{}": {}'.format(
|
|
name, new_name, volume, pool, stderr
|
|
),
|
|
)
|
|
|
|
# 2. Rename the snapshot in ZK
|
|
zkhandler.rename(
|
|
[
|
|
(
|
|
("snapshot", f"{pool}/{volume}/{name}"),
|
|
("snapshot", f"{pool}/{volume}/{new_name}"),
|
|
),
|
|
]
|
|
)
|
|
|
|
return (
|
|
True,
|
|
'Renamed RBD snapshot "{}" to "{}" for volume "{}" in pool "{}".'.format(
|
|
name, new_name, volume, pool
|
|
),
|
|
)
|
|
|
|
|
|
def remove_snapshot(zkhandler, pool, volume, name):
|
|
if not verifyVolume(zkhandler, pool, volume):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
volume, pool
|
|
)
|
|
if not verifySnapshot(zkhandler, pool, volume, name):
|
|
return (
|
|
False,
|
|
'ERROR: No snapshot with name "{}" is present of volume {} in pool {}.'.format(
|
|
name, volume, pool
|
|
),
|
|
)
|
|
|
|
# 1. Remove the snapshot
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd snap rm {}/{}@{}".format(pool, volume, name)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'Failed to remove RBD snapshot "{}" of volume "{}" in pool "{}": {}'.format(
|
|
name, volume, pool, stderr
|
|
),
|
|
)
|
|
|
|
# 2. Delete snapshot from Zookeeper
|
|
zkhandler.delete([("snapshot", f"{pool}/{volume}/{name}")])
|
|
|
|
# 3. Update the count of snapshots on this volume
|
|
volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}"))
|
|
volume_stats = dict(json.loads(volume_stats_raw))
|
|
# Format the size to something nicer
|
|
volume_stats["snapshot_count"] = volume_stats["snapshot_count"] - 1
|
|
volume_stats_raw = json.dumps(volume_stats)
|
|
zkhandler.write([(("volume.stats", f"{pool}/{volume}"), volume_stats_raw)])
|
|
|
|
return True, 'Removed RBD snapshot "{}" of volume "{}" in pool "{}".'.format(
|
|
name, volume, pool
|
|
)
|
|
|
|
|
|
def get_list_snapshot(zkhandler, pool, volume, limit, is_fuzzy=True):
|
|
snapshot_list = []
|
|
if pool and not verifyPool(zkhandler, pool):
|
|
return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(
|
|
pool
|
|
)
|
|
|
|
if volume and not verifyPool(zkhandler, volume):
|
|
return (
|
|
False,
|
|
'ERROR: No volume with name "{}" is present in the cluster.'.format(volume),
|
|
)
|
|
|
|
full_snapshot_list = getCephSnapshots(zkhandler, pool, volume)
|
|
|
|
if is_fuzzy and limit:
|
|
# Implicitly assume fuzzy limits
|
|
if not re.match(r"\^.*", limit):
|
|
limit = ".*" + limit
|
|
if not re.match(r".*\$", limit):
|
|
limit = limit + ".*"
|
|
|
|
for snapshot in full_snapshot_list:
|
|
volume, snapshot_name = snapshot.split("@")
|
|
pool_name, volume_name = volume.split("/")
|
|
if limit:
|
|
try:
|
|
if re.fullmatch(limit, snapshot_name):
|
|
snapshot_list.append(
|
|
{
|
|
"pool": pool_name,
|
|
"volume": volume_name,
|
|
"snapshot": snapshot_name,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
return False, "Regex Error: {}".format(e)
|
|
else:
|
|
snapshot_list.append(
|
|
{"pool": pool_name, "volume": volume_name, "snapshot": snapshot_name}
|
|
)
|
|
|
|
return True, sorted(snapshot_list, key=lambda x: str(x["snapshot"]))
|