2021-08-21 02:46:11 -04:00
#!/usr/bin/env python3
# CephInstance.py - Class implementing a PVC node Ceph instance
# Part of the Parallel Virtual Cluster (PVC) system
#
2022-10-06 11:55:27 -04:00
# Copyright (C) 2018-2022 Joshua M. Boniface <joshua@boniface.me>
2021-08-21 02:46:11 -04:00
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import time
import json
import daemon_lib . common as common
2021-09-23 13:59:49 -04:00
from distutils . util import strtobool
2021-12-24 15:18:38 -05:00
from re import search , match , sub
def get_detect_device ( detect_string ) :
"""
Parses a " detect: " string into a normalized block device path using lsscsi .
A detect string is formatted " detect:<NAME>:<SIZE>:<ID> " , where
NAME is some unique identifier in lsscsi , SIZE is a human - readable
size value to within + / - 3 % of the real size of the device , and
ID is the Nth ( 0 - indexed ) matching entry of that NAME and SIZE .
"""
_ , name , size , idd = detect_string . split ( " : " )
if _ != " detect " :
return None
retcode , stdout , stderr = common . run_os_command ( " lsscsi -s " )
if retcode :
print ( f " Failed to run lsscsi: { stderr } " )
return None
# Get valid lines
lsscsi_lines_raw = stdout . split ( " \n " )
lsscsi_lines = list ( )
for line in lsscsi_lines_raw :
if not line :
continue
split_line = line . split ( )
if split_line [ 1 ] != " disk " :
continue
lsscsi_lines . append ( line )
# Handle size determination (+/- 3%)
lsscsi_sizes = set ( )
for line in lsscsi_lines :
lsscsi_sizes . add ( split_line [ - 1 ] )
for l_size in lsscsi_sizes :
b_size = float ( sub ( r " \ D. " , " " , size ) )
t_size = float ( sub ( r " \ D. " , " " , l_size ) )
plusthreepct = t_size * 1.03
minusthreepct = t_size * 0.97
if b_size > minusthreepct and b_size < plusthreepct :
size = l_size
break
blockdev = None
matches = list ( )
for idx , line in enumerate ( lsscsi_lines ) :
# Skip non-disk entries
if line . split ( ) [ 1 ] != " disk " :
continue
# Skip if name is not contained in the line (case-insensitive)
if name . lower ( ) not in line . lower ( ) :
continue
# Skip if the size does not match
if size != line . split ( ) [ - 1 ] :
continue
# Get our blockdev and append to the list
matches . append ( line . split ( ) [ - 2 ] )
blockdev = None
# Find the blockdev at index {idd}
for idx , _blockdev in enumerate ( matches ) :
if int ( idx ) == int ( idd ) :
blockdev = _blockdev
break
return blockdev
2021-09-23 13:59:49 -04:00
2021-08-21 02:46:11 -04:00
class CephOSDInstance ( object ) :
2022-05-02 12:11:32 -04:00
def __init__ ( self , zkhandler , logger , this_node , osd_id ) :
2021-08-21 02:46:11 -04:00
self . zkhandler = zkhandler
2022-05-02 12:11:32 -04:00
self . logger = logger
2021-08-21 02:46:11 -04:00
self . this_node = this_node
self . osd_id = osd_id
self . node = None
2022-05-02 12:11:32 -04:00
self . device = None
self . vg = None
self . lv = None
2021-08-21 02:46:11 -04:00
self . stats = dict ( )
2021-11-06 03:02:43 -04:00
@self.zkhandler.zk_conn.DataWatch (
self . zkhandler . schema . path ( " osd.node " , self . osd_id )
)
def watch_osd_node ( data , stat , event = " " ) :
if event and event . type == " DELETED " :
2021-08-21 02:46:11 -04:00
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try :
2021-11-06 03:02:43 -04:00
data = data . decode ( " ascii " )
2021-08-21 02:46:11 -04:00
except AttributeError :
2021-11-06 03:02:43 -04:00
data = " "
2021-08-21 02:46:11 -04:00
if data and data != self . node :
self . node = data
2021-11-06 03:02:43 -04:00
@self.zkhandler.zk_conn.DataWatch (
self . zkhandler . schema . path ( " osd.stats " , self . osd_id )
)
def watch_osd_stats ( data , stat , event = " " ) :
if event and event . type == " DELETED " :
2021-08-21 02:46:11 -04:00
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try :
2021-11-06 03:02:43 -04:00
data = data . decode ( " ascii " )
2021-08-21 02:46:11 -04:00
except AttributeError :
2021-11-06 03:02:43 -04:00
data = " "
2021-08-21 02:46:11 -04:00
if data and data != self . stats :
self . stats = json . loads ( data )
2022-05-02 12:11:32 -04:00
@self.zkhandler.zk_conn.DataWatch (
self . zkhandler . schema . path ( " osd.device " , self . osd_id )
)
def watch_osd_device ( data , stat , event = " " ) :
if event and event . type == " DELETED " :
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try :
data = data . decode ( " ascii " )
except AttributeError :
data = " "
if data and data != self . device :
self . device = data
# Exception conditional for migration from schema v7 to schema v8
try :
@self.zkhandler.zk_conn.DataWatch (
self . zkhandler . schema . path ( " osd.vg " , self . osd_id )
)
def watch_osd_vg ( data , stat , event = " " ) :
if event and event . type == " DELETED " :
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try :
data = data . decode ( " ascii " )
except AttributeError :
data = " "
if data and data != self . vg :
self . vg = data
@self.zkhandler.zk_conn.DataWatch (
self . zkhandler . schema . path ( " osd.lv " , self . osd_id )
)
def watch_osd_lv ( data , stat , event = " " ) :
if event and event . type == " DELETED " :
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try :
data = data . decode ( " ascii " )
except AttributeError :
data = " "
if data and data != self . lv :
self . lv = data
if self . node == self . this_node . name :
self . update_information ( )
except TypeError :
return
def update_information ( self ) :
if self . vg is not None and self . lv is not None :
find_device = f " /dev/ { self . vg } / { self . lv } "
else :
find_device = self . device
self . logger . out (
f " Updating stored disk information for OSD { self . osd_id } " ,
state = " i " ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm list { find_device } "
)
2022-05-06 15:31:58 -04:00
osd_blockdev = None
osd_fsid = None
osd_clusterfsid = None
osd_device = None
2022-05-02 12:11:32 -04:00
for line in stdout . split ( " \n " ) :
if " block device " in line :
osd_blockdev = line . split ( ) [ - 1 ]
if " osd fsid " in line :
osd_fsid = line . split ( ) [ - 1 ]
if " cluster fsid " in line :
osd_clusterfsid = line . split ( ) [ - 1 ]
if " devices " in line :
osd_device = line . split ( ) [ - 1 ]
2022-05-06 15:31:58 -04:00
if not osd_blockdev or not osd_fsid or not osd_clusterfsid or not osd_device :
2022-05-02 12:11:32 -04:00
self . logger . out (
f " Failed to find updated OSD information via ceph-volume for { find_device } " ,
state = " e " ,
)
return
# Split OSD blockdev into VG and LV components
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
_ , _ , osd_vg , osd_lv = osd_blockdev . split ( " / " )
# Except for potentially the "osd.device", this should never change, but this ensures
# that the data is added at lease once on initialization for existing OSDs.
self . zkhandler . write (
[
( ( " osd.device " , self . osd_id ) , osd_device ) ,
( ( " osd.fsid " , self . osd_id ) , " " ) ,
( ( " osd.ofsid " , self . osd_id ) , osd_fsid ) ,
( ( " osd.cfsid " , self . osd_id ) , osd_clusterfsid ) ,
( ( " osd.lvm " , self . osd_id ) , " " ) ,
( ( " osd.vg " , self . osd_id ) , osd_vg ) ,
( ( " osd.lv " , self . osd_id ) , osd_lv ) ,
]
)
self . device = osd_device
self . vg = osd_vg
self . lv = osd_lv
2021-08-21 02:46:11 -04:00
@staticmethod
2021-11-06 03:02:43 -04:00
def add_osd (
zkhandler , logger , node , device , weight , ext_db_flag = False , ext_db_ratio = 0.05
) :
2021-12-24 15:18:38 -05:00
# Handle a detect device if that is passed
if match ( r " detect: " , device ) :
ddevice = get_detect_device ( device )
if ddevice is None :
logger . out (
f " Failed to determine block device from detect string { device } " ,
state = " e " ,
)
return False
else :
logger . out (
f " Determined block device { ddevice } from detect string { device } " ,
state = " i " ,
)
device = ddevice
2021-08-21 02:46:11 -04:00
# We are ready to create a new OSD on this node
2021-11-06 03:02:43 -04:00
logger . out ( " Creating new OSD disk on block device {} " . format ( device ) , state = " i " )
2021-08-21 02:46:11 -04:00
try :
# 1. Create an OSD; we do this so we know what ID will be gen'd
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command ( " ceph osd create " )
2021-08-21 02:46:11 -04:00
if retcode :
2021-11-06 03:02:43 -04:00
print ( " ceph osd create " )
2021-08-21 02:46:11 -04:00
print ( stdout )
print ( stderr )
2021-09-26 00:50:10 -04:00
raise Exception
2021-08-21 02:46:11 -04:00
osd_id = stdout . rstrip ( )
# 2. Remove that newly-created OSD
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
" ceph osd rm {} " . format ( osd_id )
)
2021-08-21 02:46:11 -04:00
if retcode :
2021-11-06 03:02:43 -04:00
print ( " ceph osd rm " )
2021-08-21 02:46:11 -04:00
print ( stdout )
print ( stderr )
2021-09-26 00:50:10 -04:00
raise Exception
2021-08-21 02:46:11 -04:00
# 3a. Zap the disk to ensure it is ready to go
2021-11-06 03:02:43 -04:00
logger . out ( " Zapping disk {} " . format ( device ) , state = " i " )
retcode , stdout , stderr = common . run_os_command (
" ceph-volume lvm zap --destroy {} " . format ( device )
)
2021-08-21 02:46:11 -04:00
if retcode :
2021-11-06 03:02:43 -04:00
print ( " ceph-volume lvm zap " )
2021-08-21 02:46:11 -04:00
print ( stdout )
print ( stderr )
2021-09-26 00:50:10 -04:00
raise Exception
2021-08-21 02:46:11 -04:00
2021-09-23 13:59:49 -04:00
dev_flags = " --data {} " . format ( device )
# 3b. Prepare the logical volume if ext_db_flag
if ext_db_flag :
2021-11-06 03:02:43 -04:00
_ , osd_size_bytes , _ = common . run_os_command (
" blockdev --getsize64 {} " . format ( device )
)
2021-09-23 13:59:49 -04:00
osd_size_bytes = int ( osd_size_bytes )
2021-11-06 03:02:43 -04:00
result = CephOSDInstance . create_osd_db_lv (
zkhandler , logger , osd_id , ext_db_ratio , osd_size_bytes
)
2021-09-23 13:59:49 -04:00
if not result :
2021-09-26 00:50:10 -04:00
raise Exception
2021-09-23 13:59:49 -04:00
db_device = " osd-db/osd- {} " . format ( osd_id )
dev_flags + = " --block.db {} " . format ( db_device )
else :
db_device = " "
# 3c. Create the OSD for real
2021-11-06 03:02:43 -04:00
logger . out (
" Preparing LVM for new OSD disk with ID {} on {} " . format (
osd_id , device
) ,
state = " i " ,
)
2021-08-21 02:46:11 -04:00
retcode , stdout , stderr = common . run_os_command (
2021-11-06 03:02:43 -04:00
" ceph-volume lvm prepare --bluestore {devices} " . format (
2021-11-06 03:21:04 -04:00
devices = dev_flags
2021-08-21 02:46:11 -04:00
)
)
if retcode :
2021-11-06 03:02:43 -04:00
print ( " ceph-volume lvm prepare " )
2021-08-21 02:46:11 -04:00
print ( stdout )
print ( stderr )
2021-09-26 00:50:10 -04:00
raise Exception
2021-08-21 02:46:11 -04:00
2022-05-02 12:11:32 -04:00
# 4a. Get OSD information
2021-11-06 03:02:43 -04:00
logger . out (
2022-05-02 12:11:32 -04:00
" Getting OSD information for ID {} on {} " . format ( osd_id , device ) ,
state = " i " ,
2021-11-06 03:02:43 -04:00
)
2021-08-21 02:46:11 -04:00
retcode , stdout , stderr = common . run_os_command (
2021-11-06 03:21:04 -04:00
" ceph-volume lvm list {device} " . format ( device = device )
2021-08-21 02:46:11 -04:00
)
2021-11-06 03:02:43 -04:00
for line in stdout . split ( " \n " ) :
2022-05-02 12:11:32 -04:00
if " block device " in line :
osd_blockdev = line . split ( ) [ - 1 ]
2021-11-06 03:02:43 -04:00
if " osd fsid " in line :
2021-08-21 02:46:11 -04:00
osd_fsid = line . split ( ) [ - 1 ]
2022-05-02 12:11:32 -04:00
if " cluster fsid " in line :
osd_clusterfsid = line . split ( ) [ - 1 ]
if " devices " in line :
osd_device = line . split ( ) [ - 1 ]
2021-08-21 02:46:11 -04:00
if not osd_fsid :
2021-11-06 03:02:43 -04:00
print ( " ceph-volume lvm list " )
2022-05-02 12:11:32 -04:00
print ( " Could not find OSD information in data: " )
2021-08-21 02:46:11 -04:00
print ( stdout )
print ( stderr )
2021-09-26 00:50:10 -04:00
raise Exception
2021-08-21 02:46:11 -04:00
2022-05-02 12:11:32 -04:00
# Split OSD blockdev into VG and LV components
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
_ , _ , osd_vg , osd_lv = osd_blockdev . split ( " / " )
# Reset whatever we were given to Ceph's /dev/xdX naming
if device != osd_device :
device = osd_device
2021-08-21 02:46:11 -04:00
# 4b. Activate the OSD
2021-11-06 03:27:07 -04:00
logger . out ( " Activating new OSD disk with ID {} " . format ( osd_id ) , state = " i " )
2021-08-21 02:46:11 -04:00
retcode , stdout , stderr = common . run_os_command (
2021-11-06 03:02:43 -04:00
" ceph-volume lvm activate --bluestore {osdid} {osdfsid} " . format (
osdid = osd_id , osdfsid = osd_fsid
2021-08-21 02:46:11 -04:00
)
)
if retcode :
2021-11-06 03:02:43 -04:00
print ( " ceph-volume lvm activate " )
2021-08-21 02:46:11 -04:00
print ( stdout )
print ( stderr )
2021-09-26 00:50:10 -04:00
raise Exception
2021-08-21 02:46:11 -04:00
# 5. Add it to the crush map
2021-11-06 03:02:43 -04:00
logger . out (
" Adding new OSD disk with ID {} to CRUSH map " . format ( osd_id ) , state = " i "
)
2021-08-21 02:46:11 -04:00
retcode , stdout , stderr = common . run_os_command (
2021-11-06 03:02:43 -04:00
" ceph osd crush add osd. {osdid} {weight} root=default host= {node} " . format (
osdid = osd_id , weight = weight , node = node
2021-08-21 02:46:11 -04:00
)
)
if retcode :
2021-11-06 03:02:43 -04:00
print ( " ceph osd crush add " )
2021-08-21 02:46:11 -04:00
print ( stdout )
print ( stderr )
2021-09-26 00:50:10 -04:00
raise Exception
2022-05-06 15:31:58 -04:00
2021-08-21 02:46:11 -04:00
time . sleep ( 0.5 )
# 6. Verify it started
retcode , stdout , stderr = common . run_os_command (
2021-11-06 03:02:43 -04:00
" systemctl status ceph-osd@ {osdid} " . format ( osdid = osd_id )
2021-08-21 02:46:11 -04:00
)
if retcode :
2021-11-06 03:02:43 -04:00
print ( " systemctl status " )
2021-08-21 02:46:11 -04:00
print ( stdout )
print ( stderr )
2021-09-26 00:50:10 -04:00
raise Exception
2021-08-21 02:46:11 -04:00
# 7. Add the new OSD to the list
2021-11-06 03:02:43 -04:00
logger . out (
" Adding new OSD disk with ID {} to Zookeeper " . format ( osd_id ) , state = " i "
)
zkhandler . write (
[
( ( " osd " , osd_id ) , " " ) ,
( ( " osd.node " , osd_id ) , node ) ,
( ( " osd.device " , osd_id ) , device ) ,
( ( " osd.db_device " , osd_id ) , db_device ) ,
2022-05-02 12:11:32 -04:00
( ( " osd.fsid " , osd_id ) , " " ) ,
( ( " osd.ofsid " , osd_id ) , osd_fsid ) ,
( ( " osd.cfsid " , osd_id ) , osd_clusterfsid ) ,
( ( " osd.lvm " , osd_id ) , " " ) ,
( ( " osd.vg " , osd_id ) , osd_vg ) ,
( ( " osd.lv " , osd_id ) , osd_lv ) ,
2022-04-29 16:50:30 -04:00
(
( " osd.stats " , osd_id ) ,
2022-05-02 13:28:19 -04:00
' { " uuid " : " | " , " up " : 0, " in " : 0, " primary_affinity " : " | " , " utilization " : " | " , " var " : " | " , " pgs " : " | " , " kb " : " | " , " weight " : " | " , " reweight " : " | " , " node " : " | " , " used " : " | " , " avail " : " | " , " wr_ops " : " | " , " wr_data " : " | " , " rd_ops " : " | " , " rd_data " : " | " , " state " : " | " } ' ,
2022-04-29 16:50:30 -04:00
) ,
2021-11-06 03:02:43 -04:00
]
)
2021-08-21 02:46:11 -04:00
# Log it
2021-11-06 03:02:43 -04:00
logger . out ( " Created new OSD disk with ID {} " . format ( osd_id ) , state = " o " )
2021-08-21 02:46:11 -04:00
return True
except Exception as e :
# Log it
2021-11-06 03:02:43 -04:00
logger . out ( " Failed to create new OSD disk: {} " . format ( e ) , state = " e " )
2021-08-21 02:46:11 -04:00
return False
2022-05-06 15:31:58 -04:00
@staticmethod
def replace_osd (
zkhandler ,
logger ,
node ,
osd_id ,
old_device ,
new_device ,
weight ,
ext_db_flag = False ,
) :
# Handle a detect device if that is passed
if match ( r " detect: " , new_device ) :
ddevice = get_detect_device ( new_device )
if ddevice is None :
logger . out (
f " Failed to determine block device from detect string { new_device } " ,
state = " e " ,
)
return False
else :
logger . out (
f " Determined block device { ddevice } from detect string { new_device } " ,
state = " i " ,
)
new_device = ddevice
# We are ready to create a new OSD on this node
logger . out (
" Replacing OSD {} disk with block device {} " . format ( osd_id , new_device ) ,
state = " i " ,
)
try :
# Verify the OSD is present
retcode , stdout , stderr = common . run_os_command ( " ceph osd ls " )
osd_list = stdout . split ( " \n " )
if osd_id not in osd_list :
logger . out (
" Could not find OSD {} in the cluster " . format ( osd_id ) , state = " e "
)
return True
# 1. Set the OSD down and out so it will flush
logger . out ( " Setting down OSD disk with ID {} " . format ( osd_id ) , state = " i " )
retcode , stdout , stderr = common . run_os_command (
" ceph osd down {} " . format ( osd_id )
)
if retcode :
print ( " ceph osd down " )
print ( stdout )
print ( stderr )
raise Exception
logger . out ( " Setting out OSD disk with ID {} " . format ( osd_id ) , state = " i " )
retcode , stdout , stderr = common . run_os_command (
" ceph osd out {} " . format ( osd_id )
)
if retcode :
print ( " ceph osd out " )
print ( stdout )
print ( stderr )
raise Exception
# 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete)
logger . out ( " Waiting for OSD {osd_id} to be safe to remove " , state = " i " )
while True :
retcode , stdout , stderr = common . run_os_command (
f " ceph osd safe-to-destroy osd. { osd_id } "
)
if retcode in [ 0 , 11 ] :
# Code 0 = success
# Code 11 = "Error EAGAIN: OSD(s) 5 have no reported stats, and not all PGs are active+clean; we cannot draw any conclusions." which means all PGs have been remappped but backfill is still occurring
break
else :
time . sleep ( 5 )
# 3. Stop the OSD process
logger . out ( " Stopping OSD disk with ID {} " . format ( osd_id ) , state = " i " )
retcode , stdout , stderr = common . run_os_command (
" systemctl stop ceph-osd@ {} " . format ( osd_id )
)
if retcode :
print ( " systemctl stop " )
print ( stdout )
print ( stderr )
raise Exception
time . sleep ( 2 )
# 4. Destroy the OSD
logger . out ( " Destroying OSD with ID {osd_id} " , state = " i " )
retcode , stdout , stderr = common . run_os_command (
f " ceph osd destroy { osd_id } --yes-i-really-mean-it "
)
if retcode :
print ( " ceph osd destroy " )
print ( stdout )
print ( stderr )
raise Exception
# 5. Adjust the weight
logger . out (
" Adjusting weight of OSD disk with ID {} in CRUSH map " . format ( osd_id ) ,
state = " i " ,
)
retcode , stdout , stderr = common . run_os_command (
" ceph osd crush reweight osd. {osdid} {weight} " . format (
osdid = osd_id , weight = weight
)
)
if retcode :
print ( " ceph osd crush reweight " )
print ( stdout )
print ( stderr )
raise Exception
# 6a. Zap the new disk to ensure it is ready to go
logger . out ( " Zapping disk {} " . format ( new_device ) , state = " i " )
retcode , stdout , stderr = common . run_os_command (
" ceph-volume lvm zap --destroy {} " . format ( new_device )
)
if retcode :
print ( " ceph-volume lvm zap " )
print ( stdout )
print ( stderr )
raise Exception
dev_flags = " --data {} " . format ( new_device )
# 6b. Prepare the logical volume if ext_db_flag
if ext_db_flag :
db_device = " osd-db/osd- {} " . format ( osd_id )
dev_flags + = " --block.db {} " . format ( db_device )
else :
db_device = " "
# 6c. Replace the OSD
logger . out (
" Preparing LVM for replaced OSD {} disk on {} " . format (
osd_id , new_device
) ,
state = " i " ,
)
retcode , stdout , stderr = common . run_os_command (
" ceph-volume lvm prepare --osd-id {osdid} --bluestore {devices} " . format (
osdid = osd_id , devices = dev_flags
)
)
if retcode :
print ( " ceph-volume lvm prepare " )
print ( stdout )
print ( stderr )
raise Exception
# 7a. Get OSD information
logger . out (
" Getting OSD information for ID {} on {} " . format ( osd_id , new_device ) ,
state = " i " ,
)
retcode , stdout , stderr = common . run_os_command (
" ceph-volume lvm list {device} " . format ( device = new_device )
)
for line in stdout . split ( " \n " ) :
if " block device " in line :
osd_blockdev = line . split ( ) [ - 1 ]
if " osd fsid " in line :
osd_fsid = line . split ( ) [ - 1 ]
if " cluster fsid " in line :
osd_clusterfsid = line . split ( ) [ - 1 ]
if " devices " in line :
osd_device = line . split ( ) [ - 1 ]
if not osd_fsid :
print ( " ceph-volume lvm list " )
print ( " Could not find OSD information in data: " )
print ( stdout )
print ( stderr )
raise Exception
# Split OSD blockdev into VG and LV components
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
_ , _ , osd_vg , osd_lv = osd_blockdev . split ( " / " )
# Reset whatever we were given to Ceph's /dev/xdX naming
if new_device != osd_device :
new_device = osd_device
# 7b. Activate the OSD
logger . out ( " Activating new OSD disk with ID {} " . format ( osd_id ) , state = " i " )
retcode , stdout , stderr = common . run_os_command (
" ceph-volume lvm activate --bluestore {osdid} {osdfsid} " . format (
osdid = osd_id , osdfsid = osd_fsid
)
)
if retcode :
print ( " ceph-volume lvm activate " )
print ( stdout )
print ( stderr )
raise Exception
time . sleep ( 0.5 )
# 8. Verify it started
retcode , stdout , stderr = common . run_os_command (
" systemctl status ceph-osd@ {osdid} " . format ( osdid = osd_id )
)
if retcode :
print ( " systemctl status " )
print ( stdout )
print ( stderr )
raise Exception
# 9. Update Zookeeper information
logger . out (
" Adding new OSD disk with ID {} to Zookeeper " . format ( osd_id ) , state = " i "
)
zkhandler . write (
[
( ( " osd " , osd_id ) , " " ) ,
( ( " osd.node " , osd_id ) , node ) ,
( ( " osd.device " , osd_id ) , new_device ) ,
( ( " osd.db_device " , osd_id ) , db_device ) ,
( ( " osd.fsid " , osd_id ) , " " ) ,
( ( " osd.ofsid " , osd_id ) , osd_fsid ) ,
( ( " osd.cfsid " , osd_id ) , osd_clusterfsid ) ,
( ( " osd.lvm " , osd_id ) , " " ) ,
( ( " osd.vg " , osd_id ) , osd_vg ) ,
( ( " osd.lv " , osd_id ) , osd_lv ) ,
(
( " osd.stats " , osd_id ) ,
' { " uuid " : " | " , " up " : 0, " in " : 0, " primary_affinity " : " | " , " utilization " : " | " , " var " : " | " , " pgs " : " | " , " kb " : " | " , " weight " : " | " , " reweight " : " | " , " node " : " | " , " used " : " | " , " avail " : " | " , " wr_ops " : " | " , " wr_data " : " | " , " rd_ops " : " | " , " rd_data " : " | " , " state " : " | " } ' ,
) ,
]
)
# Log it
logger . out (
" Replaced OSD {} disk with device {} " . format ( osd_id , new_device ) ,
state = " o " ,
)
return True
except Exception as e :
# Log it
logger . out ( " Failed to replace OSD {} disk: {} " . format ( osd_id , e ) , state = " e " )
return False
@staticmethod
def refresh_osd ( zkhandler , logger , node , osd_id , device , ext_db_flag ) :
# Handle a detect device if that is passed
if match ( r " detect: " , device ) :
ddevice = get_detect_device ( device )
if ddevice is None :
logger . out (
f " Failed to determine block device from detect string { device } " ,
state = " e " ,
)
return False
else :
logger . out (
f " Determined block device { ddevice } from detect string { device } " ,
state = " i " ,
)
device = ddevice
# We are ready to create a new OSD on this node
logger . out (
" Refreshing OSD {} disk on block device {} " . format ( osd_id , device ) ,
state = " i " ,
)
try :
# 1. Verify the OSD is present
retcode , stdout , stderr = common . run_os_command ( " ceph osd ls " )
osd_list = stdout . split ( " \n " )
if osd_id not in osd_list :
logger . out (
" Could not find OSD {} in the cluster " . format ( osd_id ) , state = " e "
)
return True
dev_flags = " --data {} " . format ( device )
if ext_db_flag :
db_device = " osd-db/osd- {} " . format ( osd_id )
dev_flags + = " --block.db {} " . format ( db_device )
else :
db_device = " "
# 2. Get OSD information
logger . out (
" Getting OSD information for ID {} on {} " . format ( osd_id , device ) ,
state = " i " ,
)
retcode , stdout , stderr = common . run_os_command (
" ceph-volume lvm list {device} " . format ( device = device )
)
for line in stdout . split ( " \n " ) :
if " block device " in line :
osd_blockdev = line . split ( ) [ - 1 ]
if " osd fsid " in line :
osd_fsid = line . split ( ) [ - 1 ]
if " cluster fsid " in line :
osd_clusterfsid = line . split ( ) [ - 1 ]
if " devices " in line :
osd_device = line . split ( ) [ - 1 ]
if not osd_fsid :
print ( " ceph-volume lvm list " )
print ( " Could not find OSD information in data: " )
print ( stdout )
print ( stderr )
raise Exception
# Split OSD blockdev into VG and LV components
# osd_blockdev = /dev/ceph-<uuid>/osd-block-<uuid>
_ , _ , osd_vg , osd_lv = osd_blockdev . split ( " / " )
# Reset whatever we were given to Ceph's /dev/xdX naming
if device != osd_device :
device = osd_device
# 3. Activate the OSD
logger . out ( " Activating new OSD disk with ID {} " . format ( osd_id ) , state = " i " )
retcode , stdout , stderr = common . run_os_command (
" ceph-volume lvm activate --bluestore {osdid} {osdfsid} " . format (
osdid = osd_id , osdfsid = osd_fsid
)
)
if retcode :
print ( " ceph-volume lvm activate " )
print ( stdout )
print ( stderr )
raise Exception
time . sleep ( 0.5 )
# 4. Verify it started
retcode , stdout , stderr = common . run_os_command (
" systemctl status ceph-osd@ {osdid} " . format ( osdid = osd_id )
)
if retcode :
print ( " systemctl status " )
print ( stdout )
print ( stderr )
raise Exception
# 5. Update Zookeeper information
logger . out (
" Adding new OSD disk with ID {} to Zookeeper " . format ( osd_id ) , state = " i "
)
zkhandler . write (
[
( ( " osd " , osd_id ) , " " ) ,
( ( " osd.node " , osd_id ) , node ) ,
( ( " osd.device " , osd_id ) , device ) ,
( ( " osd.db_device " , osd_id ) , db_device ) ,
( ( " osd.fsid " , osd_id ) , " " ) ,
( ( " osd.ofsid " , osd_id ) , osd_fsid ) ,
( ( " osd.cfsid " , osd_id ) , osd_clusterfsid ) ,
( ( " osd.lvm " , osd_id ) , " " ) ,
( ( " osd.vg " , osd_id ) , osd_vg ) ,
( ( " osd.lv " , osd_id ) , osd_lv ) ,
(
( " osd.stats " , osd_id ) ,
' { " uuid " : " | " , " up " : 0, " in " : 0, " primary_affinity " : " | " , " utilization " : " | " , " var " : " | " , " pgs " : " | " , " kb " : " | " , " weight " : " | " , " reweight " : " | " , " node " : " | " , " used " : " | " , " avail " : " | " , " wr_ops " : " | " , " wr_data " : " | " , " rd_ops " : " | " , " rd_data " : " | " , " state " : " | " } ' ,
) ,
]
)
# Log it
logger . out ( " Refreshed OSD {} disk on {} " . format ( osd_id , device ) , state = " o " )
return True
except Exception as e :
# Log it
logger . out ( " Failed to refresh OSD {} disk: {} " . format ( osd_id , e ) , state = " e " )
return False
2021-08-21 02:46:11 -04:00
@staticmethod
2022-04-29 11:16:33 -04:00
def remove_osd ( zkhandler , logger , osd_id , osd_obj , force_flag ) :
2021-11-06 03:02:43 -04:00
logger . out ( " Removing OSD disk {} " . format ( osd_id ) , state = " i " )
2021-08-21 02:46:11 -04:00
try :
2021-12-28 03:05:34 -05:00
# Verify the OSD is present
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command ( " ceph osd ls " )
osd_list = stdout . split ( " \n " )
2021-08-21 02:46:11 -04:00
if osd_id not in osd_list :
2021-11-06 03:02:43 -04:00
logger . out (
" Could not find OSD {} in the cluster " . format ( osd_id ) , state = " e "
)
2022-04-29 11:16:33 -04:00
if force_flag :
logger . out ( " Ignoring error due to force flag " , state = " i " )
else :
return True
2021-08-21 02:46:11 -04:00
2021-12-28 03:05:34 -05:00
# 1. Set the OSD down and out so it will flush
logger . out ( " Setting down OSD disk with ID {} " . format ( osd_id ) , state = " i " )
retcode , stdout , stderr = common . run_os_command (
" ceph osd down {} " . format ( osd_id )
)
if retcode :
print ( " ceph osd down " )
print ( stdout )
print ( stderr )
2022-04-29 11:16:33 -04:00
if force_flag :
logger . out ( " Ignoring error due to force flag " , state = " i " )
else :
raise Exception
2021-12-28 03:05:34 -05:00
2021-11-06 03:02:43 -04:00
logger . out ( " Setting out OSD disk with ID {} " . format ( osd_id ) , state = " i " )
retcode , stdout , stderr = common . run_os_command (
" ceph osd out {} " . format ( osd_id )
)
2021-08-21 02:46:11 -04:00
if retcode :
2021-11-06 03:02:43 -04:00
print ( " ceph osd out " )
2021-08-21 02:46:11 -04:00
print ( stdout )
print ( stderr )
2022-04-29 11:16:33 -04:00
if force_flag :
logger . out ( " Ignoring error due to force flag " , state = " i " )
else :
raise Exception
2021-08-21 02:46:11 -04:00
2022-05-06 15:31:58 -04:00
# 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete)
logger . out ( " Waiting for OSD {osd_id} to be safe to remove " , state = " i " )
2021-08-21 02:46:11 -04:00
while True :
2022-05-06 15:31:58 -04:00
retcode , stdout , stderr = common . run_os_command (
f " ceph osd safe-to-destroy osd. { osd_id } "
)
if int ( retcode ) in [ 0 , 11 ] :
2021-08-21 02:46:11 -04:00
break
2022-05-06 15:31:58 -04:00
else :
time . sleep ( 5 )
2021-08-21 02:46:11 -04:00
# 3. Stop the OSD process and wait for it to be terminated
2021-11-06 03:02:43 -04:00
logger . out ( " Stopping OSD disk with ID {} " . format ( osd_id ) , state = " i " )
retcode , stdout , stderr = common . run_os_command (
" systemctl stop ceph-osd@ {} " . format ( osd_id )
)
2021-08-21 02:46:11 -04:00
if retcode :
2021-11-06 03:02:43 -04:00
print ( " systemctl stop " )
2021-08-21 02:46:11 -04:00
print ( stdout )
print ( stderr )
2022-04-29 11:16:33 -04:00
if force_flag :
logger . out ( " Ignoring error due to force flag " , state = " i " )
else :
raise Exception
2022-05-06 15:31:58 -04:00
time . sleep ( 2 )
2021-08-21 02:46:11 -04:00
# 4. Determine the block devices
2022-05-02 12:52:22 -04:00
osd_vg = zkhandler . read ( ( " osd.vg " , osd_id ) )
osd_lv = zkhandler . read ( ( " osd.lv " , osd_id ) )
osd_lvm = f " /dev/ { osd_vg } / { osd_lv } "
osd_device = None
2021-08-21 02:46:11 -04:00
2021-11-06 03:02:43 -04:00
logger . out (
2022-05-02 12:52:22 -04:00
f " Getting disk info for OSD { osd_id } LV { osd_lvm } " ,
2022-04-29 13:26:36 -04:00
state = " i " ,
2021-11-06 03:02:43 -04:00
)
retcode , stdout , stderr = common . run_os_command (
2022-05-02 12:52:22 -04:00
f " ceph-volume lvm list { osd_lvm } "
2021-11-06 03:02:43 -04:00
)
2022-05-02 12:52:22 -04:00
for line in stdout . split ( " \n " ) :
if " devices " in line :
osd_device = line . split ( ) [ - 1 ]
if not osd_device :
print ( " ceph-volume lvm list " )
print ( " Could not find OSD information in data: " )
print ( stdout )
print ( stderr )
if force_flag :
logger . out ( " Ignoring error due to force flag " , state = " i " )
else :
raise Exception
# 5. Zap the volumes
logger . out (
" Zapping OSD {} disk on {} " . format ( osd_id , osd_device ) ,
state = " i " ,
)
retcode , stdout , stderr = common . run_os_command (
" ceph-volume lvm zap --destroy {} " . format ( osd_device )
)
if retcode :
print ( " ceph-volume lvm zap " )
print ( stdout )
print ( stderr )
if force_flag :
logger . out ( " Ignoring error due to force flag " , state = " i " )
else :
raise Exception
2021-08-21 02:46:11 -04:00
# 6. Purge the OSD from Ceph
2021-11-06 03:02:43 -04:00
logger . out ( " Purging OSD disk with ID {} " . format ( osd_id ) , state = " i " )
retcode , stdout , stderr = common . run_os_command (
" ceph osd purge {} --yes-i-really-mean-it " . format ( osd_id )
)
2021-08-21 02:46:11 -04:00
if retcode :
2021-11-06 03:02:43 -04:00
print ( " ceph osd purge " )
2021-08-21 02:46:11 -04:00
print ( stdout )
print ( stderr )
2022-04-29 11:16:33 -04:00
if force_flag :
logger . out ( " Ignoring error due to force flag " , state = " i " )
else :
raise Exception
2021-08-21 02:46:11 -04:00
2021-09-23 13:59:49 -04:00
# 7. Remove the DB device
2021-11-06 03:02:43 -04:00
if zkhandler . exists ( ( " osd.db_device " , osd_id ) ) :
db_device = zkhandler . read ( ( " osd.db_device " , osd_id ) )
logger . out (
' Removing OSD DB logical volume " {} " ' . format ( db_device ) , state = " i "
)
retcode , stdout , stderr = common . run_os_command (
" lvremove --yes --force {} " . format ( db_device )
)
2021-09-23 13:59:49 -04:00
# 8. Delete OSD from ZK
2021-11-06 03:02:43 -04:00
logger . out (
" Deleting OSD disk with ID {} from Zookeeper " . format ( osd_id ) , state = " i "
)
zkhandler . delete ( ( " osd " , osd_id ) , recursive = True )
2021-08-21 02:46:11 -04:00
# Log it
2021-11-06 03:02:43 -04:00
logger . out ( " Removed OSD disk with ID {} " . format ( osd_id ) , state = " o " )
2021-08-21 02:46:11 -04:00
return True
except Exception as e :
# Log it
2021-11-06 03:02:43 -04:00
logger . out (
" Failed to purge OSD disk with ID {} : {} " . format ( osd_id , e ) , state = " e "
)
2021-08-21 02:46:11 -04:00
return False
2021-09-23 13:59:49 -04:00
@staticmethod
def add_db_vg ( zkhandler , logger , device ) :
2021-12-24 15:18:38 -05:00
# Check if an existsing volume group exists
retcode , stdout , stderr = common . run_os_command ( " vgdisplay osd-db " )
if retcode != 5 :
logger . out ( ' Ceph OSD database VG " osd-db " already exists ' , state = " e " )
return False
# Handle a detect device if that is passed
if match ( r " detect: " , device ) :
ddevice = get_detect_device ( device )
if ddevice is None :
logger . out (
f " Failed to determine block device from detect string { device } " ,
state = " e " ,
)
return False
else :
logger . out (
f " Determined block device { ddevice } from detect string { device } " ,
state = " i " ,
)
device = ddevice
2021-11-06 03:02:43 -04:00
logger . out (
" Creating new OSD database volume group on block device {} " . format ( device ) ,
state = " i " ,
)
2021-09-23 13:59:49 -04:00
try :
# 1. Create an empty partition table
2021-11-06 03:02:43 -04:00
logger . out (
" Creating partitions on block device {} " . format ( device ) , state = " i "
)
2021-09-23 13:59:49 -04:00
retcode , stdout , stderr = common . run_os_command (
2021-11-06 03:02:43 -04:00
" sgdisk --clear {} " . format ( device )
2021-09-23 13:59:49 -04:00
)
if retcode :
2021-11-06 03:02:43 -04:00
print ( " sgdisk create partition table " )
2021-09-23 13:59:49 -04:00
print ( stdout )
print ( stderr )
2021-09-26 00:50:10 -04:00
raise Exception
2021-09-23 13:59:49 -04:00
2021-09-26 00:08:54 -04:00
retcode , stdout , stderr = common . run_os_command (
2021-11-06 03:02:43 -04:00
" sgdisk --new 1:: --typecode 1:8e00 {} " . format ( device )
2021-09-26 00:08:54 -04:00
)
if retcode :
2021-11-06 03:02:43 -04:00
print ( " sgdisk create pv partition " )
2021-09-26 00:08:54 -04:00
print ( stdout )
print ( stderr )
2021-09-26 00:50:10 -04:00
raise Exception
2021-09-26 00:08:54 -04:00
# Handle the partition ID portion
2021-11-06 03:02:43 -04:00
if search ( r " by-path " , device ) or search ( r " by-id " , device ) :
2021-09-26 00:08:54 -04:00
# /dev/disk/by-path/pci-0000:03:00.0-scsi-0:1:0:0 -> pci-0000:03:00.0-scsi-0:1:0:0-part1
2021-11-06 03:02:43 -04:00
partition = " {} -part1 " . format ( device )
elif search ( r " nvme " , device ) :
2021-09-26 00:08:54 -04:00
# /dev/nvme0n1 -> nvme0n1p1
2021-11-06 03:02:43 -04:00
partition = " {} p1 " . format ( device )
2021-09-26 00:08:54 -04:00
else :
# /dev/sda -> sda1
# No other '/dev/disk/by-*' types are valid for raw block devices anyways
2021-11-06 03:02:43 -04:00
partition = " {} 1 " . format ( device )
2021-09-26 00:08:54 -04:00
2021-09-23 13:59:49 -04:00
# 2. Create the PV
2021-11-06 03:02:43 -04:00
logger . out ( " Creating PV on block device {} " . format ( partition ) , state = " i " )
2021-09-23 13:59:49 -04:00
retcode , stdout , stderr = common . run_os_command (
2021-11-06 03:02:43 -04:00
" pvcreate --force {} " . format ( partition )
2021-09-23 13:59:49 -04:00
)
if retcode :
2021-11-06 03:02:43 -04:00
print ( " pv creation " )
2021-09-23 13:59:49 -04:00
print ( stdout )
print ( stderr )
2021-09-26 00:50:10 -04:00
raise Exception
2021-09-23 13:59:49 -04:00
# 2. Create the VG (named 'osd-db')
2021-11-06 03:02:43 -04:00
logger . out (
' Creating VG " osd-db " on block device {} ' . format ( partition ) , state = " i "
)
2021-09-23 13:59:49 -04:00
retcode , stdout , stderr = common . run_os_command (
2021-11-06 03:02:43 -04:00
" vgcreate --force osd-db {} " . format ( partition )
2021-09-23 13:59:49 -04:00
)
if retcode :
2021-11-06 03:02:43 -04:00
print ( " vg creation " )
2021-09-23 13:59:49 -04:00
print ( stdout )
print ( stderr )
2021-09-26 00:50:10 -04:00
raise Exception
2021-09-23 13:59:49 -04:00
# Log it
2021-11-06 03:02:43 -04:00
logger . out (
" Created new OSD database volume group on block device {} " . format (
device
) ,
state = " o " ,
)
2021-09-23 13:59:49 -04:00
return True
except Exception as e :
# Log it
2021-11-06 03:02:43 -04:00
logger . out (
" Failed to create OSD database volume group: {} " . format ( e ) , state = " e "
)
2021-09-23 13:59:49 -04:00
return False
@staticmethod
2021-09-23 23:31:58 -04:00
def create_osd_db_lv ( zkhandler , logger , osd_id , ext_db_ratio , osd_size_bytes ) :
2021-11-06 03:02:43 -04:00
logger . out (
" Creating new OSD database logical volume for OSD ID {} " . format ( osd_id ) ,
state = " i " ,
)
2021-09-23 13:59:49 -04:00
try :
# 0. Check if an existsing logical volume exists
retcode , stdout , stderr = common . run_os_command (
2021-11-06 03:02:43 -04:00
" lvdisplay osd-db/osd {} " . format ( osd_id )
2021-09-23 13:59:49 -04:00
)
if retcode != 5 :
2021-11-06 03:02:43 -04:00
logger . out (
' Ceph OSD database LV " osd-db/osd {} " already exists ' . format ( osd_id ) ,
state = " e " ,
)
2021-09-23 13:59:49 -04:00
return False
2021-09-23 23:31:58 -04:00
# 1. Determine LV sizing
osd_db_size = int ( osd_size_bytes * ext_db_ratio / 1024 / 1024 )
2021-09-23 13:59:49 -04:00
# 2. Create the LV
2021-11-06 03:02:43 -04:00
logger . out (
' Creating DB LV " osd-db/osd- {} " of {} M ( {} * {} ) ' . format (
osd_id , osd_db_size , osd_size_bytes , ext_db_ratio
) ,
state = " i " ,
)
2021-09-23 13:59:49 -04:00
retcode , stdout , stderr = common . run_os_command (
2021-11-06 03:02:43 -04:00
" lvcreate --yes --name osd- {} --size {} osd-db " . format (
osd_id , osd_db_size
)
2021-09-23 13:59:49 -04:00
)
if retcode :
2021-11-06 03:02:43 -04:00
print ( " db lv creation " )
2021-09-23 13:59:49 -04:00
print ( stdout )
print ( stderr )
2021-09-26 00:50:10 -04:00
raise Exception
2021-09-23 13:59:49 -04:00
# Log it
2021-11-06 03:02:43 -04:00
logger . out (
' Created new OSD database logical volume " osd-db/osd- {} " ' . format (
osd_id
) ,
state = " o " ,
)
2021-09-23 13:59:49 -04:00
return True
except Exception as e :
# Log it
2021-11-06 03:02:43 -04:00
logger . out (
" Failed to create OSD database logical volume: {} " . format ( e ) , state = " e "
)
2021-09-23 13:59:49 -04:00
return False
2021-08-21 02:46:11 -04:00
class CephPoolInstance ( object ) :
2022-05-02 12:11:32 -04:00
def __init__ ( self , zkhandler , logger , this_node , name ) :
2021-08-21 02:46:11 -04:00
self . zkhandler = zkhandler
2022-05-02 12:11:32 -04:00
self . logger = logger
2021-08-21 02:46:11 -04:00
self . this_node = this_node
self . name = name
2021-11-06 03:02:43 -04:00
self . pgs = " "
2021-08-21 02:46:11 -04:00
self . stats = dict ( )
2021-11-06 03:02:43 -04:00
@self.zkhandler.zk_conn.DataWatch (
self . zkhandler . schema . path ( " pool.pgs " , self . name )
)
def watch_pool_node ( data , stat , event = " " ) :
if event and event . type == " DELETED " :
2021-08-21 02:46:11 -04:00
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try :
2021-11-06 03:02:43 -04:00
data = data . decode ( " ascii " )
2021-08-21 02:46:11 -04:00
except AttributeError :
2021-11-06 03:02:43 -04:00
data = " "
2021-08-21 02:46:11 -04:00
if data and data != self . pgs :
self . pgs = data
2021-11-06 03:02:43 -04:00
@self.zkhandler.zk_conn.DataWatch (
self . zkhandler . schema . path ( " pool.stats " , self . name )
)
def watch_pool_stats ( data , stat , event = " " ) :
if event and event . type == " DELETED " :
2021-08-21 02:46:11 -04:00
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try :
2021-11-06 03:02:43 -04:00
data = data . decode ( " ascii " )
2021-08-21 02:46:11 -04:00
except AttributeError :
2021-11-06 03:02:43 -04:00
data = " "
2021-08-21 02:46:11 -04:00
if data and data != self . stats :
self . stats = json . loads ( data )
class CephVolumeInstance ( object ) :
2022-05-02 12:11:32 -04:00
def __init__ ( self , zkhandler , logger , this_node , pool , name ) :
2021-08-21 02:46:11 -04:00
self . zkhandler = zkhandler
2022-05-02 12:11:32 -04:00
self . logger = logger
2021-08-21 02:46:11 -04:00
self . this_node = this_node
self . pool = pool
self . name = name
self . stats = dict ( )
2021-11-06 03:02:43 -04:00
@self.zkhandler.zk_conn.DataWatch (
self . zkhandler . schema . path ( " volume.stats " , f " { self . pool } / { self . name } " )
)
def watch_volume_stats ( data , stat , event = " " ) :
if event and event . type == " DELETED " :
2021-08-21 02:46:11 -04:00
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try :
2021-11-06 03:02:43 -04:00
data = data . decode ( " ascii " )
2021-08-21 02:46:11 -04:00
except AttributeError :
2021-11-06 03:02:43 -04:00
data = " "
2021-08-21 02:46:11 -04:00
if data and data != self . stats :
self . stats = json . loads ( data )
class CephSnapshotInstance ( object ) :
def __init__ ( self , zkhandler , this_node , pool , volume , name ) :
self . zkhandler = zkhandler
self . this_node = this_node
self . pool = pool
self . volume = volume
self . name = name
self . stats = dict ( )
2021-11-06 03:02:43 -04:00
@self.zkhandler.zk_conn.DataWatch (
self . zkhandler . schema . path (
" snapshot.stats " , f " { self . pool } / { self . volume } / { self . name } "
)
)
def watch_snapshot_stats ( data , stat , event = " " ) :
if event and event . type == " DELETED " :
2021-08-21 02:46:11 -04:00
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try :
2021-11-06 03:02:43 -04:00
data = data . decode ( " ascii " )
2021-08-21 02:46:11 -04:00
except AttributeError :
2021-11-06 03:02:43 -04:00
data = " "
2021-08-21 02:46:11 -04:00
if data and data != self . stats :
self . stats = json . loads ( data )
# Primary command function
# This command pipe is only used for OSD adds and removes
def ceph_command ( zkhandler , logger , this_node , data , d_osd ) :
2022-07-06 15:58:57 -04:00
# Get the command and args; the * + join ensures arguments with spaces (e.g. detect strings) are recombined right
command , * args = data . split ( )
args = " " . join ( args )
2021-08-21 02:46:11 -04:00
# Adding a new OSD
2021-11-06 03:02:43 -04:00
if command == " osd_add " :
node , device , weight , ext_db_flag , ext_db_ratio = args . split ( " , " )
2021-09-23 13:59:49 -04:00
ext_db_flag = bool ( strtobool ( ext_db_flag ) )
2021-09-23 23:31:58 -04:00
ext_db_ratio = float ( ext_db_ratio )
2021-08-21 02:46:11 -04:00
if node == this_node . name :
# Lock the command queue
2021-11-06 03:02:43 -04:00
zk_lock = zkhandler . writelock ( " base.cmd.ceph " )
2021-08-21 02:46:11 -04:00
with zk_lock :
# Add the OSD
2021-11-06 03:02:43 -04:00
result = CephOSDInstance . add_osd (
zkhandler , logger , node , device , weight , ext_db_flag , ext_db_ratio
)
2021-08-21 02:46:11 -04:00
# Command succeeded
if result :
# Update the command queue
2021-11-06 03:02:43 -04:00
zkhandler . write ( [ ( " base.cmd.ceph " , " success- {} " . format ( data ) ) ] )
2021-08-21 02:46:11 -04:00
# Command failed
else :
# Update the command queue
2021-11-06 03:02:43 -04:00
zkhandler . write ( [ ( " base.cmd.ceph " , " failure- {} " . format ( data ) ) ] )
2022-05-06 15:31:58 -04:00
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time . sleep ( 1 )
# Replacing an OSD
if command == " osd_replace " :
node , osd_id , old_device , new_device , weight , ext_db_flag = args . split ( " , " )
ext_db_flag = bool ( strtobool ( ext_db_flag ) )
if node == this_node . name :
# Lock the command queue
zk_lock = zkhandler . writelock ( " base.cmd.ceph " )
with zk_lock :
# Add the OSD
result = CephOSDInstance . replace_osd (
zkhandler ,
logger ,
node ,
osd_id ,
old_device ,
new_device ,
weight ,
ext_db_flag ,
)
# Command succeeded
if result :
# Update the command queue
zkhandler . write ( [ ( " base.cmd.ceph " , " success- {} " . format ( data ) ) ] )
# Command failed
else :
# Update the command queue
zkhandler . write ( [ ( " base.cmd.ceph " , " failure- {} " . format ( data ) ) ] )
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time . sleep ( 1 )
# Refreshing an OSD
if command == " osd_refresh " :
node , osd_id , device , ext_db_flag = args . split ( " , " )
ext_db_flag = bool ( strtobool ( ext_db_flag ) )
if node == this_node . name :
# Lock the command queue
zk_lock = zkhandler . writelock ( " base.cmd.ceph " )
with zk_lock :
# Add the OSD
result = CephOSDInstance . refresh_osd (
zkhandler , logger , node , osd_id , device , ext_db_flag
)
# Command succeeded
if result :
# Update the command queue
zkhandler . write ( [ ( " base.cmd.ceph " , " success- {} " . format ( data ) ) ] )
# Command failed
else :
# Update the command queue
zkhandler . write ( [ ( " base.cmd.ceph " , " failure- {} " . format ( data ) ) ] )
2021-08-21 02:46:11 -04:00
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time . sleep ( 1 )
# Removing an OSD
2021-11-06 03:02:43 -04:00
elif command == " osd_remove " :
2022-04-29 13:26:36 -04:00
osd_id , force = args . split ( " , " )
force_flag = bool ( strtobool ( force ) )
2021-08-21 02:46:11 -04:00
# Verify osd_id is in the list
if d_osd [ osd_id ] and d_osd [ osd_id ] . node == this_node . name :
# Lock the command queue
2021-11-06 03:02:43 -04:00
zk_lock = zkhandler . writelock ( " base.cmd.ceph " )
2021-08-21 02:46:11 -04:00
with zk_lock :
# Remove the OSD
2021-11-06 03:02:43 -04:00
result = CephOSDInstance . remove_osd (
2022-04-29 11:16:33 -04:00
zkhandler , logger , osd_id , d_osd [ osd_id ] , force_flag
2021-11-06 03:02:43 -04:00
)
2021-08-21 02:46:11 -04:00
# Command succeeded
if result :
# Update the command queue
2021-11-06 03:02:43 -04:00
zkhandler . write ( [ ( " base.cmd.ceph " , " success- {} " . format ( data ) ) ] )
2021-08-21 02:46:11 -04:00
# Command failed
else :
# Update the command queue
2021-11-06 03:02:43 -04:00
zkhandler . write ( [ ( " base.cmd.ceph " , " failure- {} " . format ( data ) ) ] )
2021-08-21 02:46:11 -04:00
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time . sleep ( 1 )
2021-09-23 13:59:49 -04:00
# Adding a new DB VG
2021-11-06 03:02:43 -04:00
elif command == " db_vg_add " :
node , device = args . split ( " , " )
2021-09-23 13:59:49 -04:00
if node == this_node . name :
# Lock the command queue
2021-11-06 03:02:43 -04:00
zk_lock = zkhandler . writelock ( " base.cmd.ceph " )
2021-09-23 13:59:49 -04:00
with zk_lock :
# Add the VG
result = CephOSDInstance . add_db_vg ( zkhandler , logger , device )
# Command succeeded
if result :
# Update the command queue
2021-11-06 03:02:43 -04:00
zkhandler . write ( [ ( " base.cmd.ceph " , " success- {} " . format ( data ) ) ] )
2021-09-23 13:59:49 -04:00
# Command failed
else :
# Update the command queue
2021-11-06 03:02:43 -04:00
zkhandler . write ( [ ( " base.cmd.ceph " , " failure= {} " . format ( data ) ) ] )
2021-09-23 13:59:49 -04:00
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time . sleep ( 1 )