Finish up Ceph OSD removal, add locking to commands
This commit is contained in:
parent
89a3e0c7ee
commit
3e4a6086d5
|
@ -23,6 +23,7 @@
|
|||
import re
|
||||
import click
|
||||
import ast
|
||||
import time
|
||||
|
||||
import client_lib.ansiprint as ansiprint
|
||||
import client_lib.zkhandler as zkhandler
|
||||
|
@ -90,7 +91,14 @@ def formatOSDList(zk_conn, osd_list):
|
|||
osd_stats = getOSDInformation(zk_conn, osd)
|
||||
|
||||
# Set the parent node and length
|
||||
osd_node[osd] = osd_stats['node']
|
||||
try:
|
||||
osd_node[osd] = osd_stats['node']
|
||||
# If this happens, the node hasn't checked in fully yet, so just ignore it
|
||||
if osd_node[osd] == '|':
|
||||
continue
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
_osd_node_length = len(osd_node[osd]) + 1
|
||||
if _osd_node_length > osd_node_length:
|
||||
osd_node_length = _osd_node_length
|
||||
|
@ -247,20 +255,51 @@ def get_status(zk_conn):
|
|||
return True, ''
|
||||
|
||||
def add_osd(zk_conn, node, device):
|
||||
# Verify the target node exists
|
||||
if not common.verifyNode(zk_conn, node):
|
||||
return False, 'ERROR: No node named "{}" is present in the cluster.'.format(node)
|
||||
|
||||
# Tell the cluster to create a new OSD for the host
|
||||
add_osd_string = 'add {},{}'.format(node, device)
|
||||
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': add_osd_string})
|
||||
click.echo('Created new OSD with block device {} on node {}.'.format(device, node))
|
||||
return True, ''
|
||||
# Wait 1/2 second for the cluster to get the message and start working
|
||||
time.sleep(0.5)
|
||||
# Acquire a read lock, so we get the return exclusively
|
||||
lock = zkhandler.readlock(zk_conn, '/ceph/osd_cmd')
|
||||
with lock:
|
||||
result = zkhandler.readdata(zk_conn, '/ceph/osd_cmd').split()[0]
|
||||
if result == 'success-add':
|
||||
success = True
|
||||
else:
|
||||
success = False
|
||||
|
||||
if success:
|
||||
return True, 'Created new OSD with block device {} on node {}.'.format(device, node)
|
||||
else:
|
||||
return False, 'Failed to create new OSD; check node logs for details.'
|
||||
|
||||
def remove_osd(zk_conn, osd_id):
|
||||
if not common.verifyOSD(zk_conn, osd_id):
|
||||
return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(osd_id)
|
||||
|
||||
# Tell the cluster to remove an OSD
|
||||
remove_osd_string = 'remove {}'.format(osd_id)
|
||||
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': remove_osd_string})
|
||||
click.echo('Removed OSD with ID {} from the cluster.'.format(osd_id))
|
||||
return True, ''
|
||||
# Wait 1/2 second for the cluster to get the message and start working
|
||||
time.sleep(0.5)
|
||||
# Acquire a read lock, so we get the return exclusively
|
||||
lock = zkhandler.readlock(zk_conn, '/ceph/osd_cmd')
|
||||
with lock:
|
||||
result = zkhandler.readdata(zk_conn, '/ceph/osd_cmd').split()[0]
|
||||
if result == 'success-remove':
|
||||
success = True
|
||||
else:
|
||||
success = False
|
||||
|
||||
if success:
|
||||
return True, 'Removed OSD {} from the cluster.'.format(osd_id)
|
||||
else:
|
||||
return False, 'Failed to remove OSD; check node logs for details.'
|
||||
|
||||
def get_list_osd(zk_conn, limit):
|
||||
osd_list = []
|
||||
|
|
|
@ -175,6 +175,15 @@ def verifyNode(zk_conn, node):
|
|||
else:
|
||||
return False
|
||||
|
||||
#
|
||||
# Verify OSD is valid in cluster
|
||||
#
|
||||
def verifyOSD(zk_conn, osd_id):
|
||||
if zkhandler.exists(zk_conn, '/ceph/osds/{}'.format(osd_id)):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
#
|
||||
# Get the primary coordinator node
|
||||
#
|
||||
|
|
|
@ -21,6 +21,8 @@
|
|||
###############################################################################
|
||||
|
||||
import kazoo.client
|
||||
import uuid
|
||||
|
||||
import client_lib.ansiprint as ansiprint
|
||||
|
||||
# Exists function
|
||||
|
@ -38,10 +40,7 @@ def listchildren(zk_conn, key):
|
|||
|
||||
# Delete key function
|
||||
def deletekey(zk_conn, key, recursive=True):
|
||||
try:
|
||||
zk_conn.delete(key, recursive=recursive)
|
||||
except:
|
||||
pass
|
||||
zk_conn.delete(key, recursive=recursive)
|
||||
|
||||
# Data read function
|
||||
def readdata(zk_conn, key):
|
||||
|
@ -88,3 +87,14 @@ def writedata(zk_conn, kv):
|
|||
except Exception:
|
||||
return False
|
||||
|
||||
# Write lock function
|
||||
def writelock(zk_conn, key):
|
||||
lock_id = str(uuid.uuid1())
|
||||
lock = zk_conn.WriteLock('{}'.format(key), lock_id)
|
||||
return lock
|
||||
|
||||
# Read lock function
|
||||
def readlock(zk_conn, key):
|
||||
lock_id = str(uuid.uuid1())
|
||||
lock = zk_conn.ReadLock('{}'.format(key), lock_id)
|
||||
return lock
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
|
||||
import time
|
||||
import ast
|
||||
import json
|
||||
import psutil
|
||||
|
||||
import pvcd.log as log
|
||||
import pvcd.zkhandler as zkhandler
|
||||
|
@ -53,7 +55,7 @@ class CephOSDInstance(object):
|
|||
except AttributeError:
|
||||
data = ''
|
||||
|
||||
if data != self.node:
|
||||
if data and data != self.node:
|
||||
self.node = data
|
||||
|
||||
@self.zk_conn.DataWatch('/ceph/osds/{}/stats'.format(self.osd_id))
|
||||
|
@ -68,8 +70,8 @@ class CephOSDInstance(object):
|
|||
except AttributeError:
|
||||
data = ''
|
||||
|
||||
if data != self.stats:
|
||||
self.stats = dict(ast.literal_eval(data))
|
||||
if data and data != self.stats:
|
||||
self.stats = json.loads(data)
|
||||
|
||||
def add_osd(zk_conn, logger, node, device):
|
||||
# We are ready to create a new OSD on this node
|
||||
|
@ -77,7 +79,8 @@ def add_osd(zk_conn, logger, node, device):
|
|||
try:
|
||||
# 1. Create an OSD; we do this so we know what ID will be gen'd
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd create')
|
||||
if retcode != 0:
|
||||
if retcode:
|
||||
print('ceph osd create')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
|
@ -85,7 +88,8 @@ def add_osd(zk_conn, logger, node, device):
|
|||
|
||||
# 2. Remove that newly-created OSD
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd rm {}'.format(osd_id))
|
||||
if retcode != 0:
|
||||
if retcode:
|
||||
print('ceph osd rm')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
|
@ -97,7 +101,8 @@ def add_osd(zk_conn, logger, node, device):
|
|||
device=device
|
||||
)
|
||||
)
|
||||
if retcode != 0:
|
||||
if retcode:
|
||||
print('ceph-volume lvm prepare')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
|
@ -108,7 +113,8 @@ def add_osd(zk_conn, logger, node, device):
|
|||
osdid=osd_id
|
||||
)
|
||||
)
|
||||
if retcode != 0:
|
||||
if retcode:
|
||||
print('ceph-volume lvm activate')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
|
@ -120,7 +126,8 @@ def add_osd(zk_conn, logger, node, device):
|
|||
node=node
|
||||
)
|
||||
)
|
||||
if retcode != 0:
|
||||
if retcode:
|
||||
print('ceph osd crush add')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
|
@ -132,7 +139,8 @@ def add_osd(zk_conn, logger, node, device):
|
|||
osdid=osd_id
|
||||
)
|
||||
)
|
||||
if retcode != 0:
|
||||
if retcode:
|
||||
print('systemctl status')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
|
@ -141,15 +149,16 @@ def add_osd(zk_conn, logger, node, device):
|
|||
zkhandler.writedata(zk_conn, {
|
||||
'/ceph/osds/{}'.format(osd_id): '',
|
||||
'/ceph/osds/{}/node'.format(osd_id): node,
|
||||
'/ceph/osds/{}/size'.format(osd_id): '',
|
||||
'/ceph/osds/{}/stats'.format(osd_id): '{}'
|
||||
})
|
||||
|
||||
# Log it
|
||||
logger.out('Created new OSD disk with ID {}'.format(osd_id), state='o')
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out('Failed to create new OSD disk: {}'.format(e), state='e')
|
||||
return False
|
||||
|
||||
def remove_osd(zk_conn, logger, osd_id, osd_obj):
|
||||
logger.out('Removing OSD disk {}'.format(osd_id), state='i')
|
||||
|
@ -163,16 +172,78 @@ def remove_osd(zk_conn, logger, osd_id, osd_obj):
|
|||
|
||||
# 1. Set the OSD out so it will flush
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd out {}'.format(osd_id))
|
||||
if retcode != 0:
|
||||
if retcode:
|
||||
print('ceph osd out')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
|
||||
# 2. Wait for the OSD to flush
|
||||
osd_string = str()
|
||||
while True:
|
||||
retcode, stdout, stderr = common.run_os_command('ceph health')
|
||||
health_string = stdout
|
||||
except:
|
||||
pass
|
||||
retcode, stdout, stderr = common.run_os_command('ceph pg dump osds --format json')
|
||||
dump_string = json.loads(stdout)
|
||||
for osd in dump_string:
|
||||
if str(osd['osd']) == osd_id:
|
||||
osd_string = osd
|
||||
print(osd_string)
|
||||
num_pgs = osd_string['num_pgs']
|
||||
if num_pgs > 0:
|
||||
time.sleep(5)
|
||||
else:
|
||||
break
|
||||
|
||||
# 3. Stop the OSD process and wait for it to be terminated
|
||||
retcode, stdout, stderr = common.run_os_command('systemctl stop ceph-osd@{}'.format(osd_id))
|
||||
if retcode:
|
||||
print('systemctl stop')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
|
||||
# FIXME: There has to be a better way to do this /shrug
|
||||
while True:
|
||||
is_osd_up = False
|
||||
# Find if there is a process named ceph-osd with arg '--id {id}'
|
||||
for p in psutil.process_iter(attrs=['name', 'cmdline']):
|
||||
if 'ceph-osd' == p.info['name'] and '--id {}'.format(osd_id) in ' '.join(p.info['cmdline']):
|
||||
is_osd_up = True
|
||||
# If there isn't, continue
|
||||
if not is_osd_up:
|
||||
break
|
||||
|
||||
# 4. Delete OSD from ZK
|
||||
zkhandler.deletekey(zk_conn, '/ceph/osds/{}'.format(osd_id))
|
||||
|
||||
# 5. Determine the block devices
|
||||
retcode, stdout, stderr = common.run_os_command('readlink /var/lib/ceph/osd/ceph-{}/block'.format(osd_id))
|
||||
vg_name = stdout.split('/')[-2] # e.g. /dev/ceph-<uuid>/osd-block-<uuid>
|
||||
retcode, stdout, stderr = common.run_os_command('vgs --separator , --noheadings -o pv_name {}'.format(vg_name))
|
||||
pv_block = stdout
|
||||
|
||||
# 6. Zap the volumes
|
||||
retcode, stdout, stderr = common.run_os_command('ceph-volume lvm zap --destroy {}'.format(pv_block))
|
||||
if retcode:
|
||||
print('ceph-volume lvm zap')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
|
||||
# 7. Purge the OSD from Ceph
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd purge {} --yes-i-really-mean-it'.format(osd_id))
|
||||
if retcode:
|
||||
print('ceph osd purge')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
|
||||
# Log it
|
||||
logger.out('Purged OSD disk with ID {}'.format(osd_id), state='o')
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out('Failed to purge OSD disk with ID {}: {}'.format(osd_id, e), state='e')
|
||||
return False
|
||||
|
||||
class CephPool(object):
|
||||
def __init__(self):
|
||||
|
|
|
@ -308,7 +308,7 @@ def zk_listener(state):
|
|||
|
||||
# Start keepalive thread
|
||||
if update_timer:
|
||||
update_timer = createKeepaliveTimer()
|
||||
update_timer = startKeepaliveTimer()
|
||||
else:
|
||||
pass
|
||||
zk_conn.add_listener(zk_listener)
|
||||
|
@ -669,23 +669,38 @@ def osd_cmd(data, stat, event=''):
|
|||
if command == 'add':
|
||||
node, device = args.split(',')
|
||||
if node == this_node.name:
|
||||
# Clean up the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': ''})
|
||||
# Add the OSD
|
||||
CephInstance.add_osd(zk_conn, logger, node, device)
|
||||
# Lock the command queue
|
||||
lock = zkhandler.writelock(zk_conn, '/ceph/osd_cmd')
|
||||
with lock:
|
||||
# Add the OSD
|
||||
result = CephInstance.add_osd(zk_conn, logger, node, device)
|
||||
# Command succeeded
|
||||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'success-{}'.format(data)})
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'failure-{}'.format(data)})
|
||||
# Removing an OSD
|
||||
elif command == 'remove':
|
||||
osd_id = args
|
||||
|
||||
# Verify osd_id is in the list
|
||||
if not d_osd[osd_id]:
|
||||
return True
|
||||
|
||||
if d_osd[osd_id].node == this_node.name:
|
||||
# Clean up the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': ''})
|
||||
# Remove the OSD
|
||||
CephInstance.remove_osd(zk_conn, logger, osd_id, d_osd[osd_id])
|
||||
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
|
||||
# Lock the command queue
|
||||
lock = zkhandler.writelock(zk_conn, '/ceph/osd_cmd')
|
||||
with lock:
|
||||
# Remove the OSD
|
||||
result = CephInstance.remove_osd(zk_conn, logger, osd_id, d_osd[osd_id])
|
||||
# Command succeeded
|
||||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'success-{}'.format(data)})
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'failure-{}'.format(data)})
|
||||
|
||||
# OSD objects
|
||||
@zk_conn.ChildrenWatch('/ceph/osds')
|
||||
|
@ -784,7 +799,6 @@ def update_zookeeper():
|
|||
rd_data = line[15]
|
||||
state = line[17]
|
||||
osd_status.update({
|
||||
# osd_stats.update({
|
||||
str(osd_id): {
|
||||
'node': node,
|
||||
'used': used,
|
||||
|
@ -808,7 +822,7 @@ def update_zookeeper():
|
|||
for osd in osd_list:
|
||||
if d_osd[osd].node == myhostname:
|
||||
zkhandler.writedata(zk_conn, {
|
||||
'/ceph/osds/{}/stats'.format(osd): str(osd_stats[osd])
|
||||
'/ceph/osds/{}/stats'.format(osd): str(json.dumps(osd_stats[osd]))
|
||||
})
|
||||
osds_this_node += 1
|
||||
|
||||
|
|
|
@ -21,7 +21,9 @@
|
|||
###############################################################################
|
||||
|
||||
import kazoo.client
|
||||
import pvcd.log as log
|
||||
import uuid
|
||||
|
||||
#import pvcd.log as log
|
||||
|
||||
# Child list function
|
||||
def listchildren(zk_conn, key):
|
||||
|
@ -29,8 +31,8 @@ def listchildren(zk_conn, key):
|
|||
return children
|
||||
|
||||
# Key deletion function
|
||||
def delete(zk_conn, key):
|
||||
zk_conn.delete(key, recursive=True)
|
||||
def deletekey(zk_conn, key, recursive=True):
|
||||
zk_conn.delete(key, recursive=recursive)
|
||||
|
||||
# Data read function
|
||||
def readdata(zk_conn, key):
|
||||
|
@ -53,7 +55,7 @@ def writedata(zk_conn, kv):
|
|||
# Check if this key already exists or not
|
||||
if not zk_conn.exists(key):
|
||||
# We're creating a new key
|
||||
zk_transaction.create(key, data.encode('ascii'))
|
||||
zk_transaction.create(key, str(data).encode('ascii'))
|
||||
else:
|
||||
# We're updating a key with version validation
|
||||
orig_data = zk_conn.get(key)
|
||||
|
@ -63,7 +65,7 @@ def writedata(zk_conn, kv):
|
|||
new_version = version + 1
|
||||
|
||||
# Update the data
|
||||
zk_transaction.set_data(key, data.encode('ascii'))
|
||||
zk_transaction.set_data(key, str(data).encode('ascii'))
|
||||
|
||||
# Set up the check
|
||||
try:
|
||||
|
@ -79,3 +81,14 @@ def writedata(zk_conn, kv):
|
|||
except Exception:
|
||||
return False
|
||||
|
||||
# Write lock function
|
||||
def writelock(zk_conn, key):
|
||||
lock_id = str(uuid.uuid1())
|
||||
lock = zk_conn.WriteLock('{}'.format(key), lock_id)
|
||||
return lock
|
||||
|
||||
# Read lock function
|
||||
def readlock(zk_conn, key):
|
||||
lock_id = str(uuid.uuid1())
|
||||
lock = zk_conn.ReadLock('{}'.format(key), lock_id)
|
||||
return lock
|
||||
|
|
Loading…
Reference in New Issue