From 99fcb21e3b65873adb3b9e3cd3e76dc239048093 Mon Sep 17 00:00:00 2001 From: Joshua Boniface Date: Wed, 31 Oct 2018 23:38:17 -0400 Subject: [PATCH] Support adding and removing Ceph pools --- client-cli/pvc.py | 65 ++++- client-common/ceph.py | 430 ++++++++++++++++++++++++++++--- client-common/common.py | 9 - node-daemon/pvcd.service | 2 +- node-daemon/pvcd/CephInstance.py | 105 +++++++- node-daemon/pvcd/Daemon.py | 134 +++++++++- 6 files changed, 681 insertions(+), 64 deletions(-) diff --git a/client-cli/pvc.py b/client-cli/pvc.py index c2e6725d..41517403 100755 --- a/client-cli/pvc.py +++ b/client-cli/pvc.py @@ -912,6 +912,8 @@ def net_acl_list(net, limit, direction): def cli_ceph(): """ Manage the Ceph storage of the PVC cluster. + + NOTE: The PVC Ceph interface is limited to the most common tasks. Any other administrative tasks must be performed on a node directly. """ pass @@ -999,6 +1001,61 @@ def ceph_pool(): """ pass +############################################################################### +# pvc ceph pool add +############################################################################### +@click.command(name='add', short_help='Add new RBD pool.') +@click.argument( + 'name' +) +@click.argument( + 'pgs' +) +def ceph_pool_add(name, pgs): + """ + Add a new Ceph RBD pool with name NAME and PGS placement groups. + """ + + zk_conn = pvc_common.startZKConnection(zk_host) + retcode, retmsg = pvc_ceph.add_pool(zk_conn, name, pgs) + cleanup(retcode, retmsg, zk_conn) + +############################################################################### +# pvc ceph pool remove +############################################################################### +@click.command(name='remove', short_help='Remove RBD pool.') +@click.argument( + 'name' +) +@click.option('--yes', is_flag=True, + expose_value=False, + prompt='DANGER: This command will destroy this pool and all volumes. Do you want to continue?' +) +def ceph_pool_remove(name): + """ + Remove a Ceph RBD pool with name NAME and all volumes on it. + """ + + zk_conn = pvc_common.startZKConnection(zk_host) + retcode, retmsg = pvc_ceph.remove_pool(zk_conn, name) + cleanup(retcode, retmsg, zk_conn) + +############################################################################### +# pvc ceph pool list +############################################################################### +@click.command(name='list', short_help='List cluster RBD pools.') +@click.argument( + 'limit', default=None, required=False +) +def ceph_pool_list(limit): + """ + List all Ceph RBD pools in the cluster; optinally only match elements matching name regex LIMIT. + """ + + zk_conn = pvc_common.startZKConnection(zk_host) + retcode, retmsg = pvc_ceph.get_list_pool(zk_conn, limit) + cleanup(retcode, retmsg, zk_conn) + ############################################################################### # pvc init @@ -1134,13 +1191,11 @@ ceph_osd.add_command(ceph_osd_remove) #ceph_osd.add_command(ceph_osd_out) #ceph_osd.add_command(ceph_osd_set) #ceph_osd.add_command(ceph_osd_unset) -#ceph_osd.add_command(ceph_osd_info) ceph_osd.add_command(ceph_osd_list) -#ceph_pool.add_command(ceph_pool_add) -#ceph_pool.add_command(ceph_pool_remove) -#ceph_pool.add_command(ceph_pool_info) -#ceph_pool.add_command(ceph_pool_list) +ceph_pool.add_command(ceph_pool_add) +ceph_pool.add_command(ceph_pool_remove) +ceph_pool.add_command(ceph_pool_list) cli_ceph.add_command(ceph_status) cli_ceph.add_command(ceph_osd) diff --git a/client-common/ceph.py b/client-common/ceph.py index a214af43..a5cbdcb3 100644 --- a/client-common/ceph.py +++ b/client-common/ceph.py @@ -22,8 +22,9 @@ import re import click -import ast +import json import time +import math import client_lib.ansiprint as ansiprint import client_lib.zkhandler as zkhandler @@ -33,6 +34,22 @@ import client_lib.common as common # Supplemental functions # +# Verify OSD is valid in cluster +def verifyOSD(zk_conn, osd_id): + if zkhandler.exists(zk_conn, '/ceph/osds/{}'.format(osd_id)): + return True + else: + return False + +# Verify OSD path is valid in cluster +def verifyOSDBlock(zk_conn, node, device): + for osd in zkhandler.listchildren(zk_conn, '/ceph/osds'): + osd_node = zkhandler.readdata(zk_conn, '/ceph/osds/{}/node'.format(osd)) + osd_device = zkhandler.readdata(zk_conn, '/ceph/osds/{}/device'.format(osd)) + if node == osd_node and device == osd_device: + return osd + return None + # # Cluster search functions # @@ -44,7 +61,7 @@ def getClusterOSDList(zk_conn): def getOSDInformation(zk_conn, osd_id): # Parse the stats data osd_stats_raw = zkhandler.readdata(zk_conn, '/ceph/osds/{}/stats'.format(osd_id)) - osd_stats = dict(ast.literal_eval(osd_stats_raw)) + osd_stats = dict(json.loads(osd_stats_raw)) return osd_stats def getCephOSDs(zk_conn): @@ -53,15 +70,19 @@ def getCephOSDs(zk_conn): def formatOSDList(zk_conn, osd_list): osd_list_output = [] + osd_uuid = dict() osd_up = dict() osd_up_colour = dict() osd_in = dict() osd_in_colour = dict() osd_weight = dict() + osd_pgs = dict() osd_node = dict() osd_used = dict() osd_free = dict() + osd_util = dict() + osd_var= dict() osd_wrops = dict() osd_wrdata = dict() osd_rdops = dict() @@ -73,9 +94,12 @@ def formatOSDList(zk_conn, osd_list): osd_up_length = 4 osd_in_length = 4 osd_weight_length = 7 + osd_pgs_length = 4 osd_node_length = 5 osd_used_length = 5 osd_free_length = 6 + osd_util_length = 6 + osd_var_length = 6 osd_wrops_length = 4 osd_wrdata_length = 5 osd_rdops_length = 4 @@ -125,7 +149,13 @@ def formatOSDList(zk_conn, osd_list): if _osd_weight_length > osd_weight_length: osd_weight_length = _osd_weight_length - # Set the used/available space and length + # Set the pgs and length + osd_pgs[osd] = osd_stats['pgs'] + _osd_pgs_length = len(str(osd_pgs[osd])) + 1 + if _osd_pgs_length > osd_pgs_length: + osd_pgs_length = _osd_pgs_length + + # Set the used/available/utlization%/variance and lengths osd_used[osd] = osd_stats['used'] _osd_used_length = len(osd_used[osd]) + 1 if _osd_used_length > osd_used_length: @@ -134,6 +164,14 @@ def formatOSDList(zk_conn, osd_list): _osd_free_length = len(osd_free[osd]) + 1 if _osd_free_length > osd_free_length: osd_free_length = _osd_free_length + osd_util[osd] = round(osd_stats['utilization'], 2) + _osd_util_length = len(str(osd_util[osd])) + 1 + if _osd_util_length > osd_util_length: + osd_util_length = _osd_util_length + osd_var[osd] = round(osd_stats['var'], 2) + _osd_var_length = len(str(osd_var[osd])) + 1 + if _osd_var_length > osd_var_length: + osd_var_length = _osd_var_length # Set the write IOPS/data and length osd_wrops[osd] = osd_stats['wr_ops'] @@ -162,8 +200,11 @@ def formatOSDList(zk_conn, osd_list): {osd_up: <{osd_up_length}} \ {osd_in: <{osd_in_length}} \ {osd_weight: <{osd_weight_length}} \ +{osd_pgs: <{osd_pgs_length}} \ Space: {osd_used: <{osd_used_length}} \ {osd_free: <{osd_free_length}} \ +{osd_util: <{osd_util_length}} \ +{osd_var: <{osd_var_length}} \ Write: {osd_wrops: <{osd_wrops_length}} \ {osd_wrdata: <{osd_wrdata_length}} \ Read: {osd_rdops: <{osd_rdops_length}} \ @@ -176,8 +217,11 @@ Read: {osd_rdops: <{osd_rdops_length}} \ osd_up_length=osd_up_length, osd_in_length=osd_in_length, osd_weight_length=osd_weight_length, + osd_pgs_length=osd_pgs_length, osd_used_length=osd_used_length, osd_free_length=osd_free_length, + osd_util_length=osd_util_length, + osd_var_length=osd_var_length, osd_wrops_length=osd_wrops_length, osd_wrdata_length=osd_wrdata_length, osd_rdops_length=osd_rdops_length, @@ -187,8 +231,11 @@ Read: {osd_rdops: <{osd_rdops_length}} \ osd_up='Up', osd_in='In', osd_weight='Weight', + osd_pgs='PGs', osd_used='Used', osd_free='Free', + osd_util='Util%', + osd_var='Var', osd_wrops='OPS', osd_wrdata='Data', osd_rdops='OPS', @@ -203,8 +250,11 @@ Read: {osd_rdops: <{osd_rdops_length}} \ {osd_up_colour}{osd_up: <{osd_up_length}}{end_colour} \ {osd_in_colour}{osd_in: <{osd_in_length}}{end_colour} \ {osd_weight: <{osd_weight_length}} \ +{osd_pgs: <{osd_pgs_length}} \ {osd_used: <{osd_used_length}} \ {osd_free: <{osd_free_length}} \ +{osd_util: <{osd_util_length}} \ +{osd_var: <{osd_var_length}} \ {osd_wrops: <{osd_wrops_length}} \ {osd_wrdata: <{osd_wrdata_length}} \ {osd_rdops: <{osd_rdops_length}} \ @@ -218,8 +268,11 @@ Read: {osd_rdops: <{osd_rdops_length}} \ osd_up_length=osd_up_length, osd_in_length=osd_in_length, osd_weight_length=osd_weight_length, + osd_pgs_length=osd_pgs_length, osd_used_length=osd_used_length, osd_free_length=osd_free_length, + osd_util_length=osd_util_length, + osd_var_length=osd_var_length, osd_wrops_length=osd_wrops_length, osd_wrdata_length=osd_wrdata_length, osd_rdops_length=osd_rdops_length, @@ -231,8 +284,11 @@ Read: {osd_rdops: <{osd_rdops_length}} \ osd_in_colour=osd_in_colour[osd], osd_in=osd_in[osd], osd_weight=osd_weight[osd], + osd_pgs=osd_pgs[osd], osd_used=osd_used[osd], osd_free=osd_free[osd], + osd_util=osd_util[osd], + osd_var=osd_var[osd], osd_wrops=osd_wrops[osd], osd_wrdata=osd_wrdata[osd], osd_rdops=osd_rdops[osd], @@ -243,6 +299,232 @@ Read: {osd_rdops: <{osd_rdops_length}} \ output_string = osd_list_output_header + '\n' + '\n'.join(sorted(osd_list_output)) return output_string +def getClusterPoolList(zk_conn): + # Get a list of pools under /ceph/pools + pool_list = zkhandler.listchildren(zk_conn, '/ceph/pools') + return pool_list + +def getPoolInformation(zk_conn, name): + # Parse the stats data + pool_stats_raw = zkhandler.readdata(zk_conn, '/ceph/pools/{}/stats'.format(name)) + pool_stats = dict(json.loads(pool_stats_raw)) + # Deal with the size issues + size_matrix = { + 'b': 1, + 'K': 1024, + 'M': 1024*1024, + 'G': 1024*1024*1024, + 'T': 1024*1024*1024*1024, + 'P': 1024*1024*1024*1024*1024 + } + for datatype in 'size_bytes', 'read_bytes', 'write_bytes': + databytes = pool_stats[datatype] + databytes_formatted = '' + if databytes > 9999: + for unit in sorted(size_matrix, key=size_matrix.get, reverse=True): + new_bytes = int(math.ceil(databytes / size_matrix[unit])) + # Round up if 5 or more digits + if new_bytes > 9999: + # We can jump down another level + continue + else: + # We're at the end, display with this size + databytes_formatted = '{}{}'.format(new_bytes, unit) + else: + databytes_formatted = '{}B'.format(databytes) + new_name = datatype.replace('bytes', 'formatted') + pool_stats[new_name] = databytes_formatted + return pool_stats + +def getCephPools(zk_conn): + pool_list = zkhandler.listchildren(zk_conn, '/ceph/pools') + return pool_list + +def formatPoolList(zk_conn, pool_list): + pool_list_output = [] + + pool_id = dict() + pool_size = dict() + pool_num_objects = dict() + pool_num_clones = dict() + pool_num_copies = dict() + pool_num_degraded = dict() + pool_read_ops = dict() + pool_read_data = dict() + pool_write_ops = dict() + pool_write_data = dict() + + pool_name_length = 5 + pool_id_length = 3 + pool_size_length = 5 + pool_num_objects_length = 6 + pool_num_clones_length = 7 + pool_num_copies_length = 7 + pool_num_degraded_length = 9 + pool_read_ops_length = 4 + pool_read_data_length = 5 + pool_write_ops_length = 4 + pool_write_data_length = 5 + + for pool in pool_list: + # Set the Pool name length + _pool_name_length = len(pool) + 1 + if _pool_name_length > pool_name_length: + pool_name_length = _pool_name_length + + # Get stats + pool_stats = getPoolInformation(zk_conn, pool) + + # Set the parent node and length + try: + pool_id[pool] = pool_stats['id'] + # If this happens, the node hasn't checked in fully yet, so just ignore it + if not pool_id[pool]: + continue + except KeyError: + continue + + # Set the id and length + pool_id[pool] = pool_stats['id'] + _pool_id_length = len(str(pool_id[pool])) + 1 + if _pool_id_length > pool_id_length: + pool_id_length = _pool_id_length + + # Set the size and length + pool_size[pool] = pool_stats['size_formatted'] + _pool_size_length = len(str(pool_size[pool])) + 1 + if _pool_size_length > pool_size_length: + pool_size_length = _pool_size_length + + # Set the num_objects and length + pool_num_objects[pool] = pool_stats['num_objects'] + _pool_num_objects_length = len(str(pool_num_objects[pool])) + 1 + if _pool_num_objects_length > pool_num_objects_length: + pool_num_objects_length = _pool_num_objects_length + + # Set the num_clones and length + pool_num_clones[pool] = pool_stats['num_object_clones'] + _pool_num_clones_length = len(str(pool_num_clones[pool])) + 1 + if _pool_num_clones_length > pool_num_clones_length: + pool_num_clones_length = _pool_num_clones_length + + # Set the num_copies and length + pool_num_copies[pool] = pool_stats['num_object_copies'] + _pool_num_copies_length = len(str(pool_num_copies[pool])) + 1 + if _pool_num_copies_length > pool_num_copies_length: + pool_num_copies_length = _pool_num_copies_length + + # Set the num_degraded and length + pool_num_degraded[pool] = pool_stats['num_objects_degraded'] + _pool_num_degraded_length = len(str(pool_num_degraded[pool])) + 1 + if _pool_num_degraded_length > pool_num_degraded_length: + pool_num_degraded_length = _pool_num_degraded_length + + # Set the write IOPS/data and length + pool_write_ops[pool] = pool_stats['write_ops'] + _pool_write_ops_length = len(str(pool_write_ops[pool])) + 1 + if _pool_write_ops_length > pool_write_ops_length: + pool_write_ops_length = _pool_write_ops_length + pool_write_data[pool] = pool_stats['write_formatted'] + _pool_write_data_length = len(pool_write_data[pool]) + 1 + if _pool_write_data_length > pool_write_data_length: + pool_write_data_length = _pool_write_data_length + + # Set the read IOPS/data and length + pool_read_ops[pool] = pool_stats['read_ops'] + _pool_read_ops_length = len(str(pool_read_ops[pool])) + 1 + if _pool_read_ops_length > pool_read_ops_length: + pool_read_ops_length = _pool_read_ops_length + pool_read_data[pool] = pool_stats['read_formatted'] + _pool_read_data_length = len(pool_read_data[pool]) + 1 + if _pool_read_data_length > pool_read_data_length: + pool_read_data_length = _pool_read_data_length + + # Format the output header + pool_list_output_header = '{bold}\ +{pool_id: <{pool_id_length}} \ +{pool_name: <{pool_name_length}} \ +{pool_size: <{pool_size_length}} \ +Objects: {pool_objects: <{pool_objects_length}} \ +{pool_clones: <{pool_clones_length}} \ +{pool_copies: <{pool_copies_length}} \ +{pool_degraded: <{pool_degraded_length}} \ +Write: {pool_write_ops: <{pool_write_ops_length}} \ +{pool_write_data: <{pool_write_data_length}} \ +Read: {pool_read_ops: <{pool_read_ops_length}} \ +{pool_read_data: <{pool_read_data_length}} \ +{end_bold}'.format( + bold=ansiprint.bold(), + end_bold=ansiprint.end(), + pool_id_length=pool_id_length, + pool_name_length=pool_name_length, + pool_size_length=pool_size_length, + pool_objects_length=pool_num_objects_length, + pool_clones_length=pool_num_clones_length, + pool_copies_length=pool_num_copies_length, + pool_degraded_length=pool_num_degraded_length, + pool_write_ops_length=pool_write_ops_length, + pool_write_data_length=pool_write_data_length, + pool_read_ops_length=pool_read_ops_length, + pool_read_data_length=pool_read_data_length, + pool_id='ID', + pool_name='Name', + pool_size='Used', + pool_objects='Count', + pool_clones='Clones', + pool_copies='Copies', + pool_degraded='Degraded', + pool_write_ops='OPS', + pool_write_data='Data', + pool_read_ops='OPS', + pool_read_data='Data' + ) + + for pool in pool_list: + # Format the output header + pool_list_output.append('{bold}\ +{pool_id: <{pool_id_length}} \ +{pool_name: <{pool_name_length}} \ +{pool_size: <{pool_size_length}} \ + {pool_objects: <{pool_objects_length}} \ +{pool_clones: <{pool_clones_length}} \ +{pool_copies: <{pool_copies_length}} \ +{pool_degraded: <{pool_degraded_length}} \ + {pool_write_ops: <{pool_write_ops_length}} \ +{pool_write_data: <{pool_write_data_length}} \ + {pool_read_ops: <{pool_read_ops_length}} \ +{pool_read_data: <{pool_read_data_length}} \ +{end_bold}'.format( + bold=ansiprint.bold(), + end_bold=ansiprint.end(), + pool_id_length=pool_id_length, + pool_name_length=pool_name_length, + pool_size_length=pool_size_length, + pool_objects_length=pool_num_objects_length, + pool_clones_length=pool_num_clones_length, + pool_copies_length=pool_num_copies_length, + pool_degraded_length=pool_num_degraded_length, + pool_write_ops_length=pool_write_ops_length, + pool_write_data_length=pool_write_data_length, + pool_read_ops_length=pool_read_ops_length, + pool_read_data_length=pool_read_data_length, + pool_id=pool_id[pool], + pool_name=pool, + pool_size=pool_size[pool], + pool_objects=pool_num_objects[pool], + pool_clones=pool_num_clones[pool], + pool_copies=pool_num_copies[pool], + pool_degraded=pool_num_degraded[pool], + pool_write_ops=pool_write_ops[pool], + pool_write_data=pool_write_data[pool], + pool_read_ops=pool_read_ops[pool], + pool_read_data=pool_read_data[pool] + ) + ) + + output_string = pool_list_output_header + '\n' + '\n'.join(sorted(pool_list_output)) + return output_string + # # Direct functions # @@ -259,49 +541,60 @@ def add_osd(zk_conn, node, device): if not common.verifyNode(zk_conn, node): return False, 'ERROR: No node named "{}" is present in the cluster.'.format(node) + # Verify target block device isn't in use + block_osd = verifyOSDBlock(zk_conn, node, device) + if block_osd: + return False, 'ERROR: Block device {} on node {} is used by OSD {}'.format(device, node, block_osd) + # Tell the cluster to create a new OSD for the host - add_osd_string = 'add {},{}'.format(node, device) - zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': add_osd_string}) + add_osd_string = 'osd_add {},{}'.format(node, device) + zkhandler.writedata(zk_conn, {'/ceph/cmd': add_osd_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/osd_cmd') + lock = zkhandler.readlock(zk_conn, '/ceph/cmd') with lock: - result = zkhandler.readdata(zk_conn, '/ceph/osd_cmd').split()[0] - if result == 'success-add': - success = True - else: + try: + result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + if result == 'success-osd_add': + message = 'Created new OSD with block device {} on node {}.'.format(device, node) + success = True + else: + message = 'ERROR: Failed to create new OSD; check node logs for details.' + success = False + except: + message = 'ERROR: Command ignored by node.' success = False - zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': ''}) - - if success: - return True, 'Created new OSD with block device {} on node {}.'.format(device, node) - else: - return False, 'Failed to create new OSD; check node logs for details.' + + zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + return success, message def remove_osd(zk_conn, osd_id): - if not common.verifyOSD(zk_conn, osd_id): + if not verifyOSD(zk_conn, osd_id): return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(osd_id) # Tell the cluster to remove an OSD - remove_osd_string = 'remove {}'.format(osd_id) - zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': remove_osd_string}) + remove_osd_string = 'osd_remove {}'.format(osd_id) + zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_osd_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/osd_cmd') + lock = zkhandler.readlock(zk_conn, '/ceph/cmd') with lock: - result = zkhandler.readdata(zk_conn, '/ceph/osd_cmd').split()[0] - if result == 'success-remove': - success = True - else: + try: + result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + if result == 'success-osd_remove': + message = 'Removed OSD {} from the cluster.'.format(osd_id) + success = True + else: + message = 'ERROR: Failed to remove OSD; check node logs for details.' + success = False + except: success = False - zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': ''}) + message = 'ERROR Command ignored by node.' - if success: - return True, 'Removed OSD {} from the cluster.'.format(osd_id) - else: - return False, 'Failed to remove OSD; check node logs for details.' + zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + return success, message def get_list_osd(zk_conn, limit): osd_list = [] @@ -332,3 +625,82 @@ def get_list_osd(zk_conn, limit): click.echo(output_string) return True, '' + +def add_pool(zk_conn, name, pgs): + # Tell the cluster to create a new pool + add_pool_string = 'pool_add {},{}'.format(name, pgs) + zkhandler.writedata(zk_conn, {'/ceph/cmd': add_pool_string}) + # Wait 1/2 second for the cluster to get the message and start working + time.sleep(0.5) + # Acquire a read lock, so we get the return exclusively + lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + with lock: + try: + result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + if result == 'success-pool_add': + message = 'Created new RBD pool {} with {} PGs.'.format(name, pgs) + success = True + else: + message = 'ERROR: Failed to create new pool; check node logs for details.' + success = False + except: + message = 'ERROR: Command ignored by node.' + success = False + + zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + return success, message + +def remove_pool(zk_conn, name): + # Tell the cluster to create a new pool + remove_pool_string = 'pool_remove {}'.format(name) + zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_pool_string}) + # Wait 1/2 second for the cluster to get the message and start working + time.sleep(0.5) + # Acquire a read lock, so we get the return exclusively + lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + with lock: + try: + result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + if result == 'success-pool_remove': + message = 'Removed RBD pool {} and all volumes.'.format(name, pgs) + success = True + else: + message = 'ERROR: Failed to remove pool; check node logs for details.' + success = False + except: + message = 'ERROR: Command ignored by node.' + success = False + + zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + return success, message + +def get_list_pool(zk_conn, limit): + pool_list = [] + full_pool_list = getCephPools(zk_conn) + + if limit: + try: + # Implicitly assume fuzzy limits + if re.match('\^.*', limit) == None: + limit = '.*' + limit + if re.match('.*\$', limit) == None: + limit = limit + '.*' + except Exception as e: + return False, 'Regex Error: {}'.format(e) + + for pool in full_pool_list: + valid_pool = False + if limit: + if re.match(limit, pool['pool_id']) != None: + valid_pool = True + else: + valid_pool = True + + if valid_pool: + pool_list.append(pool) + + output_string = formatPoolList(zk_conn, pool_list) + click.echo(output_string) + + return True, '' + diff --git a/client-common/common.py b/client-common/common.py index e66133b3..5e1e4f63 100644 --- a/client-common/common.py +++ b/client-common/common.py @@ -175,15 +175,6 @@ def verifyNode(zk_conn, node): else: return False -# -# Verify OSD is valid in cluster -# -def verifyOSD(zk_conn, osd_id): - if zkhandler.exists(zk_conn, '/ceph/osds/{}'.format(osd_id)): - return True - else: - return False - # # Get the primary coordinator node # diff --git a/node-daemon/pvcd.service b/node-daemon/pvcd.service index ffe834f3..24269562 100644 --- a/node-daemon/pvcd.service +++ b/node-daemon/pvcd.service @@ -9,7 +9,7 @@ WorkingDirectory = /usr/share/pvc Environment = PYTHONUNBUFFERED=true Environment = PVCD_CONFIG_FILE=/etc/pvc/pvcd.conf ExecStart = /usr/share/pvc/pvcd.py -Restart = on-failure +Restart = never [Install] WantedBy = multi-user.target diff --git a/node-daemon/pvcd/CephInstance.py b/node-daemon/pvcd/CephInstance.py index e84e6906..2525d27d 100644 --- a/node-daemon/pvcd/CephInstance.py +++ b/node-daemon/pvcd/CephInstance.py @@ -30,10 +30,6 @@ import pvcd.zkhandler as zkhandler import pvcd.fencing as fencing import pvcd.common as common -class CephInstance(object): - def __init__(self): - pass - class CephOSDInstance(object): def __init__(self, zk_conn, this_node, osd_id): self.zk_conn = zk_conn @@ -149,6 +145,7 @@ def add_osd(zk_conn, logger, node, device): zkhandler.writedata(zk_conn, { '/ceph/osds/{}'.format(osd_id): '', '/ceph/osds/{}/node'.format(osd_id): node, + '/ceph/osds/{}/device'.format(osd_id): device, '/ceph/osds/{}/stats'.format(osd_id): '{}' }) @@ -186,7 +183,6 @@ def remove_osd(zk_conn, logger, osd_id, osd_obj): for osd in dump_string: if str(osd['osd']) == osd_id: osd_string = osd - print(osd_string) num_pgs = osd_string['num_pgs'] if num_pgs > 0: time.sleep(5) @@ -245,6 +241,99 @@ def remove_osd(zk_conn, logger, osd_id, osd_obj): logger.out('Failed to purge OSD disk with ID {}: {}'.format(osd_id, e), state='e') return False -class CephPool(object): - def __init__(self): - pass +class CephPoolInstance(object): + def __init__(self, zk_conn, this_node, name): + self.zk_conn = zk_conn + self.this_node = this_node + self.name = name + self.pgs = '' + self.stats = dict() + + @self.zk_conn.DataWatch('/ceph/pools/{}/pgs'.format(self.name)) + def watch_pool_node(data, stat, event=''): + if event and event.type == 'DELETED': + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode('ascii') + except AttributeError: + data = '' + + if data and data != self.pgs: + self.pgs = data + + @self.zk_conn.DataWatch('/ceph/pools/{}/stats'.format(self.name)) + def watch_pool_stats(data, stat, event=''): + if event and event.type == 'DELETED': + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode('ascii') + except AttributeError: + data = '' + + if data and data != self.stats: + self.stats = json.loads(data) + +def add_pool(zk_conn, logger, name, pgs): + # We are ready to create a new pool on this node + logger.out('Creating new RBD pool {}'.format(name), state='i') + try: + # 1. Create the pool + retcode, stdout, stderr = common.run_os_command('ceph osd pool create {} {} replicated'.format(name, pgs)) + if retcode: + print('ceph osd pool create') + print(stdout) + print(stderr) + raise + + # 2. Enable RBD application + retcode, stdout, stderr = common.run_os_command('ceph osd pool application enable {} rbd'.format(name)) + if retcode: + print('ceph osd pool application enable') + print(stdout) + print(stderr) + raise + + # 3. Add the new pool to ZK + zkhandler.writedata(zk_conn, { + '/ceph/pools/{}'.format(name): '', + '/ceph/pools/{}/pgs'.format(name): pgs, + '/ceph/pools/{}/stats'.format(name): '{}' + }) + + # Log it + logger.out('Created new RBD pool {}'.format(name), state='o') + return True + except Exception as e: + # Log it + logger.out('Failed to create new RBD pool {}: {}'.format(name, e), state='e') + return False + +def remove_pool(zk_conn, logger, name): + # We are ready to create a new pool on this node + logger.out('Removing RBD pool {}'.format(name), state='i') + try: + # Delete pool from ZK + zkhandler.deletekey(zk_conn, '/ceph/pools/{}'.format(name)) + + # Remove the pool + retcode, stdout, stderr = common.run_os_command('ceph osd pool rm {pool} {pool} --yes-i-really-really-mean-it'.format(pool=name)) + if retcode: + print('ceph osd pool rm') + print(stdout) + print(stderr) + raise + + # Log it + logger.out('Removed RBD pool {}'.format(name), state='o') + return True + except Exception as e: + # Log it + logger.out('Failed to remove RBD pool {}: {}'.format(name, e), state='e') + return False + diff --git a/node-daemon/pvcd/Daemon.py b/node-daemon/pvcd/Daemon.py index fa9f5f27..e33e7091 100644 --- a/node-daemon/pvcd/Daemon.py +++ b/node-daemon/pvcd/Daemon.py @@ -525,10 +525,12 @@ d_node = dict() d_network = dict() d_domain = dict() d_osd = dict() +d_pool = dict() node_list = [] network_list = [] domain_list = [] osd_list = [] +pool_list = [] # Create an instance of the DNS Aggregator if we're a coordinator if config['daemon_mode'] == 'coordinator': @@ -654,8 +656,8 @@ def update_domains(new_domain_list): d_node[node].update_domain_list(d_domain) # Ceph OSD provisioning key -@zk_conn.DataWatch('/ceph/osd_cmd') -def osd_cmd(data, stat, event=''): +@zk_conn.DataWatch('/ceph/cmd') +def cmd(data, stat, event=''): if data: data = data.decode('ascii') else: @@ -666,41 +668,84 @@ def osd_cmd(data, stat, event=''): command, args = data.split() # Adding a new OSD - if command == 'add': + if command == 'osd_add': node, device = args.split(',') if node == this_node.name: # Lock the command queue - lock = zkhandler.writelock(zk_conn, '/ceph/osd_cmd') + lock = zkhandler.writelock(zk_conn, '/ceph/cmd') with lock: # Add the OSD result = CephInstance.add_osd(zk_conn, logger, node, device) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 0.5 seconds before we free the lock, to ensure the client hits the lock + time.sleep(0.5) # Removing an OSD - elif command == 'remove': + elif command == 'osd_remove': osd_id = args # Verify osd_id is in the list if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: # Lock the command queue - lock = zkhandler.writelock(zk_conn, '/ceph/osd_cmd') + lock = zkhandler.writelock(zk_conn, '/ceph/cmd') with lock: # Remove the OSD result = CephInstance.remove_osd(zk_conn, logger, osd_id, d_osd[osd_id]) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 0.5 seconds before we free the lock, to ensure the client hits the lock + time.sleep(0.5) + # Adding a new pool + if command == 'pool_add': + name, pgs = args.split(',') + if this_node.router_state == 'primary': + # Lock the command queue + lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with lock: + # Add the pool + result = CephInstance.add_pool(zk_conn, logger, name, pgs) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 0.5 seconds before we free the lock, to ensure the client hits the lock + time.sleep(0.5) + # Removing a pool + elif command == 'pool_remove': + name = args + + if this_node.router_state == 'primary': + # Lock the command queue + lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with lock: + # Remove the pool + result = CephInstance.remove_pool(zk_conn, logger, name) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 0.5 seconds before we free the lock, to ensure the client hits the lock + time.sleep(0.5) # OSD objects @zk_conn.ChildrenWatch('/ceph/osds') @@ -722,6 +767,26 @@ def update_osds(new_osd_list): osd_list = new_osd_list logger.out('{}OSD list:{} {}'.format(logger.fmt_blue, logger.fmt_end, ' '.join(osd_list)), state='i') +# Pool objects +@zk_conn.ChildrenWatch('/ceph/pools') +def update_pools(new_pool_list): + global pool_list, d_pool + + # Add any missing Pools to the list + for pool in new_pool_list: + if not pool in pool_list: + d_pool[pool] = CephInstance.CephPoolInstance(zk_conn, this_node, pool) + + # Remove any deleted Pools from the list + for pool in pool_list: + if not pool in new_pool_list: + # Delete the object + del(d_pool[pool]) + + # Update and print new list + pool_list = new_pool_list + logger.out('{}Pool list:{} {}'.format(logger.fmt_blue, logger.fmt_end, ' '.join(pool_list)), state='i') + ############################################################################### # PHASE 9 - Run the daemon ############################################################################### @@ -764,6 +829,36 @@ def update_zookeeper(): logger.out('Failed to set Ceph status data', state='e') return + # Set pool information in zookeeper (primary only) + if this_node.router_state == 'primary': + # Get pool info + pool_df = dict() + retcode, stdout, stderr = common.run_os_command('rados df --format json') + pool_df_raw = json.loads(stdout)['pools'] + for pool in pool_df_raw: + pool_df.update({ + str(pool['name']): { + 'id': pool['id'], + 'size_bytes': pool['size_bytes'], + 'num_objects': pool['num_objects'], + 'num_object_clones': pool['num_object_clones'], + 'num_object_copies': pool['num_object_copies'], + 'num_objects_missing_on_primary': pool['num_objects_missing_on_primary'], + 'num_objects_unfound': pool['num_objects_unfound'], + 'num_objects_degraded': pool['num_objects_degraded'], + 'read_ops': pool['read_ops'], + 'read_bytes': pool['read_bytes'], + 'write_ops': pool['write_ops'], + 'write_bytes': pool['write_bytes'] + } + }) + + # Trigger updates for each OSD on this node + for pool in pool_list: + zkhandler.writedata(zk_conn, { + '/ceph/pools/{}/stats'.format(pool): str(json.dumps(pool_df[pool])) + }) + # Get data from Ceph OSDs # Parse the dump data osd_dump = dict() @@ -779,6 +874,18 @@ def update_zookeeper(): 'primary_affinity': osd['primary_affinity'] } }) + # Parse the df data + osd_df = dict() + retcode, stdout, stderr = common.run_os_command('ceph osd df --format json') + osd_df_raw = json.loads(stdout)['nodes'] + for osd in osd_df_raw: + osd_df.update({ + str(osd['id']): { + 'utilization': osd['utilization'], + 'var': osd['var'], + 'pgs': osd['pgs'], + } + }) # Parse the status data osd_status = dict() retcode, stdout, stderr = common.run_os_command('ceph osd status') @@ -814,6 +921,7 @@ def update_zookeeper(): osd_stats = dict() for osd in osd_list: this_dump = osd_dump[osd] + this_dump.update(osd_df[osd]) this_dump.update(osd_status[osd]) osd_stats[osd] = this_dump @@ -938,13 +1046,15 @@ def update_zookeeper(): logger.out( '{bold}Ceph cluster status:{nofmt} {health_colour}{health}{nofmt} ' '{bold}Total OSDs:{nofmt} {total_osds} ' - '{bold}Node OSDs:{nofmt} {node_osds}'.format( + '{bold}Node OSDs:{nofmt} {node_osds} ' + '{bold}Pools:{nofmt} {total_pools} '.format( bold=logger.fmt_bold, health_colour=ceph_health_colour, nofmt=logger.fmt_end, health=ceph_health, total_osds=len(osd_list), - node_osds=osds_this_node + node_osds=osds_this_node, + total_pools=len(pool_list) ), )