Store additional OSD information in ZK

Ensures that information like the FSIDs and the OSD LVM volume are
stored in Zookeeper at creation time and updated at daemon start time
(to ensure the data is populated at least once, or if the /dev/sdX
path changes).

This will allow safer operation of OSD removals and the potential
implementation of re-activation after node replacements.
This commit is contained in:
Joshua Boniface 2022-05-02 12:11:32 -04:00
parent cea8832f90
commit 464f0e0356
4 changed files with 159 additions and 11 deletions

View File

@ -0,0 +1 @@
{"version": "8", "root": "", "base": {"root": "", "schema": "/schema", "schema.version": "/schema/version", "config": "/config", "config.maintenance": "/config/maintenance", "config.primary_node": "/config/primary_node", "config.primary_node.sync_lock": "/config/primary_node/sync_lock", "config.upstream_ip": "/config/upstream_ip", "config.migration_target_selector": "/config/migration_target_selector", "cmd": "/cmd", "cmd.node": "/cmd/nodes", "cmd.domain": "/cmd/domains", "cmd.ceph": "/cmd/ceph", "logs": "/logs", "node": "/nodes", "domain": "/domains", "network": "/networks", "storage": "/ceph", "storage.util": "/ceph/util", "osd": "/ceph/osds", "pool": "/ceph/pools", "volume": "/ceph/volumes", "snapshot": "/ceph/snapshots"}, "logs": {"node": "", "messages": "/messages"}, "node": {"name": "", "keepalive": "/keepalive", "mode": "/daemonmode", "data.active_schema": "/activeschema", "data.latest_schema": "/latestschema", "data.static": "/staticdata", "data.pvc_version": "/pvcversion", "running_domains": "/runningdomains", "count.provisioned_domains": "/domainscount", "count.networks": "/networkscount", "state.daemon": "/daemonstate", "state.router": "/routerstate", "state.domain": "/domainstate", "cpu.load": "/cpuload", "vcpu.allocated": "/vcpualloc", "memory.total": "/memtotal", "memory.used": "/memused", "memory.free": "/memfree", "memory.allocated": "/memalloc", "memory.provisioned": "/memprov", "ipmi.hostname": "/ipmihostname", "ipmi.username": "/ipmiusername", "ipmi.password": "/ipmipassword", "sriov": "/sriov", "sriov.pf": "/sriov/pf", "sriov.vf": "/sriov/vf"}, "sriov_pf": {"phy": "", "mtu": "/mtu", "vfcount": "/vfcount"}, "sriov_vf": {"phy": "", "pf": "/pf", "mtu": "/mtu", "mac": "/mac", "phy_mac": "/phy_mac", "config": "/config", "config.vlan_id": "/config/vlan_id", "config.vlan_qos": "/config/vlan_qos", "config.tx_rate_min": "/config/tx_rate_min", "config.tx_rate_max": "/config/tx_rate_max", "config.spoof_check": "/config/spoof_check", "config.link_state": "/config/link_state", "config.trust": "/config/trust", "config.query_rss": "/config/query_rss", "pci": "/pci", "pci.domain": "/pci/domain", "pci.bus": "/pci/bus", "pci.slot": "/pci/slot", "pci.function": "/pci/function", "used": "/used", "used_by": "/used_by"}, "domain": {"name": "", "xml": "/xml", "state": "/state", "profile": "/profile", "stats": "/stats", "node": "/node", "last_node": "/lastnode", "failed_reason": "/failedreason", "storage.volumes": "/rbdlist", "console.log": "/consolelog", "console.vnc": "/vnc", "meta.autostart": "/node_autostart", "meta.migrate_method": "/migration_method", "meta.node_selector": "/node_selector", "meta.node_limit": "/node_limit", "meta.tags": "/tags", "migrate.sync_lock": "/migrate_sync_lock"}, "tag": {"name": "", "type": "/type", "protected": "/protected"}, "network": {"vni": "", "type": "/nettype", "mtu": "/mtu", "rule": "/firewall_rules", "rule.in": "/firewall_rules/in", "rule.out": "/firewall_rules/out", "nameservers": "/name_servers", "domain": "/domain", "reservation": "/dhcp4_reservations", "lease": "/dhcp4_leases", "ip4.gateway": "/ip4_gateway", "ip4.network": "/ip4_network", "ip4.dhcp": "/dhcp4_flag", "ip4.dhcp_start": "/dhcp4_start", "ip4.dhcp_end": "/dhcp4_end", "ip6.gateway": "/ip6_gateway", "ip6.network": "/ip6_network", "ip6.dhcp": "/dhcp6_flag"}, "reservation": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname"}, "lease": {"mac": "", "ip": "/ipaddr", "hostname": "/hostname", "expiry": "/expiry", "client_id": "/clientid"}, "rule": {"description": "", "rule": "/rule", "order": "/order"}, "osd": {"id": "", "node": "/node", "device": "/device", "db_device": "/db_device", "fsid": "/fsid", "ofsid": "/fsid/osd", "cfsid": "/fsid/cluster", "lvm": "/lvm", "vg": "/lvm/vg", "lv": "/lvm/lv", "stats": "/stats"}, "pool": {"name": "", "pgs": "/pgs", "tier": "/tier", "stats": "/stats"}, "volume": {"name": "", "stats": "/stats"}, "snapshot": {"name": "", "stats": "/stats"}}

View File

@ -540,7 +540,7 @@ class ZKHandler(object):
# #
class ZKSchema(object): class ZKSchema(object):
# Current version # Current version
_version = 7 _version = 8
# Root for doing nested keys # Root for doing nested keys
_schema_root = "" _schema_root = ""
@ -700,6 +700,12 @@ class ZKSchema(object):
"node": "/node", "node": "/node",
"device": "/device", "device": "/device",
"db_device": "/db_device", "db_device": "/db_device",
"fsid": "/fsid",
"ofsid": "/fsid/osd",
"cfsid": "/fsid/cluster",
"lvm": "/lvm",
"vg": "/lvm/vg",
"lv": "/lvm/lv",
"stats": "/stats", "stats": "/stats",
}, },
# The schema of an individual pool entry (/ceph/pools/{pool_name}) # The schema of an individual pool entry (/ceph/pools/{pool_name})

View File

@ -943,7 +943,9 @@ def entrypoint():
# Add any missing OSDs to the list # Add any missing OSDs to the list
for osd in [osd for osd in new_osd_list if osd not in osd_list]: for osd in [osd for osd in new_osd_list if osd not in osd_list]:
d_osd[osd] = CephInstance.CephOSDInstance(zkhandler, this_node, osd) d_osd[osd] = CephInstance.CephOSDInstance(
zkhandler, logger, this_node, osd
)
# Remove any deleted OSDs from the list # Remove any deleted OSDs from the list
for osd in [osd for osd in osd_list if osd not in new_osd_list]: for osd in [osd for osd in osd_list if osd not in new_osd_list]:
@ -963,7 +965,9 @@ def entrypoint():
# Add any missing pools to the list # Add any missing pools to the list
for pool in [pool for pool in new_pool_list if pool not in pool_list]: for pool in [pool for pool in new_pool_list if pool not in pool_list]:
d_pool[pool] = CephInstance.CephPoolInstance(zkhandler, this_node, pool) d_pool[pool] = CephInstance.CephPoolInstance(
zkhandler, logger, this_node, pool
)
# Prepare the volume components for this pool # Prepare the volume components for this pool
volume_list[pool] = list() volume_list[pool] = list()
d_volume[pool] = dict() d_volume[pool] = dict()
@ -993,7 +997,7 @@ def entrypoint():
if volume not in volume_list[pool] if volume not in volume_list[pool]
]: ]:
d_volume[pool][volume] = CephInstance.CephVolumeInstance( d_volume[pool][volume] = CephInstance.CephVolumeInstance(
zkhandler, this_node, pool, volume zkhandler, logger, this_node, pool, volume
) )
# Remove any deleted volumes from the list # Remove any deleted volumes from the list

View File

@ -99,12 +99,15 @@ def get_detect_device(detect_string):
class CephOSDInstance(object): class CephOSDInstance(object):
def __init__(self, zkhandler, this_node, osd_id): def __init__(self, zkhandler, logger, this_node, osd_id):
self.zkhandler = zkhandler self.zkhandler = zkhandler
self.logger = logger
self.this_node = this_node self.this_node = this_node
self.osd_id = osd_id self.osd_id = osd_id
self.node = None self.node = None
self.size = None self.device = None
self.vg = None
self.lv = None
self.stats = dict() self.stats = dict()
@self.zkhandler.zk_conn.DataWatch( @self.zkhandler.zk_conn.DataWatch(
@ -141,6 +144,117 @@ class CephOSDInstance(object):
if data and data != self.stats: if data and data != self.stats:
self.stats = json.loads(data) self.stats = json.loads(data)
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("osd.device", self.osd_id)
)
def watch_osd_device(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = ""
if data and data != self.device:
self.device = data
# Exception conditional for migration from schema v7 to schema v8
try:
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("osd.vg", self.osd_id)
)
def watch_osd_vg(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = ""
if data and data != self.vg:
self.vg = data
@self.zkhandler.zk_conn.DataWatch(
self.zkhandler.schema.path("osd.lv", self.osd_id)
)
def watch_osd_lv(data, stat, event=""):
if event and event.type == "DELETED":
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode("ascii")
except AttributeError:
data = ""
if data and data != self.lv:
self.lv = data
if self.node == self.this_node.name:
self.update_information()
except TypeError:
return
def update_information(self):
if self.vg is not None and self.lv is not None:
find_device = f"/dev/{self.vg}/{self.lv}"
else:
find_device = self.device
self.logger.out(
f"Updating stored disk information for OSD {self.osd_id}",
state="i",
)
retcode, stdout, stderr = common.run_os_command(
f"ceph-volume lvm list {find_device}"
)
for line in stdout.split("\n"):
if "block device" in line:
osd_blockdev = line.split()[-1]
if "osd fsid" in line:
osd_fsid = line.split()[-1]
if "cluster fsid" in line:
osd_clusterfsid = line.split()[-1]
if "devices" in line:
osd_device = line.split()[-1]
if not osd_fsid:
self.logger.out(
f"Failed to find updated OSD information via ceph-volume for {find_device}",
state="e",
)
return
# Split OSD blockdev into VG and LV components
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
_, _, osd_vg, osd_lv = osd_blockdev.split("/")
# Except for potentially the "osd.device", this should never change, but this ensures
# that the data is added at lease once on initialization for existing OSDs.
self.zkhandler.write(
[
(("osd.device", self.osd_id), osd_device),
(("osd.fsid", self.osd_id), ""),
(("osd.ofsid", self.osd_id), osd_fsid),
(("osd.cfsid", self.osd_id), osd_clusterfsid),
(("osd.lvm", self.osd_id), ""),
(("osd.vg", self.osd_id), osd_vg),
(("osd.lv", self.osd_id), osd_lv),
]
)
self.device = osd_device
self.vg = osd_vg
self.lv = osd_lv
@staticmethod @staticmethod
def add_osd( def add_osd(
zkhandler, logger, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05 zkhandler, logger, node, device, weight, ext_db_flag=False, ext_db_ratio=0.05
@ -230,24 +344,39 @@ class CephOSDInstance(object):
print(stderr) print(stderr)
raise Exception raise Exception
# 4a. Get OSD FSID # 4a. Get OSD information
logger.out( logger.out(
"Getting OSD FSID for ID {} on {}".format(osd_id, device), state="i" "Getting OSD information for ID {} on {}".format(osd_id, device),
state="i",
) )
retcode, stdout, stderr = common.run_os_command( retcode, stdout, stderr = common.run_os_command(
"ceph-volume lvm list {device}".format(device=device) "ceph-volume lvm list {device}".format(device=device)
) )
for line in stdout.split("\n"): for line in stdout.split("\n"):
if "block device" in line:
osd_blockdev = line.split()[-1]
if "osd fsid" in line: if "osd fsid" in line:
osd_fsid = line.split()[-1] osd_fsid = line.split()[-1]
if "cluster fsid" in line:
osd_clusterfsid = line.split()[-1]
if "devices" in line:
osd_device = line.split()[-1]
if not osd_fsid: if not osd_fsid:
print("ceph-volume lvm list") print("ceph-volume lvm list")
print("Could not find OSD fsid in data:") print("Could not find OSD information in data:")
print(stdout) print(stdout)
print(stderr) print(stderr)
raise Exception raise Exception
# Split OSD blockdev into VG and LV components
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
_, _, osd_vg, osd_lv = osd_blockdev.split("/")
# Reset whatever we were given to Ceph's /dev/xdX naming
if device != osd_device:
device = osd_device
# 4b. Activate the OSD # 4b. Activate the OSD
logger.out("Activating new OSD disk with ID {}".format(osd_id), state="i") logger.out("Activating new OSD disk with ID {}".format(osd_id), state="i")
retcode, stdout, stderr = common.run_os_command( retcode, stdout, stderr = common.run_os_command(
@ -297,6 +426,12 @@ class CephOSDInstance(object):
(("osd.node", osd_id), node), (("osd.node", osd_id), node),
(("osd.device", osd_id), device), (("osd.device", osd_id), device),
(("osd.db_device", osd_id), db_device), (("osd.db_device", osd_id), db_device),
(("osd.fsid", osd_id), ""),
(("osd.ofsid", osd_id), osd_fsid),
(("osd.cfsid", osd_id), osd_clusterfsid),
(("osd.lvm", osd_id), ""),
(("osd.vg", osd_id), osd_vg),
(("osd.lv", osd_id), osd_lv),
( (
("osd.stats", osd_id), ("osd.stats", osd_id),
f'{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "{node}", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", state="|" }', f'{"uuid": "|", "up": 0, "in": 0, "primary_affinity": "|", "utilization": "|", "var": "|", "pgs": "|", "kb": "|", "weight": "|", "reweight": "|", "node": "{node}", "used": "|", "avail": "|", "wr_ops": "|", "wr_data": "|", "rd_ops": "|", "rd_data": "|", state="|" }',
@ -647,8 +782,9 @@ class CephOSDInstance(object):
class CephPoolInstance(object): class CephPoolInstance(object):
def __init__(self, zkhandler, this_node, name): def __init__(self, zkhandler, logger, this_node, name):
self.zkhandler = zkhandler self.zkhandler = zkhandler
self.logger = logger
self.this_node = this_node self.this_node = this_node
self.name = name self.name = name
self.pgs = "" self.pgs = ""
@ -690,8 +826,9 @@ class CephPoolInstance(object):
class CephVolumeInstance(object): class CephVolumeInstance(object):
def __init__(self, zkhandler, this_node, pool, name): def __init__(self, zkhandler, logger, this_node, pool, name):
self.zkhandler = zkhandler self.zkhandler = zkhandler
self.logger = logger
self.this_node = this_node self.this_node = this_node
self.pool = pool self.pool = pool
self.name = name self.name = name