Support adding and removing Ceph pools
This commit is contained in:
@ -9,7 +9,7 @@ WorkingDirectory = /usr/share/pvc
|
||||
Environment = PYTHONUNBUFFERED=true
|
||||
Environment = PVCD_CONFIG_FILE=/etc/pvc/pvcd.conf
|
||||
ExecStart = /usr/share/pvc/pvcd.py
|
||||
Restart = on-failure
|
||||
Restart = never
|
||||
|
||||
[Install]
|
||||
WantedBy = multi-user.target
|
||||
|
@ -30,10 +30,6 @@ import pvcd.zkhandler as zkhandler
|
||||
import pvcd.fencing as fencing
|
||||
import pvcd.common as common
|
||||
|
||||
class CephInstance(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
class CephOSDInstance(object):
|
||||
def __init__(self, zk_conn, this_node, osd_id):
|
||||
self.zk_conn = zk_conn
|
||||
@ -149,6 +145,7 @@ def add_osd(zk_conn, logger, node, device):
|
||||
zkhandler.writedata(zk_conn, {
|
||||
'/ceph/osds/{}'.format(osd_id): '',
|
||||
'/ceph/osds/{}/node'.format(osd_id): node,
|
||||
'/ceph/osds/{}/device'.format(osd_id): device,
|
||||
'/ceph/osds/{}/stats'.format(osd_id): '{}'
|
||||
})
|
||||
|
||||
@ -186,7 +183,6 @@ def remove_osd(zk_conn, logger, osd_id, osd_obj):
|
||||
for osd in dump_string:
|
||||
if str(osd['osd']) == osd_id:
|
||||
osd_string = osd
|
||||
print(osd_string)
|
||||
num_pgs = osd_string['num_pgs']
|
||||
if num_pgs > 0:
|
||||
time.sleep(5)
|
||||
@ -245,6 +241,99 @@ def remove_osd(zk_conn, logger, osd_id, osd_obj):
|
||||
logger.out('Failed to purge OSD disk with ID {}: {}'.format(osd_id, e), state='e')
|
||||
return False
|
||||
|
||||
class CephPool(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
class CephPoolInstance(object):
|
||||
def __init__(self, zk_conn, this_node, name):
|
||||
self.zk_conn = zk_conn
|
||||
self.this_node = this_node
|
||||
self.name = name
|
||||
self.pgs = ''
|
||||
self.stats = dict()
|
||||
|
||||
@self.zk_conn.DataWatch('/ceph/pools/{}/pgs'.format(self.name))
|
||||
def watch_pool_node(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
except AttributeError:
|
||||
data = ''
|
||||
|
||||
if data and data != self.pgs:
|
||||
self.pgs = data
|
||||
|
||||
@self.zk_conn.DataWatch('/ceph/pools/{}/stats'.format(self.name))
|
||||
def watch_pool_stats(data, stat, event=''):
|
||||
if event and event.type == 'DELETED':
|
||||
# The key has been deleted after existing before; terminate this watcher
|
||||
# because this class instance is about to be reaped in Daemon.py
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('ascii')
|
||||
except AttributeError:
|
||||
data = ''
|
||||
|
||||
if data and data != self.stats:
|
||||
self.stats = json.loads(data)
|
||||
|
||||
def add_pool(zk_conn, logger, name, pgs):
|
||||
# We are ready to create a new pool on this node
|
||||
logger.out('Creating new RBD pool {}'.format(name), state='i')
|
||||
try:
|
||||
# 1. Create the pool
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd pool create {} {} replicated'.format(name, pgs))
|
||||
if retcode:
|
||||
print('ceph osd pool create')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
|
||||
# 2. Enable RBD application
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd pool application enable {} rbd'.format(name))
|
||||
if retcode:
|
||||
print('ceph osd pool application enable')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
|
||||
# 3. Add the new pool to ZK
|
||||
zkhandler.writedata(zk_conn, {
|
||||
'/ceph/pools/{}'.format(name): '',
|
||||
'/ceph/pools/{}/pgs'.format(name): pgs,
|
||||
'/ceph/pools/{}/stats'.format(name): '{}'
|
||||
})
|
||||
|
||||
# Log it
|
||||
logger.out('Created new RBD pool {}'.format(name), state='o')
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out('Failed to create new RBD pool {}: {}'.format(name, e), state='e')
|
||||
return False
|
||||
|
||||
def remove_pool(zk_conn, logger, name):
|
||||
# We are ready to create a new pool on this node
|
||||
logger.out('Removing RBD pool {}'.format(name), state='i')
|
||||
try:
|
||||
# Delete pool from ZK
|
||||
zkhandler.deletekey(zk_conn, '/ceph/pools/{}'.format(name))
|
||||
|
||||
# Remove the pool
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd pool rm {pool} {pool} --yes-i-really-really-mean-it'.format(pool=name))
|
||||
if retcode:
|
||||
print('ceph osd pool rm')
|
||||
print(stdout)
|
||||
print(stderr)
|
||||
raise
|
||||
|
||||
# Log it
|
||||
logger.out('Removed RBD pool {}'.format(name), state='o')
|
||||
return True
|
||||
except Exception as e:
|
||||
# Log it
|
||||
logger.out('Failed to remove RBD pool {}: {}'.format(name, e), state='e')
|
||||
return False
|
||||
|
||||
|
@ -525,10 +525,12 @@ d_node = dict()
|
||||
d_network = dict()
|
||||
d_domain = dict()
|
||||
d_osd = dict()
|
||||
d_pool = dict()
|
||||
node_list = []
|
||||
network_list = []
|
||||
domain_list = []
|
||||
osd_list = []
|
||||
pool_list = []
|
||||
|
||||
# Create an instance of the DNS Aggregator if we're a coordinator
|
||||
if config['daemon_mode'] == 'coordinator':
|
||||
@ -654,8 +656,8 @@ def update_domains(new_domain_list):
|
||||
d_node[node].update_domain_list(d_domain)
|
||||
|
||||
# Ceph OSD provisioning key
|
||||
@zk_conn.DataWatch('/ceph/osd_cmd')
|
||||
def osd_cmd(data, stat, event=''):
|
||||
@zk_conn.DataWatch('/ceph/cmd')
|
||||
def cmd(data, stat, event=''):
|
||||
if data:
|
||||
data = data.decode('ascii')
|
||||
else:
|
||||
@ -666,41 +668,84 @@ def osd_cmd(data, stat, event=''):
|
||||
command, args = data.split()
|
||||
|
||||
# Adding a new OSD
|
||||
if command == 'add':
|
||||
if command == 'osd_add':
|
||||
node, device = args.split(',')
|
||||
if node == this_node.name:
|
||||
# Lock the command queue
|
||||
lock = zkhandler.writelock(zk_conn, '/ceph/osd_cmd')
|
||||
lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
|
||||
with lock:
|
||||
# Add the OSD
|
||||
result = CephInstance.add_osd(zk_conn, logger, node, device)
|
||||
# Command succeeded
|
||||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'success-{}'.format(data)})
|
||||
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'failure-{}'.format(data)})
|
||||
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
|
||||
# Wait 0.5 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(0.5)
|
||||
# Removing an OSD
|
||||
elif command == 'remove':
|
||||
elif command == 'osd_remove':
|
||||
osd_id = args
|
||||
|
||||
# Verify osd_id is in the list
|
||||
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
|
||||
# Lock the command queue
|
||||
lock = zkhandler.writelock(zk_conn, '/ceph/osd_cmd')
|
||||
lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
|
||||
with lock:
|
||||
# Remove the OSD
|
||||
result = CephInstance.remove_osd(zk_conn, logger, osd_id, d_osd[osd_id])
|
||||
# Command succeeded
|
||||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'success-{}'.format(data)})
|
||||
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'failure-{}'.format(data)})
|
||||
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
|
||||
# Wait 0.5 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(0.5)
|
||||
# Adding a new pool
|
||||
if command == 'pool_add':
|
||||
name, pgs = args.split(',')
|
||||
if this_node.router_state == 'primary':
|
||||
# Lock the command queue
|
||||
lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
|
||||
with lock:
|
||||
# Add the pool
|
||||
result = CephInstance.add_pool(zk_conn, logger, name, pgs)
|
||||
# Command succeeded
|
||||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
|
||||
# Wait 0.5 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(0.5)
|
||||
# Removing a pool
|
||||
elif command == 'pool_remove':
|
||||
name = args
|
||||
|
||||
if this_node.router_state == 'primary':
|
||||
# Lock the command queue
|
||||
lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
|
||||
with lock:
|
||||
# Remove the pool
|
||||
result = CephInstance.remove_pool(zk_conn, logger, name)
|
||||
# Command succeeded
|
||||
if result:
|
||||
# Update the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
|
||||
# Command failed
|
||||
else:
|
||||
# Update the command queue
|
||||
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
|
||||
# Wait 0.5 seconds before we free the lock, to ensure the client hits the lock
|
||||
time.sleep(0.5)
|
||||
|
||||
# OSD objects
|
||||
@zk_conn.ChildrenWatch('/ceph/osds')
|
||||
@ -722,6 +767,26 @@ def update_osds(new_osd_list):
|
||||
osd_list = new_osd_list
|
||||
logger.out('{}OSD list:{} {}'.format(logger.fmt_blue, logger.fmt_end, ' '.join(osd_list)), state='i')
|
||||
|
||||
# Pool objects
|
||||
@zk_conn.ChildrenWatch('/ceph/pools')
|
||||
def update_pools(new_pool_list):
|
||||
global pool_list, d_pool
|
||||
|
||||
# Add any missing Pools to the list
|
||||
for pool in new_pool_list:
|
||||
if not pool in pool_list:
|
||||
d_pool[pool] = CephInstance.CephPoolInstance(zk_conn, this_node, pool)
|
||||
|
||||
# Remove any deleted Pools from the list
|
||||
for pool in pool_list:
|
||||
if not pool in new_pool_list:
|
||||
# Delete the object
|
||||
del(d_pool[pool])
|
||||
|
||||
# Update and print new list
|
||||
pool_list = new_pool_list
|
||||
logger.out('{}Pool list:{} {}'.format(logger.fmt_blue, logger.fmt_end, ' '.join(pool_list)), state='i')
|
||||
|
||||
###############################################################################
|
||||
# PHASE 9 - Run the daemon
|
||||
###############################################################################
|
||||
@ -764,6 +829,36 @@ def update_zookeeper():
|
||||
logger.out('Failed to set Ceph status data', state='e')
|
||||
return
|
||||
|
||||
# Set pool information in zookeeper (primary only)
|
||||
if this_node.router_state == 'primary':
|
||||
# Get pool info
|
||||
pool_df = dict()
|
||||
retcode, stdout, stderr = common.run_os_command('rados df --format json')
|
||||
pool_df_raw = json.loads(stdout)['pools']
|
||||
for pool in pool_df_raw:
|
||||
pool_df.update({
|
||||
str(pool['name']): {
|
||||
'id': pool['id'],
|
||||
'size_bytes': pool['size_bytes'],
|
||||
'num_objects': pool['num_objects'],
|
||||
'num_object_clones': pool['num_object_clones'],
|
||||
'num_object_copies': pool['num_object_copies'],
|
||||
'num_objects_missing_on_primary': pool['num_objects_missing_on_primary'],
|
||||
'num_objects_unfound': pool['num_objects_unfound'],
|
||||
'num_objects_degraded': pool['num_objects_degraded'],
|
||||
'read_ops': pool['read_ops'],
|
||||
'read_bytes': pool['read_bytes'],
|
||||
'write_ops': pool['write_ops'],
|
||||
'write_bytes': pool['write_bytes']
|
||||
}
|
||||
})
|
||||
|
||||
# Trigger updates for each OSD on this node
|
||||
for pool in pool_list:
|
||||
zkhandler.writedata(zk_conn, {
|
||||
'/ceph/pools/{}/stats'.format(pool): str(json.dumps(pool_df[pool]))
|
||||
})
|
||||
|
||||
# Get data from Ceph OSDs
|
||||
# Parse the dump data
|
||||
osd_dump = dict()
|
||||
@ -779,6 +874,18 @@ def update_zookeeper():
|
||||
'primary_affinity': osd['primary_affinity']
|
||||
}
|
||||
})
|
||||
# Parse the df data
|
||||
osd_df = dict()
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd df --format json')
|
||||
osd_df_raw = json.loads(stdout)['nodes']
|
||||
for osd in osd_df_raw:
|
||||
osd_df.update({
|
||||
str(osd['id']): {
|
||||
'utilization': osd['utilization'],
|
||||
'var': osd['var'],
|
||||
'pgs': osd['pgs'],
|
||||
}
|
||||
})
|
||||
# Parse the status data
|
||||
osd_status = dict()
|
||||
retcode, stdout, stderr = common.run_os_command('ceph osd status')
|
||||
@ -814,6 +921,7 @@ def update_zookeeper():
|
||||
osd_stats = dict()
|
||||
for osd in osd_list:
|
||||
this_dump = osd_dump[osd]
|
||||
this_dump.update(osd_df[osd])
|
||||
this_dump.update(osd_status[osd])
|
||||
osd_stats[osd] = this_dump
|
||||
|
||||
@ -938,13 +1046,15 @@ def update_zookeeper():
|
||||
logger.out(
|
||||
'{bold}Ceph cluster status:{nofmt} {health_colour}{health}{nofmt} '
|
||||
'{bold}Total OSDs:{nofmt} {total_osds} '
|
||||
'{bold}Node OSDs:{nofmt} {node_osds}'.format(
|
||||
'{bold}Node OSDs:{nofmt} {node_osds} '
|
||||
'{bold}Pools:{nofmt} {total_pools} '.format(
|
||||
bold=logger.fmt_bold,
|
||||
health_colour=ceph_health_colour,
|
||||
nofmt=logger.fmt_end,
|
||||
health=ceph_health,
|
||||
total_osds=len(osd_list),
|
||||
node_osds=osds_this_node
|
||||
node_osds=osds_this_node,
|
||||
total_pools=len(pool_list)
|
||||
),
|
||||
)
|
||||
|
||||
|
Reference in New Issue
Block a user