Returns garbage data, but avoids causing the entire API call to fail if this happens on very old clusters (or after any sort of ZK corruption).
2615 lines
82 KiB
Python
2615 lines
82 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# ceph.py - PVC client function library, Ceph cluster fuctions
|
|
# Part of the Parallel Virtual Cluster (PVC) system
|
|
#
|
|
# Copyright (C) 2018-2024 Joshua M. Boniface <joshua@boniface.me>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, version 3.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
#
|
|
###############################################################################
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
import time
|
|
import math
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from distutils.util import strtobool
|
|
from json import loads as jloads
|
|
from re import match, search
|
|
from uuid import uuid4
|
|
from os import path
|
|
|
|
import daemon_lib.vm as vm
|
|
import daemon_lib.common as common
|
|
|
|
from daemon_lib.celery import start, log_info, log_warn, update, fail, finish
|
|
|
|
|
|
#
|
|
# Supplemental functions
|
|
#
|
|
|
|
|
|
# Verify OSD is valid in cluster
|
|
def verifyOSD(zkhandler, osd_id):
|
|
return zkhandler.exists(("osd", osd_id))
|
|
|
|
|
|
# Verify Pool is valid in cluster
|
|
def verifyPool(zkhandler, name):
|
|
return zkhandler.exists(("pool", name))
|
|
|
|
|
|
# Verify Volume is valid in cluster
|
|
def verifyVolume(zkhandler, pool, name):
|
|
return zkhandler.exists(("volume", f"{pool}/{name}"))
|
|
|
|
|
|
# Verify Snapshot is valid in cluster
|
|
def verifySnapshot(zkhandler, pool, volume, name):
|
|
return zkhandler.exists(("snapshot", f"{pool}/{volume}/{name}"))
|
|
|
|
|
|
# Verify OSD path is valid in cluster
|
|
def verifyOSDBlock(zkhandler, node, device):
|
|
for osd in zkhandler.children("base.osd"):
|
|
osd_node = zkhandler.read(("osd.node", osd))
|
|
osd_device = zkhandler.read(("osd.device", osd))
|
|
if node == osd_node and device == osd_device:
|
|
return osd
|
|
return None
|
|
|
|
|
|
# Matrix of human-to-byte values
|
|
byte_unit_matrix = {
|
|
"B": 1,
|
|
"K": 1024,
|
|
"M": 1024 * 1024,
|
|
"G": 1024 * 1024 * 1024,
|
|
"T": 1024 * 1024 * 1024 * 1024,
|
|
"P": 1024 * 1024 * 1024 * 1024 * 1024,
|
|
"E": 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
|
|
"Z": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
|
|
"Y": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
|
|
"R": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
|
|
"Q": 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
|
|
}
|
|
|
|
# Matrix of human-to-metric values
|
|
ops_unit_matrix = {
|
|
"": 1,
|
|
"K": 1000,
|
|
"M": 1000 * 1000,
|
|
"G": 1000 * 1000 * 1000,
|
|
"T": 1000 * 1000 * 1000 * 1000,
|
|
"P": 1000 * 1000 * 1000 * 1000 * 1000,
|
|
"E": 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
|
|
"Z": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
|
|
"Y": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
|
|
"R": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
|
|
"Q": 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
|
|
}
|
|
|
|
|
|
# Format byte sizes to/from human-readable units
|
|
def format_bytes_tohuman(databytes):
|
|
datahuman = ""
|
|
for unit in sorted(byte_unit_matrix, key=byte_unit_matrix.get, reverse=True):
|
|
new_bytes = int(math.ceil(databytes / byte_unit_matrix[unit]))
|
|
# Round up if 5 or more digits
|
|
if new_bytes > 9999:
|
|
# We can jump down another level
|
|
continue
|
|
else:
|
|
# We're at the end, display with this size
|
|
datahuman = "{}{}".format(new_bytes, unit)
|
|
|
|
return datahuman
|
|
|
|
|
|
def format_bytes_fromhuman(datahuman):
|
|
if not isinstance(datahuman, str):
|
|
datahuman = str(datahuman)
|
|
|
|
if not re.search(r"[A-Za-z]+", datahuman):
|
|
dataunit = "B"
|
|
datasize = float(datahuman)
|
|
else:
|
|
dataunit = str(re.match(r"[0-9\.]+([A-Za-z])[iBb]*", datahuman).group(1))
|
|
datasize = float(re.match(r"([0-9\.]+)[A-Za-z]+", datahuman).group(1))
|
|
|
|
if byte_unit_matrix.get(dataunit.upper()):
|
|
databytes = int(datasize * byte_unit_matrix[dataunit.upper()])
|
|
return databytes
|
|
else:
|
|
return None
|
|
|
|
|
|
# Format ops sizes to/from human-readable units
|
|
def format_ops_tohuman(dataops):
|
|
datahuman = ""
|
|
for unit in sorted(ops_unit_matrix, key=ops_unit_matrix.get, reverse=True):
|
|
new_ops = int(math.ceil(dataops / ops_unit_matrix[unit]))
|
|
# Round up if 5 or more digits
|
|
if new_ops > 9999:
|
|
# We can jump down another level
|
|
continue
|
|
else:
|
|
# We're at the end, display with this size
|
|
datahuman = "{}{}".format(new_ops, unit)
|
|
|
|
return datahuman
|
|
|
|
|
|
def format_ops_fromhuman(datahuman):
|
|
# Trim off human-readable character
|
|
dataunit = datahuman[-1]
|
|
datasize = int(datahuman[:-1])
|
|
dataops = datasize * ops_unit_matrix[dataunit.upper()]
|
|
return "{}".format(dataops)
|
|
|
|
|
|
def format_pct_tohuman(datapct):
|
|
datahuman = "{0:.1f}".format(float(datapct * 100.0))
|
|
return datahuman
|
|
|
|
|
|
#
|
|
# Status functions
|
|
#
|
|
def get_status(zkhandler):
|
|
primary_node = zkhandler.read("base.config.primary_node")
|
|
ceph_status = zkhandler.read("base.storage").rstrip()
|
|
|
|
# Create a data structure for the information
|
|
status_data = {
|
|
"type": "status",
|
|
"primary_node": primary_node,
|
|
"ceph_data": ceph_status,
|
|
}
|
|
return True, status_data
|
|
|
|
|
|
def get_health(zkhandler):
|
|
primary_node = zkhandler.read("base.config.primary_node")
|
|
ceph_health = zkhandler.read("base.storage.health").rstrip()
|
|
|
|
# Create a data structure for the information
|
|
status_data = {
|
|
"type": "health",
|
|
"primary_node": primary_node,
|
|
"ceph_data": ceph_health,
|
|
}
|
|
return True, status_data
|
|
|
|
|
|
def get_util(zkhandler):
|
|
primary_node = zkhandler.read("base.config.primary_node")
|
|
ceph_df = zkhandler.read("base.storage.util").rstrip()
|
|
|
|
# Create a data structure for the information
|
|
status_data = {
|
|
"type": "utilization",
|
|
"primary_node": primary_node,
|
|
"ceph_data": ceph_df,
|
|
}
|
|
return True, status_data
|
|
|
|
|
|
#
|
|
# OSD functions
|
|
#
|
|
def getClusterOSDList(zkhandler):
|
|
# Get a list of VNIs by listing the children of /networks
|
|
return zkhandler.children("base.osd")
|
|
|
|
|
|
def getOSDInformation(zkhandler, osd_id):
|
|
(
|
|
osd_fsid,
|
|
osd_node,
|
|
osd_device,
|
|
_osd_is_split,
|
|
osd_db_device,
|
|
osd_stats_raw,
|
|
) = zkhandler.read_many(
|
|
[
|
|
("osd.ofsid", osd_id),
|
|
("osd.node", osd_id),
|
|
("osd.device", osd_id),
|
|
("osd.is_split", osd_id),
|
|
("osd.db_device", osd_id),
|
|
("osd.stats", osd_id),
|
|
]
|
|
)
|
|
|
|
osd_is_split = bool(strtobool(_osd_is_split))
|
|
# Parse the stats data
|
|
osd_stats = dict(json.loads(osd_stats_raw))
|
|
|
|
osd_information = {
|
|
"id": osd_id,
|
|
"fsid": osd_fsid,
|
|
"node": osd_node,
|
|
"device": osd_device,
|
|
"is_split": osd_is_split,
|
|
"db_device": osd_db_device,
|
|
"stats": osd_stats,
|
|
}
|
|
return osd_information
|
|
|
|
|
|
def in_osd(zkhandler, osd_id):
|
|
if not verifyOSD(zkhandler, osd_id):
|
|
return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(
|
|
osd_id
|
|
)
|
|
|
|
retcode, stdout, stderr = common.run_os_command("ceph osd in {}".format(osd_id))
|
|
if retcode:
|
|
return False, "ERROR: Failed to enable OSD {}: {}".format(osd_id, stderr)
|
|
|
|
return True, "Set OSD {} online.".format(osd_id)
|
|
|
|
|
|
def out_osd(zkhandler, osd_id):
|
|
if not verifyOSD(zkhandler, osd_id):
|
|
return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(
|
|
osd_id
|
|
)
|
|
|
|
retcode, stdout, stderr = common.run_os_command("ceph osd out {}".format(osd_id))
|
|
if retcode:
|
|
return False, "ERROR: Failed to disable OSD {}: {}".format(osd_id, stderr)
|
|
|
|
return True, "Set OSD {} offline.".format(osd_id)
|
|
|
|
|
|
def set_osd(zkhandler, option):
|
|
retcode, stdout, stderr = common.run_os_command("ceph osd set {}".format(option))
|
|
if retcode:
|
|
return False, 'ERROR: Failed to set property "{}": {}'.format(option, stderr)
|
|
|
|
return True, 'Set OSD property "{}".'.format(option)
|
|
|
|
|
|
def unset_osd(zkhandler, option):
|
|
retcode, stdout, stderr = common.run_os_command("ceph osd unset {}".format(option))
|
|
if retcode:
|
|
return False, 'ERROR: Failed to unset property "{}": {}'.format(option, stderr)
|
|
|
|
return True, 'Unset OSD property "{}".'.format(option)
|
|
|
|
|
|
def get_list_osd(zkhandler, limit=None, is_fuzzy=True):
|
|
osd_list = []
|
|
full_osd_list = zkhandler.children("base.osd")
|
|
|
|
if is_fuzzy and limit:
|
|
# Implicitly assume fuzzy limits
|
|
if not re.match(r"\^.*", limit):
|
|
limit = ".*" + limit
|
|
if not re.match(r".*\$", limit):
|
|
limit = limit + ".*"
|
|
|
|
for osd in full_osd_list:
|
|
if limit:
|
|
try:
|
|
if re.fullmatch(limit, osd):
|
|
osd_list.append(getOSDInformation(zkhandler, osd))
|
|
except Exception as e:
|
|
return False, "Regex Error: {}".format(e)
|
|
else:
|
|
osd_list.append(getOSDInformation(zkhandler, osd))
|
|
|
|
return True, sorted(osd_list, key=lambda x: int(x["id"]))
|
|
|
|
|
|
#
|
|
# Pool functions
|
|
#
|
|
def getPoolInformation(zkhandler, pool):
|
|
# Parse the stats data
|
|
(
|
|
pool_stats_raw,
|
|
tier,
|
|
pgs,
|
|
) = zkhandler.read_many(
|
|
[
|
|
("pool.stats", pool),
|
|
("pool.tier", pool),
|
|
("pool.pgs", pool),
|
|
]
|
|
)
|
|
|
|
pool_stats = dict(json.loads(pool_stats_raw))
|
|
volume_count = len(getCephVolumes(zkhandler, pool))
|
|
if tier is None:
|
|
tier = "default"
|
|
|
|
pool_information = {
|
|
"name": pool,
|
|
"volume_count": volume_count,
|
|
"tier": tier,
|
|
"pgs": pgs,
|
|
"stats": pool_stats,
|
|
}
|
|
return pool_information
|
|
|
|
|
|
def add_pool(zkhandler, name, pgs, replcfg, tier=None):
|
|
# Prepare the copies/mincopies variables
|
|
try:
|
|
copies, mincopies = replcfg.split(",")
|
|
copies = int(copies.replace("copies=", ""))
|
|
mincopies = int(mincopies.replace("mincopies=", ""))
|
|
except Exception:
|
|
copies = None
|
|
mincopies = None
|
|
if not copies or not mincopies:
|
|
return False, f'ERROR: Replication configuration "{replcfg}" is not valid.'
|
|
|
|
# Prepare the tiers if applicable
|
|
if tier is not None and tier in ["hdd", "ssd", "nvme"]:
|
|
crush_rule = f"{tier}_tier"
|
|
# Create a CRUSH rule for the relevant tier
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd crush rule create-replicated {crush_rule} default host {tier}"
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
f"ERROR: Failed to create CRUSH rule {tier} for pool {name}: {stderr}",
|
|
)
|
|
else:
|
|
tier = "default"
|
|
crush_rule = "replicated"
|
|
|
|
# Create the pool
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd pool create {name} {pgs} {pgs} {crush_rule}"
|
|
)
|
|
if retcode:
|
|
return False, f'ERROR: Failed to create pool "{name}" with {pgs} PGs: {stderr}'
|
|
|
|
# Set the size and minsize
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd pool set {name} size {copies}"
|
|
)
|
|
if retcode:
|
|
return False, f'ERROR: Failed to set pool "{name}" size of {copies}: {stderr}'
|
|
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd pool set {name} min_size {mincopies}"
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
f'ERROR: Failed to set pool "{name}" minimum size of {mincopies}: {stderr}',
|
|
)
|
|
|
|
# Enable RBD application
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd pool application enable {name} rbd"
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
f'ERROR: Failed to enable RBD application on pool "{name}" : {stderr}',
|
|
)
|
|
|
|
# Add the new pool to Zookeeper
|
|
zkhandler.write(
|
|
[
|
|
(("pool", name), ""),
|
|
(("pool.pgs", name), pgs),
|
|
(("pool.tier", name), tier),
|
|
(("pool.stats", name), "{}"),
|
|
(("volume", name), ""),
|
|
(("snapshot", name), ""),
|
|
]
|
|
)
|
|
|
|
return True, f'Created RBD pool "{name}" with {pgs} PGs'
|
|
|
|
|
|
def remove_pool(zkhandler, name):
|
|
if not verifyPool(zkhandler, name):
|
|
return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(
|
|
name
|
|
)
|
|
|
|
# 1. Remove pool volumes
|
|
for volume in zkhandler.children(("volume", name)):
|
|
remove_volume(zkhandler, name, volume)
|
|
|
|
# 2. Remove the pool
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"ceph osd pool rm {pool} {pool} --yes-i-really-really-mean-it".format(pool=name)
|
|
)
|
|
if retcode:
|
|
return False, 'ERROR: Failed to remove pool "{}": {}'.format(name, stderr)
|
|
|
|
# 3. Delete pool from Zookeeper
|
|
zkhandler.delete(
|
|
[
|
|
("pool", name),
|
|
("volume", name),
|
|
("snapshot", name),
|
|
]
|
|
)
|
|
|
|
return True, 'Removed RBD pool "{}" and all volumes.'.format(name)
|
|
|
|
|
|
def set_pgs_pool(zkhandler, name, pgs):
|
|
if not verifyPool(zkhandler, name):
|
|
return False, f'ERROR: No pool with name "{name}" is present in the cluster.'
|
|
|
|
# Validate new PGs count
|
|
pgs = int(pgs)
|
|
if (pgs == 0) or (pgs & (pgs - 1) != 0):
|
|
return (
|
|
False,
|
|
f'ERROR: Invalid PGs number "{pgs}": must be a non-zero power of 2.',
|
|
)
|
|
|
|
# Set the new pgs number
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd pool set {name} pg_num {pgs}"
|
|
)
|
|
if retcode:
|
|
return False, f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}"
|
|
|
|
# Set the new pgps number if increasing
|
|
current_pgs = int(zkhandler.read(("pool.pgs", name)))
|
|
if current_pgs >= pgs:
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd pool set {name} pgp_num {pgs}"
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
f"ERROR: Failed to set pg_num on pool {name} to {pgs}: {stderr}",
|
|
)
|
|
|
|
# Update Zookeeper count
|
|
zkhandler.write(
|
|
[
|
|
(("pool.pgs", name), pgs),
|
|
]
|
|
)
|
|
|
|
return True, f'Set PGs count to {pgs} for RBD pool "{name}".'
|
|
|
|
|
|
def get_list_pool(zkhandler, limit=None, is_fuzzy=True):
|
|
full_pool_list = zkhandler.children("base.pool")
|
|
|
|
if is_fuzzy and limit:
|
|
# Implicitly assume fuzzy limits
|
|
if not re.match(r"\^.*", limit):
|
|
limit = ".*" + limit
|
|
if not re.match(r".*\$", limit):
|
|
limit = limit + ".*"
|
|
|
|
get_pool_info = dict()
|
|
for pool in full_pool_list:
|
|
is_limit_match = False
|
|
if limit:
|
|
try:
|
|
if re.fullmatch(limit, pool):
|
|
is_limit_match = True
|
|
except Exception as e:
|
|
return False, "Regex Error: {}".format(e)
|
|
else:
|
|
is_limit_match = True
|
|
|
|
get_pool_info[pool] = True if is_limit_match else False
|
|
|
|
pool_execute_list = [pool for pool in full_pool_list if get_pool_info[pool]]
|
|
pool_data_list = list()
|
|
with ThreadPoolExecutor(max_workers=32, thread_name_prefix="pool_list") as executor:
|
|
futures = []
|
|
for pool in pool_execute_list:
|
|
futures.append(executor.submit(getPoolInformation, zkhandler, pool))
|
|
for future in futures:
|
|
pool_data_list.append(future.result())
|
|
|
|
return True, sorted(pool_data_list, key=lambda x: int(x["stats"].get("id", 0)))
|
|
|
|
|
|
#
|
|
# Volume functions
|
|
#
|
|
def getCephVolumes(zkhandler, pool):
|
|
volume_list = list()
|
|
if not pool:
|
|
pool_list = zkhandler.children("base.pool")
|
|
else:
|
|
pool_list = [pool]
|
|
|
|
for pool_name in pool_list:
|
|
children = zkhandler.children(("volume", pool_name))
|
|
if children is None:
|
|
continue
|
|
for volume_name in children:
|
|
volume_list.append("{}/{}".format(pool_name, volume_name))
|
|
|
|
return volume_list
|
|
|
|
|
|
def getVolumeInformation(zkhandler, pool, volume):
|
|
# Parse the stats data
|
|
volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}"))
|
|
try:
|
|
volume_stats = dict(json.loads(volume_stats_raw))
|
|
# Format the size to something nicer
|
|
volume_stats["size"] = format_bytes_tohuman(volume_stats["size"])
|
|
except Exception:
|
|
volume_stats = dict(
|
|
json.loads(
|
|
f'{"name":"{volume}","id":"","size":0,"objects":0,"order":0,"object_size":0,"snapshot_count":0,"block_name_prefix":"","format":0,"features":[],"op_features":[],"flags":[],"create_timestamp":"","access_timestamp":"","modify_timestamp":""}'
|
|
)
|
|
)
|
|
|
|
volume_information = {"name": volume, "pool": pool, "stats": volume_stats}
|
|
return volume_information
|
|
|
|
|
|
def scan_volume(zkhandler, pool, name):
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd info --format json {}/{}".format(pool, name)
|
|
)
|
|
volstats = stdout
|
|
|
|
# 3. Add the new volume to Zookeeper
|
|
zkhandler.write(
|
|
[
|
|
(("volume.stats", f"{pool}/{name}"), volstats),
|
|
]
|
|
)
|
|
|
|
|
|
def add_volume(zkhandler, pool, name, size, force_flag=False, zk_only=False):
|
|
# 1. Verify the size of the volume
|
|
pool_information = getPoolInformation(zkhandler, pool)
|
|
size_bytes = format_bytes_fromhuman(size)
|
|
if size_bytes is None:
|
|
return (
|
|
False,
|
|
f"ERROR: Requested volume size '{size}' does not have a valid SI unit",
|
|
)
|
|
|
|
pool_total_free_bytes = int(pool_information["stats"]["free_bytes"])
|
|
if size_bytes >= pool_total_free_bytes:
|
|
return (
|
|
False,
|
|
f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')",
|
|
)
|
|
|
|
# Check if we're greater than 80% utilization after the create; error if so unless we have the force flag
|
|
pool_total_bytes = (
|
|
int(pool_information["stats"]["stored_bytes"]) + pool_total_free_bytes
|
|
)
|
|
pool_safe_total_bytes = int(pool_total_bytes * 0.80)
|
|
pool_safe_free_bytes = pool_safe_total_bytes - int(
|
|
pool_information["stats"]["stored_bytes"]
|
|
)
|
|
if size_bytes >= pool_safe_free_bytes and not force_flag:
|
|
return (
|
|
False,
|
|
f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the safe free space in the pool ('{format_bytes_tohuman(pool_safe_free_bytes)}' for 80% full); retry with force to ignore this error",
|
|
)
|
|
|
|
# 2. Create the volume
|
|
# zk_only flag skips actually creating the volume - this would be done by some other mechanism
|
|
if not zk_only:
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd create --size {}B {}/{}".format(size_bytes, pool, name)
|
|
)
|
|
if retcode:
|
|
return False, 'ERROR: Failed to create RBD volume "{}": {}'.format(
|
|
name, stderr
|
|
)
|
|
|
|
# 3. Add the new volume to Zookeeper
|
|
zkhandler.write(
|
|
[
|
|
(("volume", f"{pool}/{name}"), ""),
|
|
(("volume.stats", f"{pool}/{name}"), ""),
|
|
(("snapshot", f"{pool}/{name}"), ""),
|
|
]
|
|
)
|
|
|
|
# 4. Scan the volume stats
|
|
scan_volume(zkhandler, pool, name)
|
|
|
|
return True, 'Created RBD volume "{}" of size "{}" in pool "{}".'.format(
|
|
name, format_bytes_tohuman(size_bytes), pool
|
|
)
|
|
|
|
|
|
def clone_volume(zkhandler, pool, name_src, name_new, force_flag=False):
|
|
# 1. Verify the volume
|
|
if not verifyVolume(zkhandler, pool, name_src):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
name_src, pool
|
|
)
|
|
|
|
volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{name_src}"))
|
|
volume_stats = dict(json.loads(volume_stats_raw))
|
|
size_bytes = volume_stats["size"]
|
|
pool_information = getPoolInformation(zkhandler, pool)
|
|
pool_total_free_bytes = int(pool_information["stats"]["free_bytes"])
|
|
if size_bytes >= pool_total_free_bytes:
|
|
return (
|
|
False,
|
|
f"ERROR: Clone volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')",
|
|
)
|
|
|
|
# Check if we're greater than 80% utilization after the create; error if so unless we have the force flag
|
|
pool_total_bytes = (
|
|
int(pool_information["stats"]["stored_bytes"]) + pool_total_free_bytes
|
|
)
|
|
pool_safe_total_bytes = int(pool_total_bytes * 0.80)
|
|
pool_safe_free_bytes = pool_safe_total_bytes - int(
|
|
pool_information["stats"]["stored_bytes"]
|
|
)
|
|
if size_bytes >= pool_safe_free_bytes and not force_flag:
|
|
return (
|
|
False,
|
|
f"ERROR: Clone volume size '{format_bytes_tohuman(size_bytes)}' is greater than the safe free space in the pool ('{format_bytes_tohuman(pool_safe_free_bytes)}' for 80% full); retry with force to ignore this error",
|
|
)
|
|
|
|
# 2. Clone the volume
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd copy {}/{} {}/{}".format(pool, name_src, pool, name_new)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'ERROR: Failed to clone RBD volume "{}" to "{}" in pool "{}": {}'.format(
|
|
name_src, name_new, pool, stderr
|
|
),
|
|
)
|
|
|
|
# 3. Add the new volume to Zookeeper
|
|
zkhandler.write(
|
|
[
|
|
(("volume", f"{pool}/{name_new}"), ""),
|
|
(("volume.stats", f"{pool}/{name_new}"), ""),
|
|
(("snapshot", f"{pool}/{name_new}"), ""),
|
|
]
|
|
)
|
|
|
|
# 4. Scan the volume stats
|
|
scan_volume(zkhandler, pool, name_new)
|
|
|
|
return True, 'Cloned RBD volume "{}" to "{}" in pool "{}"'.format(
|
|
name_src, name_new, pool
|
|
)
|
|
|
|
|
|
def resize_volume(zkhandler, pool, name, size, force_flag=False):
|
|
if not verifyVolume(zkhandler, pool, name):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
name, pool
|
|
)
|
|
|
|
# 1. Verify the size of the volume
|
|
pool_information = getPoolInformation(zkhandler, pool)
|
|
size_bytes = format_bytes_fromhuman(size)
|
|
if size_bytes is None:
|
|
return (
|
|
False,
|
|
f"ERROR: Requested volume size '{size}' does not have a valid SI unit",
|
|
)
|
|
|
|
pool_total_free_bytes = int(pool_information["stats"]["free_bytes"])
|
|
if size_bytes >= pool_total_free_bytes:
|
|
return (
|
|
False,
|
|
f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the available free space in the pool ('{format_bytes_tohuman(pool_information['stats']['free_bytes'])}')",
|
|
)
|
|
|
|
# Check if we're greater than 80% utilization after the create; error if so unless we have the force flag
|
|
pool_total_bytes = (
|
|
int(pool_information["stats"]["stored_bytes"]) + pool_total_free_bytes
|
|
)
|
|
pool_safe_total_bytes = int(pool_total_bytes * 0.80)
|
|
pool_safe_free_bytes = pool_safe_total_bytes - int(
|
|
pool_information["stats"]["stored_bytes"]
|
|
)
|
|
if size_bytes >= pool_safe_free_bytes and not force_flag:
|
|
return (
|
|
False,
|
|
f"ERROR: Requested volume size '{format_bytes_tohuman(size_bytes)}' is greater than the safe free space in the pool ('{format_bytes_tohuman(pool_safe_free_bytes)}' for 80% full); retry with force to ignore this error",
|
|
)
|
|
|
|
# 2. Resize the volume
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd resize --size {} {}/{}".format(
|
|
format_bytes_tohuman(size_bytes), pool, name
|
|
)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'ERROR: Failed to resize RBD volume "{}" to size "{}" in pool "{}": {}'.format(
|
|
name, format_bytes_tohuman(size_bytes), pool, stderr
|
|
),
|
|
)
|
|
|
|
# 3a. Determine the node running this VM if applicable
|
|
active_node = None
|
|
volume_vm_name = name.split("_")[0]
|
|
retcode, vm_info = vm.get_info(zkhandler, volume_vm_name)
|
|
if retcode:
|
|
for disk in vm_info["disks"]:
|
|
# This block device is present in this VM so we can continue
|
|
if disk["name"] == "{}/{}".format(pool, name):
|
|
active_node = vm_info["node"]
|
|
volume_id = disk["dev"]
|
|
# 3b. Perform a live resize in libvirt if the VM is running
|
|
if active_node is not None and vm_info.get("state", "") == "start":
|
|
import libvirt
|
|
|
|
# Run the libvirt command against the target host
|
|
try:
|
|
dest_lv = "qemu+tcp://{}/system".format(active_node)
|
|
target_lv_conn = libvirt.open(dest_lv)
|
|
target_vm_conn = target_lv_conn.lookupByName(vm_info["name"])
|
|
if target_vm_conn:
|
|
target_vm_conn.blockResize(
|
|
volume_id,
|
|
size_bytes,
|
|
libvirt.VIR_DOMAIN_BLOCK_RESIZE_BYTES,
|
|
)
|
|
target_lv_conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
# 4. Scan the volume stats
|
|
scan_volume(zkhandler, pool, name)
|
|
|
|
return True, 'Resized RBD volume "{}" to size "{}" in pool "{}".'.format(
|
|
name, format_bytes_tohuman(size_bytes), pool
|
|
)
|
|
|
|
|
|
def rename_volume(zkhandler, pool, name, new_name):
|
|
if not verifyVolume(zkhandler, pool, name):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
name, pool
|
|
)
|
|
|
|
# 1. Rename the volume
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd rename {}/{} {}".format(pool, name, new_name)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'ERROR: Failed to rename volume "{}" to "{}" in pool "{}": {}'.format(
|
|
name, new_name, pool, stderr
|
|
),
|
|
)
|
|
|
|
# 2. Rename the volume in Zookeeper
|
|
zkhandler.rename(
|
|
[
|
|
(("volume", f"{pool}/{name}"), ("volume", f"{pool}/{new_name}")),
|
|
(("snapshot", f"{pool}/{name}"), ("snapshot", f"{pool}/{new_name}")),
|
|
]
|
|
)
|
|
|
|
# 3. Scan the volume stats
|
|
scan_volume(zkhandler, pool, new_name)
|
|
|
|
return True, 'Renamed RBD volume "{}" to "{}" in pool "{}".'.format(
|
|
name, new_name, pool
|
|
)
|
|
|
|
|
|
def remove_volume(zkhandler, pool, name):
|
|
if not verifyVolume(zkhandler, pool, name):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
name, pool
|
|
)
|
|
|
|
# 1a. Remove PVC-managed volume snapshots
|
|
for snapshot in zkhandler.children(("snapshot", f"{pool}/{name}")):
|
|
remove_snapshot(zkhandler, pool, name, snapshot)
|
|
|
|
# 1b. Purge any remaining volume snapshots
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd snap purge {}/{}".format(pool, name)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'ERROR: Failed to purge snapshots from RBD volume "{}" in pool "{}": {}'.format(
|
|
name, pool, stderr
|
|
),
|
|
)
|
|
|
|
# 2. Remove the volume
|
|
retcode, stdout, stderr = common.run_os_command("rbd rm {}/{}".format(pool, name))
|
|
if retcode:
|
|
return False, 'ERROR: Failed to remove RBD volume "{}" in pool "{}": {}'.format(
|
|
name, pool, stderr
|
|
)
|
|
|
|
# 3. Delete volume from Zookeeper
|
|
zkhandler.delete(
|
|
[
|
|
("volume", f"{pool}/{name}"),
|
|
("snapshot", f"{pool}/{name}"),
|
|
]
|
|
)
|
|
|
|
return True, 'Removed RBD volume "{}" in pool "{}".'.format(name, pool)
|
|
|
|
|
|
def map_volume(zkhandler, pool, name):
|
|
if not verifyVolume(zkhandler, pool, name):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
name, pool
|
|
)
|
|
|
|
# 1. Map the volume onto the local system
|
|
retcode, stdout, stderr = common.run_os_command("rbd map {}/{}".format(pool, name))
|
|
if retcode:
|
|
return False, 'ERROR: Failed to map RBD volume "{}" in pool "{}": {}'.format(
|
|
name, pool, stderr
|
|
)
|
|
|
|
# 2. Calculate the absolute path to the mapped volume
|
|
mapped_volume = "/dev/rbd/{}/{}".format(pool, name)
|
|
|
|
# 3. Ensure the volume exists
|
|
if not os.path.exists(mapped_volume):
|
|
return (
|
|
False,
|
|
'ERROR: Mapped volume not found at expected location "{}".'.format(
|
|
mapped_volume
|
|
),
|
|
)
|
|
|
|
return True, mapped_volume
|
|
|
|
|
|
def unmap_volume(zkhandler, pool, name):
|
|
if not verifyVolume(zkhandler, pool, name):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
name, pool
|
|
)
|
|
|
|
mapped_volume = "/dev/rbd/{}/{}".format(pool, name)
|
|
|
|
# 1. Ensure the volume exists
|
|
if not os.path.exists(mapped_volume):
|
|
return (
|
|
False,
|
|
'ERROR: Mapped volume not found at expected location "{}".'.format(
|
|
mapped_volume
|
|
),
|
|
)
|
|
|
|
# 2. Unap the volume
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd unmap {}".format(mapped_volume)
|
|
)
|
|
if retcode:
|
|
return False, 'ERROR: Failed to unmap RBD volume at "{}": {}'.format(
|
|
mapped_volume, stderr
|
|
)
|
|
|
|
return True, 'Unmapped RBD volume at "{}".'.format(mapped_volume)
|
|
|
|
|
|
def get_list_volume(zkhandler, pool, limit=None, is_fuzzy=True):
|
|
if pool and not verifyPool(zkhandler, pool):
|
|
return False, 'ERROR: No pool with name "{}" is present in the cluster.'.format(
|
|
pool
|
|
)
|
|
|
|
full_volume_list = getCephVolumes(zkhandler, pool)
|
|
|
|
if is_fuzzy and limit:
|
|
# Implicitly assume fuzzy limits
|
|
if not re.match(r"\^.*", limit):
|
|
limit = ".*" + limit
|
|
if not re.match(r".*\$", limit):
|
|
limit = limit + ".*"
|
|
|
|
get_volume_info = dict()
|
|
for volume in full_volume_list:
|
|
pool_name, volume_name = volume.split("/")
|
|
is_limit_match = False
|
|
|
|
# Check on limit
|
|
if limit:
|
|
# Try to match the limit against the volume name
|
|
try:
|
|
if re.fullmatch(limit, volume_name):
|
|
is_limit_match = True
|
|
except Exception as e:
|
|
return False, "Regex Error: {}".format(e)
|
|
else:
|
|
is_limit_match = True
|
|
|
|
get_volume_info[volume] = True if is_limit_match else False
|
|
|
|
# Obtain our volume data in a thread pool
|
|
volume_execute_list = [
|
|
volume for volume in full_volume_list if get_volume_info[volume]
|
|
]
|
|
volume_data_list = list()
|
|
with ThreadPoolExecutor(
|
|
max_workers=32, thread_name_prefix="volume_list"
|
|
) as executor:
|
|
futures = []
|
|
for volume in volume_execute_list:
|
|
pool_name, volume_name = volume.split("/")
|
|
futures.append(
|
|
executor.submit(getVolumeInformation, zkhandler, pool_name, volume_name)
|
|
)
|
|
for future in futures:
|
|
volume_data_list.append(future.result())
|
|
|
|
return True, sorted(volume_data_list, key=lambda x: str(x["name"]))
|
|
|
|
|
|
#
|
|
# Snapshot functions
|
|
#
|
|
def getCephSnapshots(zkhandler, pool, volume):
|
|
snapshot_list = list()
|
|
volume_list = list()
|
|
|
|
volume_list = getCephVolumes(zkhandler, pool)
|
|
if volume:
|
|
for volume_entry in volume_list:
|
|
volume_pool, volume_name = volume_entry.split("/")
|
|
if volume_name == volume:
|
|
volume_list = ["{}/{}".format(volume_pool, volume_name)]
|
|
|
|
for volume_entry in volume_list:
|
|
for snapshot_name in zkhandler.children(("snapshot", volume_entry)):
|
|
snapshot_list.append("{}@{}".format(volume_entry, snapshot_name))
|
|
|
|
return snapshot_list
|
|
|
|
|
|
def add_snapshot(zkhandler, pool, volume, name, zk_only=False):
|
|
if not verifyVolume(zkhandler, pool, volume):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
volume, pool
|
|
)
|
|
|
|
# 1. Create the snapshot
|
|
if not zk_only:
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd snap create {}/{}@{}".format(pool, volume, name)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'ERROR: Failed to create RBD snapshot "{}" of volume "{}" in pool "{}": {}'.format(
|
|
name, volume, pool, stderr
|
|
),
|
|
)
|
|
|
|
# 2. Get snapshot stats
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd info --format json {}/{}@{}".format(pool, volume, name)
|
|
)
|
|
snapstats = stdout
|
|
|
|
# 3. Add the snapshot to Zookeeper
|
|
zkhandler.write(
|
|
[
|
|
(("snapshot", f"{pool}/{volume}/{name}"), ""),
|
|
(("snapshot.stats", f"{pool}/{volume}/{name}"), snapstats),
|
|
]
|
|
)
|
|
|
|
# 4. Update the count of snapshots on this volume
|
|
volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}"))
|
|
volume_stats = dict(json.loads(volume_stats_raw))
|
|
volume_stats["snapshot_count"] = volume_stats["snapshot_count"] + 1
|
|
zkhandler.write(
|
|
[
|
|
(("volume.stats", f"{pool}/{volume}"), json.dumps(volume_stats)),
|
|
]
|
|
)
|
|
|
|
return True, 'Created RBD snapshot "{}" of volume "{}" in pool "{}".'.format(
|
|
name, volume, pool
|
|
)
|
|
|
|
|
|
def rename_snapshot(zkhandler, pool, volume, name, new_name):
|
|
if not verifyVolume(zkhandler, pool, volume):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
volume, pool
|
|
)
|
|
if not verifySnapshot(zkhandler, pool, volume, name):
|
|
return (
|
|
False,
|
|
'ERROR: No snapshot with name "{}" is present for volume "{}" in pool "{}".'.format(
|
|
name, volume, pool
|
|
),
|
|
)
|
|
|
|
# 1. Rename the snapshot
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd snap rename {pool}/{volume}@{name} {pool}/{volume}@{new_name}".format(
|
|
pool=pool, volume=volume, name=name, new_name=new_name
|
|
)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'ERROR: Failed to rename RBD snapshot "{}" to "{}" for volume "{}" in pool "{}": {}'.format(
|
|
name, new_name, volume, pool, stderr
|
|
),
|
|
)
|
|
|
|
# 2. Rename the snapshot in ZK
|
|
zkhandler.rename(
|
|
[
|
|
(
|
|
("snapshot", f"{pool}/{volume}/{name}"),
|
|
("snapshot", f"{pool}/{volume}/{new_name}"),
|
|
),
|
|
]
|
|
)
|
|
|
|
return (
|
|
True,
|
|
'Renamed RBD snapshot "{}" to "{}" for volume "{}" in pool "{}".'.format(
|
|
name, new_name, volume, pool
|
|
),
|
|
)
|
|
|
|
|
|
def rollback_snapshot(zkhandler, pool, volume, name):
|
|
if not verifyVolume(zkhandler, pool, volume):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
volume, pool
|
|
)
|
|
if not verifySnapshot(zkhandler, pool, volume, name):
|
|
return (
|
|
False,
|
|
'ERROR: No snapshot with name "{}" is present for volume "{}" in pool "{}".'.format(
|
|
name, volume, pool
|
|
),
|
|
)
|
|
|
|
# 1. Roll back the snapshot
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd snap rollback {}/{}@{}".format(pool, volume, name)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'ERROR: Failed to roll back RBD volume "{}" in pool "{}" to snapshot "{}": {}'.format(
|
|
volume, pool, name, stderr
|
|
),
|
|
)
|
|
|
|
return True, 'Rolled back RBD volume "{}" in pool "{}" to snapshot "{}".'.format(
|
|
volume, pool, name
|
|
)
|
|
|
|
|
|
def remove_snapshot(zkhandler, pool, volume, name):
|
|
if not verifyVolume(zkhandler, pool, volume):
|
|
return False, 'ERROR: No volume with name "{}" is present in pool "{}".'.format(
|
|
volume, pool
|
|
)
|
|
if not verifySnapshot(zkhandler, pool, volume, name):
|
|
return (
|
|
False,
|
|
'ERROR: No snapshot with name "{}" is present of volume {} in pool {}.'.format(
|
|
name, volume, pool
|
|
),
|
|
)
|
|
|
|
# 1. Remove the snapshot
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"rbd snap rm {}/{}@{}".format(pool, volume, name)
|
|
)
|
|
if retcode:
|
|
return (
|
|
False,
|
|
'Failed to remove RBD snapshot "{}" of volume "{}" in pool "{}": {}'.format(
|
|
name, volume, pool, stderr
|
|
),
|
|
)
|
|
|
|
# 2. Delete snapshot from Zookeeper
|
|
zkhandler.delete([("snapshot", f"{pool}/{volume}/{name}")])
|
|
|
|
# 3. Update the count of snapshots on this volume
|
|
volume_stats_raw = zkhandler.read(("volume.stats", f"{pool}/{volume}"))
|
|
volume_stats = dict(json.loads(volume_stats_raw))
|
|
# Format the size to something nicer
|
|
volume_stats["snapshot_count"] = volume_stats["snapshot_count"] - 1
|
|
volume_stats_raw = json.dumps(volume_stats)
|
|
zkhandler.write([(("volume.stats", f"{pool}/{volume}"), volume_stats_raw)])
|
|
|
|
return True, 'Removed RBD snapshot "{}" of volume "{}" in pool "{}".'.format(
|
|
name, volume, pool
|
|
)
|
|
|
|
|
|
def get_list_snapshot(zkhandler, target_pool, target_volume, limit=None, is_fuzzy=True):
|
|
snapshot_list = []
|
|
full_snapshot_list = getCephSnapshots(zkhandler, target_pool, target_volume)
|
|
|
|
if is_fuzzy and limit:
|
|
# Implicitly assume fuzzy limits
|
|
if not re.match(r"\^.*", limit):
|
|
limit = ".*" + limit
|
|
if not re.match(r".*\$", limit):
|
|
limit = limit + ".*"
|
|
|
|
for snapshot in full_snapshot_list:
|
|
volume, snapshot_name = snapshot.split("@")
|
|
pool_name, volume_name = volume.split("/")
|
|
if target_pool and pool_name != target_pool:
|
|
continue
|
|
if target_volume and volume_name != target_volume:
|
|
continue
|
|
try:
|
|
snapshot_stats = json.loads(
|
|
zkhandler.read(
|
|
("snapshot.stats", f"{pool_name}/{volume_name}/{snapshot_name}")
|
|
)
|
|
)
|
|
except Exception:
|
|
snapshot_stats = []
|
|
if limit:
|
|
try:
|
|
if re.fullmatch(limit, snapshot_name):
|
|
snapshot_list.append(
|
|
{
|
|
"pool": pool_name,
|
|
"volume": volume_name,
|
|
"snapshot": snapshot_name,
|
|
"stats": snapshot_stats,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
return False, "Regex Error: {}".format(e)
|
|
else:
|
|
snapshot_list.append(
|
|
{
|
|
"pool": pool_name,
|
|
"volume": volume_name,
|
|
"snapshot": snapshot_name,
|
|
"stats": snapshot_stats,
|
|
}
|
|
)
|
|
|
|
return True, sorted(snapshot_list, key=lambda x: str(x["snapshot"]))
|
|
|
|
|
|
#
|
|
# Celery worker tasks (must be run on node, outputs log messages to worker)
|
|
#
|
|
def osd_worker_helper_find_osds_from_block(device):
|
|
# Try to query the passed block device directly
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm list --format json {device}"
|
|
)
|
|
if retcode:
|
|
found_osds = []
|
|
else:
|
|
found_osds = jloads(stdout)
|
|
|
|
return found_osds
|
|
|
|
|
|
def osd_worker_add_osd(
|
|
zkhandler,
|
|
celery,
|
|
node,
|
|
device,
|
|
weight,
|
|
ext_db_ratio=None,
|
|
ext_db_size=None,
|
|
split_count=None,
|
|
):
|
|
current_stage = 0
|
|
total_stages = 5
|
|
if split_count is None:
|
|
split_count = 1
|
|
else:
|
|
split_count = int(split_count)
|
|
total_stages = total_stages + 3 * int(split_count)
|
|
if ext_db_ratio is not None or ext_db_size is not None:
|
|
total_stages = total_stages + 3 * int(split_count) + 1
|
|
|
|
start(
|
|
celery,
|
|
f"Adding {split_count} new OSD(s) on device {device} with weight {weight}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
# Handle a detect device if that is passed
|
|
if match(r"detect:", device):
|
|
ddevice = common.get_detect_device(device)
|
|
if ddevice is None:
|
|
fail(
|
|
celery,
|
|
f"Failed to determine block device from detect string {device}",
|
|
)
|
|
return
|
|
else:
|
|
log_info(
|
|
celery, f"Determined block device {ddevice} from detect string {device}"
|
|
)
|
|
device = ddevice
|
|
|
|
if ext_db_size is not None and ext_db_ratio is not None:
|
|
fail(
|
|
celery,
|
|
"Invalid configuration: both an ext_db_size and ext_db_ratio were specified",
|
|
)
|
|
return
|
|
|
|
# Check if device has a partition table; it's not valid if it does
|
|
retcode, _, _ = common.run_os_command(f"sfdisk --dump {device}")
|
|
if retcode < 1:
|
|
fail(
|
|
celery,
|
|
f"Device {device} has a partition table and is unsuitable for an OSD",
|
|
)
|
|
return
|
|
|
|
if ext_db_size is not None or ext_db_ratio is not None:
|
|
ext_db_flag = True
|
|
else:
|
|
ext_db_flag = False
|
|
|
|
if split_count > 1:
|
|
split_flag = f"--osds-per-device {split_count}"
|
|
is_split = True
|
|
log_info(
|
|
celery, f"Creating {split_count} new OSD disks on block device {device}"
|
|
)
|
|
else:
|
|
split_flag = ""
|
|
is_split = False
|
|
log_info(celery, f"Creating 1 new OSD disk on block device {device}")
|
|
|
|
if "nvme" in device:
|
|
class_flag = "--crush-device-class nvme"
|
|
else:
|
|
class_flag = "--crush-device-class ssd"
|
|
|
|
# 1. Zap the block device
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Zapping block device {device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm zap --destroy {device}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to perform ceph-volume lvm zap on {device}")
|
|
return
|
|
|
|
# 2. Prepare the OSD(s)
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Preparing OSD(s) on device {device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm batch --yes --prepare --bluestore {split_flag} {class_flag} {device}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to perform ceph-volume lvm batch on {device}")
|
|
return
|
|
|
|
# 3. Get the list of created OSDs on the device (initial pass)
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Querying OSD(s) on device {device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm list --format json {device}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to perform ceph-volume lvm list on {device}")
|
|
return
|
|
|
|
created_osds = jloads(stdout)
|
|
|
|
# 4. Prepare the WAL and DB devices
|
|
if ext_db_flag:
|
|
for created_osd in created_osds:
|
|
# 4a. Get the OSD FSID and ID from the details
|
|
osd_details = created_osds[created_osd][0]
|
|
osd_fsid = osd_details["tags"]["ceph.osd_fsid"]
|
|
osd_id = osd_details["tags"]["ceph.osd_id"]
|
|
osd_lv = osd_details["lv_path"]
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Preparing DB LV for OSD {osd_id}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
# 4b. Prepare the logical volume if ext_db_flag
|
|
if ext_db_ratio is not None:
|
|
_, osd_size_bytes, _ = common.run_os_command(
|
|
f"blockdev --getsize64 {osd_lv}"
|
|
)
|
|
osd_size_bytes = int(osd_size_bytes)
|
|
osd_db_size_bytes = int(osd_size_bytes * ext_db_ratio)
|
|
if ext_db_size is not None:
|
|
osd_db_size_bytes = format_bytes_fromhuman(ext_db_size)
|
|
osd_db_size_mb = int(osd_db_size_bytes / 1024 / 1024)
|
|
|
|
db_device = f"osd-db/osd-{osd_id}"
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Preparing Bluestore DB volume for OSD {osd_id} on {db_device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"lvcreate -L {osd_db_size_mb}M -n osd-{osd_id} --yes osd-db"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to run lvcreate on {db_device}")
|
|
return
|
|
|
|
# 4c. Attach the new DB device to the OSD
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Attaching Bluestore DB volume to OSD {osd_id}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm new-db --osd-id {osd_id} --osd-fsid {osd_fsid} --target {db_device}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(
|
|
celery, f"Failed to perform ceph-volume lvm new-db on OSD {osd_id}"
|
|
)
|
|
return
|
|
|
|
# 4d. Get the list of created OSDs on the device (final pass)
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Requerying OSD(s) on device {device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm list --format json {device}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to perform ceph-volume lvm list on {device}")
|
|
return
|
|
|
|
created_osds = jloads(stdout)
|
|
|
|
# 5. Activate the OSDs
|
|
for created_osd in created_osds:
|
|
# 5a. Get the OSD FSID and ID from the details
|
|
osd_details = created_osds[created_osd][0]
|
|
osd_clusterfsid = osd_details["tags"]["ceph.cluster_fsid"]
|
|
osd_fsid = osd_details["tags"]["ceph.osd_fsid"]
|
|
osd_id = osd_details["tags"]["ceph.osd_id"]
|
|
db_device = osd_details["tags"].get("ceph.db_device", "")
|
|
osd_vg = osd_details["vg_name"]
|
|
osd_lv = osd_details["lv_name"]
|
|
|
|
# 5b. Add it to the crush map
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Adding OSD {osd_id} to CRUSH map",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd crush add osd.{osd_id} {weight} root=default host={node}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to perform ceph osd crush add for OSD {osd_id}")
|
|
return
|
|
|
|
# 5c. Activate the OSD
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Activating OSD {osd_id}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm activate --bluestore {osd_id} {osd_fsid}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to perform ceph osd crush add for OSD {osd_id}")
|
|
return
|
|
|
|
# 5d. Wait 1 second for it to activate
|
|
time.sleep(1)
|
|
|
|
# 5e. Verify it started
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"systemctl status ceph-osd@{osd_id}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to start OSD {osd_id} process")
|
|
return
|
|
|
|
# 5f. Add the new OSD to PVC
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Adding OSD {osd_id} to PVC",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
zkhandler.write(
|
|
[
|
|
(("osd", osd_id), ""),
|
|
(("osd.node", osd_id), node),
|
|
(("osd.device", osd_id), device),
|
|
(("osd.db_device", osd_id), db_device),
|
|
(("osd.fsid", osd_id), ""),
|
|
(("osd.ofsid", osd_id), osd_fsid),
|
|
(("osd.cfsid", osd_id), osd_clusterfsid),
|
|
(("osd.lvm", osd_id), ""),
|
|
(("osd.vg", osd_id), osd_vg),
|
|
(("osd.lv", osd_id), osd_lv),
|
|
(("osd.is_split", osd_id), is_split),
|
|
(
|
|
("osd.stats", osd_id),
|
|
'{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "|", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", "state": "|"}',
|
|
),
|
|
]
|
|
)
|
|
|
|
# 6. Wait for OSD to check in
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
"Waiting for new OSD(s) to report stats",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
last_osd = None
|
|
for osd in created_osds:
|
|
last_osd = osd
|
|
|
|
while (
|
|
jloads(
|
|
zkhandler.read(
|
|
("osd.stats", created_osds[last_osd][0]["tags"]["ceph.osd_id"])
|
|
)
|
|
)["weight"]
|
|
== "|"
|
|
):
|
|
time.sleep(1)
|
|
|
|
# 7. Log it
|
|
current_stage += 1
|
|
return finish(
|
|
celery,
|
|
f"Successfully created {len(created_osds.keys())} new OSD(s) {','.join(created_osds.keys())} on device {device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
|
|
def osd_worker_replace_osd(
|
|
zkhandler,
|
|
celery,
|
|
node,
|
|
osd_id,
|
|
new_device,
|
|
old_device=None,
|
|
weight=None,
|
|
ext_db_ratio=None,
|
|
ext_db_size=None,
|
|
):
|
|
# Try to determine if any other OSDs shared a block device with this OSD
|
|
_, osd_list = get_list_osd(zkhandler, None)
|
|
osd_block = zkhandler.read(("osd.device", osd_id))
|
|
all_osds_on_block = [
|
|
o for o in osd_list if o["node"] == node and o["device"] == osd_block
|
|
]
|
|
all_osds_on_block_ids = [o["id"] for o in all_osds_on_block]
|
|
|
|
# Set up stages
|
|
current_stage = 0
|
|
total_stages = 3
|
|
_split_count = len(all_osds_on_block_ids)
|
|
total_stages = total_stages + 10 * int(_split_count)
|
|
if (
|
|
ext_db_ratio is not None
|
|
or ext_db_size is not None
|
|
or any([True for o in all_osds_on_block if o["db_device"]])
|
|
):
|
|
total_stages = total_stages + 2 * int(_split_count)
|
|
|
|
start(
|
|
celery,
|
|
f"Replacing OSD(s) {','.join(all_osds_on_block_ids)} with device {new_device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
# Handle a detect device if that is passed
|
|
if match(r"detect:", new_device):
|
|
ddevice = common.get_detect_device(new_device)
|
|
if ddevice is None:
|
|
fail(
|
|
celery,
|
|
f"Failed to determine block device from detect string {new_device}",
|
|
)
|
|
return
|
|
else:
|
|
log_info(
|
|
celery,
|
|
f"Determined block device {ddevice} from detect string {new_device}",
|
|
)
|
|
new_device = ddevice
|
|
|
|
# Check if device has a partition table; it's not valid if it does
|
|
retcode, _, _ = common.run_os_command(f"sfdisk --dump {new_device}")
|
|
if retcode < 1:
|
|
fail(
|
|
celery,
|
|
f"Device {new_device} has a partition table and is unsuitable for an OSD",
|
|
)
|
|
return
|
|
|
|
# Phase 1: Try to determine what we can about the old device
|
|
real_old_device = None
|
|
|
|
# Determine information from a passed old_device
|
|
if old_device is not None:
|
|
found_osds = osd_worker_helper_find_osds_from_block(old_device)
|
|
if found_osds and osd_id in found_osds.keys():
|
|
real_old_device = old_device
|
|
else:
|
|
log_warn(
|
|
celery,
|
|
f"No OSD(s) found on device {old_device}; falling back to PVC detection",
|
|
)
|
|
|
|
# Try to get an old_device from our PVC information
|
|
if real_old_device is None:
|
|
found_osds = osd_worker_helper_find_osds_from_block(osd_block)
|
|
|
|
if osd_id in found_osds.keys():
|
|
real_old_device = osd_block
|
|
|
|
if real_old_device is None:
|
|
skip_zap = True
|
|
log_warn(
|
|
celery,
|
|
"No valid old block device found for OSD(s); skipping zap",
|
|
)
|
|
else:
|
|
skip_zap = False
|
|
|
|
# Determine the weight of the OSD(s)
|
|
if weight is None:
|
|
weight = all_osds_on_block[0]["stats"]["weight"]
|
|
|
|
# Take down the OSD(s), but keep it's CRUSH map details and IDs
|
|
for osd in all_osds_on_block:
|
|
osd_id = osd["id"]
|
|
|
|
# 1. Set the OSD down and out so it will flush
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Setting OSD {osd_id} down",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(f"ceph osd down {osd_id}")
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to set OSD {osd_id} down")
|
|
return
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Setting OSD {osd_id} out",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(f"ceph osd out {osd_id}")
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to set OSD {osd_id} out")
|
|
return
|
|
|
|
# 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete)
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Waiting for OSD {osd_id} to be safe to remove",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
tcount = 0
|
|
while True:
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd safe-to-destroy osd.{osd_id}"
|
|
)
|
|
if int(retcode) in [0, 11]:
|
|
break
|
|
else:
|
|
common.run_os_command(f"ceph osd down {osd_id}")
|
|
common.run_os_command(f"ceph osd out {osd_id}")
|
|
time.sleep(1)
|
|
tcount += 1
|
|
if tcount > 60:
|
|
log_warn(
|
|
celery,
|
|
f"Timed out (60s) waiting for OSD {osd_id} to be safe to remove; proceeding anyways",
|
|
)
|
|
break
|
|
|
|
# 3. Stop the OSD process and wait for it to be terminated
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Stopping OSD {osd_id}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"systemctl stop ceph-osd@{osd_id}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to stop OSD {osd_id}")
|
|
return
|
|
time.sleep(5)
|
|
|
|
# 4. Destroy the OSD
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Destroying OSD {osd_id}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd destroy {osd_id} --yes-i-really-mean-it"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to destroy OSD {osd_id}")
|
|
return
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Zapping old disk {real_old_device} if possible",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
if not skip_zap:
|
|
# 5. Zap the old disk
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm zap --destroy {real_old_device}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
log_warn(
|
|
celery, f"Failed to zap old disk {real_old_device}; proceeding anyways"
|
|
)
|
|
|
|
# 6. Prepare the volume group on the new device
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Preparing LVM volume group on new disk {new_device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm zap --destroy {new_device}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to run ceph-volume lvm zap on new disk {new_device}")
|
|
return
|
|
|
|
retcode, stdout, stderr = common.run_os_command(f"pvcreate {new_device}")
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to run pvcreate on new disk {new_device}")
|
|
return
|
|
|
|
vg_uuid = str(uuid4())
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"vgcreate ceph-{vg_uuid} {new_device}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to run vgcreate on new disk {new_device}")
|
|
return
|
|
|
|
# Determine how many OSDs we want on the new device
|
|
osds_count = len(all_osds_on_block)
|
|
|
|
# Determine the size of the new device
|
|
_, new_device_size_bytes, _ = common.run_os_command(
|
|
f"blockdev --getsize64 {new_device}"
|
|
)
|
|
|
|
# Calculate the size of each OSD (in MB) based on the default 4M extent size
|
|
new_osd_size_mb = (
|
|
int(int(int(new_device_size_bytes) / osds_count) / 1024 / 1024 / 4) * 4
|
|
)
|
|
|
|
# Calculate the size, if applicable, of the OSD block if we were passed a ratio
|
|
if ext_db_ratio is not None:
|
|
osd_new_db_size_mb = int(int(int(new_osd_size_mb * ext_db_ratio) / 4) * 4)
|
|
elif ext_db_size is not None:
|
|
osd_new_db_size_mb = int(
|
|
int(int(format_bytes_fromhuman(ext_db_size)) / 1024 / 1024 / 4) * 4
|
|
)
|
|
else:
|
|
if all_osds_on_block[0]["db_device"]:
|
|
_, new_device_size_bytes, _ = common.run_os_command(
|
|
f"blockdev --getsize64 {all_osds_on_block[0]['db_device']}"
|
|
)
|
|
osd_new_db_size_mb = int(
|
|
int(int(new_device_size_bytes) / 1024 / 1024 / 4) * 4
|
|
)
|
|
else:
|
|
osd_new_db_size_mb = None
|
|
|
|
for osd in all_osds_on_block:
|
|
osd_id = osd["id"]
|
|
osd_fsid = osd["fsid"]
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Preparing LVM logical volume for OSD {osd_id} on new disk {new_device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"lvcreate -L {new_osd_size_mb}M -n osd-block-{osd_fsid} ceph-{vg_uuid}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to run lvcreate for OSD {osd_id}")
|
|
return
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Preparing OSD {osd_id} on new disk {new_device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm prepare --bluestore --osd-id {osd_id} --osd-fsid {osd_fsid} --data /dev/ceph-{vg_uuid}/osd-block-{osd_fsid}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to run ceph-volume lvm prepare for OSD {osd_id}")
|
|
return
|
|
|
|
for osd in all_osds_on_block:
|
|
osd_id = osd["id"]
|
|
osd_fsid = osd["fsid"]
|
|
|
|
if osd["db_device"]:
|
|
db_device = f"osd-db/osd-{osd_id}"
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Preparing Bluestore DB volume for OSD {osd_id} on {db_device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"lvremove --force {db_device}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to run lvremove on {db_device}")
|
|
return
|
|
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"lvcreate -L {osd_new_db_size_mb}M -n osd-{osd_id} --yes osd-db"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to run lvcreate on {db_device}")
|
|
return
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Attaching Bluestore DB volume to OSD {osd_id}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm new-db --osd-id {osd_id} --osd-fsid {osd_fsid} --target {db_device}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to run ceph-volume lvm new-db for OSD {osd_id}")
|
|
return
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Updating OSD {osd_id} in CRUSH map",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd crush add osd.{osd_id} {weight} root=default host={node}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to run ceph osd crush add for OSD {osd_id}")
|
|
return
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Activating OSD {osd_id}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm activate --bluestore {osd_id} {osd_fsid}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to run ceph-volume lvm activate for OSD {osd_id}")
|
|
return
|
|
|
|
# Wait 1 second for it to activate
|
|
time.sleep(1)
|
|
|
|
# Verify it started
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"systemctl status ceph-osd@{osd_id}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to start OSD {osd_id} process")
|
|
return
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Updating OSD {osd_id} in PVC",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
zkhandler.write(
|
|
[
|
|
(("osd.device", osd_id), new_device),
|
|
(("osd.vg", osd_id), f"ceph-{vg_uuid}"),
|
|
(("osd.lv", osd_id), f"osd-block-{osd_fsid}"),
|
|
]
|
|
)
|
|
|
|
# 6. Log it
|
|
current_stage += 1
|
|
return finish(
|
|
celery,
|
|
f"Successfully replaced OSD(s) {','.join(all_osds_on_block_ids)} on new disk {new_device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
|
|
def osd_worker_refresh_osd(
|
|
zkhandler,
|
|
celery,
|
|
node,
|
|
osd_id,
|
|
device,
|
|
ext_db_flag,
|
|
):
|
|
# Try to determine if any other OSDs shared a block device with this OSD
|
|
_, osd_list = get_list_osd(zkhandler, None)
|
|
osd_block = zkhandler.read(("osd.device", osd_id))
|
|
all_osds_on_block = [
|
|
o for o in osd_list if o["node"] == node and o["device"] == osd_block
|
|
]
|
|
all_osds_on_block_ids = [o["id"] for o in all_osds_on_block]
|
|
|
|
# Set up stages
|
|
current_stage = 0
|
|
total_stages = 1
|
|
_split_count = len(all_osds_on_block_ids)
|
|
total_stages = total_stages + 3 * int(_split_count)
|
|
|
|
start(
|
|
celery,
|
|
f"Refreshing/reimporting OSD(s) {','.join(all_osds_on_block_ids)} on device {device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
# Handle a detect device if that is passed
|
|
if match(r"detect:", device):
|
|
ddevice = common.get_detect_device(device)
|
|
if ddevice is None:
|
|
fail(
|
|
celery,
|
|
f"Failed to determine block device from detect string {device}",
|
|
)
|
|
return
|
|
else:
|
|
log_info(
|
|
celery,
|
|
f"Determined block device {ddevice} from detect string {device}",
|
|
)
|
|
device = ddevice
|
|
|
|
retcode, stdout, stderr = common.run_os_command("ceph osd ls")
|
|
osd_list = stdout.split("\n")
|
|
if osd_id not in osd_list:
|
|
fail(
|
|
celery,
|
|
f"Could not find OSD {osd_id} in the cluster",
|
|
)
|
|
return
|
|
|
|
found_osds = osd_worker_helper_find_osds_from_block(device)
|
|
if osd_id not in found_osds.keys():
|
|
fail(
|
|
celery,
|
|
f"Could not find OSD {osd_id} on device {device}",
|
|
)
|
|
return
|
|
|
|
for osd in found_osds:
|
|
found_osd = found_osds[osd][0]
|
|
lv_device = found_osd["lv_path"]
|
|
|
|
_, osd_pvc_information = get_list_osd(zkhandler, osd)
|
|
osd_information = osd_pvc_information[0]
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
"Querying for OSD on device",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm list --format json {lv_device}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to run ceph-volume lvm list for OSD {osd}")
|
|
return
|
|
|
|
osd_detail = jloads(stdout)[osd][0]
|
|
|
|
osd_fsid = osd_detail["tags"]["ceph.osd_fsid"]
|
|
if osd_fsid != osd_information["fsid"]:
|
|
fail(
|
|
celery,
|
|
f"OSD {osd} FSID {osd_information['fsid']} does not match volume FSID {osd_fsid}; OSD cannot be imported",
|
|
)
|
|
return
|
|
|
|
dev_flags = f"--data {lv_device}"
|
|
|
|
if ext_db_flag:
|
|
db_device = "osd-db/osd-{osd}"
|
|
dev_flags += f" --block.db {db_device}"
|
|
|
|
if not path.exists(f"/dev/{db_device}"):
|
|
fail(
|
|
celery,
|
|
f"OSD Bluestore DB volume {db_device} does not exist; OSD cannot be imported",
|
|
)
|
|
return
|
|
else:
|
|
db_device = ""
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Activating OSD {osd}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm activate --bluestore {osd} {osd_fsid}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to run ceph-volume lvm activate for OSD {osd}")
|
|
return
|
|
|
|
# Wait 1 second for it to activate
|
|
time.sleep(1)
|
|
|
|
# Verify it started
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"systemctl status ceph-osd@{osd}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(celery, f"Failed to start OSD {osd} process")
|
|
return
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Updating OSD {osd} in PVC",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
zkhandler.write(
|
|
[
|
|
(("osd.device", osd), device),
|
|
(("osd.vg", osd), osd_detail["vg_name"]),
|
|
(("osd.lv", osd), osd_detail["lv_name"]),
|
|
]
|
|
)
|
|
|
|
# 6. Log it
|
|
current_stage += 1
|
|
return finish(
|
|
celery,
|
|
f"Successfully reimported OSD(s) {','.join(all_osds_on_block_ids)} on device {device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
|
|
def osd_worker_remove_osd(
|
|
zkhandler, celery, node, osd_id, force_flag=False, skip_zap_flag=False
|
|
):
|
|
# Get initial data
|
|
data_device = zkhandler.read(("osd.device", osd_id))
|
|
if zkhandler.exists(("osd.db_device", osd_id)):
|
|
db_device = zkhandler.read(("osd.db_device", osd_id))
|
|
else:
|
|
db_device = None
|
|
|
|
# Set up stages
|
|
current_stage = 0
|
|
total_stages = 7
|
|
if not force_flag:
|
|
total_stages += 1
|
|
if not skip_zap_flag:
|
|
total_stages += 2
|
|
if db_device:
|
|
total_stages += 1
|
|
|
|
start(
|
|
celery,
|
|
f"Removing OSD {osd_id}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
# Verify the OSD is present
|
|
retcode, stdout, stderr = common.run_os_command("ceph osd ls")
|
|
osd_list = stdout.split("\n")
|
|
if osd_id not in osd_list:
|
|
if not force_flag:
|
|
fail(
|
|
celery,
|
|
f"OSD {osd_id} not found in Ceph",
|
|
)
|
|
return
|
|
else:
|
|
log_warn(
|
|
celery,
|
|
f"OSD {osd_id} not found in Ceph; ignoring error due to force flag",
|
|
)
|
|
|
|
# 1. Set the OSD down and out so it will flush
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Setting OSD {osd_id} down",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(f"ceph osd down {osd_id}")
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
if not force_flag:
|
|
fail(
|
|
celery,
|
|
f"Failed to set OSD {osd_id} down",
|
|
)
|
|
return
|
|
else:
|
|
log_warn(
|
|
celery,
|
|
f"Failed to set OSD {osd_id} down; ignoring error due to force flag",
|
|
)
|
|
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Setting OSD {osd_id} out",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(f"ceph osd out {osd_id}")
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
if not force_flag:
|
|
fail(
|
|
celery,
|
|
f"Failed to set OSD {osd_id} down",
|
|
)
|
|
return
|
|
else:
|
|
log_warn(
|
|
celery,
|
|
f"Failed to set OSD {osd_id} down; ignoring error due to force flag",
|
|
)
|
|
|
|
# 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete)
|
|
if not force_flag:
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Waiting for OSD {osd_id} to be safe to remove",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
tcount = 0
|
|
while True:
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd safe-to-destroy osd.{osd_id}"
|
|
)
|
|
if int(retcode) in [0, 11]:
|
|
break
|
|
else:
|
|
common.run_os_command(f"ceph osd down {osd_id}")
|
|
common.run_os_command(f"ceph osd out {osd_id}")
|
|
time.sleep(1)
|
|
tcount += 1
|
|
if tcount > 60:
|
|
log_warn(
|
|
celery,
|
|
f"Timed out (60s) waiting for OSD {osd_id} to be safe to remove; proceeding anyways",
|
|
)
|
|
break
|
|
|
|
# 3. Stop the OSD process and wait for it to be terminated
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Stopping OSD {osd_id}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(f"systemctl stop ceph-osd@{osd_id}")
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
if not force_flag:
|
|
fail(
|
|
celery,
|
|
f"Failed to stop OSD {osd_id} process",
|
|
)
|
|
return
|
|
else:
|
|
log_warn(
|
|
celery,
|
|
f"Failed to stop OSD {osd_id} process; ignoring error due to force flag",
|
|
)
|
|
time.sleep(5)
|
|
|
|
# 4. Delete OSD from ZK
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Deleting OSD {osd_id} from PVC",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
zkhandler.delete(("osd", osd_id), recursive=True)
|
|
|
|
# 5a. Destroy the OSD from Ceph
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Destroying OSD {osd_id}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd destroy {osd_id} --yes-i-really-mean-it"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
if not force_flag:
|
|
fail(
|
|
celery,
|
|
f"Failed to destroy OSD {osd_id}",
|
|
)
|
|
return
|
|
else:
|
|
log_warn(
|
|
celery,
|
|
f"Failed to destroy OSD {osd_id}; ignoring error due to force flag",
|
|
)
|
|
time.sleep(2)
|
|
|
|
# 5b. Purge the OSD from Ceph
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Removing OSD {osd_id} from CRUSH map",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
# Remove the OSD from the CRUSH map
|
|
retcode, stdout, stderr = common.run_os_command(f"ceph osd crush rm osd.{osd_id}")
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
if not force_flag:
|
|
fail(
|
|
celery,
|
|
f"Failed to remove OSD {osd_id} from CRUSH map",
|
|
)
|
|
return
|
|
else:
|
|
log_warn(
|
|
celery,
|
|
f"Failed to remove OSD {osd_id} from CRUSH map; ignoring error due to force flag",
|
|
)
|
|
|
|
# Purge the OSD
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Purging OSD {osd_id}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
if force_flag:
|
|
force_arg = "--force"
|
|
else:
|
|
force_arg = ""
|
|
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph osd purge {osd_id} {force_arg} --yes-i-really-mean-it"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
if not force_flag:
|
|
fail(
|
|
celery,
|
|
f"Failed to purge OSD {osd_id}",
|
|
)
|
|
return
|
|
else:
|
|
log_warn(
|
|
celery,
|
|
f"Failed to purge OSD {osd_id}; ignoring error due to force flag",
|
|
)
|
|
|
|
# 6. Remove the DB device
|
|
if db_device:
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Removing OSD DB logical volume {db_device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"lvremove --yes --force {db_device}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
if not force_flag:
|
|
fail(
|
|
celery,
|
|
f"Failed to remove OSD DB logical volume {db_device}",
|
|
)
|
|
return
|
|
else:
|
|
log_warn(
|
|
celery,
|
|
f"Failed to remove OSD DB logical volume {db_device}; ignoring error due to force flag",
|
|
)
|
|
|
|
if not skip_zap_flag:
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Zapping old disk {data_device} if possible",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
# 7. Determine the block devices
|
|
found_osds = osd_worker_helper_find_osds_from_block(data_device)
|
|
if osd_id in found_osds.keys():
|
|
# Try to determine if any other OSDs shared a block device with this OSD
|
|
_, osd_list = get_list_osd(zkhandler, None)
|
|
all_osds_on_block = [
|
|
o for o in osd_list if o["node"] == node and o["device"] == data_device
|
|
]
|
|
|
|
if len(all_osds_on_block) < 1:
|
|
log_info(
|
|
celery,
|
|
f"Found no peer split OSD(s) on {data_device}; zapping disk",
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm zap --destroy {data_device}"
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
if not force_flag:
|
|
fail(
|
|
celery,
|
|
f"Failed to run ceph-volume lvm zap on device {data_device}",
|
|
)
|
|
return
|
|
else:
|
|
log_warn(
|
|
celery,
|
|
f"Failed to run ceph-volume lvm zap on device {data_device}; ignoring error due to force flag",
|
|
)
|
|
else:
|
|
log_warn(
|
|
celery,
|
|
f"Found {len(all_osds_on_block)} OSD(s) still remaining on {data_device}; skipping zap",
|
|
)
|
|
else:
|
|
log_warn(
|
|
celery,
|
|
f"Could not find OSD {osd_id} on device {data_device}; skipping zap",
|
|
)
|
|
|
|
# 6. Log it
|
|
current_stage += 1
|
|
return finish(
|
|
celery,
|
|
f"Successfully removed OSD {osd_id}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
|
|
|
|
def osd_worker_add_db_vg(zkhandler, celery, device):
|
|
# Set up stages
|
|
current_stage = 0
|
|
total_stages = 4
|
|
|
|
start(
|
|
celery,
|
|
f"Creating new OSD database volume group on device {device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
# Check if an existsing volume group exists
|
|
retcode, stdout, stderr = common.run_os_command("vgdisplay osd-db")
|
|
if retcode != 5:
|
|
fail(
|
|
celery,
|
|
"Ceph OSD database VG already exists",
|
|
)
|
|
return
|
|
|
|
# Handle a detect device if that is passed
|
|
if match(r"detect:", device):
|
|
ddevice = common.get_detect_device(device)
|
|
if ddevice is None:
|
|
fail(
|
|
celery,
|
|
f"Failed to determine block device from detect string {device}",
|
|
)
|
|
return
|
|
else:
|
|
log_info(
|
|
celery,
|
|
f"Determined block device {ddevice} from detect string {device}",
|
|
)
|
|
device = ddevice
|
|
|
|
# 1. Create an empty partition table
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Creating partitions on device {device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command("sgdisk --clear {}".format(device))
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(
|
|
celery,
|
|
f"Failed to create partition table on device {device}",
|
|
)
|
|
return
|
|
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"sgdisk --new 1:: --typecode 1:8e00 {}".format(device)
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(
|
|
celery,
|
|
f"Failed to set partition type to LVM PV on device {device}",
|
|
)
|
|
return
|
|
|
|
# Handle the partition ID portion
|
|
if search(r"by-path", device) or search(r"by-id", device):
|
|
# /dev/disk/by-path/pci-0000:03:00.0-scsi-0:1:0:0 -> pci-0000:03:00.0-scsi-0:1:0:0-part1
|
|
partition = "{}-part1".format(device)
|
|
elif search(r"nvme", device):
|
|
# /dev/nvme0n1 -> nvme0n1p1
|
|
partition = "{}p1".format(device)
|
|
else:
|
|
# /dev/sda -> sda1
|
|
# No other '/dev/disk/by-*' types are valid for raw block devices anyways
|
|
partition = "{}1".format(device)
|
|
|
|
# 2. Create the PV
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f"Creating LVM PV on device {device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"pvcreate --force {}".format(partition)
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(
|
|
celery,
|
|
f"Failed to create LVM PV on device {device}",
|
|
)
|
|
return
|
|
|
|
# 2. Create the VG (named 'osd-db')
|
|
current_stage += 1
|
|
update(
|
|
celery,
|
|
f'Creating LVM VG "osd-db" on device {device}',
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"vgcreate --force osd-db {}".format(partition)
|
|
)
|
|
log_info(celery, f"stdout: {stdout}")
|
|
log_info(celery, f"stderr: {stderr}")
|
|
if retcode:
|
|
fail(
|
|
celery,
|
|
f"Failed to create LVM VG on device {device}",
|
|
)
|
|
return
|
|
|
|
# Log it
|
|
current_stage += 1
|
|
return finish(
|
|
celery,
|
|
f"Successfully created new OSD DB volume group on device {device}",
|
|
current=current_stage,
|
|
total=total_stages,
|
|
)
|