With the OSD LVM information stored in Zookeeper, we can use this to determine the actual block device to zap rather than relying on runtime determination and guestimation.
959 lines
35 KiB
Python
959 lines
35 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# CephInstance.py - Class implementing a PVC node Ceph instance
|
|
# Part of the Parallel Virtual Cluster (PVC) system
|
|
#
|
|
# Copyright (C) 2018-2021 Joshua M. Boniface <joshua@boniface.me>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, version 3.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
#
|
|
###############################################################################
|
|
|
|
import time
|
|
import json
|
|
import psutil
|
|
|
|
import daemon_lib.common as common
|
|
|
|
from distutils.util import strtobool
|
|
from re import search, match, sub
|
|
|
|
|
|
def get_detect_device(detect_string):
|
|
"""
|
|
Parses a "detect:" string into a normalized block device path using lsscsi.
|
|
|
|
A detect string is formatted "detect:<NAME>:<SIZE>:<ID>", where
|
|
NAME is some unique identifier in lsscsi, SIZE is a human-readable
|
|
size value to within +/- 3% of the real size of the device, and
|
|
ID is the Nth (0-indexed) matching entry of that NAME and SIZE.
|
|
"""
|
|
_, name, size, idd = detect_string.split(":")
|
|
if _ != "detect":
|
|
return None
|
|
|
|
retcode, stdout, stderr = common.run_os_command("lsscsi -s")
|
|
if retcode:
|
|
print(f"Failed to run lsscsi: {stderr}")
|
|
return None
|
|
|
|
# Get valid lines
|
|
lsscsi_lines_raw = stdout.split("\n")
|
|
lsscsi_lines = list()
|
|
for line in lsscsi_lines_raw:
|
|
if not line:
|
|
continue
|
|
split_line = line.split()
|
|
if split_line[1] != "disk":
|
|
continue
|
|
lsscsi_lines.append(line)
|
|
|
|
# Handle size determination (+/- 3%)
|
|
lsscsi_sizes = set()
|
|
for line in lsscsi_lines:
|
|
lsscsi_sizes.add(split_line[-1])
|
|
for l_size in lsscsi_sizes:
|
|
b_size = float(sub(r"\D.", "", size))
|
|
t_size = float(sub(r"\D.", "", l_size))
|
|
|
|
plusthreepct = t_size * 1.03
|
|
minusthreepct = t_size * 0.97
|
|
|
|
if b_size > minusthreepct and b_size < plusthreepct:
|
|
size = l_size
|
|
break
|
|
|
|
blockdev = None
|
|
matches = list()
|
|
for idx, line in enumerate(lsscsi_lines):
|
|
# Skip non-disk entries
|
|
if line.split()[1] != "disk":
|
|
continue
|
|
# Skip if name is not contained in the line (case-insensitive)
|
|
if name.lower() not in line.lower():
|
|
continue
|
|
# Skip if the size does not match
|
|
if size != line.split()[-1]:
|
|
continue
|
|
# Get our blockdev and append to the list
|
|
matches.append(line.split()[-2])
|
|
|
|
blockdev = None
|
|
# Find the blockdev at index {idd}
|
|
for idx, _blockdev in enumerate(matches):
|
|
if int(idx) == int(idd):
|
|
blockdev = _blockdev
|
|
break
|
|
|
|
return blockdev
|
|
|
|
|
|
class CephOSDInstance(object):
|
|
def __init__(self, zkhandler, logger, this_node, osd_id):
|
|
self.zkhandler = zkhandler
|
|
self.logger = logger
|
|
self.this_node = this_node
|
|
self.osd_id = osd_id
|
|
self.node = None
|
|
self.device = None
|
|
self.vg = None
|
|
self.lv = None
|
|
self.stats = dict()
|
|
|
|
@self.zkhandler.zk_conn.DataWatch(
|
|
self.zkhandler.schema.path("osd.node", self.osd_id)
|
|
)
|
|
def watch_osd_node(data, stat, event=""):
|
|
if event and event.type == "DELETED":
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode("ascii")
|
|
except AttributeError:
|
|
data = ""
|
|
|
|
if data and data != self.node:
|
|
self.node = data
|
|
|
|
@self.zkhandler.zk_conn.DataWatch(
|
|
self.zkhandler.schema.path("osd.stats", self.osd_id)
|
|
)
|
|
def watch_osd_stats(data, stat, event=""):
|
|
if event and event.type == "DELETED":
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode("ascii")
|
|
except AttributeError:
|
|
data = ""
|
|
|
|
if data and data != self.stats:
|
|
self.stats = json.loads(data)
|
|
|
|
@self.zkhandler.zk_conn.DataWatch(
|
|
self.zkhandler.schema.path("osd.device", self.osd_id)
|
|
)
|
|
def watch_osd_device(data, stat, event=""):
|
|
if event and event.type == "DELETED":
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode("ascii")
|
|
except AttributeError:
|
|
data = ""
|
|
|
|
if data and data != self.device:
|
|
self.device = data
|
|
|
|
# Exception conditional for migration from schema v7 to schema v8
|
|
try:
|
|
|
|
@self.zkhandler.zk_conn.DataWatch(
|
|
self.zkhandler.schema.path("osd.vg", self.osd_id)
|
|
)
|
|
def watch_osd_vg(data, stat, event=""):
|
|
if event and event.type == "DELETED":
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode("ascii")
|
|
except AttributeError:
|
|
data = ""
|
|
|
|
if data and data != self.vg:
|
|
self.vg = data
|
|
|
|
@self.zkhandler.zk_conn.DataWatch(
|
|
self.zkhandler.schema.path("osd.lv", self.osd_id)
|
|
)
|
|
def watch_osd_lv(data, stat, event=""):
|
|
if event and event.type == "DELETED":
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode("ascii")
|
|
except AttributeError:
|
|
data = ""
|
|
|
|
if data and data != self.lv:
|
|
self.lv = data
|
|
|
|
if self.node == self.this_node.name:
|
|
self.update_information()
|
|
except TypeError:
|
|
return
|
|
|
|
def update_information(self):
|
|
if self.vg is not None and self.lv is not None:
|
|
find_device = f"/dev/{self.vg}/{self.lv}"
|
|
else:
|
|
find_device = self.device
|
|
|
|
self.logger.out(
|
|
f"Updating stored disk information for OSD {self.osd_id}",
|
|
state="i",
|
|
)
|
|
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm list {find_device}"
|
|
)
|
|
for line in stdout.split("\n"):
|
|
if "block device" in line:
|
|
osd_blockdev = line.split()[-1]
|
|
if "osd fsid" in line:
|
|
osd_fsid = line.split()[-1]
|
|
if "cluster fsid" in line:
|
|
osd_clusterfsid = line.split()[-1]
|
|
if "devices" in line:
|
|
osd_device = line.split()[-1]
|
|
|
|
if not osd_fsid:
|
|
self.logger.out(
|
|
f"Failed to find updated OSD information via ceph-volume for {find_device}",
|
|
state="e",
|
|
)
|
|
return
|
|
|
|
# Split OSD blockdev into VG and LV components
|
|
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
|
|
_, _, osd_vg, osd_lv = osd_blockdev.split("/")
|
|
|
|
# Except for potentially the "osd.device", this should never change, but this ensures
|
|
# that the data is added at lease once on initialization for existing OSDs.
|
|
self.zkhandler.write(
|
|
[
|
|
(("osd.device", self.osd_id), osd_device),
|
|
(("osd.fsid", self.osd_id), ""),
|
|
(("osd.ofsid", self.osd_id), osd_fsid),
|
|
(("osd.cfsid", self.osd_id), osd_clusterfsid),
|
|
(("osd.lvm", self.osd_id), ""),
|
|
(("osd.vg", self.osd_id), osd_vg),
|
|
(("osd.lv", self.osd_id), osd_lv),
|
|
]
|
|
)
|
|
self.device = osd_device
|
|
self.vg = osd_vg
|
|
self.lv = osd_lv
|
|
|
|
@staticmethod
|
|
def add_osd(
|
|
zkhandler, logger, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05
|
|
):
|
|
# Handle a detect device if that is passed
|
|
if match(r"detect:", device):
|
|
ddevice = get_detect_device(device)
|
|
if ddevice is None:
|
|
logger.out(
|
|
f"Failed to determine block device from detect string {device}",
|
|
state="e",
|
|
)
|
|
return False
|
|
else:
|
|
logger.out(
|
|
f"Determined block device {ddevice} from detect string {device}",
|
|
state="i",
|
|
)
|
|
device = ddevice
|
|
|
|
# We are ready to create a new OSD on this node
|
|
logger.out("Creating new OSD disk on block device {}".format(device), state="i")
|
|
try:
|
|
# 1. Create an OSD; we do this so we know what ID will be gen'd
|
|
retcode, stdout, stderr = common.run_os_command("ceph osd create")
|
|
if retcode:
|
|
print("ceph osd create")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception
|
|
osd_id = stdout.rstrip()
|
|
|
|
# 2. Remove that newly-created OSD
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"ceph osd rm {}".format(osd_id)
|
|
)
|
|
if retcode:
|
|
print("ceph osd rm")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception
|
|
|
|
# 3a. Zap the disk to ensure it is ready to go
|
|
logger.out("Zapping disk {}".format(device), state="i")
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"ceph-volume lvm zap --destroy {}".format(device)
|
|
)
|
|
if retcode:
|
|
print("ceph-volume lvm zap")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception
|
|
|
|
dev_flags = "--data {}".format(device)
|
|
|
|
# 3b. Prepare the logical volume if ext_db_flag
|
|
if ext_db_flag:
|
|
_, osd_size_bytes, _ = common.run_os_command(
|
|
"blockdev --getsize64 {}".format(device)
|
|
)
|
|
osd_size_bytes = int(osd_size_bytes)
|
|
result = CephOSDInstance.create_osd_db_lv(
|
|
zkhandler, logger, osd_id, ext_db_ratio, osd_size_bytes
|
|
)
|
|
if not result:
|
|
raise Exception
|
|
db_device = "osd-db/osd-{}".format(osd_id)
|
|
dev_flags += " --block.db {}".format(db_device)
|
|
else:
|
|
db_device = ""
|
|
|
|
# 3c. Create the OSD for real
|
|
logger.out(
|
|
"Preparing LVM for new OSD disk with ID {} on {}".format(
|
|
osd_id, device
|
|
),
|
|
state="i",
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"ceph-volume lvm prepare --bluestore {devices}".format(
|
|
devices=dev_flags
|
|
)
|
|
)
|
|
if retcode:
|
|
print("ceph-volume lvm prepare")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception
|
|
|
|
# 4a. Get OSD information
|
|
logger.out(
|
|
"Getting OSD information for ID {} on {}".format(osd_id, device),
|
|
state="i",
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"ceph-volume lvm list {device}".format(device=device)
|
|
)
|
|
for line in stdout.split("\n"):
|
|
if "block device" in line:
|
|
osd_blockdev = line.split()[-1]
|
|
if "osd fsid" in line:
|
|
osd_fsid = line.split()[-1]
|
|
if "cluster fsid" in line:
|
|
osd_clusterfsid = line.split()[-1]
|
|
if "devices" in line:
|
|
osd_device = line.split()[-1]
|
|
|
|
if not osd_fsid:
|
|
print("ceph-volume lvm list")
|
|
print("Could not find OSD information in data:")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception
|
|
|
|
# Split OSD blockdev into VG and LV components
|
|
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
|
|
_, _, osd_vg, osd_lv = osd_blockdev.split("/")
|
|
|
|
# Reset whatever we were given to Ceph's /dev/xdX naming
|
|
if device != osd_device:
|
|
device = osd_device
|
|
|
|
# 4b. Activate the OSD
|
|
logger.out("Activating new OSD disk with ID {}".format(osd_id), state="i")
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"ceph-volume lvm activate --bluestore {osdid} {osdfsid}".format(
|
|
osdid=osd_id, osdfsid=osd_fsid
|
|
)
|
|
)
|
|
if retcode:
|
|
print("ceph-volume lvm activate")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception
|
|
|
|
# 5. Add it to the crush map
|
|
logger.out(
|
|
"Adding new OSD disk with ID {} to CRUSH map".format(osd_id), state="i"
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"ceph osd crush add osd.{osdid} {weight} root=default host={node}".format(
|
|
osdid=osd_id, weight=weight, node=node
|
|
)
|
|
)
|
|
if retcode:
|
|
print("ceph osd crush add")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception
|
|
time.sleep(0.5)
|
|
|
|
# 6. Verify it started
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"systemctl status ceph-osd@{osdid}".format(osdid=osd_id)
|
|
)
|
|
if retcode:
|
|
print("systemctl status")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception
|
|
|
|
# 7. Add the new OSD to the list
|
|
logger.out(
|
|
"Adding new OSD disk with ID {} to Zookeeper".format(osd_id), state="i"
|
|
)
|
|
zkhandler.write(
|
|
[
|
|
(("osd", osd_id), ""),
|
|
(("osd.node", osd_id), node),
|
|
(("osd.device", osd_id), device),
|
|
(("osd.db_device", osd_id), db_device),
|
|
(("osd.fsid", osd_id), ""),
|
|
(("osd.ofsid", osd_id), osd_fsid),
|
|
(("osd.cfsid", osd_id), osd_clusterfsid),
|
|
(("osd.lvm", osd_id), ""),
|
|
(("osd.vg", osd_id), osd_vg),
|
|
(("osd.lv", osd_id), osd_lv),
|
|
(
|
|
("osd.stats", osd_id),
|
|
f'{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "{node}", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", state="|" }',
|
|
),
|
|
]
|
|
)
|
|
|
|
# Log it
|
|
logger.out("Created new OSD disk with ID {}".format(osd_id), state="o")
|
|
return True
|
|
except Exception as e:
|
|
# Log it
|
|
logger.out("Failed to create new OSD disk: {}".format(e), state="e")
|
|
return False
|
|
|
|
@staticmethod
|
|
def remove_osd(zkhandler, logger, osd_id, osd_obj, force_flag):
|
|
logger.out("Removing OSD disk {}".format(osd_id), state="i")
|
|
try:
|
|
# Verify the OSD is present
|
|
retcode, stdout, stderr = common.run_os_command("ceph osd ls")
|
|
osd_list = stdout.split("\n")
|
|
if osd_id not in osd_list:
|
|
logger.out(
|
|
"Could not find OSD {} in the cluster".format(osd_id), state="e"
|
|
)
|
|
if force_flag:
|
|
logger.out("Ignoring error due to force flag", state="i")
|
|
else:
|
|
return True
|
|
|
|
# 1. Set the OSD down and out so it will flush
|
|
logger.out("Setting down OSD disk with ID {}".format(osd_id), state="i")
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"ceph osd down {}".format(osd_id)
|
|
)
|
|
if retcode:
|
|
print("ceph osd down")
|
|
print(stdout)
|
|
print(stderr)
|
|
if force_flag:
|
|
logger.out("Ignoring error due to force flag", state="i")
|
|
else:
|
|
raise Exception
|
|
|
|
logger.out("Setting out OSD disk with ID {}".format(osd_id), state="i")
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"ceph osd out {}".format(osd_id)
|
|
)
|
|
if retcode:
|
|
print("ceph osd out")
|
|
print(stdout)
|
|
print(stderr)
|
|
if force_flag:
|
|
logger.out("Ignoring error due to force flag", state="i")
|
|
else:
|
|
raise Exception
|
|
|
|
# 2. Wait for the OSD to flush
|
|
logger.out("Flushing OSD disk with ID {}".format(osd_id), state="i")
|
|
osd_string = str()
|
|
while True:
|
|
try:
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"ceph pg dump osds --format json"
|
|
)
|
|
dump_string = json.loads(stdout)
|
|
for osd in dump_string:
|
|
if str(osd["osd"]) == osd_id:
|
|
osd_string = osd
|
|
num_pgs = osd_string["num_pgs"]
|
|
if num_pgs > 0:
|
|
time.sleep(5)
|
|
else:
|
|
if force_flag:
|
|
logger.out("Ignoring error due to force flag", state="i")
|
|
else:
|
|
raise Exception
|
|
|
|
except Exception:
|
|
break
|
|
|
|
# 3. Stop the OSD process and wait for it to be terminated
|
|
logger.out("Stopping OSD disk with ID {}".format(osd_id), state="i")
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"systemctl stop ceph-osd@{}".format(osd_id)
|
|
)
|
|
if retcode:
|
|
print("systemctl stop")
|
|
print(stdout)
|
|
print(stderr)
|
|
if force_flag:
|
|
logger.out("Ignoring error due to force flag", state="i")
|
|
else:
|
|
raise Exception
|
|
|
|
# FIXME: There has to be a better way to do this /shrug
|
|
while True:
|
|
is_osd_up = False
|
|
# Find if there is a process named ceph-osd with arg '--id {id}'
|
|
for p in psutil.process_iter(attrs=["name", "cmdline"]):
|
|
if "ceph-osd" == p.info["name"] and "--id {}".format(
|
|
osd_id
|
|
) in " ".join(p.info["cmdline"]):
|
|
is_osd_up = True
|
|
# If there isn't, continue
|
|
if not is_osd_up:
|
|
break
|
|
|
|
# 4. Determine the block devices
|
|
osd_vg = zkhandler.read(("osd.vg", osd_id))
|
|
osd_lv = zkhandler.read(("osd.lv", osd_id))
|
|
osd_lvm = f"/dev/{osd_vg}/{osd_lv}"
|
|
osd_device = None
|
|
|
|
logger.out(
|
|
f"Getting disk info for OSD {osd_id} LV {osd_lvm}",
|
|
state="i",
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
f"ceph-volume lvm list {osd_lvm}"
|
|
)
|
|
for line in stdout.split("\n"):
|
|
if "devices" in line:
|
|
osd_device = line.split()[-1]
|
|
|
|
if not osd_device:
|
|
print("ceph-volume lvm list")
|
|
print("Could not find OSD information in data:")
|
|
print(stdout)
|
|
print(stderr)
|
|
if force_flag:
|
|
logger.out("Ignoring error due to force flag", state="i")
|
|
else:
|
|
raise Exception
|
|
|
|
# 5. Zap the volumes
|
|
logger.out(
|
|
"Zapping OSD {} disk on {}".format(osd_id, osd_device),
|
|
state="i",
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"ceph-volume lvm zap --destroy {}".format(osd_device)
|
|
)
|
|
if retcode:
|
|
print("ceph-volume lvm zap")
|
|
print(stdout)
|
|
print(stderr)
|
|
if force_flag:
|
|
logger.out("Ignoring error due to force flag", state="i")
|
|
else:
|
|
raise Exception
|
|
|
|
# 6. Purge the OSD from Ceph
|
|
logger.out("Purging OSD disk with ID {}".format(osd_id), state="i")
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"ceph osd purge {} --yes-i-really-mean-it".format(osd_id)
|
|
)
|
|
if retcode:
|
|
print("ceph osd purge")
|
|
print(stdout)
|
|
print(stderr)
|
|
if force_flag:
|
|
logger.out("Ignoring error due to force flag", state="i")
|
|
else:
|
|
raise Exception
|
|
|
|
# 7. Remove the DB device
|
|
if zkhandler.exists(("osd.db_device", osd_id)):
|
|
db_device = zkhandler.read(("osd.db_device", osd_id))
|
|
logger.out(
|
|
'Removing OSD DB logical volume "{}"'.format(db_device), state="i"
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"lvremove --yes --force {}".format(db_device)
|
|
)
|
|
|
|
# 8. Delete OSD from ZK
|
|
logger.out(
|
|
"Deleting OSD disk with ID {} from Zookeeper".format(osd_id), state="i"
|
|
)
|
|
zkhandler.delete(("osd", osd_id), recursive=True)
|
|
|
|
# Log it
|
|
logger.out("Removed OSD disk with ID {}".format(osd_id), state="o")
|
|
return True
|
|
except Exception as e:
|
|
# Log it
|
|
logger.out(
|
|
"Failed to purge OSD disk with ID {}: {}".format(osd_id, e), state="e"
|
|
)
|
|
return False
|
|
|
|
@staticmethod
|
|
def add_db_vg(zkhandler, logger, device):
|
|
# Check if an existsing volume group exists
|
|
retcode, stdout, stderr = common.run_os_command("vgdisplay osd-db")
|
|
if retcode != 5:
|
|
logger.out('Ceph OSD database VG "osd-db" already exists', state="e")
|
|
return False
|
|
|
|
# Handle a detect device if that is passed
|
|
if match(r"detect:", device):
|
|
ddevice = get_detect_device(device)
|
|
if ddevice is None:
|
|
logger.out(
|
|
f"Failed to determine block device from detect string {device}",
|
|
state="e",
|
|
)
|
|
return False
|
|
else:
|
|
logger.out(
|
|
f"Determined block device {ddevice} from detect string {device}",
|
|
state="i",
|
|
)
|
|
device = ddevice
|
|
|
|
logger.out(
|
|
"Creating new OSD database volume group on block device {}".format(device),
|
|
state="i",
|
|
)
|
|
try:
|
|
# 1. Create an empty partition table
|
|
logger.out(
|
|
"Creating partitions on block device {}".format(device), state="i"
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"sgdisk --clear {}".format(device)
|
|
)
|
|
if retcode:
|
|
print("sgdisk create partition table")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception
|
|
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"sgdisk --new 1:: --typecode 1:8e00 {}".format(device)
|
|
)
|
|
if retcode:
|
|
print("sgdisk create pv partition")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception
|
|
|
|
# Handle the partition ID portion
|
|
if search(r"by-path", device) or search(r"by-id", device):
|
|
# /dev/disk/by-path/pci-0000:03:00.0-scsi-0:1:0:0 -> pci-0000:03:00.0-scsi-0:1:0:0-part1
|
|
partition = "{}-part1".format(device)
|
|
elif search(r"nvme", device):
|
|
# /dev/nvme0n1 -> nvme0n1p1
|
|
partition = "{}p1".format(device)
|
|
else:
|
|
# /dev/sda -> sda1
|
|
# No other '/dev/disk/by-*' types are valid for raw block devices anyways
|
|
partition = "{}1".format(device)
|
|
|
|
# 2. Create the PV
|
|
logger.out("Creating PV on block device {}".format(partition), state="i")
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"pvcreate --force {}".format(partition)
|
|
)
|
|
if retcode:
|
|
print("pv creation")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception
|
|
|
|
# 2. Create the VG (named 'osd-db')
|
|
logger.out(
|
|
'Creating VG "osd-db" on block device {}'.format(partition), state="i"
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"vgcreate --force osd-db {}".format(partition)
|
|
)
|
|
if retcode:
|
|
print("vg creation")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception
|
|
|
|
# Log it
|
|
logger.out(
|
|
"Created new OSD database volume group on block device {}".format(
|
|
device
|
|
),
|
|
state="o",
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
# Log it
|
|
logger.out(
|
|
"Failed to create OSD database volume group: {}".format(e), state="e"
|
|
)
|
|
return False
|
|
|
|
@staticmethod
|
|
def create_osd_db_lv(zkhandler, logger, osd_id, ext_db_ratio, osd_size_bytes):
|
|
logger.out(
|
|
"Creating new OSD database logical volume for OSD ID {}".format(osd_id),
|
|
state="i",
|
|
)
|
|
try:
|
|
# 0. Check if an existsing logical volume exists
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"lvdisplay osd-db/osd{}".format(osd_id)
|
|
)
|
|
if retcode != 5:
|
|
logger.out(
|
|
'Ceph OSD database LV "osd-db/osd{}" already exists'.format(osd_id),
|
|
state="e",
|
|
)
|
|
return False
|
|
|
|
# 1. Determine LV sizing
|
|
osd_db_size = int(osd_size_bytes * ext_db_ratio / 1024 / 1024)
|
|
|
|
# 2. Create the LV
|
|
logger.out(
|
|
'Creating DB LV "osd-db/osd-{}" of {}M ({} * {})'.format(
|
|
osd_id, osd_db_size, osd_size_bytes, ext_db_ratio
|
|
),
|
|
state="i",
|
|
)
|
|
retcode, stdout, stderr = common.run_os_command(
|
|
"lvcreate --yes --name osd-{} --size {} osd-db".format(
|
|
osd_id, osd_db_size
|
|
)
|
|
)
|
|
if retcode:
|
|
print("db lv creation")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception
|
|
|
|
# Log it
|
|
logger.out(
|
|
'Created new OSD database logical volume "osd-db/osd-{}"'.format(
|
|
osd_id
|
|
),
|
|
state="o",
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
# Log it
|
|
logger.out(
|
|
"Failed to create OSD database logical volume: {}".format(e), state="e"
|
|
)
|
|
return False
|
|
|
|
|
|
class CephPoolInstance(object):
|
|
def __init__(self, zkhandler, logger, this_node, name):
|
|
self.zkhandler = zkhandler
|
|
self.logger = logger
|
|
self.this_node = this_node
|
|
self.name = name
|
|
self.pgs = ""
|
|
self.stats = dict()
|
|
|
|
@self.zkhandler.zk_conn.DataWatch(
|
|
self.zkhandler.schema.path("pool.pgs", self.name)
|
|
)
|
|
def watch_pool_node(data, stat, event=""):
|
|
if event and event.type == "DELETED":
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode("ascii")
|
|
except AttributeError:
|
|
data = ""
|
|
|
|
if data and data != self.pgs:
|
|
self.pgs = data
|
|
|
|
@self.zkhandler.zk_conn.DataWatch(
|
|
self.zkhandler.schema.path("pool.stats", self.name)
|
|
)
|
|
def watch_pool_stats(data, stat, event=""):
|
|
if event and event.type == "DELETED":
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode("ascii")
|
|
except AttributeError:
|
|
data = ""
|
|
|
|
if data and data != self.stats:
|
|
self.stats = json.loads(data)
|
|
|
|
|
|
class CephVolumeInstance(object):
|
|
def __init__(self, zkhandler, logger, this_node, pool, name):
|
|
self.zkhandler = zkhandler
|
|
self.logger = logger
|
|
self.this_node = this_node
|
|
self.pool = pool
|
|
self.name = name
|
|
self.stats = dict()
|
|
|
|
@self.zkhandler.zk_conn.DataWatch(
|
|
self.zkhandler.schema.path("volume.stats", f"{self.pool}/{self.name}")
|
|
)
|
|
def watch_volume_stats(data, stat, event=""):
|
|
if event and event.type == "DELETED":
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode("ascii")
|
|
except AttributeError:
|
|
data = ""
|
|
|
|
if data and data != self.stats:
|
|
self.stats = json.loads(data)
|
|
|
|
|
|
class CephSnapshotInstance(object):
|
|
def __init__(self, zkhandler, this_node, pool, volume, name):
|
|
self.zkhandler = zkhandler
|
|
self.this_node = this_node
|
|
self.pool = pool
|
|
self.volume = volume
|
|
self.name = name
|
|
self.stats = dict()
|
|
|
|
@self.zkhandler.zk_conn.DataWatch(
|
|
self.zkhandler.schema.path(
|
|
"snapshot.stats", f"{self.pool}/{self.volume}/{self.name}"
|
|
)
|
|
)
|
|
def watch_snapshot_stats(data, stat, event=""):
|
|
if event and event.type == "DELETED":
|
|
# The key has been deleted after existing before; terminate this watcher
|
|
# because this class instance is about to be reaped in Daemon.py
|
|
return False
|
|
|
|
try:
|
|
data = data.decode("ascii")
|
|
except AttributeError:
|
|
data = ""
|
|
|
|
if data and data != self.stats:
|
|
self.stats = json.loads(data)
|
|
|
|
|
|
# Primary command function
|
|
# This command pipe is only used for OSD adds and removes
|
|
def ceph_command(zkhandler, logger, this_node, data, d_osd):
|
|
# Get the command and args
|
|
command, args = data.split()
|
|
|
|
# Adding a new OSD
|
|
if command == "osd_add":
|
|
node, device, weight, ext_db_flag, ext_db_ratio = args.split(",")
|
|
ext_db_flag = bool(strtobool(ext_db_flag))
|
|
ext_db_ratio = float(ext_db_ratio)
|
|
if node == this_node.name:
|
|
# Lock the command queue
|
|
zk_lock = zkhandler.writelock("base.cmd.ceph")
|
|
with zk_lock:
|
|
# Add the OSD
|
|
result = CephOSDInstance.add_osd(
|
|
zkhandler, logger, node, device, weight, ext_db_flag, ext_db_ratio
|
|
)
|
|
# Command succeeded
|
|
if result:
|
|
# Update the command queue
|
|
zkhandler.write([("base.cmd.ceph", "success-{}".format(data))])
|
|
# Command failed
|
|
else:
|
|
# Update the command queue
|
|
zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))])
|
|
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
|
time.sleep(1)
|
|
|
|
# Removing an OSD
|
|
elif command == "osd_remove":
|
|
osd_id, force = args.split(",")
|
|
force_flag = bool(strtobool(force))
|
|
|
|
# Verify osd_id is in the list
|
|
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
|
|
# Lock the command queue
|
|
zk_lock = zkhandler.writelock("base.cmd.ceph")
|
|
with zk_lock:
|
|
# Remove the OSD
|
|
result = CephOSDInstance.remove_osd(
|
|
zkhandler, logger, osd_id, d_osd[osd_id], force_flag
|
|
)
|
|
# Command succeeded
|
|
if result:
|
|
# Update the command queue
|
|
zkhandler.write([("base.cmd.ceph", "success-{}".format(data))])
|
|
# Command failed
|
|
else:
|
|
# Update the command queue
|
|
zkhandler.write([("base.cmd.ceph", "failure-{}".format(data))])
|
|
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
|
time.sleep(1)
|
|
|
|
# Adding a new DB VG
|
|
elif command == "db_vg_add":
|
|
node, device = args.split(",")
|
|
if node == this_node.name:
|
|
# Lock the command queue
|
|
zk_lock = zkhandler.writelock("base.cmd.ceph")
|
|
with zk_lock:
|
|
# Add the VG
|
|
result = CephOSDInstance.add_db_vg(zkhandler, logger, device)
|
|
# Command succeeded
|
|
if result:
|
|
# Update the command queue
|
|
zkhandler.write([("base.cmd.ceph", "success-{}".format(data))])
|
|
# Command failed
|
|
else:
|
|
# Update the command queue
|
|
zkhandler.write([("base.cmd.ceph", "failure={}".format(data))])
|
|
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
|
time.sleep(1)
|