2019-09-26 14:07:52 -04:00
#!/usr/bin/env python3
2018-10-27 18:11:58 -04:00
# ceph.py - PVC client function library, Ceph cluster fuctions
# Part of the Parallel Virtual Cluster (PVC) system
#
2023-12-29 11:16:59 -05:00
# Copyright (C) 2018-2024 Joshua M. Boniface <joshua@boniface.me>
2018-10-27 18:11:58 -04:00
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
2021-03-25 16:57:17 -04:00
# the Free Software Foundation, version 3.
2018-10-27 18:11:58 -04:00
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
2020-02-09 13:43:48 -05:00
import os
2018-10-27 18:11:58 -04:00
import re
2018-10-31 23:38:17 -04:00
import json
2018-10-30 22:41:44 -04:00
import time
2018-10-31 23:38:17 -04:00
import math
2018-10-27 18:11:58 -04:00
2021-07-01 17:33:13 -04:00
from concurrent . futures import ThreadPoolExecutor
2023-11-03 01:45:49 -04:00
from distutils . util import strtobool
2023-11-09 14:05:15 -05:00
from json import loads as jloads
from re import match , search
from uuid import uuid4
from os import path
2021-07-01 17:33:13 -04:00
2020-08-11 21:46:12 -04:00
import daemon_lib . vm as vm
2020-02-08 18:48:59 -05:00
import daemon_lib . common as common
2018-10-27 18:11:58 -04:00
2023-11-09 14:05:15 -05:00
from daemon_lib . celery import start , log_info , log_warn , update , fail , finish
2020-11-07 14:45:24 -05:00
2018-10-27 18:11:58 -04:00
#
# Supplemental functions
#
2023-09-12 16:41:02 -04:00
2018-10-31 23:38:17 -04:00
# Verify OSD is valid in cluster
2021-05-29 20:29:51 -04:00
def verifyOSD ( zkhandler , osd_id ) :
2021-11-06 03:02:43 -04:00
return zkhandler . exists ( ( " osd " , osd_id ) )
2018-10-31 23:38:17 -04:00
2020-11-07 14:45:24 -05:00
2018-11-01 19:55:13 -04:00
# Verify Pool is valid in cluster
2021-05-29 20:29:51 -04:00
def verifyPool ( zkhandler , name ) :
2021-11-06 03:02:43 -04:00
return zkhandler . exists ( ( " pool " , name ) )
2018-11-01 19:55:13 -04:00
2020-11-07 14:45:24 -05:00
2019-06-19 00:12:44 -04:00
# Verify Volume is valid in cluster
2021-05-29 20:29:51 -04:00
def verifyVolume ( zkhandler , pool , name ) :
2021-11-06 03:02:43 -04:00
return zkhandler . exists ( ( " volume " , f " { pool } / { name } " ) )
2019-06-19 00:12:44 -04:00
2020-11-07 14:45:24 -05:00
2019-06-19 00:12:44 -04:00
# Verify Snapshot is valid in cluster
2021-05-29 20:29:51 -04:00
def verifySnapshot ( zkhandler , pool , volume , name ) :
2021-11-06 03:02:43 -04:00
return zkhandler . exists ( ( " snapshot " , f " { pool } / { volume } / { name } " ) )
2019-06-19 00:12:44 -04:00
2020-11-07 14:45:24 -05:00
2018-10-31 23:38:17 -04:00
# Verify OSD path is valid in cluster
2021-05-29 20:29:51 -04:00
def verifyOSDBlock ( zkhandler , node , device ) :
2021-11-06 03:02:43 -04:00
for osd in zkhandler . children ( " base.osd " ) :
osd_node = zkhandler . read ( ( " osd.node " , osd ) )
osd_device = zkhandler . read ( ( " osd.device " , osd ) )
2018-10-31 23:38:17 -04:00
if node == osd_node and device == osd_device :
return osd
return None
2020-11-07 13:17:49 -05:00
2020-11-07 14:45:24 -05:00
# Matrix of human-to-byte values
2019-07-08 22:03:34 -04:00
byte_unit_matrix = {
2021-11-06 03:02:43 -04:00
" B " : 1 ,
" K " : 1024 ,
" M " : 1024 * 1024 ,
" G " : 1024 * 1024 * 1024 ,
" T " : 1024 * 1024 * 1024 * 1024 ,
" P " : 1024 * 1024 * 1024 * 1024 * 1024 ,
2023-04-28 12:01:11 -04:00
" E " : 1024 * 1024 * 1024 * 1024 * 1024 * 1024 ,
" Z " : 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 ,
" Y " : 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 ,
" R " : 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 ,
" Q " : 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 * 1024 ,
2019-06-21 09:22:24 -04:00
}
2020-11-07 14:45:24 -05:00
# Matrix of human-to-metric values
ops_unit_matrix = {
2021-11-06 03:02:43 -04:00
" " : 1 ,
" K " : 1000 ,
" M " : 1000 * 1000 ,
" G " : 1000 * 1000 * 1000 ,
" T " : 1000 * 1000 * 1000 * 1000 ,
" P " : 1000 * 1000 * 1000 * 1000 * 1000 ,
2023-04-28 12:01:11 -04:00
" E " : 1000 * 1000 * 1000 * 1000 * 1000 * 1000 ,
" Z " : 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 ,
" Y " : 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 ,
" R " : 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 ,
" Q " : 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 ,
2020-11-07 14:45:24 -05:00
}
# Format byte sizes to/from human-readable units
2019-06-21 09:22:24 -04:00
def format_bytes_tohuman ( databytes ) :
2021-11-06 03:02:43 -04:00
datahuman = " "
2019-07-08 22:03:34 -04:00
for unit in sorted ( byte_unit_matrix , key = byte_unit_matrix . get , reverse = True ) :
new_bytes = int ( math . ceil ( databytes / byte_unit_matrix [ unit ] ) )
2019-06-21 09:22:24 -04:00
# Round up if 5 or more digits
if new_bytes > 9999 :
# We can jump down another level
continue
else :
# We're at the end, display with this size
2021-11-06 03:02:43 -04:00
datahuman = " {} {} " . format ( new_bytes , unit )
2019-06-21 09:22:24 -04:00
return datahuman
2020-11-07 14:45:24 -05:00
2019-06-21 09:22:24 -04:00
def format_bytes_fromhuman ( datahuman ) :
2023-04-28 12:01:11 -04:00
if not re . search ( r " [A-Za-z]+ " , datahuman ) :
2021-11-06 03:02:43 -04:00
dataunit = " B "
2023-12-26 12:43:31 -05:00
datasize = float ( datahuman )
2023-04-28 12:01:11 -04:00
else :
2023-12-26 12:43:31 -05:00
dataunit = str ( re . match ( r " [0-9 \ .]+([A-Za-z])[iBb]* " , datahuman ) . group ( 1 ) )
datasize = float ( re . match ( r " ([0-9 \ .]+)[A-Za-z]+ " , datahuman ) . group ( 1 ) )
2023-04-28 12:01:11 -04:00
2023-12-26 12:43:31 -05:00
if byte_unit_matrix . get ( dataunit . upper ( ) ) :
databytes = int ( datasize * byte_unit_matrix [ dataunit . upper ( ) ] )
2023-04-28 12:01:11 -04:00
return databytes
else :
return None
2018-11-01 23:03:27 -04:00
2020-11-07 13:17:49 -05:00
2019-07-08 22:03:34 -04:00
# Format ops sizes to/from human-readable units
def format_ops_tohuman ( dataops ) :
2021-11-06 03:02:43 -04:00
datahuman = " "
2019-07-08 22:03:34 -04:00
for unit in sorted ( ops_unit_matrix , key = ops_unit_matrix . get , reverse = True ) :
new_ops = int ( math . ceil ( dataops / ops_unit_matrix [ unit ] ) )
# Round up if 5 or more digits
if new_ops > 9999 :
# We can jump down another level
continue
else :
# We're at the end, display with this size
2021-11-06 03:02:43 -04:00
datahuman = " {} {} " . format ( new_ops , unit )
2019-07-08 22:03:34 -04:00
return datahuman
2020-11-07 14:45:24 -05:00
2019-07-08 22:03:34 -04:00
def format_ops_fromhuman ( datahuman ) :
# Trim off human-readable character
dataunit = datahuman [ - 1 ]
datasize = int ( datahuman [ : - 1 ] )
2023-12-26 12:43:31 -05:00
dataops = datasize * ops_unit_matrix [ dataunit . upper ( ) ]
2021-11-06 03:02:43 -04:00
return " {} " . format ( dataops )
2019-07-08 22:03:34 -04:00
2020-11-07 14:45:24 -05:00
2019-12-06 00:06:21 -05:00
def format_pct_tohuman ( datapct ) :
datahuman = " {0:.1f} " . format ( float ( datapct * 100.0 ) )
return datahuman
2020-11-07 14:45:24 -05:00
2018-10-30 09:17:32 -04:00
#
2019-07-04 23:09:16 -04:00
# Status functions
2018-10-30 09:17:32 -04:00
#
2021-05-29 20:29:51 -04:00
def get_status ( zkhandler ) :
2021-11-06 03:02:43 -04:00
primary_node = zkhandler . read ( " base.config.primary_node " )
ceph_status = zkhandler . read ( " base.storage " ) . rstrip ( )
2019-07-05 00:29:47 -04:00
# Create a data structure for the information
status_data = {
2021-11-06 03:02:43 -04:00
" type " : " status " ,
" primary_node " : primary_node ,
" ceph_data " : ceph_status ,
2019-07-08 10:56:33 -04:00
}
return True , status_data
2020-11-07 14:45:24 -05:00
2023-02-15 15:16:02 -05:00
def get_health ( zkhandler ) :
primary_node = zkhandler . read ( " base.config.primary_node " )
ceph_health = zkhandler . read ( " base.storage.health " ) . rstrip ( )
# Create a data structure for the information
status_data = {
" type " : " health " ,
" primary_node " : primary_node ,
" ceph_data " : ceph_health ,
}
return True , status_data
2021-05-29 20:29:51 -04:00
def get_util ( zkhandler ) :
2021-11-06 03:02:43 -04:00
primary_node = zkhandler . read ( " base.config.primary_node " )
ceph_df = zkhandler . read ( " base.storage.util " ) . rstrip ( )
2019-07-08 10:56:33 -04:00
# Create a data structure for the information
status_data = {
2021-11-06 03:02:43 -04:00
" type " : " utilization " ,
" primary_node " : primary_node ,
" ceph_data " : ceph_df ,
2019-07-05 00:29:47 -04:00
}
2019-07-05 00:44:40 -04:00
return True , status_data
2019-07-26 15:10:47 -04:00
2019-07-04 23:09:16 -04:00
#
# OSD functions
#
2021-05-29 20:29:51 -04:00
def getClusterOSDList ( zkhandler ) :
2018-10-30 09:17:32 -04:00
# Get a list of VNIs by listing the children of /networks
2021-11-06 03:02:43 -04:00
return zkhandler . children ( " base.osd " )
2018-10-30 09:17:32 -04:00
2020-11-07 14:45:24 -05:00
2021-05-29 20:29:51 -04:00
def getOSDInformation ( zkhandler , osd_id ) :
2023-12-10 22:24:38 -05:00
(
osd_fsid ,
osd_node ,
osd_device ,
_osd_is_split ,
osd_db_device ,
osd_stats_raw ,
) = zkhandler . read_many (
[
( " osd.ofsid " , osd_id ) ,
( " osd.node " , osd_id ) ,
( " osd.device " , osd_id ) ,
( " osd.is_split " , osd_id ) ,
( " osd.db_device " , osd_id ) ,
( " osd.stats " , osd_id ) ,
]
)
osd_is_split = bool ( strtobool ( _osd_is_split ) )
2018-10-30 09:17:32 -04:00
# Parse the stats data
2018-10-31 23:38:17 -04:00
osd_stats = dict ( json . loads ( osd_stats_raw ) )
2018-10-30 09:17:32 -04:00
2019-07-05 00:29:47 -04:00
osd_information = {
2021-11-06 03:02:43 -04:00
" id " : osd_id ,
2023-11-03 16:37:55 -04:00
" fsid " : osd_fsid ,
2022-04-29 13:26:36 -04:00
" node " : osd_node ,
2021-11-06 03:02:43 -04:00
" device " : osd_device ,
2023-11-01 21:17:38 -04:00
" is_split " : osd_is_split ,
2021-11-06 03:02:43 -04:00
" db_device " : osd_db_device ,
" stats " : osd_stats ,
2019-07-05 00:29:47 -04:00
}
return osd_information
2020-11-07 14:45:24 -05:00
2021-05-29 20:29:51 -04:00
def in_osd ( zkhandler , osd_id ) :
if not verifyOSD ( zkhandler , osd_id ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No OSD with ID " {} " is present in the cluster. ' . format (
osd_id
)
2019-07-05 00:29:47 -04:00
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command ( " ceph osd in {} " . format ( osd_id ) )
2020-02-08 23:35:30 -05:00
if retcode :
2021-11-06 03:02:43 -04:00
return False , " ERROR: Failed to enable OSD {} : {} " . format ( osd_id , stderr )
2019-07-05 00:29:47 -04:00
2021-11-06 03:02:43 -04:00
return True , " Set OSD {} online. " . format ( osd_id )
2019-07-05 00:29:47 -04:00
2020-11-07 14:45:24 -05:00
2021-05-29 20:29:51 -04:00
def out_osd ( zkhandler , osd_id ) :
if not verifyOSD ( zkhandler , osd_id ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No OSD with ID " {} " is present in the cluster. ' . format (
osd_id
)
2019-07-05 00:29:47 -04:00
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command ( " ceph osd out {} " . format ( osd_id ) )
2020-02-08 23:35:30 -05:00
if retcode :
2021-11-06 03:02:43 -04:00
return False , " ERROR: Failed to disable OSD {} : {} " . format ( osd_id , stderr )
2019-07-05 00:29:47 -04:00
2021-11-06 03:02:43 -04:00
return True , " Set OSD {} offline. " . format ( osd_id )
2019-07-05 00:29:47 -04:00
2020-11-07 14:45:24 -05:00
2021-05-29 20:29:51 -04:00
def set_osd ( zkhandler , option ) :
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command ( " ceph osd set {} " . format ( option ) )
2020-02-08 23:35:30 -05:00
if retcode :
return False , ' ERROR: Failed to set property " {} " : {} ' . format ( option , stderr )
2019-07-05 00:29:47 -04:00
2020-02-08 23:35:30 -05:00
return True , ' Set OSD property " {} " . ' . format ( option )
2019-07-05 00:29:47 -04:00
2020-11-07 14:45:24 -05:00
2021-05-29 20:29:51 -04:00
def unset_osd ( zkhandler , option ) :
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command ( " ceph osd unset {} " . format ( option ) )
2020-02-08 23:35:30 -05:00
if retcode :
return False , ' ERROR: Failed to unset property " {} " : {} ' . format ( option , stderr )
2019-07-05 00:29:47 -04:00
2020-02-08 23:35:30 -05:00
return True , ' Unset OSD property " {} " . ' . format ( option )
2019-07-05 00:29:47 -04:00
2020-11-07 14:45:24 -05:00
2023-12-09 13:43:58 -05:00
def get_list_osd ( zkhandler , limit = None , is_fuzzy = True ) :
2019-07-05 00:29:47 -04:00
osd_list = [ ]
2021-11-06 03:02:43 -04:00
full_osd_list = zkhandler . children ( " base.osd " )
2019-07-05 00:29:47 -04:00
2019-07-26 15:03:48 -04:00
if is_fuzzy and limit :
# Implicitly assume fuzzy limits
2021-11-06 03:02:43 -04:00
if not re . match ( r " \ ^.* " , limit ) :
limit = " .* " + limit
if not re . match ( r " .* \ $ " , limit ) :
limit = limit + " .* "
2019-07-26 15:03:48 -04:00
2019-07-05 00:29:47 -04:00
for osd in full_osd_list :
if limit :
try :
2021-12-06 16:35:29 -05:00
if re . fullmatch ( limit , osd ) :
2021-05-29 20:29:51 -04:00
osd_list . append ( getOSDInformation ( zkhandler , osd ) )
2019-07-05 00:29:47 -04:00
except Exception as e :
2021-11-06 03:02:43 -04:00
return False , " Regex Error: {} " . format ( e )
2019-07-05 00:29:47 -04:00
else :
2021-05-29 20:29:51 -04:00
osd_list . append ( getOSDInformation ( zkhandler , osd ) )
2019-07-05 00:29:47 -04:00
2021-11-06 03:02:43 -04:00
return True , sorted ( osd_list , key = lambda x : int ( x [ " id " ] ) )
2019-07-05 00:29:47 -04:00
2018-10-31 23:38:17 -04:00
2019-07-05 00:29:47 -04:00
#
# Pool functions
#
2021-05-29 20:29:51 -04:00
def getPoolInformation ( zkhandler , pool ) :
2019-07-05 00:29:47 -04:00
# Parse the stats data
2024-04-19 10:26:06 -04:00
(
pool_stats_raw ,
tier ,
pgs ,
) = zkhandler . read_many (
2023-12-11 10:25:36 -05:00
[
( " pool.stats " , pool ) ,
( " pool.tier " , pool ) ,
( " pool.pgs " , pool ) ,
]
)
2019-07-05 00:29:47 -04:00
pool_stats = dict ( json . loads ( pool_stats_raw ) )
2021-05-29 20:29:51 -04:00
volume_count = len ( getCephVolumes ( zkhandler , pool ) )
2021-12-28 20:39:50 -05:00
if tier is None :
tier = " default "
pool_information = {
" name " : pool ,
" volume_count " : volume_count ,
" tier " : tier ,
2021-12-28 21:08:04 -05:00
" pgs " : pgs ,
2021-12-28 20:39:50 -05:00
" stats " : pool_stats ,
}
2019-07-05 00:29:47 -04:00
return pool_information
2018-10-31 23:38:17 -04:00
2020-11-07 14:45:24 -05:00
2021-12-28 20:39:50 -05:00
def add_pool ( zkhandler , name , pgs , replcfg , tier = None ) :
2020-02-08 23:35:30 -05:00
# Prepare the copies/mincopies variables
try :
2021-11-06 03:02:43 -04:00
copies , mincopies = replcfg . split ( " , " )
copies = int ( copies . replace ( " copies= " , " " ) )
mincopies = int ( mincopies . replace ( " mincopies= " , " " ) )
2020-11-06 19:24:10 -05:00
except Exception :
2020-02-08 23:35:30 -05:00
copies = None
mincopies = None
if not copies or not mincopies :
2021-12-28 20:39:50 -05:00
return False , f ' ERROR: Replication configuration " { replcfg } " is not valid. '
# Prepare the tiers if applicable
if tier is not None and tier in [ " hdd " , " ssd " , " nvme " ] :
crush_rule = f " { tier } _tier "
# Create a CRUSH rule for the relevant tier
retcode , stdout , stderr = common . run_os_command (
f " ceph osd crush rule create-replicated { crush_rule } default host { tier } "
2021-11-06 03:02:43 -04:00
)
2021-12-28 20:39:50 -05:00
if retcode :
return (
False ,
f " ERROR: Failed to create CRUSH rule { tier } for pool { name } : { stderr } " ,
)
else :
tier = " default "
crush_rule = " replicated "
# Create the pool
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
2021-12-28 20:39:50 -05:00
f " ceph osd pool create { name } { pgs } { pgs } { crush_rule } "
2021-11-06 03:02:43 -04:00
)
2020-02-08 23:35:30 -05:00
if retcode :
2021-12-28 20:39:50 -05:00
return False , f ' ERROR: Failed to create pool " { name } " with { pgs } PGs: { stderr } '
2020-11-06 19:05:48 -05:00
2021-12-28 20:39:50 -05:00
# Set the size and minsize
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
2021-12-28 20:39:50 -05:00
f " ceph osd pool set { name } size { copies } "
2021-11-06 03:02:43 -04:00
)
2020-02-08 23:35:30 -05:00
if retcode :
2021-12-28 20:39:50 -05:00
return False , f ' ERROR: Failed to set pool " { name } " size of { copies } : { stderr } '
2020-02-08 23:35:30 -05:00
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
2021-12-28 20:39:50 -05:00
f " ceph osd pool set { name } min_size { mincopies } "
2021-11-06 03:02:43 -04:00
)
2020-02-08 23:35:30 -05:00
if retcode :
2021-12-28 20:39:50 -05:00
return (
False ,
f ' ERROR: Failed to set pool " { name } " minimum size of { mincopies } : { stderr } ' ,
2021-11-06 03:02:43 -04:00
)
2020-02-08 23:35:30 -05:00
2021-12-28 20:39:50 -05:00
# Enable RBD application
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
2021-12-28 20:39:50 -05:00
f " ceph osd pool application enable { name } rbd "
2021-11-06 03:02:43 -04:00
)
2020-02-08 23:35:30 -05:00
if retcode :
2021-11-06 03:02:43 -04:00
return (
False ,
2021-12-28 20:39:50 -05:00
f ' ERROR: Failed to enable RBD application on pool " { name } " : { stderr } ' ,
2021-11-06 03:02:43 -04:00
)
2020-02-08 23:35:30 -05:00
2021-12-28 20:39:50 -05:00
# Add the new pool to Zookeeper
2021-11-06 03:02:43 -04:00
zkhandler . write (
[
( ( " pool " , name ) , " " ) ,
( ( " pool.pgs " , name ) , pgs ) ,
2021-12-28 20:39:50 -05:00
( ( " pool.tier " , name ) , tier ) ,
2021-11-06 03:02:43 -04:00
( ( " pool.stats " , name ) , " {} " ) ,
( ( " volume " , name ) , " " ) ,
( ( " snapshot " , name ) , " " ) ,
]
)
2020-02-08 23:35:30 -05:00
2021-12-28 20:39:50 -05:00
return True , f ' Created RBD pool " { name } " with { pgs } PGs '
2019-07-04 23:09:16 -04:00
2020-11-07 14:45:24 -05:00
2021-05-29 20:29:51 -04:00
def remove_pool ( zkhandler , name ) :
if not verifyPool ( zkhandler , name ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No pool with name " {} " is present in the cluster. ' . format (
name
)
2019-07-04 23:09:16 -04:00
2020-02-08 23:35:30 -05:00
# 1. Remove pool volumes
2021-11-06 03:02:43 -04:00
for volume in zkhandler . children ( ( " volume " , name ) ) :
2021-05-29 20:29:51 -04:00
remove_volume ( zkhandler , name , volume )
2019-07-04 23:09:16 -04:00
2020-02-08 23:35:30 -05:00
# 2. Remove the pool
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
" ceph osd pool rm {pool} {pool} --yes-i-really-really-mean-it " . format ( pool = name )
)
2020-02-08 23:35:30 -05:00
if retcode :
return False , ' ERROR: Failed to remove pool " {} " : {} ' . format ( name , stderr )
2019-07-04 23:09:16 -04:00
2020-02-08 23:35:30 -05:00
# 3. Delete pool from Zookeeper
2021-11-06 03:02:43 -04:00
zkhandler . delete (
[
( " pool " , name ) ,
( " volume " , name ) ,
( " snapshot " , name ) ,
]
)
2020-02-08 23:35:30 -05:00
return True , ' Removed RBD pool " {} " and all volumes. ' . format ( name )
2019-07-04 23:09:16 -04:00
2020-11-07 14:45:24 -05:00
2021-12-28 21:41:41 -05:00
def set_pgs_pool ( zkhandler , name , pgs ) :
if not verifyPool ( zkhandler , name ) :
return False , f ' ERROR: No pool with name " { name } " is present in the cluster. '
# Validate new PGs count
pgs = int ( pgs )
if ( pgs == 0 ) or ( pgs & ( pgs - 1 ) != 0 ) :
return (
False ,
f ' ERROR: Invalid PGs number " { pgs } " : must be a non-zero power of 2. ' ,
)
# Set the new pgs number
retcode , stdout , stderr = common . run_os_command (
f " ceph osd pool set { name } pg_num { pgs } "
)
if retcode :
return False , f " ERROR: Failed to set pg_num on pool { name } to { pgs } : { stderr } "
# Set the new pgps number if increasing
current_pgs = int ( zkhandler . read ( ( " pool.pgs " , name ) ) )
if current_pgs > = pgs :
retcode , stdout , stderr = common . run_os_command (
f " ceph osd pool set { name } pgp_num { pgs } "
)
if retcode :
return (
False ,
f " ERROR: Failed to set pg_num on pool { name } to { pgs } : { stderr } " ,
)
# Update Zookeeper count
zkhandler . write (
[
( ( " pool.pgs " , name ) , pgs ) ,
]
)
return True , f ' Set PGs count to { pgs } for RBD pool " { name } " . '
2023-12-09 13:43:58 -05:00
def get_list_pool ( zkhandler , limit = None , is_fuzzy = True ) :
2021-11-06 03:02:43 -04:00
full_pool_list = zkhandler . children ( " base.pool " )
2019-07-04 23:09:16 -04:00
2021-12-06 16:35:29 -05:00
if is_fuzzy and limit :
# Implicitly assume fuzzy limits
if not re . match ( r " \ ^.* " , limit ) :
limit = " .* " + limit
if not re . match ( r " .* \ $ " , limit ) :
limit = limit + " .* "
2020-11-06 19:05:48 -05:00
2021-07-01 17:33:13 -04:00
get_pool_info = dict ( )
2019-07-05 00:29:47 -04:00
for pool in full_pool_list :
2021-07-01 17:33:13 -04:00
is_limit_match = False
2019-07-05 00:29:47 -04:00
if limit :
try :
2021-12-06 16:35:29 -05:00
if re . fullmatch ( limit , pool ) :
2021-07-01 17:33:13 -04:00
is_limit_match = True
2019-07-05 00:29:47 -04:00
except Exception as e :
2021-11-06 03:02:43 -04:00
return False , " Regex Error: {} " . format ( e )
2019-07-05 00:29:47 -04:00
else :
2021-07-01 17:33:13 -04:00
is_limit_match = True
get_pool_info [ pool ] = True if is_limit_match else False
pool_execute_list = [ pool for pool in full_pool_list if get_pool_info [ pool ] ]
pool_data_list = list ( )
2021-11-06 03:02:43 -04:00
with ThreadPoolExecutor ( max_workers = 32 , thread_name_prefix = " pool_list " ) as executor :
2021-07-01 17:33:13 -04:00
futures = [ ]
for pool in pool_execute_list :
futures . append ( executor . submit ( getPoolInformation , zkhandler , pool ) )
for future in futures :
pool_data_list . append ( future . result ( ) )
2019-07-04 23:09:16 -04:00
2021-12-28 21:03:10 -05:00
return True , sorted ( pool_data_list , key = lambda x : int ( x [ " stats " ] . get ( " id " , 0 ) ) )
2019-07-04 23:09:16 -04:00
2018-10-31 23:38:17 -04:00
2019-07-05 00:29:47 -04:00
#
# Volume functions
#
2021-05-29 20:29:51 -04:00
def getCephVolumes ( zkhandler , pool ) :
2019-07-05 00:29:47 -04:00
volume_list = list ( )
2019-07-05 13:57:15 -04:00
if not pool :
2021-11-06 03:02:43 -04:00
pool_list = zkhandler . children ( " base.pool " )
2019-07-05 00:29:47 -04:00
else :
2020-11-07 13:02:54 -05:00
pool_list = [ pool ]
2018-10-31 23:38:17 -04:00
2019-07-05 00:29:47 -04:00
for pool_name in pool_list :
2021-11-06 03:02:43 -04:00
for volume_name in zkhandler . children ( ( " volume " , pool_name ) ) :
volume_list . append ( " {} / {} " . format ( pool_name , volume_name ) )
2019-06-21 09:05:00 -04:00
2019-07-05 00:29:47 -04:00
return volume_list
2018-10-28 22:15:25 -04:00
2020-11-07 14:45:24 -05:00
2021-05-29 20:29:51 -04:00
def getVolumeInformation ( zkhandler , pool , volume ) :
2019-07-05 00:29:47 -04:00
# Parse the stats data
2021-11-06 03:02:43 -04:00
volume_stats_raw = zkhandler . read ( ( " volume.stats " , f " { pool } / { volume } " ) )
2019-07-05 00:29:47 -04:00
volume_stats = dict ( json . loads ( volume_stats_raw ) )
# Format the size to something nicer
2021-11-06 03:02:43 -04:00
volume_stats [ " size " ] = format_bytes_tohuman ( volume_stats [ " size " ] )
2018-10-30 22:41:44 -04:00
2021-11-06 03:02:43 -04:00
volume_information = { " name " : volume , " pool " : pool , " stats " : volume_stats }
2019-07-05 00:29:47 -04:00
return volume_information
2019-07-04 23:09:16 -04:00
2020-11-07 14:45:24 -05:00
2024-02-02 10:13:46 -05:00
def add_volume ( zkhandler , pool , name , size , force_flag = False ) :
2021-02-17 11:27:26 -05:00
# 1. Verify the size of the volume
2021-05-29 20:29:51 -04:00
pool_information = getPoolInformation ( zkhandler , pool )
2021-02-17 11:27:26 -05:00
size_bytes = format_bytes_fromhuman ( size )
2023-04-28 12:01:11 -04:00
if size_bytes is None :
return (
False ,
f " ERROR: Requested volume size ' { size } ' does not have a valid SI unit " ,
)
2024-02-02 10:13:46 -05:00
pool_total_free_bytes = int ( pool_information [ " stats " ] [ " free_bytes " ] )
if size_bytes > = pool_total_free_bytes :
2021-11-06 03:02:43 -04:00
return (
False ,
2023-04-28 12:01:11 -04:00
f " ERROR: Requested volume size ' { format_bytes_tohuman ( size_bytes ) } ' is greater than the available free space in the pool ( ' { format_bytes_tohuman ( pool_information [ ' stats ' ] [ ' free_bytes ' ] ) } ' ) " ,
2021-11-06 03:02:43 -04:00
)
2021-07-29 15:30:00 -04:00
2024-02-02 10:13:46 -05:00
# Check if we're greater than 80% utilization after the create; error if so unless we have the force flag
pool_total_bytes = (
int ( pool_information [ " stats " ] [ " used_bytes " ] ) + pool_total_free_bytes
)
pool_safe_total_bytes = int ( pool_total_bytes * 0.80 )
pool_safe_free_bytes = pool_safe_total_bytes - int (
pool_information [ " stats " ] [ " used_bytes " ]
)
if size_bytes > = pool_safe_free_bytes and not force_flag :
return (
False ,
f " ERROR: Requested volume size ' { format_bytes_tohuman ( size_bytes ) } ' is greater than the safe free space in the pool ( ' { format_bytes_tohuman ( pool_safe_free_bytes ) } ' for 80% full); retry with force to ignore this error " ,
)
2021-02-17 11:27:26 -05:00
# 2. Create the volume
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
2023-09-30 12:37:43 -04:00
" rbd create --size {} B {} / {} " . format ( size_bytes , pool , name )
2021-11-06 03:02:43 -04:00
)
2020-02-08 23:35:30 -05:00
if retcode :
return False , ' ERROR: Failed to create RBD volume " {} " : {} ' . format ( name , stderr )
# 2. Get volume stats
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
" rbd info --format json {} / {} " . format ( pool , name )
)
2020-02-08 23:35:30 -05:00
volstats = stdout
# 3. Add the new volume to Zookeeper
2021-11-06 03:02:43 -04:00
zkhandler . write (
[
( ( " volume " , f " { pool } / { name } " ) , " " ) ,
( ( " volume.stats " , f " { pool } / { name } " ) , volstats ) ,
( ( " snapshot " , f " { pool } / { name } " ) , " " ) ,
]
)
2020-02-08 23:35:30 -05:00
2023-04-28 12:01:11 -04:00
return True , ' Created RBD volume " {} " of size " {} " in pool " {} " . ' . format (
name , format_bytes_tohuman ( size_bytes ) , pool
)
2020-02-08 23:35:30 -05:00
2020-11-07 14:45:24 -05:00
2024-02-02 10:29:34 -05:00
def clone_volume ( zkhandler , pool , name_src , name_new , force_flag = False ) :
# 1. Verify the volume
2021-05-29 20:29:51 -04:00
if not verifyVolume ( zkhandler , pool , name_src ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No volume with name " {} " is present in pool " {} " . ' . format (
name_src , pool
)
2020-02-08 23:35:30 -05:00
2024-02-02 10:29:34 -05:00
volume_stats_raw = zkhandler . read ( ( " volume.stats " , f " { pool } / { name_src } " ) )
volume_stats = dict ( json . loads ( volume_stats_raw ) )
size_bytes = volume_stats [ " size " ]
pool_information = getPoolInformation ( zkhandler , pool )
pool_total_free_bytes = int ( pool_information [ " stats " ] [ " free_bytes " ] )
if size_bytes > = pool_total_free_bytes :
return (
False ,
f " ERROR: Clone volume size ' { format_bytes_tohuman ( size_bytes ) } ' is greater than the available free space in the pool ( ' { format_bytes_tohuman ( pool_information [ ' stats ' ] [ ' free_bytes ' ] ) } ' ) " ,
)
# Check if we're greater than 80% utilization after the create; error if so unless we have the force flag
pool_total_bytes = (
int ( pool_information [ " stats " ] [ " used_bytes " ] ) + pool_total_free_bytes
)
pool_safe_total_bytes = int ( pool_total_bytes * 0.80 )
pool_safe_free_bytes = pool_safe_total_bytes - int (
pool_information [ " stats " ] [ " used_bytes " ]
)
if size_bytes > = pool_safe_free_bytes and not force_flag :
return (
False ,
f " ERROR: Clone volume size ' { format_bytes_tohuman ( size_bytes ) } ' is greater than the safe free space in the pool ( ' { format_bytes_tohuman ( pool_safe_free_bytes ) } ' for 80% full); retry with force to ignore this error " ,
)
# 2. Clone the volume
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
" rbd copy {} / {} {} / {} " . format ( pool , name_src , pool , name_new )
)
2020-02-08 23:35:30 -05:00
if retcode :
2021-11-06 03:02:43 -04:00
return (
False ,
' ERROR: Failed to clone RBD volume " {} " to " {} " in pool " {} " : {} ' . format (
name_src , name_new , pool , stderr
) ,
)
2020-02-08 23:35:30 -05:00
2024-02-02 10:29:34 -05:00
# 3. Get volume stats
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
" rbd info --format json {} / {} " . format ( pool , name_new )
)
2020-02-08 23:35:30 -05:00
volstats = stdout
2024-02-02 10:29:34 -05:00
# 4. Add the new volume to Zookeeper
2021-11-06 03:02:43 -04:00
zkhandler . write (
[
( ( " volume " , f " { pool } / { name_new } " ) , " " ) ,
( ( " volume.stats " , f " { pool } / { name_new } " ) , volstats ) ,
( ( " snapshot " , f " { pool } / { name_new } " ) , " " ) ,
]
)
2020-02-08 23:35:30 -05:00
2021-11-06 03:02:43 -04:00
return True , ' Cloned RBD volume " {} " to " {} " in pool " {} " ' . format (
name_src , name_new , pool
)
2019-07-26 14:24:22 -04:00
2020-11-07 14:45:24 -05:00
2024-02-02 10:13:46 -05:00
def resize_volume ( zkhandler , pool , name , size , force_flag = False ) :
2021-05-29 20:29:51 -04:00
if not verifyVolume ( zkhandler , pool , name ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No volume with name " {} " is present in pool " {} " . ' . format (
name , pool
)
2019-07-26 14:24:22 -04:00
2021-09-12 19:54:51 -04:00
# 1. Verify the size of the volume
pool_information = getPoolInformation ( zkhandler , pool )
size_bytes = format_bytes_fromhuman ( size )
2023-04-28 12:01:11 -04:00
if size_bytes is None :
return (
False ,
f " ERROR: Requested volume size ' { size } ' does not have a valid SI unit " ,
)
2024-02-02 10:13:46 -05:00
pool_total_free_bytes = int ( pool_information [ " stats " ] [ " free_bytes " ] )
if size_bytes > = pool_total_free_bytes :
2021-11-06 03:02:43 -04:00
return (
False ,
2023-04-28 12:01:11 -04:00
f " ERROR: Requested volume size ' { format_bytes_tohuman ( size_bytes ) } ' is greater than the available free space in the pool ( ' { format_bytes_tohuman ( pool_information [ ' stats ' ] [ ' free_bytes ' ] ) } ' ) " ,
2021-11-06 03:02:43 -04:00
)
2021-09-12 19:54:51 -04:00
2024-02-02 10:13:46 -05:00
# Check if we're greater than 80% utilization after the create; error if so unless we have the force flag
pool_total_bytes = (
int ( pool_information [ " stats " ] [ " used_bytes " ] ) + pool_total_free_bytes
)
pool_safe_total_bytes = int ( pool_total_bytes * 0.80 )
pool_safe_free_bytes = pool_safe_total_bytes - int (
pool_information [ " stats " ] [ " used_bytes " ]
)
if size_bytes > = pool_safe_free_bytes and not force_flag :
return (
False ,
f " ERROR: Requested volume size ' { format_bytes_tohuman ( size_bytes ) } ' is greater than the safe free space in the pool ( ' { format_bytes_tohuman ( pool_safe_free_bytes ) } ' for 80% full); retry with force to ignore this error " ,
)
2021-09-12 19:54:51 -04:00
# 2. Resize the volume
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
2023-04-28 12:01:11 -04:00
" rbd resize --size {} {} / {} " . format (
format_bytes_tohuman ( size_bytes ) , pool , name
)
2021-11-06 03:02:43 -04:00
)
2020-02-08 23:35:30 -05:00
if retcode :
2021-11-06 03:02:43 -04:00
return (
False ,
' ERROR: Failed to resize RBD volume " {} " to size " {} " in pool " {} " : {} ' . format (
2023-04-28 12:01:11 -04:00
name , format_bytes_tohuman ( size_bytes ) , pool , stderr
2021-11-06 03:02:43 -04:00
) ,
)
2019-07-26 14:24:22 -04:00
2021-09-12 19:54:51 -04:00
# 3a. Determine the node running this VM if applicable
2020-08-11 21:46:12 -04:00
active_node = None
2021-11-06 03:02:43 -04:00
volume_vm_name = name . split ( " _ " ) [ 0 ]
2021-05-29 20:29:51 -04:00
retcode , vm_info = vm . get_info ( zkhandler , volume_vm_name )
2020-08-11 21:46:12 -04:00
if retcode :
2021-11-06 03:02:43 -04:00
for disk in vm_info [ " disks " ] :
2020-08-11 21:46:12 -04:00
# This block device is present in this VM so we can continue
2021-11-06 03:02:43 -04:00
if disk [ " name " ] == " {} / {} " . format ( pool , name ) :
active_node = vm_info [ " node " ]
volume_id = disk [ " dev " ]
2021-09-12 19:54:51 -04:00
# 3b. Perform a live resize in libvirt if the VM is running
2021-11-06 03:02:43 -04:00
if active_node is not None and vm_info . get ( " state " , " " ) == " start " :
2020-08-11 21:46:12 -04:00
import libvirt
2021-11-06 03:02:43 -04:00
2020-08-11 21:46:12 -04:00
# Run the libvirt command against the target host
try :
2021-11-06 03:02:43 -04:00
dest_lv = " qemu+tcp:// {} /system " . format ( active_node )
2020-08-11 21:46:12 -04:00
target_lv_conn = libvirt . open ( dest_lv )
2021-11-06 03:02:43 -04:00
target_vm_conn = target_lv_conn . lookupByName ( vm_info [ " name " ] )
2020-08-11 21:46:12 -04:00
if target_vm_conn :
2021-11-06 03:02:43 -04:00
target_vm_conn . blockResize (
volume_id ,
2023-04-28 12:01:11 -04:00
size_bytes ,
2021-11-06 03:02:43 -04:00
libvirt . VIR_DOMAIN_BLOCK_RESIZE_BYTES ,
)
2020-08-11 21:46:12 -04:00
target_lv_conn . close ( )
2020-11-06 19:24:10 -05:00
except Exception :
2020-08-11 21:46:12 -04:00
pass
2021-09-12 19:54:51 -04:00
# 4. Get volume stats
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
" rbd info --format json {} / {} " . format ( pool , name )
)
2020-02-08 23:35:30 -05:00
volstats = stdout
2019-07-26 14:24:22 -04:00
2021-09-12 19:54:51 -04:00
# 5. Update the volume in Zookeeper
2021-11-06 03:02:43 -04:00
zkhandler . write (
[
( ( " volume " , f " { pool } / { name } " ) , " " ) ,
( ( " volume.stats " , f " { pool } / { name } " ) , volstats ) ,
( ( " snapshot " , f " { pool } / { name } " ) , " " ) ,
]
)
2019-07-26 14:24:22 -04:00
2021-11-06 03:02:43 -04:00
return True , ' Resized RBD volume " {} " to size " {} " in pool " {} " . ' . format (
2023-04-28 12:01:11 -04:00
name , format_bytes_tohuman ( size_bytes ) , pool
2021-11-06 03:02:43 -04:00
)
2019-07-26 14:24:22 -04:00
2020-11-07 14:45:24 -05:00
2021-05-29 20:29:51 -04:00
def rename_volume ( zkhandler , pool , name , new_name ) :
if not verifyVolume ( zkhandler , pool , name ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No volume with name " {} " is present in pool " {} " . ' . format (
name , pool
)
2018-10-30 22:41:44 -04:00
2020-02-08 23:35:30 -05:00
# 1. Rename the volume
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
" rbd rename {} / {} {} " . format ( pool , name , new_name )
)
2020-02-08 23:35:30 -05:00
if retcode :
2021-11-06 03:02:43 -04:00
return (
False ,
' ERROR: Failed to rename volume " {} " to " {} " in pool " {} " : {} ' . format (
name , new_name , pool , stderr
) ,
)
2019-06-21 09:05:00 -04:00
2020-02-08 23:35:30 -05:00
# 2. Rename the volume in Zookeeper
2021-11-06 03:02:43 -04:00
zkhandler . rename (
[
( ( " volume " , f " { pool } / { name } " ) , ( " volume " , f " { pool } / { new_name } " ) ) ,
( ( " snapshot " , f " { pool } / { name } " ) , ( " snapshot " , f " { pool } / { new_name } " ) ) ,
]
)
2018-10-30 09:17:32 -04:00
2020-02-08 23:35:30 -05:00
# 3. Get volume stats
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
" rbd info --format json {} / {} " . format ( pool , new_name )
)
2020-02-08 23:35:30 -05:00
volstats = stdout
2019-10-10 14:09:07 -04:00
2020-02-08 23:35:30 -05:00
# 4. Update the volume stats in Zookeeper
2021-11-06 03:02:43 -04:00
zkhandler . write (
[
( ( " volume.stats " , f " { pool } / { new_name } " ) , volstats ) ,
]
)
2019-10-10 14:09:07 -04:00
2021-11-06 03:02:43 -04:00
return True , ' Renamed RBD volume " {} " to " {} " in pool " {} " . ' . format (
name , new_name , pool
)
2019-10-10 14:09:07 -04:00
2020-11-07 14:45:24 -05:00
2021-05-29 20:29:51 -04:00
def remove_volume ( zkhandler , pool , name ) :
if not verifyVolume ( zkhandler , pool , name ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No volume with name " {} " is present in pool " {} " . ' . format (
name , pool
)
2018-11-01 22:00:59 -04:00
2024-04-19 10:31:11 -04:00
# 1a. Remove PVC-managed volume snapshots
2021-11-06 03:02:43 -04:00
for snapshot in zkhandler . children ( ( " snapshot " , f " { pool } / { name } " ) ) :
2021-05-29 20:29:51 -04:00
remove_snapshot ( zkhandler , pool , name , snapshot )
2018-11-01 22:00:59 -04:00
2024-04-19 10:31:11 -04:00
# 1b. Purge any remaining volume snapshots
retcode , stdout , stderr = common . run_os_command (
" rbd snap purge {} / {} " . format ( pool , name )
)
if retcode :
return (
False ,
' ERROR: Failed to purge snapshots from RBD volume " {} " in pool " {} " : {} ' . format (
name , pool , stderr
) ,
)
2020-02-08 23:35:30 -05:00
# 2. Remove the volume
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command ( " rbd rm {} / {} " . format ( pool , name ) )
2020-02-08 23:35:30 -05:00
if retcode :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: Failed to remove RBD volume " {} " in pool " {} " : {} ' . format (
name , pool , stderr
)
2019-06-21 09:05:00 -04:00
2020-02-08 23:35:30 -05:00
# 3. Delete volume from Zookeeper
2021-11-06 03:02:43 -04:00
zkhandler . delete (
[
( " volume " , f " { pool } / { name } " ) ,
( " snapshot " , f " { pool } / { name } " ) ,
]
)
2020-02-08 23:35:30 -05:00
return True , ' Removed RBD volume " {} " in pool " {} " . ' . format ( name , pool )
2018-11-01 22:00:59 -04:00
2020-11-07 14:45:24 -05:00
2021-05-29 20:29:51 -04:00
def map_volume ( zkhandler , pool , name ) :
if not verifyVolume ( zkhandler , pool , name ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No volume with name " {} " is present in pool " {} " . ' . format (
name , pool
)
2020-02-09 13:43:48 -05:00
# 1. Map the volume onto the local system
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command ( " rbd map {} / {} " . format ( pool , name ) )
2020-02-09 13:43:48 -05:00
if retcode :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: Failed to map RBD volume " {} " in pool " {} " : {} ' . format (
name , pool , stderr
)
2020-02-09 13:43:48 -05:00
# 2. Calculate the absolute path to the mapped volume
2021-11-06 03:02:43 -04:00
mapped_volume = " /dev/rbd/ {} / {} " . format ( pool , name )
2020-02-09 13:43:48 -05:00
# 3. Ensure the volume exists
if not os . path . exists ( mapped_volume ) :
2021-11-06 03:02:43 -04:00
return (
False ,
' ERROR: Mapped volume not found at expected location " {} " . ' . format (
mapped_volume
) ,
)
2020-02-09 13:43:48 -05:00
return True , mapped_volume
2020-11-07 14:45:24 -05:00
2021-05-29 20:29:51 -04:00
def unmap_volume ( zkhandler , pool , name ) :
if not verifyVolume ( zkhandler , pool , name ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No volume with name " {} " is present in pool " {} " . ' . format (
name , pool
)
2020-02-09 13:43:48 -05:00
2021-11-06 03:02:43 -04:00
mapped_volume = " /dev/rbd/ {} / {} " . format ( pool , name )
2020-02-09 13:43:48 -05:00
# 1. Ensure the volume exists
if not os . path . exists ( mapped_volume ) :
2021-11-06 03:02:43 -04:00
return (
False ,
' ERROR: Mapped volume not found at expected location " {} " . ' . format (
mapped_volume
) ,
)
2020-02-09 13:43:48 -05:00
# 2. Unap the volume
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
" rbd unmap {} " . format ( mapped_volume )
)
2020-02-09 13:43:48 -05:00
if retcode :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: Failed to unmap RBD volume at " {} " : {} ' . format (
mapped_volume , stderr
)
2020-02-09 13:43:48 -05:00
return True , ' Unmapped RBD volume at " {} " . ' . format ( mapped_volume )
2020-11-07 14:45:24 -05:00
2023-12-09 13:43:58 -05:00
def get_list_volume ( zkhandler , pool , limit = None , is_fuzzy = True ) :
2021-05-29 20:29:51 -04:00
if pool and not verifyPool ( zkhandler , pool ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No pool with name " {} " is present in the cluster. ' . format (
pool
)
2018-11-01 22:00:59 -04:00
2021-05-29 20:29:51 -04:00
full_volume_list = getCephVolumes ( zkhandler , pool )
2018-11-01 22:00:59 -04:00
2021-12-06 16:35:29 -05:00
if is_fuzzy and limit :
# Implicitly assume fuzzy limits
if not re . match ( r " \ ^.* " , limit ) :
limit = " .* " + limit
if not re . match ( r " .* \ $ " , limit ) :
limit = limit + " .* "
2019-07-26 15:10:47 -04:00
2021-07-01 17:33:13 -04:00
get_volume_info = dict ( )
2019-07-05 00:29:47 -04:00
for volume in full_volume_list :
2021-11-06 03:02:43 -04:00
pool_name , volume_name = volume . split ( " / " )
2021-07-01 17:33:13 -04:00
is_limit_match = False
# Check on limit
2019-07-05 00:29:47 -04:00
if limit :
2021-07-01 17:33:13 -04:00
# Try to match the limit against the volume name
2019-07-05 00:29:47 -04:00
try :
2021-12-06 16:35:29 -05:00
if re . fullmatch ( limit , volume_name ) :
2021-07-01 17:33:13 -04:00
is_limit_match = True
2019-07-05 00:29:47 -04:00
except Exception as e :
2021-11-06 03:02:43 -04:00
return False , " Regex Error: {} " . format ( e )
2019-07-05 00:29:47 -04:00
else :
2021-07-01 17:33:13 -04:00
is_limit_match = True
get_volume_info [ volume ] = True if is_limit_match else False
# Obtain our volume data in a thread pool
2021-11-06 03:02:43 -04:00
volume_execute_list = [
volume for volume in full_volume_list if get_volume_info [ volume ]
]
2021-07-01 17:33:13 -04:00
volume_data_list = list ( )
2021-11-06 03:02:43 -04:00
with ThreadPoolExecutor (
max_workers = 32 , thread_name_prefix = " volume_list "
) as executor :
2021-07-01 17:33:13 -04:00
futures = [ ]
for volume in volume_execute_list :
2021-11-06 03:02:43 -04:00
pool_name , volume_name = volume . split ( " / " )
futures . append (
executor . submit ( getVolumeInformation , zkhandler , pool_name , volume_name )
)
2021-07-01 17:33:13 -04:00
for future in futures :
volume_data_list . append ( future . result ( ) )
2021-11-06 03:02:43 -04:00
return True , sorted ( volume_data_list , key = lambda x : str ( x [ " name " ] ) )
2018-11-01 22:00:59 -04:00
2018-10-31 23:38:17 -04:00
2019-07-05 00:29:47 -04:00
#
# Snapshot functions
#
2021-05-29 20:29:51 -04:00
def getCephSnapshots ( zkhandler , pool , volume ) :
2019-07-05 00:29:47 -04:00
snapshot_list = list ( )
volume_list = list ( )
2018-10-31 23:38:17 -04:00
2021-05-29 20:29:51 -04:00
volume_list = getCephVolumes ( zkhandler , pool )
2019-07-05 13:57:15 -04:00
if volume :
2019-07-05 00:29:47 -04:00
for volume_entry in volume_list :
2021-11-06 03:02:43 -04:00
volume_pool , volume_name = volume_entry . split ( " / " )
2019-07-05 00:29:47 -04:00
if volume_name == volume :
2021-11-06 03:02:43 -04:00
volume_list = [ " {} / {} " . format ( volume_pool , volume_name ) ]
2018-10-31 23:38:17 -04:00
2019-07-05 00:29:47 -04:00
for volume_entry in volume_list :
2021-11-06 03:02:43 -04:00
for snapshot_name in zkhandler . children ( ( " snapshot " , volume_entry ) ) :
snapshot_list . append ( " {} @ {} " . format ( volume_entry , snapshot_name ) )
2018-10-31 23:38:17 -04:00
2019-07-05 00:29:47 -04:00
return snapshot_list
2018-10-31 23:38:17 -04:00
2020-11-07 14:45:24 -05:00
2023-10-24 00:23:12 -04:00
def add_snapshot ( zkhandler , pool , volume , name , zk_only = False ) :
2021-05-29 20:29:51 -04:00
if not verifyVolume ( zkhandler , pool , volume ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No volume with name " {} " is present in pool " {} " . ' . format (
volume , pool
)
2020-02-08 23:35:30 -05:00
# 1. Create the snapshot
2023-10-24 00:23:12 -04:00
if not zk_only :
retcode , stdout , stderr = common . run_os_command (
" rbd snap create {} / {} @ {} " . format ( pool , volume , name )
2021-11-06 03:02:43 -04:00
)
2023-10-24 00:23:12 -04:00
if retcode :
return (
False ,
' ERROR: Failed to create RBD snapshot " {} " of volume " {} " in pool " {} " : {} ' . format (
name , volume , pool , stderr
) ,
)
2020-02-08 23:35:30 -05:00
# 2. Add the snapshot to Zookeeper
2021-11-06 03:02:43 -04:00
zkhandler . write (
[
( ( " snapshot " , f " { pool } / { volume } / { name } " ) , " " ) ,
( ( " snapshot.stats " , f " { pool } / { volume } / { name } " ) , " {} " ) ,
]
)
2020-11-06 19:05:48 -05:00
2021-02-14 17:02:49 -05:00
# 3. Update the count of snapshots on this volume
2021-11-06 03:02:43 -04:00
volume_stats_raw = zkhandler . read ( ( " volume.stats " , f " { pool } / { volume } " ) )
2021-02-14 17:02:49 -05:00
volume_stats = dict ( json . loads ( volume_stats_raw ) )
# Format the size to something nicer
2021-11-06 03:02:43 -04:00
volume_stats [ " snapshot_count " ] = volume_stats [ " snapshot_count " ] + 1
2021-02-14 17:02:49 -05:00
volume_stats_raw = json . dumps ( volume_stats )
2021-11-06 03:02:43 -04:00
zkhandler . write (
[
( ( " volume.stats " , f " { pool } / { volume } " ) , volume_stats_raw ) ,
]
)
2021-02-14 17:02:49 -05:00
2021-11-06 03:02:43 -04:00
return True , ' Created RBD snapshot " {} " of volume " {} " in pool " {} " . ' . format (
name , volume , pool
)
2019-07-28 23:00:35 -04:00
2020-11-07 14:45:24 -05:00
2021-05-29 20:29:51 -04:00
def rename_snapshot ( zkhandler , pool , volume , name , new_name ) :
if not verifyVolume ( zkhandler , pool , volume ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No volume with name " {} " is present in pool " {} " . ' . format (
volume , pool
)
2021-05-29 20:29:51 -04:00
if not verifySnapshot ( zkhandler , pool , volume , name ) :
2021-11-06 03:02:43 -04:00
return (
False ,
' ERROR: No snapshot with name " {} " is present for volume " {} " in pool " {} " . ' . format (
name , volume , pool
) ,
)
2019-07-28 23:00:35 -04:00
2020-02-08 23:35:30 -05:00
# 1. Rename the snapshot
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
" rbd snap rename {pool} / {volume} @ {name} {pool} / {volume} @ {new_name} " . format (
pool = pool , volume = volume , name = name , new_name = new_name
)
)
2020-02-08 23:35:30 -05:00
if retcode :
2021-11-06 03:02:43 -04:00
return (
False ,
' ERROR: Failed to rename RBD snapshot " {} " to " {} " for volume " {} " in pool " {} " : {} ' . format (
name , new_name , volume , pool , stderr
) ,
)
2019-06-19 00:12:44 -04:00
2020-02-08 23:35:30 -05:00
# 2. Rename the snapshot in ZK
2021-11-06 03:02:43 -04:00
zkhandler . rename (
[
(
( " snapshot " , f " { pool } / { volume } / { name } " ) ,
( " snapshot " , f " { pool } / { volume } / { new_name } " ) ,
) ,
]
)
return (
True ,
' Renamed RBD snapshot " {} " to " {} " for volume " {} " in pool " {} " . ' . format (
name , new_name , volume , pool
) ,
)
2019-06-19 00:12:44 -04:00
2020-11-07 14:45:24 -05:00
2024-05-13 15:22:03 -04:00
def rollback_snapshot ( zkhandler , pool , volume , name ) :
if not verifyVolume ( zkhandler , pool , volume ) :
return False , ' ERROR: No volume with name " {} " is present in pool " {} " . ' . format (
volume , pool
)
if not verifySnapshot ( zkhandler , pool , volume , name ) :
return (
False ,
' ERROR: No snapshot with name " {} " is present for volume " {} " in pool " {} " . ' . format (
name , volume , pool
) ,
)
# 1. Roll back the snapshot
retcode , stdout , stderr = common . run_os_command (
" rbd snap rollback {} / {} @ {} " . format ( pool , volume , name )
)
if retcode :
return (
False ,
' ERROR: Failed to roll back RBD volume " {} " in pool " {} " to snapshot " {} " : {} ' . format (
volume , pool , name , stderr
) ,
)
return True , ' Rolled back RBD volume " {} " in pool " {} " to snapshot " {} " . ' . format (
volume , pool , name
)
2021-05-29 20:29:51 -04:00
def remove_snapshot ( zkhandler , pool , volume , name ) :
if not verifyVolume ( zkhandler , pool , volume ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No volume with name " {} " is present in pool " {} " . ' . format (
volume , pool
)
2021-05-29 20:29:51 -04:00
if not verifySnapshot ( zkhandler , pool , volume , name ) :
2021-11-06 03:02:43 -04:00
return (
False ,
' ERROR: No snapshot with name " {} " is present of volume {} in pool {} . ' . format (
name , volume , pool
) ,
)
2019-06-19 00:12:44 -04:00
2020-02-08 23:35:30 -05:00
# 1. Remove the snapshot
2021-11-06 03:02:43 -04:00
retcode , stdout , stderr = common . run_os_command (
" rbd snap rm {} / {} @ {} " . format ( pool , volume , name )
)
2020-02-08 23:35:30 -05:00
if retcode :
2021-11-06 03:02:43 -04:00
return (
False ,
' Failed to remove RBD snapshot " {} " of volume " {} " in pool " {} " : {} ' . format (
name , volume , pool , stderr
) ,
)
2019-06-19 00:12:44 -04:00
2020-02-08 23:35:30 -05:00
# 2. Delete snapshot from Zookeeper
2021-11-06 03:02:43 -04:00
zkhandler . delete ( [ ( " snapshot " , f " { pool } / { volume } / { name } " ) ] )
2019-06-19 00:12:44 -04:00
2021-02-14 17:02:49 -05:00
# 3. Update the count of snapshots on this volume
2021-11-06 03:02:43 -04:00
volume_stats_raw = zkhandler . read ( ( " volume.stats " , f " { pool } / { volume } " ) )
2021-02-14 17:02:49 -05:00
volume_stats = dict ( json . loads ( volume_stats_raw ) )
# Format the size to something nicer
2021-11-06 03:02:43 -04:00
volume_stats [ " snapshot_count " ] = volume_stats [ " snapshot_count " ] - 1
2021-02-14 17:02:49 -05:00
volume_stats_raw = json . dumps ( volume_stats )
2021-11-06 03:02:43 -04:00
zkhandler . write ( [ ( ( " volume.stats " , f " { pool } / { volume } " ) , volume_stats_raw ) ] )
2021-02-14 17:02:49 -05:00
2021-11-06 03:02:43 -04:00
return True , ' Removed RBD snapshot " {} " of volume " {} " in pool " {} " . ' . format (
name , volume , pool
)
2019-06-19 00:12:44 -04:00
2020-11-07 14:45:24 -05:00
2023-12-09 13:43:58 -05:00
def get_list_snapshot ( zkhandler , pool , volume , limit = None , is_fuzzy = True ) :
2019-07-05 00:29:47 -04:00
snapshot_list = [ ]
2021-05-29 20:29:51 -04:00
if pool and not verifyPool ( zkhandler , pool ) :
2021-11-06 03:02:43 -04:00
return False , ' ERROR: No pool with name " {} " is present in the cluster. ' . format (
pool
)
2019-06-19 15:32:32 -04:00
2021-05-29 20:29:51 -04:00
if volume and not verifyPool ( zkhandler , volume ) :
2021-11-06 03:02:43 -04:00
return (
False ,
' ERROR: No volume with name " {} " is present in the cluster. ' . format ( volume ) ,
)
2019-07-04 23:09:16 -04:00
2021-05-29 20:29:51 -04:00
full_snapshot_list = getCephSnapshots ( zkhandler , pool , volume )
2019-07-04 23:09:16 -04:00
2019-07-26 15:03:48 -04:00
if is_fuzzy and limit :
# Implicitly assume fuzzy limits
2021-11-06 03:02:43 -04:00
if not re . match ( r " \ ^.* " , limit ) :
limit = " .* " + limit
if not re . match ( r " .* \ $ " , limit ) :
limit = limit + " .* "
2019-07-26 15:03:48 -04:00
2019-07-05 00:29:47 -04:00
for snapshot in full_snapshot_list :
2021-11-06 03:02:43 -04:00
volume , snapshot_name = snapshot . split ( " @ " )
pool_name , volume_name = volume . split ( " / " )
2019-07-05 00:29:47 -04:00
if limit :
try :
2021-12-06 16:35:29 -05:00
if re . fullmatch ( limit , snapshot_name ) :
2021-11-06 03:02:43 -04:00
snapshot_list . append (
{
" pool " : pool_name ,
" volume " : volume_name ,
" snapshot " : snapshot_name ,
}
)
2019-07-05 00:29:47 -04:00
except Exception as e :
2021-11-06 03:02:43 -04:00
return False , " Regex Error: {} " . format ( e )
2019-07-05 00:29:47 -04:00
else :
2021-11-06 03:02:43 -04:00
snapshot_list . append (
{ " pool " : pool_name , " volume " : volume_name , " snapshot " : snapshot_name }
)
2019-07-04 23:09:16 -04:00
2021-11-06 03:02:43 -04:00
return True , sorted ( snapshot_list , key = lambda x : str ( x [ " snapshot " ] ) )
2023-11-09 14:05:15 -05:00
#
# Celery worker tasks (must be run on node, outputs log messages to worker)
#
def osd_worker_helper_find_osds_from_block ( device ) :
# Try to query the passed block device directly
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm list --format json { device } "
)
if retcode :
found_osds = [ ]
else :
found_osds = jloads ( stdout )
return found_osds
def osd_worker_add_osd (
zkhandler ,
celery ,
node ,
device ,
weight ,
ext_db_ratio = None ,
ext_db_size = None ,
split_count = None ,
) :
current_stage = 0
2023-11-09 23:51:05 -05:00
total_stages = 5
2023-11-09 14:05:15 -05:00
if split_count is None :
_split_count = 1
else :
_split_count = split_count
total_stages = total_stages + 3 * int ( _split_count )
if ext_db_ratio is not None or ext_db_size is not None :
total_stages = total_stages + 3 * int ( _split_count ) + 1
start (
celery ,
f " Adding { _split_count } new OSD(s) on device { device } with weight { weight } " ,
current = current_stage ,
total = total_stages ,
)
# Handle a detect device if that is passed
if match ( r " detect: " , device ) :
ddevice = common . get_detect_device ( device )
if ddevice is None :
fail (
celery ,
f " Failed to determine block device from detect string { device } " ,
)
return
else :
log_info (
celery , f " Determined block device { ddevice } from detect string { device } "
)
device = ddevice
if ext_db_size is not None and ext_db_ratio is not None :
fail (
celery ,
" Invalid configuration: both an ext_db_size and ext_db_ratio were specified " ,
)
return
# Check if device has a partition table; it's not valid if it does
retcode , _ , _ = common . run_os_command ( f " sfdisk --dump { device } " )
if retcode < 1 :
fail (
celery ,
f " Device { device } has a partition table and is unsuitable for an OSD " ,
)
return
if ext_db_size is not None or ext_db_ratio is not None :
ext_db_flag = True
else :
ext_db_flag = False
if split_count is not None :
split_flag = f " --osds-per-device { split_count } "
is_split = True
log_info (
celery , f " Creating { split_count } new OSD disks on block device { device } "
)
else :
split_flag = " "
is_split = False
log_info ( celery , f " Creating 1 new OSD disk on block device { device } " )
if " nvme " in device :
class_flag = " --crush-device-class nvme "
else :
class_flag = " --crush-device-class ssd "
# 1. Zap the block device
current_stage + = 1
update (
celery ,
f " Zapping block device { device } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm zap --destroy { device } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to perform ceph-volume lvm zap on { device } " )
return
# 2. Prepare the OSD(s)
current_stage + = 1
update (
celery ,
f " Preparing OSD(s) on device { device } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm batch --yes --prepare --bluestore { split_flag } { class_flag } { device } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to perform ceph-volume lvm batch on { device } " )
return
# 3. Get the list of created OSDs on the device (initial pass)
current_stage + = 1
update (
celery ,
f " Querying OSD(s) on device { device } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm list --format json { device } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to perform ceph-volume lvm list on { device } " )
return
created_osds = jloads ( stdout )
# 4. Prepare the WAL and DB devices
if ext_db_flag :
for created_osd in created_osds :
# 4a. Get the OSD FSID and ID from the details
osd_details = created_osds [ created_osd ] [ 0 ]
osd_fsid = osd_details [ " tags " ] [ " ceph.osd_fsid " ]
osd_id = osd_details [ " tags " ] [ " ceph.osd_id " ]
osd_lv = osd_details [ " lv_path " ]
current_stage + = 1
update (
celery ,
f " Preparing DB LV for OSD { osd_id } " ,
current = current_stage ,
total = total_stages ,
)
# 4b. Prepare the logical volume if ext_db_flag
if ext_db_ratio is not None :
_ , osd_size_bytes , _ = common . run_os_command (
f " blockdev --getsize64 { osd_lv } "
)
osd_size_bytes = int ( osd_size_bytes )
osd_db_size_bytes = int ( osd_size_bytes * ext_db_ratio )
if ext_db_size is not None :
osd_db_size_bytes = format_bytes_fromhuman ( ext_db_size )
osd_db_size_mb = int ( osd_db_size_bytes / 1024 / 1024 )
db_device = f " osd-db/osd- { osd_id } "
current_stage + = 1
update (
celery ,
f " Preparing Bluestore DB volume for OSD { osd_id } on { db_device } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " lvcreate -L { osd_db_size_mb } M -n osd- { osd_id } --yes osd-db "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to run lvcreate on { db_device } " )
return
# 4c. Attach the new DB device to the OSD
current_stage + = 1
update (
celery ,
f " Attaching Bluestore DB volume to OSD { osd_id } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm new-db --osd-id { osd_id } --osd-fsid { osd_fsid } --target { db_device } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail (
celery , f " Failed to perform ceph-volume lvm new-db on OSD { osd_id } "
)
return
# 4d. Get the list of created OSDs on the device (final pass)
current_stage + = 1
update (
celery ,
f " Requerying OSD(s) on device { device } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm list --format json { device } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to perform ceph-volume lvm list on { device } " )
return
created_osds = jloads ( stdout )
# 5. Activate the OSDs
for created_osd in created_osds :
# 5a. Get the OSD FSID and ID from the details
osd_details = created_osds [ created_osd ] [ 0 ]
osd_clusterfsid = osd_details [ " tags " ] [ " ceph.cluster_fsid " ]
osd_fsid = osd_details [ " tags " ] [ " ceph.osd_fsid " ]
osd_id = osd_details [ " tags " ] [ " ceph.osd_id " ]
db_device = osd_details [ " tags " ] . get ( " ceph.db_device " , " " )
osd_vg = osd_details [ " vg_name " ]
osd_lv = osd_details [ " lv_name " ]
# 5b. Add it to the crush map
current_stage + = 1
update (
celery ,
f " Adding OSD { osd_id } to CRUSH map " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph osd crush add osd. { osd_id } { weight } root=default host= { node } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to perform ceph osd crush add for OSD { osd_id } " )
return
# 5c. Activate the OSD
current_stage + = 1
update (
celery ,
f " Activating OSD { osd_id } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm activate --bluestore { osd_id } { osd_fsid } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to perform ceph osd crush add for OSD { osd_id } " )
return
# 5d. Wait 1 second for it to activate
time . sleep ( 1 )
# 5e. Verify it started
retcode , stdout , stderr = common . run_os_command (
f " systemctl status ceph-osd@ { osd_id } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to start OSD { osd_id } process " )
return
# 5f. Add the new OSD to PVC
current_stage + = 1
update (
celery ,
f " Adding OSD { osd_id } to PVC " ,
current = current_stage ,
total = total_stages ,
)
zkhandler . write (
[
( ( " osd " , osd_id ) , " " ) ,
( ( " osd.node " , osd_id ) , node ) ,
( ( " osd.device " , osd_id ) , device ) ,
( ( " osd.db_device " , osd_id ) , db_device ) ,
( ( " osd.fsid " , osd_id ) , " " ) ,
( ( " osd.ofsid " , osd_id ) , osd_fsid ) ,
( ( " osd.cfsid " , osd_id ) , osd_clusterfsid ) ,
( ( " osd.lvm " , osd_id ) , " " ) ,
( ( " osd.vg " , osd_id ) , osd_vg ) ,
( ( " osd.lv " , osd_id ) , osd_lv ) ,
( ( " osd.is_split " , osd_id ) , is_split ) ,
(
( " osd.stats " , osd_id ) ,
' { " uuid " : " | " , " up " : 0, " in " : 0, " primary_affinity " : " | " , " utilization " : " | " , " var " : " | " , " pgs " : " | " , " kb " : " | " , " weight " : " | " , " reweight " : " | " , " node " : " | " , " used " : " | " , " avail " : " | " , " wr_ops " : " | " , " wr_data " : " | " , " rd_ops " : " | " , " rd_data " : " | " , " state " : " | " } ' ,
) ,
]
)
2023-11-09 23:51:05 -05:00
# 6. Wait for OSD to check in
current_stage + = 1
update (
celery ,
" Waiting for new OSD(s) to report stats " ,
current = current_stage ,
total = total_stages ,
)
last_osd = None
for osd in created_osds :
last_osd = osd
while (
jloads (
zkhandler . read (
( " osd.stats " , created_osds [ last_osd ] [ 0 ] [ " tags " ] [ " ceph.osd_id " ] )
)
) [ " weight " ]
== " | "
) :
time . sleep ( 1 )
# 7. Log it
2023-11-09 14:05:15 -05:00
current_stage + = 1
return finish (
celery ,
f " Successfully created { len ( created_osds . keys ( ) ) } new OSD(s) { ' , ' . join ( created_osds . keys ( ) ) } on device { device } " ,
current = current_stage ,
total = total_stages ,
)
def osd_worker_replace_osd (
zkhandler ,
celery ,
node ,
osd_id ,
new_device ,
old_device = None ,
weight = None ,
ext_db_ratio = None ,
ext_db_size = None ,
) :
# Try to determine if any other OSDs shared a block device with this OSD
_ , osd_list = get_list_osd ( zkhandler , None )
osd_block = zkhandler . read ( ( " osd.device " , osd_id ) )
all_osds_on_block = [
o for o in osd_list if o [ " node " ] == node and o [ " device " ] == osd_block
]
all_osds_on_block_ids = [ o [ " id " ] for o in all_osds_on_block ]
# Set up stages
current_stage = 0
total_stages = 3
_split_count = len ( all_osds_on_block_ids )
total_stages = total_stages + 10 * int ( _split_count )
if (
ext_db_ratio is not None
or ext_db_size is not None
or any ( [ True for o in all_osds_on_block if o [ " db_device " ] ] )
) :
total_stages = total_stages + 2 * int ( _split_count )
start (
celery ,
f " Replacing OSD(s) { ' , ' . join ( all_osds_on_block_ids ) } with device { new_device } " ,
current = current_stage ,
total = total_stages ,
)
# Handle a detect device if that is passed
if match ( r " detect: " , new_device ) :
ddevice = common . get_detect_device ( new_device )
if ddevice is None :
fail (
celery ,
f " Failed to determine block device from detect string { new_device } " ,
)
return
else :
log_info (
celery ,
f " Determined block device { ddevice } from detect string { new_device } " ,
)
new_device = ddevice
# Check if device has a partition table; it's not valid if it does
retcode , _ , _ = common . run_os_command ( f " sfdisk --dump { new_device } " )
if retcode < 1 :
fail (
celery ,
f " Device { new_device } has a partition table and is unsuitable for an OSD " ,
)
return
# Phase 1: Try to determine what we can about the old device
real_old_device = None
# Determine information from a passed old_device
if old_device is not None :
found_osds = osd_worker_helper_find_osds_from_block ( old_device )
if found_osds and osd_id in found_osds . keys ( ) :
real_old_device = old_device
else :
log_warn (
celery ,
f " No OSD(s) found on device { old_device } ; falling back to PVC detection " ,
)
# Try to get an old_device from our PVC information
if real_old_device is None :
found_osds = osd_worker_helper_find_osds_from_block ( osd_block )
if osd_id in found_osds . keys ( ) :
real_old_device = osd_block
if real_old_device is None :
skip_zap = True
log_warn (
celery ,
" No valid old block device found for OSD(s); skipping zap " ,
)
else :
skip_zap = False
# Determine the weight of the OSD(s)
if weight is None :
weight = all_osds_on_block [ 0 ] [ " stats " ] [ " weight " ]
# Take down the OSD(s), but keep it's CRUSH map details and IDs
for osd in all_osds_on_block :
osd_id = osd [ " id " ]
# 1. Set the OSD down and out so it will flush
current_stage + = 1
update (
celery ,
f " Setting OSD { osd_id } down " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command ( f " ceph osd down { osd_id } " )
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to set OSD { osd_id } down " )
return
current_stage + = 1
update (
celery ,
f " Setting OSD { osd_id } out " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command ( f " ceph osd out { osd_id } " )
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to set OSD { osd_id } out " )
return
# 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete)
current_stage + = 1
update (
celery ,
f " Waiting for OSD { osd_id } to be safe to remove " ,
current = current_stage ,
total = total_stages ,
)
tcount = 0
while True :
retcode , stdout , stderr = common . run_os_command (
f " ceph osd safe-to-destroy osd. { osd_id } "
)
if int ( retcode ) in [ 0 , 11 ] :
break
else :
common . run_os_command ( f " ceph osd down { osd_id } " )
common . run_os_command ( f " ceph osd out { osd_id } " )
time . sleep ( 1 )
tcount + = 1
if tcount > 60 :
log_warn (
celery ,
f " Timed out (60s) waiting for OSD { osd_id } to be safe to remove; proceeding anyways " ,
)
break
# 3. Stop the OSD process and wait for it to be terminated
current_stage + = 1
update (
celery ,
f " Stopping OSD { osd_id } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " systemctl stop ceph-osd@ { osd_id } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to stop OSD { osd_id } " )
return
time . sleep ( 5 )
# 4. Destroy the OSD
current_stage + = 1
update (
celery ,
f " Destroying OSD { osd_id } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph osd destroy { osd_id } --yes-i-really-mean-it "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to destroy OSD { osd_id } " )
return
current_stage + = 1
update (
celery ,
f " Zapping old disk { real_old_device } if possible " ,
current = current_stage ,
total = total_stages ,
)
if not skip_zap :
# 5. Zap the old disk
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm zap --destroy { real_old_device } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
log_warn (
celery , f " Failed to zap old disk { real_old_device } ; proceeding anyways "
)
# 6. Prepare the volume group on the new device
current_stage + = 1
update (
celery ,
f " Preparing LVM volume group on new disk { new_device } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm zap --destroy { new_device } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to run ceph-volume lvm zap on new disk { new_device } " )
return
retcode , stdout , stderr = common . run_os_command ( f " pvcreate { new_device } " )
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to run pvcreate on new disk { new_device } " )
return
vg_uuid = str ( uuid4 ( ) )
retcode , stdout , stderr = common . run_os_command (
f " vgcreate ceph- { vg_uuid } { new_device } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to run vgcreate on new disk { new_device } " )
return
# Determine how many OSDs we want on the new device
osds_count = len ( all_osds_on_block )
# Determine the size of the new device
_ , new_device_size_bytes , _ = common . run_os_command (
f " blockdev --getsize64 { new_device } "
)
# Calculate the size of each OSD (in MB) based on the default 4M extent size
new_osd_size_mb = (
int ( int ( int ( new_device_size_bytes ) / osds_count ) / 1024 / 1024 / 4 ) * 4
)
# Calculate the size, if applicable, of the OSD block if we were passed a ratio
if ext_db_ratio is not None :
osd_new_db_size_mb = int ( int ( int ( new_osd_size_mb * ext_db_ratio ) / 4 ) * 4 )
elif ext_db_size is not None :
osd_new_db_size_mb = int (
int ( int ( format_bytes_fromhuman ( ext_db_size ) ) / 1024 / 1024 / 4 ) * 4
)
else :
if all_osds_on_block [ 0 ] [ " db_device " ] :
_ , new_device_size_bytes , _ = common . run_os_command (
f " blockdev --getsize64 { all_osds_on_block [ 0 ] [ ' db_device ' ] } "
)
osd_new_db_size_mb = int (
int ( int ( new_device_size_bytes ) / 1024 / 1024 / 4 ) * 4
)
else :
osd_new_db_size_mb = None
for osd in all_osds_on_block :
osd_id = osd [ " id " ]
osd_fsid = osd [ " fsid " ]
current_stage + = 1
update (
celery ,
f " Preparing LVM logical volume for OSD { osd_id } on new disk { new_device } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " lvcreate -L { new_osd_size_mb } M -n osd-block- { osd_fsid } ceph- { vg_uuid } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to run lvcreate for OSD { osd_id } " )
return
current_stage + = 1
update (
celery ,
f " Preparing OSD { osd_id } on new disk { new_device } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm prepare --bluestore --osd-id { osd_id } --osd-fsid { osd_fsid } --data /dev/ceph- { vg_uuid } /osd-block- { osd_fsid } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to run ceph-volume lvm prepare for OSD { osd_id } " )
return
for osd in all_osds_on_block :
osd_id = osd [ " id " ]
osd_fsid = osd [ " fsid " ]
if osd [ " db_device " ] :
db_device = f " osd-db/osd- { osd_id } "
current_stage + = 1
update (
celery ,
f " Preparing Bluestore DB volume for OSD { osd_id } on { db_device } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " lvremove --force { db_device } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to run lvremove on { db_device } " )
return
retcode , stdout , stderr = common . run_os_command (
f " lvcreate -L { osd_new_db_size_mb } M -n osd- { osd_id } --yes osd-db "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to run lvcreate on { db_device } " )
return
current_stage + = 1
update (
celery ,
f " Attaching Bluestore DB volume to OSD { osd_id } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm new-db --osd-id { osd_id } --osd-fsid { osd_fsid } --target { db_device } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to run ceph-volume lvm new-db for OSD { osd_id } " )
return
current_stage + = 1
update (
celery ,
f " Updating OSD { osd_id } in CRUSH map " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph osd crush add osd. { osd_id } { weight } root=default host= { node } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to run ceph osd crush add for OSD { osd_id } " )
return
current_stage + = 1
update (
celery ,
f " Activating OSD { osd_id } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm activate --bluestore { osd_id } { osd_fsid } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to run ceph-volume lvm activate for OSD { osd_id } " )
return
# Wait 1 second for it to activate
time . sleep ( 1 )
# Verify it started
retcode , stdout , stderr = common . run_os_command (
f " systemctl status ceph-osd@ { osd_id } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to start OSD { osd_id } process " )
return
current_stage + = 1
update (
celery ,
f " Updating OSD { osd_id } in PVC " ,
current = current_stage ,
total = total_stages ,
)
zkhandler . write (
[
( ( " osd.device " , osd_id ) , new_device ) ,
( ( " osd.vg " , osd_id ) , f " ceph- { vg_uuid } " ) ,
( ( " osd.lv " , osd_id ) , f " osd-block- { osd_fsid } " ) ,
]
)
# 6. Log it
current_stage + = 1
return finish (
celery ,
f " Successfully replaced OSD(s) { ' , ' . join ( all_osds_on_block_ids ) } on new disk { new_device } " ,
current = current_stage ,
total = total_stages ,
)
def osd_worker_refresh_osd (
zkhandler ,
celery ,
node ,
osd_id ,
device ,
ext_db_flag ,
) :
# Try to determine if any other OSDs shared a block device with this OSD
_ , osd_list = get_list_osd ( zkhandler , None )
osd_block = zkhandler . read ( ( " osd.device " , osd_id ) )
all_osds_on_block = [
o for o in osd_list if o [ " node " ] == node and o [ " device " ] == osd_block
]
all_osds_on_block_ids = [ o [ " id " ] for o in all_osds_on_block ]
# Set up stages
current_stage = 0
total_stages = 1
_split_count = len ( all_osds_on_block_ids )
total_stages = total_stages + 3 * int ( _split_count )
start (
celery ,
f " Refreshing/reimporting OSD(s) { ' , ' . join ( all_osds_on_block_ids ) } on device { device } " ,
current = current_stage ,
total = total_stages ,
)
# Handle a detect device if that is passed
if match ( r " detect: " , device ) :
ddevice = common . get_detect_device ( device )
if ddevice is None :
fail (
celery ,
f " Failed to determine block device from detect string { device } " ,
)
return
else :
log_info (
celery ,
f " Determined block device { ddevice } from detect string { device } " ,
)
device = ddevice
retcode , stdout , stderr = common . run_os_command ( " ceph osd ls " )
osd_list = stdout . split ( " \n " )
if osd_id not in osd_list :
fail (
celery ,
f " Could not find OSD { osd_id } in the cluster " ,
)
return
found_osds = osd_worker_helper_find_osds_from_block ( device )
if osd_id not in found_osds . keys ( ) :
fail (
celery ,
f " Could not find OSD { osd_id } on device { device } " ,
)
return
for osd in found_osds :
found_osd = found_osds [ osd ] [ 0 ]
lv_device = found_osd [ " lv_path " ]
_ , osd_pvc_information = get_list_osd ( zkhandler , osd )
osd_information = osd_pvc_information [ 0 ]
current_stage + = 1
update (
celery ,
" Querying for OSD on device " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm list --format json { lv_device } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to run ceph-volume lvm list for OSD { osd } " )
return
osd_detail = jloads ( stdout ) [ osd ] [ 0 ]
osd_fsid = osd_detail [ " tags " ] [ " ceph.osd_fsid " ]
if osd_fsid != osd_information [ " fsid " ] :
fail (
celery ,
f " OSD { osd } FSID { osd_information [ ' fsid ' ] } does not match volume FSID { osd_fsid } ; OSD cannot be imported " ,
)
return
dev_flags = f " --data { lv_device } "
if ext_db_flag :
db_device = " osd-db/osd- {osd} "
dev_flags + = f " --block.db { db_device } "
if not path . exists ( f " /dev/ { db_device } " ) :
fail (
celery ,
f " OSD Bluestore DB volume { db_device } does not exist; OSD cannot be imported " ,
)
return
else :
db_device = " "
current_stage + = 1
update (
celery ,
f " Activating OSD { osd } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm activate --bluestore { osd } { osd_fsid } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to run ceph-volume lvm activate for OSD { osd } " )
return
# Wait 1 second for it to activate
time . sleep ( 1 )
# Verify it started
retcode , stdout , stderr = common . run_os_command (
f " systemctl status ceph-osd@ { osd } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail ( celery , f " Failed to start OSD { osd } process " )
return
current_stage + = 1
update (
celery ,
f " Updating OSD { osd } in PVC " ,
current = current_stage ,
total = total_stages ,
)
zkhandler . write (
[
( ( " osd.device " , osd ) , device ) ,
( ( " osd.vg " , osd ) , osd_detail [ " vg_name " ] ) ,
( ( " osd.lv " , osd ) , osd_detail [ " lv_name " ] ) ,
]
)
# 6. Log it
current_stage + = 1
return finish (
celery ,
f " Successfully reimported OSD(s) { ' , ' . join ( all_osds_on_block_ids ) } on device { device } " ,
current = current_stage ,
total = total_stages ,
)
def osd_worker_remove_osd (
zkhandler , celery , node , osd_id , force_flag = False , skip_zap_flag = False
) :
# Get initial data
data_device = zkhandler . read ( ( " osd.device " , osd_id ) )
if zkhandler . exists ( ( " osd.db_device " , osd_id ) ) :
db_device = zkhandler . read ( ( " osd.db_device " , osd_id ) )
else :
db_device = None
# Set up stages
current_stage = 0
total_stages = 7
if not force_flag :
total_stages + = 1
if not skip_zap_flag :
total_stages + = 2
2023-11-09 23:51:05 -05:00
if db_device :
2023-11-09 14:05:15 -05:00
total_stages + = 1
start (
celery ,
f " Removing OSD { osd_id } " ,
current = current_stage ,
total = total_stages ,
)
# Verify the OSD is present
retcode , stdout , stderr = common . run_os_command ( " ceph osd ls " )
osd_list = stdout . split ( " \n " )
if osd_id not in osd_list :
if not force_flag :
fail (
celery ,
f " OSD { osd_id } not found in Ceph " ,
)
return
else :
log_warn (
celery ,
f " OSD { osd_id } not found in Ceph; ignoring error due to force flag " ,
)
# 1. Set the OSD down and out so it will flush
current_stage + = 1
update (
celery ,
f " Setting OSD { osd_id } down " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command ( f " ceph osd down { osd_id } " )
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
if not force_flag :
fail (
celery ,
f " Failed to set OSD { osd_id } down " ,
)
return
else :
log_warn (
celery ,
f " Failed to set OSD { osd_id } down; ignoring error due to force flag " ,
)
current_stage + = 1
update (
celery ,
f " Setting OSD { osd_id } out " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command ( f " ceph osd out { osd_id } " )
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
if not force_flag :
fail (
celery ,
f " Failed to set OSD { osd_id } down " ,
)
return
else :
log_warn (
celery ,
f " Failed to set OSD { osd_id } down; ignoring error due to force flag " ,
)
# 2. Wait for the OSD to be safe to remove (but don't wait for rebalancing to complete)
if not force_flag :
current_stage + = 1
update (
celery ,
f " Waiting for OSD { osd_id } to be safe to remove " ,
current = current_stage ,
total = total_stages ,
)
tcount = 0
while True :
retcode , stdout , stderr = common . run_os_command (
f " ceph osd safe-to-destroy osd. { osd_id } "
)
if int ( retcode ) in [ 0 , 11 ] :
break
else :
common . run_os_command ( f " ceph osd down { osd_id } " )
common . run_os_command ( f " ceph osd out { osd_id } " )
time . sleep ( 1 )
tcount + = 1
if tcount > 60 :
log_warn (
celery ,
f " Timed out (60s) waiting for OSD { osd_id } to be safe to remove; proceeding anyways " ,
)
break
# 3. Stop the OSD process and wait for it to be terminated
current_stage + = 1
update (
celery ,
f " Stopping OSD { osd_id } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command ( f " systemctl stop ceph-osd@ { osd_id } " )
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
if not force_flag :
fail (
celery ,
f " Failed to stop OSD { osd_id } process " ,
)
return
else :
log_warn (
celery ,
f " Failed to stop OSD { osd_id } process; ignoring error due to force flag " ,
)
time . sleep ( 5 )
# 4. Delete OSD from ZK
current_stage + = 1
update (
celery ,
f " Deleting OSD { osd_id } from PVC " ,
current = current_stage ,
total = total_stages ,
)
zkhandler . delete ( ( " osd " , osd_id ) , recursive = True )
# 5a. Destroy the OSD from Ceph
current_stage + = 1
update (
celery ,
f " Destroying OSD { osd_id } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph osd destroy { osd_id } --yes-i-really-mean-it "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
if not force_flag :
fail (
celery ,
f " Failed to destroy OSD { osd_id } " ,
)
return
else :
log_warn (
celery ,
f " Failed to destroy OSD { osd_id } ; ignoring error due to force flag " ,
)
time . sleep ( 2 )
# 5b. Purge the OSD from Ceph
current_stage + = 1
update (
celery ,
f " Removing OSD { osd_id } from CRUSH map " ,
current = current_stage ,
total = total_stages ,
)
# Remove the OSD from the CRUSH map
retcode , stdout , stderr = common . run_os_command ( f " ceph osd crush rm osd. { osd_id } " )
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
if not force_flag :
fail (
celery ,
f " Failed to remove OSD { osd_id } from CRUSH map " ,
)
return
else :
log_warn (
celery ,
f " Failed to remove OSD { osd_id } from CRUSH map; ignoring error due to force flag " ,
)
# Purge the OSD
current_stage + = 1
update (
celery ,
f " Purging OSD { osd_id } " ,
current = current_stage ,
total = total_stages ,
)
if force_flag :
force_arg = " --force "
else :
force_arg = " "
retcode , stdout , stderr = common . run_os_command (
f " ceph osd purge { osd_id } { force_arg } --yes-i-really-mean-it "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
if not force_flag :
fail (
celery ,
f " Failed to purge OSD { osd_id } " ,
)
return
else :
log_warn (
celery ,
f " Failed to purge OSD { osd_id } ; ignoring error due to force flag " ,
)
# 6. Remove the DB device
2023-11-09 23:51:05 -05:00
if db_device :
2023-11-09 14:05:15 -05:00
current_stage + = 1
update (
celery ,
f " Removing OSD DB logical volume { db_device } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
f " lvremove --yes --force { db_device } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
if not force_flag :
fail (
celery ,
f " Failed to remove OSD DB logical volume { db_device } " ,
)
return
else :
log_warn (
celery ,
f " Failed to remove OSD DB logical volume { db_device } ; ignoring error due to force flag " ,
)
if not skip_zap_flag :
current_stage + = 1
update (
celery ,
f " Zapping old disk { data_device } if possible " ,
current = current_stage ,
total = total_stages ,
)
# 7. Determine the block devices
found_osds = osd_worker_helper_find_osds_from_block ( data_device )
if osd_id in found_osds . keys ( ) :
# Try to determine if any other OSDs shared a block device with this OSD
_ , osd_list = get_list_osd ( zkhandler , None )
all_osds_on_block = [
o for o in osd_list if o [ " node " ] == node and o [ " device " ] == data_device
]
if len ( all_osds_on_block ) < 1 :
log_info (
celery ,
f " Found no peer split OSD(s) on { data_device } ; zapping disk " ,
)
retcode , stdout , stderr = common . run_os_command (
f " ceph-volume lvm zap --destroy { data_device } "
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
if not force_flag :
fail (
celery ,
f " Failed to run ceph-volume lvm zap on device { data_device } " ,
)
return
else :
log_warn (
celery ,
f " Failed to run ceph-volume lvm zap on device { data_device } ; ignoring error due to force flag " ,
)
else :
log_warn (
celery ,
f " Found { len ( all_osds_on_block ) } OSD(s) still remaining on { data_device } ; skipping zap " ,
)
else :
log_warn (
celery ,
f " Could not find OSD { osd_id } on device { data_device } ; skipping zap " ,
)
# 6. Log it
current_stage + = 1
return finish (
celery ,
f " Successfully removed OSD { osd_id } " ,
current = current_stage ,
total = total_stages ,
)
def osd_worker_add_db_vg ( zkhandler , celery , device ) :
# Set up stages
current_stage = 0
total_stages = 4
start (
celery ,
f " Creating new OSD database volume group on device { device } " ,
current = current_stage ,
total = total_stages ,
)
# Check if an existsing volume group exists
retcode , stdout , stderr = common . run_os_command ( " vgdisplay osd-db " )
if retcode != 5 :
fail (
celery ,
" Ceph OSD database VG already exists " ,
)
return
# Handle a detect device if that is passed
if match ( r " detect: " , device ) :
ddevice = common . get_detect_device ( device )
if ddevice is None :
fail (
celery ,
f " Failed to determine block device from detect string { device } " ,
)
return
else :
log_info (
celery ,
f " Determined block device { ddevice } from detect string { device } " ,
)
device = ddevice
# 1. Create an empty partition table
current_stage + = 1
update (
celery ,
f " Creating partitions on device { device } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command ( " sgdisk --clear {} " . format ( device ) )
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail (
celery ,
f " Failed to create partition table on device { device } " ,
)
return
retcode , stdout , stderr = common . run_os_command (
" sgdisk --new 1:: --typecode 1:8e00 {} " . format ( device )
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail (
celery ,
f " Failed to set partition type to LVM PV on device { device } " ,
)
return
# Handle the partition ID portion
if search ( r " by-path " , device ) or search ( r " by-id " , device ) :
# /dev/disk/by-path/pci-0000:03:00.0-scsi-0:1:0:0 -> pci-0000:03:00.0-scsi-0:1:0:0-part1
partition = " {} -part1 " . format ( device )
elif search ( r " nvme " , device ) :
# /dev/nvme0n1 -> nvme0n1p1
partition = " {} p1 " . format ( device )
else :
# /dev/sda -> sda1
# No other '/dev/disk/by-*' types are valid for raw block devices anyways
partition = " {} 1 " . format ( device )
# 2. Create the PV
current_stage + = 1
update (
celery ,
f " Creating LVM PV on device { device } " ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
" pvcreate --force {} " . format ( partition )
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail (
celery ,
f " Failed to create LVM PV on device { device } " ,
)
return
# 2. Create the VG (named 'osd-db')
current_stage + = 1
update (
celery ,
f ' Creating LVM VG " osd-db " on device { device } ' ,
current = current_stage ,
total = total_stages ,
)
retcode , stdout , stderr = common . run_os_command (
" vgcreate --force osd-db {} " . format ( partition )
)
log_info ( celery , f " stdout: { stdout } " )
log_info ( celery , f " stderr: { stderr } " )
if retcode :
fail (
celery ,
f " Failed to create LVM VG on device { device } " ,
)
return
# Log it
current_stage + = 1
return finish (
celery ,
f " Successfully created new OSD DB volume group on device { device } " ,
current = current_stage ,
total = total_stages ,
)