From 01959cb9e3fa287a4b48f436d36db8865dc4bd8c Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Wed, 19 Jun 2019 00:12:44 -0400 Subject: [PATCH] Implementation of RBD volumes and snapshots Adds the ability to manage RBD volumes (add/remove) and RBD snapshots (add/remove). (Working) list functions to come. --- client-cli/pvc.py | 165 ++++++++- client-common/ceph.py | 580 +++++++++++++++++++++++++++++++ node-daemon/pvcd/CephInstance.py | 248 +++++++++++++ 3 files changed, 991 insertions(+), 2 deletions(-) diff --git a/client-cli/pvc.py b/client-cli/pvc.py index b110585f..63599c2f 100755 --- a/client-cli/pvc.py +++ b/client-cli/pvc.py @@ -1239,7 +1239,7 @@ def ceph_osd_unset(osd_property): ) def ceph_osd_list(limit): """ - List all Ceph OSDs in the cluster; optinally only match elements matching ID regex LIMIT. + List all Ceph OSDs in the cluster; optionally only match elements matching ID regex LIMIT. """ zk_conn = pvc_common.startZKConnection(zk_host) @@ -1309,13 +1309,164 @@ def ceph_pool_remove(name): ) def ceph_pool_list(limit): """ - List all Ceph RBD pools in the cluster; optinally only match elements matching name regex LIMIT. + List all Ceph RBD pools in the cluster; optionally only match elements matching name regex LIMIT. """ zk_conn = pvc_common.startZKConnection(zk_host) retcode, retmsg = pvc_ceph.get_list_pool(zk_conn, limit) cleanup(retcode, retmsg, zk_conn) +############################################################################### +# pvc ceph volume add +############################################################################### +@click.command(name='add', short_help='Add new RBD volume.') +@click.argument( + 'pool' +) +@click.argument( + 'name' +) +@click.argument( + 'size' +) +def ceph_volume_add(pool, name, size): + """ + Add a new Ceph RBD volume with name NAME and size SIZE [GiB] to pool POOL. + """ + + zk_conn = pvc_common.startZKConnection(zk_host) + retcode, retmsg = pvc_ceph.add_volume(zk_conn, pool, name, size) + cleanup(retcode, retmsg, zk_conn) + +############################################################################### +# pvc ceph volume remove +############################################################################### +@click.command(name='remove', short_help='Remove RBD volume.') +@click.argument( + 'pool' +) +@click.argument( + 'name' +) +def ceph_volume_remove(pool, name): + """ + Remove a Ceph RBD volume with name NAME from pool POOL. + """ + + click.echo('DANGER: This will completely remove volume {} from pool {} and all data contained in it.'.format(name, pool)) + choice = input('Are you sure you want to do this? (y/N) ') + if choice == 'y' or choice == 'Y': + zk_conn = pvc_common.startZKConnection(zk_host) + retcode, retmsg = pvc_ceph.remove_volume(zk_conn, pool, name) + cleanup(retcode, retmsg, zk_conn) + else: + click.echo('Aborting.') + +############################################################################### +# pvc ceph volume list +############################################################################### +@click.command(name='list', short_help='List cluster RBD volumes.') +@click.argument( + 'pool', default='all', required=False +) +@click.argument( + 'limit', default=None, required=False +) +def ceph_volume_list(pool, limit): + """ + List all Ceph RBD volumes in the cluster or in pool POOL; optionally only match elements matching name regex LIMIT. + """ + + zk_conn = pvc_common.startZKConnection(zk_host) + retcode, retmsg = pvc_ceph.get_list_volume(zk_conn, pool, limit) + cleanup(retcode, retmsg, zk_conn) + +############################################################################### +# pvc ceph volume snapshot add +############################################################################### +@click.command(name='add', short_help='Add new RBD volume snapshot.') +@click.argument( + 'pool' +) +@click.argument( + 'volume' +) +@click.argument( + 'name' +) +def ceph_snapshot_add(pool, volume, name): + """ + Add a snapshot of Ceph RBD volume VOLUME with name NAME. + """ + + zk_conn = pvc_common.startZKConnection(zk_host) + retcode, retmsg = pvc_ceph.add_snapshot(zk_conn, pool, volume, name) + cleanup(retcode, retmsg, zk_conn) + +############################################################################### +# pvc ceph volume snapshot remove +############################################################################### +@click.command(name='remove', short_help='Remove RBD volume snapshot.') +@click.argument( + 'pool' +) +@click.argument( + 'volume' +) +@click.argument( + 'name' +) +def ceph_volume_remove(pool, volume, name): + """ + Remove a Ceph RBD volume with name NAME from pool POOL. + """ + + click.echo('DANGER: This will completely remove volume {} from pool {} and all data contained in it.'.format(name, pool)) + choice = input('Are you sure you want to do this? (y/N) ') + if choice == 'y' or choice == 'Y': + zk_conn = pvc_common.startZKConnection(zk_host) + retcode, retmsg = pvc_ceph.remove_snapshot(zk_conn, pool, name) + cleanup(retcode, retmsg, zk_conn) + else: + click.echo('Aborting.') + +############################################################################### +# pvc ceph volume snapshot list +############################################################################### +@click.command(name='list', short_help='List cluster RBD volume shapshots.') +@click.argument( + 'pool', default='all', required=False +) +@click.argument( + 'volume', default='all', required=False +) +@click.argument( + 'limit', default=None, required=False +) +def ceph_volume_list(pool, volume, limit): + """ + List all Ceph RBD volume snapshots, in the cluster or in pool POOL, for all volumes or volume VOLUME; optionally only match elements matching name regex LIMIT. + """ + + zk_conn = pvc_common.startZKConnection(zk_host) + retcode, retmsg = pvc_ceph.get_list_snapshot(zk_conn, pool, snapshot, limit) + cleanup(retcode, retmsg, zk_conn) + + +############################################################################### +# pvc init +############################################################################### + +@click.command(name='init', short_help='Initialize a new cluster.') +def init_cluster(): + """ + Perform initialization of a new PVC cluster. + """ + + import pvc_init + pvc_init.init_zookeeper(zk_host) + + ############################################################################### # pvc init @@ -1426,9 +1577,19 @@ ceph_pool.add_command(ceph_pool_add) ceph_pool.add_command(ceph_pool_remove) ceph_pool.add_command(ceph_pool_list) +ceph_volume.add_command(ceph_volume_add) +ceph_volume.add_command(ceph_volume_remove) +ceph_volume.add_command(ceph_volume_list) +ceph_volume.add_command(ceph_volume_snapshot) + +ceph_volume_snapshot.add_command(ceph_volume_snapshot_add) +ceph_volume_snapshot.add_command(ceph_volume_snapshot_remove) +ceph_volume_snapshot.add_command(ceph_volume_snapshot_list) + cli_ceph.add_command(ceph_status) cli_ceph.add_command(ceph_osd) cli_ceph.add_command(ceph_pool) +cli_ceph.add_command(ceph_volume) cli.add_command(cli_node) cli.add_command(cli_vm) diff --git a/client-common/ceph.py b/client-common/ceph.py index d6189176..f4b25ed1 100644 --- a/client-common/ceph.py +++ b/client-common/ceph.py @@ -48,6 +48,20 @@ def verifyPool(zk_conn, name): else: return False +# Verify Volume is valid in cluster +def verifyVolume(zk_conn, pool, name): + if zkhandler.exists(zk_conn, '/ceph/volumes/{}/{}'.format(pool, name)): + return True + else: + return False + +# Verify Snapshot is valid in cluster +def verifySnapshot(zk_conn, pool, volume, name): + if zkhandler.exists(zk_conn, '/ceph/snapshots/{}/{}/{}'.format(pool, volume, name)): + return True + else: + return False + # Verify OSD path is valid in cluster def verifyOSDBlock(zk_conn, node, device): for osd in zkhandler.listchildren(zk_conn, '/ceph/osds'): @@ -567,6 +581,390 @@ Wr: {pool_write_ops: <{pool_write_ops_length}} \ output_string = pool_list_output_header + '\n' + '\n'.join(sorted(pool_list_output)) return output_string +def getCephVolumes(zk_conn, pool): + + pool_list = list() + if pool == 'all': + for pool in zkhandler.listchildren(zk_conn, '/ceph/pools'): + pool_list += zkhandler.listchildren(zk_conn, '/ceph/volumes/{}'.format(pool)) + else: + pool_list += zkhandler.listchildren(zk_conn, '/ceph/volumes/{}'.format(pool)) + return pool_list + +def formatVolumeList(zk_conn, volume_list): + volume_list_output = [] + + volume_id = dict() + volume_size = dict() + volume_num_objects = dict() + volume_num_clones = dict() + volume_num_copies = dict() + volume_num_degraded = dict() + volume_read_ops = dict() + volume_read_data = dict() + volume_write_ops = dict() + volume_write_data = dict() + + volume_name_length = 5 + volume_id_length = 3 + volume_size_length = 5 + volume_num_objects_length = 6 + volume_num_clones_length = 7 + volume_num_copies_length = 7 + volume_num_degraded_length = 9 + volume_read_ops_length = 4 + volume_read_data_length = 5 + volume_write_ops_length = 4 + volume_write_data_length = 5 + + for volume in volume_list: + # Set the Volume name length + _volume_name_length = len(volume) + 1 + if _volume_name_length > volume_name_length: + volume_name_length = _volume_name_length + + # Get stats + volume_stats = getVolumeInformation(zk_conn, volume) + + # Set the parent node and length + try: + volume_id[volume] = volume_stats['id'] + # If this happens, the node hasn't checked in fully yet, so just ignore it + if not volume_id[volume]: + continue + except KeyError: + continue + + # Set the id and length + volume_id[volume] = volume_stats['id'] + _volume_id_length = len(str(volume_id[volume])) + 1 + if _volume_id_length > volume_id_length: + volume_id_length = _volume_id_length + + # Set the size and length + volume_size[volume] = volume_stats['size_formatted'] + _volume_size_length = len(str(volume_size[volume])) + 1 + if _volume_size_length > volume_size_length: + volume_size_length = _volume_size_length + + # Set the num_objects and length + volume_num_objects[volume] = volume_stats['num_objects'] + _volume_num_objects_length = len(str(volume_num_objects[volume])) + 1 + if _volume_num_objects_length > volume_num_objects_length: + volume_num_objects_length = _volume_num_objects_length + + # Set the num_clones and length + volume_num_clones[volume] = volume_stats['num_object_clones'] + _volume_num_clones_length = len(str(volume_num_clones[volume])) + 1 + if _volume_num_clones_length > volume_num_clones_length: + volume_num_clones_length = _volume_num_clones_length + + # Set the num_copies and length + volume_num_copies[volume] = volume_stats['num_object_copies'] + _volume_num_copies_length = len(str(volume_num_copies[volume])) + 1 + if _volume_num_copies_length > volume_num_copies_length: + volume_num_copies_length = _volume_num_copies_length + + # Set the num_degraded and length + volume_num_degraded[volume] = volume_stats['num_objects_degraded'] + _volume_num_degraded_length = len(str(volume_num_degraded[volume])) + 1 + if _volume_num_degraded_length > volume_num_degraded_length: + volume_num_degraded_length = _volume_num_degraded_length + + # Set the write IOPS/data and length + volume_write_ops[volume] = volume_stats['write_ops'] + _volume_write_ops_length = len(str(volume_write_ops[volume])) + 1 + if _volume_write_ops_length > volume_write_ops_length: + volume_write_ops_length = _volume_write_ops_length + volume_write_data[volume] = volume_stats['write_formatted'] + _volume_write_data_length = len(volume_write_data[volume]) + 1 + if _volume_write_data_length > volume_write_data_length: + volume_write_data_length = _volume_write_data_length + + # Set the read IOPS/data and length + volume_read_ops[volume] = volume_stats['read_ops'] + _volume_read_ops_length = len(str(volume_read_ops[volume])) + 1 + if _volume_read_ops_length > volume_read_ops_length: + volume_read_ops_length = _volume_read_ops_length + volume_read_data[volume] = volume_stats['read_formatted'] + _volume_read_data_length = len(volume_read_data[volume]) + 1 + if _volume_read_data_length > volume_read_data_length: + volume_read_data_length = _volume_read_data_length + + # Format the output header + volume_list_output_header = '{bold}\ +{volume_id: <{volume_id_length}} \ +{volume_name: <{volume_name_length}} \ +{volume_size: <{volume_size_length}} \ +Obj: {volume_objects: <{volume_objects_length}} \ +{volume_clones: <{volume_clones_length}} \ +{volume_copies: <{volume_copies_length}} \ +{volume_degraded: <{volume_degraded_length}} \ +Rd: {volume_read_ops: <{volume_read_ops_length}} \ +{volume_read_data: <{volume_read_data_length}} \ +Wr: {volume_write_ops: <{volume_write_ops_length}} \ +{volume_write_data: <{volume_write_data_length}} \ +{end_bold}'.format( + bold=ansiprint.bold(), + end_bold=ansiprint.end(), + volume_id_length=volume_id_length, + volume_name_length=volume_name_length, + volume_size_length=volume_size_length, + volume_objects_length=volume_num_objects_length, + volume_clones_length=volume_num_clones_length, + volume_copies_length=volume_num_copies_length, + volume_degraded_length=volume_num_degraded_length, + volume_write_ops_length=volume_write_ops_length, + volume_write_data_length=volume_write_data_length, + volume_read_ops_length=volume_read_ops_length, + volume_read_data_length=volume_read_data_length, + volume_id='ID', + volume_name='Name', + volume_size='Used', + volume_objects='Count', + volume_clones='Clones', + volume_copies='Copies', + volume_degraded='Degraded', + volume_write_ops='OPS', + volume_write_data='Data', + volume_read_ops='OPS', + volume_read_data='Data' + ) + + for volume in volume_list: + # Format the output header + volume_list_output.append('{bold}\ +{volume_id: <{volume_id_length}} \ +{volume_name: <{volume_name_length}} \ +{volume_size: <{volume_size_length}} \ + {volume_objects: <{volume_objects_length}} \ +{volume_clones: <{volume_clones_length}} \ +{volume_copies: <{volume_copies_length}} \ +{volume_degraded: <{volume_degraded_length}} \ + {volume_read_ops: <{volume_read_ops_length}} \ +{volume_read_data: <{volume_read_data_length}} \ + {volume_write_ops: <{volume_write_ops_length}} \ +{volume_write_data: <{volume_write_data_length}} \ +{end_bold}'.format( + bold=ansiprint.bold(), + end_bold=ansiprint.end(), + volume_id_length=volume_id_length, + volume_name_length=volume_name_length, + volume_size_length=volume_size_length, + volume_objects_length=volume_num_objects_length, + volume_clones_length=volume_num_clones_length, + volume_copies_length=volume_num_copies_length, + volume_degraded_length=volume_num_degraded_length, + volume_write_ops_length=volume_write_ops_length, + volume_write_data_length=volume_write_data_length, + volume_read_ops_length=volume_read_ops_length, + volume_read_data_length=volume_read_data_length, + volume_id=volume_id[volume], + volume_name=volume, + volume_size=volume_size[volume], + volume_objects=volume_num_objects[volume], + volume_clones=volume_num_clones[volume], + volume_copies=volume_num_copies[volume], + volume_degraded=volume_num_degraded[volume], + volume_write_ops=volume_write_ops[volume], + volume_write_data=volume_write_data[volume], + volume_read_ops=volume_read_ops[volume], + volume_read_data=volume_read_data[volume] + ) + ) + + output_string = volume_list_output_header + '\n' + '\n'.join(sorted(volume_list_output)) + return output_string + +def getCephSnapshots(zk_conn): + snapshot_list = zkhandler.listchildren(zk_conn, '/ceph/snapshots') + return snapshot_list + +def formatSnapshotList(zk_conn, snapshot_list): + snapshot_list_output = [] + + snapshot_id = dict() + snapshot_size = dict() + snapshot_num_objects = dict() + snapshot_num_clones = dict() + snapshot_num_copies = dict() + snapshot_num_degraded = dict() + snapshot_read_ops = dict() + snapshot_read_data = dict() + snapshot_write_ops = dict() + snapshot_write_data = dict() + + snapshot_name_length = 5 + snapshot_id_length = 3 + snapshot_size_length = 5 + snapshot_num_objects_length = 6 + snapshot_num_clones_length = 7 + snapshot_num_copies_length = 7 + snapshot_num_degraded_length = 9 + snapshot_read_ops_length = 4 + snapshot_read_data_length = 5 + snapshot_write_ops_length = 4 + snapshot_write_data_length = 5 + + for snapshot in snapshot_list: + # Set the Snapshot name length + _snapshot_name_length = len(snapshot) + 1 + if _snapshot_name_length > snapshot_name_length: + snapshot_name_length = _snapshot_name_length + + # Get stats + snapshot_stats = getSnapshotInformation(zk_conn, snapshot) + + # Set the parent node and length + try: + snapshot_id[snapshot] = snapshot_stats['id'] + # If this happens, the node hasn't checked in fully yet, so just ignore it + if not snapshot_id[snapshot]: + continue + except KeyError: + continue + + # Set the id and length + snapshot_id[snapshot] = snapshot_stats['id'] + _snapshot_id_length = len(str(snapshot_id[snapshot])) + 1 + if _snapshot_id_length > snapshot_id_length: + snapshot_id_length = _snapshot_id_length + + # Set the size and length + snapshot_size[snapshot] = snapshot_stats['size_formatted'] + _snapshot_size_length = len(str(snapshot_size[snapshot])) + 1 + if _snapshot_size_length > snapshot_size_length: + snapshot_size_length = _snapshot_size_length + + # Set the num_objects and length + snapshot_num_objects[snapshot] = snapshot_stats['num_objects'] + _snapshot_num_objects_length = len(str(snapshot_num_objects[snapshot])) + 1 + if _snapshot_num_objects_length > snapshot_num_objects_length: + snapshot_num_objects_length = _snapshot_num_objects_length + + # Set the num_clones and length + snapshot_num_clones[snapshot] = snapshot_stats['num_object_clones'] + _snapshot_num_clones_length = len(str(snapshot_num_clones[snapshot])) + 1 + if _snapshot_num_clones_length > snapshot_num_clones_length: + snapshot_num_clones_length = _snapshot_num_clones_length + + # Set the num_copies and length + snapshot_num_copies[snapshot] = snapshot_stats['num_object_copies'] + _snapshot_num_copies_length = len(str(snapshot_num_copies[snapshot])) + 1 + if _snapshot_num_copies_length > snapshot_num_copies_length: + snapshot_num_copies_length = _snapshot_num_copies_length + + # Set the num_degraded and length + snapshot_num_degraded[snapshot] = snapshot_stats['num_objects_degraded'] + _snapshot_num_degraded_length = len(str(snapshot_num_degraded[snapshot])) + 1 + if _snapshot_num_degraded_length > snapshot_num_degraded_length: + snapshot_num_degraded_length = _snapshot_num_degraded_length + + # Set the write IOPS/data and length + snapshot_write_ops[snapshot] = snapshot_stats['write_ops'] + _snapshot_write_ops_length = len(str(snapshot_write_ops[snapshot])) + 1 + if _snapshot_write_ops_length > snapshot_write_ops_length: + snapshot_write_ops_length = _snapshot_write_ops_length + snapshot_write_data[snapshot] = snapshot_stats['write_formatted'] + _snapshot_write_data_length = len(snapshot_write_data[snapshot]) + 1 + if _snapshot_write_data_length > snapshot_write_data_length: + snapshot_write_data_length = _snapshot_write_data_length + + # Set the read IOPS/data and length + snapshot_read_ops[snapshot] = snapshot_stats['read_ops'] + _snapshot_read_ops_length = len(str(snapshot_read_ops[snapshot])) + 1 + if _snapshot_read_ops_length > snapshot_read_ops_length: + snapshot_read_ops_length = _snapshot_read_ops_length + snapshot_read_data[snapshot] = snapshot_stats['read_formatted'] + _snapshot_read_data_length = len(snapshot_read_data[snapshot]) + 1 + if _snapshot_read_data_length > snapshot_read_data_length: + snapshot_read_data_length = _snapshot_read_data_length + + # Format the output header + snapshot_list_output_header = '{bold}\ +{snapshot_id: <{snapshot_id_length}} \ +{snapshot_name: <{snapshot_name_length}} \ +{snapshot_size: <{snapshot_size_length}} \ +Obj: {snapshot_objects: <{snapshot_objects_length}} \ +{snapshot_clones: <{snapshot_clones_length}} \ +{snapshot_copies: <{snapshot_copies_length}} \ +{snapshot_degraded: <{snapshot_degraded_length}} \ +Rd: {snapshot_read_ops: <{snapshot_read_ops_length}} \ +{snapshot_read_data: <{snapshot_read_data_length}} \ +Wr: {snapshot_write_ops: <{snapshot_write_ops_length}} \ +{snapshot_write_data: <{snapshot_write_data_length}} \ +{end_bold}'.format( + bold=ansiprint.bold(), + end_bold=ansiprint.end(), + snapshot_id_length=snapshot_id_length, + snapshot_name_length=snapshot_name_length, + snapshot_size_length=snapshot_size_length, + snapshot_objects_length=snapshot_num_objects_length, + snapshot_clones_length=snapshot_num_clones_length, + snapshot_copies_length=snapshot_num_copies_length, + snapshot_degraded_length=snapshot_num_degraded_length, + snapshot_write_ops_length=snapshot_write_ops_length, + snapshot_write_data_length=snapshot_write_data_length, + snapshot_read_ops_length=snapshot_read_ops_length, + snapshot_read_data_length=snapshot_read_data_length, + snapshot_id='ID', + snapshot_name='Name', + snapshot_size='Used', + snapshot_objects='Count', + snapshot_clones='Clones', + snapshot_copies='Copies', + snapshot_degraded='Degraded', + snapshot_write_ops='OPS', + snapshot_write_data='Data', + snapshot_read_ops='OPS', + snapshot_read_data='Data' + ) + + for snapshot in snapshot_list: + # Format the output header + snapshot_list_output.append('{bold}\ +{snapshot_id: <{snapshot_id_length}} \ +{snapshot_name: <{snapshot_name_length}} \ +{snapshot_size: <{snapshot_size_length}} \ + {snapshot_objects: <{snapshot_objects_length}} \ +{snapshot_clones: <{snapshot_clones_length}} \ +{snapshot_copies: <{snapshot_copies_length}} \ +{snapshot_degraded: <{snapshot_degraded_length}} \ + {snapshot_read_ops: <{snapshot_read_ops_length}} \ +{snapshot_read_data: <{snapshot_read_data_length}} \ + {snapshot_write_ops: <{snapshot_write_ops_length}} \ +{snapshot_write_data: <{snapshot_write_data_length}} \ +{end_bold}'.format( + bold=ansiprint.bold(), + end_bold=ansiprint.end(), + snapshot_id_length=snapshot_id_length, + snapshot_name_length=snapshot_name_length, + snapshot_size_length=snapshot_size_length, + snapshot_objects_length=snapshot_num_objects_length, + snapshot_clones_length=snapshot_num_clones_length, + snapshot_copies_length=snapshot_num_copies_length, + snapshot_degraded_length=snapshot_num_degraded_length, + snapshot_write_ops_length=snapshot_write_ops_length, + snapshot_write_data_length=snapshot_write_data_length, + snapshot_read_ops_length=snapshot_read_ops_length, + snapshot_read_data_length=snapshot_read_data_length, + snapshot_id=snapshot_id[snapshot], + snapshot_name=snapshot, + snapshot_size=snapshot_size[snapshot], + snapshot_objects=snapshot_num_objects[snapshot], + snapshot_clones=snapshot_num_clones[snapshot], + snapshot_copies=snapshot_num_copies[snapshot], + snapshot_degraded=snapshot_num_degraded[snapshot], + snapshot_write_ops=snapshot_write_ops[snapshot], + snapshot_write_data=snapshot_write_data[snapshot], + snapshot_read_ops=snapshot_read_ops[snapshot], + snapshot_read_data=snapshot_read_data[snapshot] + ) + ) + + output_string = snapshot_list_output_header + '\n' + '\n'.join(sorted(snapshot_list_output)) + return output_string + # # Direct functions # @@ -861,3 +1259,185 @@ def get_list_pool(zk_conn, limit): return True, '' +def add_volume(zk_conn, pool, name, size): + # Tell the cluster to create a new volume + add_volume_string = 'volume_add {},{},{}'.format(pool, name, size) + zkhandler.writedata(zk_conn, {'/ceph/cmd': add_volume_string}) + # Wait 1/2 second for the cluster to get the message and start working + time.sleep(0.5) + # Acquire a read lock, so we get the return exclusively + lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + with lock: + try: + result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + if result == 'success-volume_add': + message = 'Created new RBD volume {} of size {} GiB on pool {}.'.format(name, size, pool) + success = True + else: + message = 'ERROR: Failed to create new volume; check node logs for details.' + success = False + except: + message = 'ERROR: Command ignored by node.' + success = False + + # Acquire a write lock to ensure things go smoothly + lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with lock: + time.sleep(1) + zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + + return success, message + +def remove_volume(zk_conn, pool, name): + if not verifyVolume(zk_conn, pool, name): + return False, 'ERROR: No volume with name "{}" is present in pool {}.'.format(name, pool) + + # Tell the cluster to create a new volume + remove_volume_string = 'volume_remove {},{}'.format(pool, name) + zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_volume_string}) + # Wait 1/2 second for the cluster to get the message and start working + time.sleep(0.5) + # Acquire a read lock, so we get the return exclusively + lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + with lock: + try: + result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + if result == 'success-volume_remove': + message = 'Removed RBD volume {} in pool {}.'.format(name, pool) + success = True + else: + message = 'ERROR: Failed to remove volume; check node logs for details.' + success = False + except Exception as e: + message = 'ERROR: Command ignored by node: {}'.format(e) + success = False + + # Acquire a write lock to ensure things go smoothly + lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with lock: + time.sleep(1) + zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + + return success, message + +def get_list_volume(zk_conn, pool, limit): + volume_list = [] + full_volume_list = getCephVolumes(zk_conn, pool) + + if limit: + try: + # Implicitly assume fuzzy limits + if re.match('\^.*', limit) == None: + limit = '.*' + limit + if re.match('.*\$', limit) == None: + limit = limit + '.*' + except Exception as e: + return False, 'Regex Error: {}'.format(e) + + for volume in full_volume_list: + valid_volume = False + if limit: + if re.match(limit, volume['volume_id']) != None: + valid_volume = True + else: + valid_volume = True + + if valid_volume: + volume_list.append(volume) + + output_string = formatVolumeList(zk_conn, volume_list) + click.echo(output_string) + + return True, '' + +def add_snapshot(zk_conn, pool, volume, name): + # Tell the cluster to create a new snapshot + add_snapshot_string = 'snapshot_add {},{}'.format(pool. volume, name) + zkhandler.writedata(zk_conn, {'/ceph/cmd': add_snapshot_string}) + # Wait 1/2 second for the cluster to get the message and start working + time.sleep(0.5) + # Acquire a read lock, so we get the return exclusively + lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + with lock: + try: + result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + if result == 'success-snapshot_add': + message = 'Created new RBD snapshot {} of volume {} on pool {}.'.format(name, volume, pool) + success = True + else: + message = 'ERROR: Failed to create new snapshot; check node logs for details.' + success = False + except: + message = 'ERROR: Command ignored by node.' + success = False + + # Acquire a write lock to ensure things go smoothly + lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with lock: + time.sleep(1) + zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + + return success, message + +def remove_snapshot(zk_conn, pool, volume, name): + if not verifySnapshot(zk_conn, pool, volume, name): + return False, 'ERROR: No snapshot with name "{}" is present of volume {} on pool {}.'.format(name, volume, pool) + + # Tell the cluster to create a new snapshot + remove_snapshot_string = 'snapshot_remove {},{},{}'.format(pool, volume name) + zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_snapshot_string}) + # Wait 1/2 second for the cluster to get the message and start working + time.sleep(0.5) + # Acquire a read lock, so we get the return exclusively + lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + with lock: + try: + result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + if result == 'success-snapshot_remove': + message = 'Removed RBD snapshot {} and all volumes.'.format(name) + success = True + else: + message = 'ERROR: Failed to remove snapshot; check node logs for details.' + success = False + except Exception as e: + message = 'ERROR: Command ignored by node: {}'.format(e) + success = False + + # Acquire a write lock to ensure things go smoothly + lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with lock: + time.sleep(1) + zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + + return success, message + +def get_list_snapshot(zk_conn, pool, volume, limit): + snapshot_list = [] + full_snapshot_list = getCephSnapshots(zk_conn, pool, volume) + + if limit: + try: + # Implicitly assume fuzzy limits + if re.match('\^.*', limit) == None: + limit = '.*' + limit + if re.match('.*\$', limit) == None: + limit = limit + '.*' + except Exception as e: + return False, 'Regex Error: {}'.format(e) + + for snapshot in full_snapshot_list: + valid_snapshot = False + if limit: + if re.match(limit, snapshot['snapshot_id']) != None: + valid_snapshot = True + else: + valid_snapshot = True + + if valid_snapshot: + snapshot_list.append(snapshot) + + output_string = formatSnapshotList(zk_conn, snapshot_list) + click.echo(output_string) + + return True, '' + diff --git a/node-daemon/pvcd/CephInstance.py b/node-daemon/pvcd/CephInstance.py index 86f239f8..1a66875a 100644 --- a/node-daemon/pvcd/CephInstance.py +++ b/node-daemon/pvcd/CephInstance.py @@ -447,6 +447,170 @@ def remove_pool(zk_conn, logger, name): logger.out('Failed to remove RBD pool {}: {}'.format(name, e), state='e') return False +class CephVolumeInstance(object): + def __init__(self, zk_conn, this_node, pool, name): + self.zk_conn = zk_conn + self.this_node = this_node + self.pool = pool + self.name = name + self.size = '' + self.stats = dict() + + @self.zk_conn.DataWatch('/ceph/volumes/{}/{}/size'.format(self.pool, self.name)) + def watch_volume_node(data, stat, event=''): + if event and event.type == 'DELETED': + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode('ascii') + except AttributeError: + data = '' + + if data and data != self.size: + self.size = data + + @self.zk_conn.DataWatch('/ceph/volumes/{}/{}/stats'.format(self.pool, self.name)) + def watch_volume_stats(data, stat, event=''): + if event and event.type == 'DELETED': + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode('ascii') + except AttributeError: + data = '' + + if data and data != self.stats: + self.stats = json.loads(data) + +def add_volume(zk_conn, logger, pool, name, size): + # We are ready to create a new volume on this node + logger.out('Creating new RBD volume {} on pool {}'.format(name, pool), state='i') + try: + # 1. Create the volume + sizeMiB = size * 1024 + retcode, stdout, stderr = common.run_os_command('rbd create --size {} {}/{}'.format(sizeMiB, pool, name)) + if retcode: + print('rbd create') + print(stdout) + print(stderr) + raise + + # 2. Add the new volume to ZK + zkhandler.writedata(zk_conn, { + '/ceph/volumes/{}/{}'.format(pool, name): '', + '/ceph/volumes/{}/{}/size'.format(pool, name): size, + '/ceph/volumes/{}/{}/stats'.format(pool, name): '{}' + }) + + # Log it + logger.out('Created new RBD volume {} on pool {}'.format(name, pool), state='o') + return True + except Exception as e: + # Log it + logger.out('Failed to create new RBD volume {} on pool {}: {}'.format(name, pool, e), state='e') + return False + +def remove_volume(zk_conn, logger, pool, name): + # We are ready to create a new volume on this node + logger.out('Removing RBD volume {} from pool {}'.format(name, pool), state='i') + try: + # Delete volume from ZK + zkhandler.deletekey(zk_conn, '/ceph/volumes/{}/{}'.format(pool, name)) + + # Remove the volume + retcode, stdout, stderr = common.run_os_command('rbd rm {}/{}'.format(pool, name)) + if retcode: + print('ceph osd volume rm') + print(stdout) + print(stderr) + raise + + # Log it + logger.out('Removed RBD volume {} from pool {}'.format(name, pool), state='o') + return True + except Exception as e: + # Log it + logger.out('Failed to remove RBD volume {} from pool {}: {}'.format(name, pool, e), state='e') + return False + +class CephSnapshotInstance(object): + def __init__(self, zk_conn, this_node, name): + self.zk_conn = zk_conn + self.this_node = this_node + self.pool = pool + self.volume = volume + self.name = name + self.stats = dict() + + @self.zk_conn.DataWatch('/ceph/snapshots/{}/{}/{}/stats'.format(self.pool, self.volume, self.name)) + def watch_snapshot_stats(data, stat, event=''): + if event and event.type == 'DELETED': + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode('ascii') + except AttributeError: + data = '' + + if data and data != self.stats: + self.stats = json.loads(data) + +def add_snapshot(zk_conn, logger, pool, volume, name): + # We are ready to create a new snapshot on this node + logger.out('Creating new RBD snapshot {} of volume {} on pool {}'.format(name, volume, pool), state='i') + try: + # 1. Create the snapshot + retcode, stdout, stderr = common.run_os_command('rbd snap create {}/{}@{}'.format(pool, volume, name)) + if retcode: + print('rbd snap create') + print(stdout) + print(stderr) + raise + + # 2. Add the new snapshot to ZK + zkhandler.writedata(zk_conn, { + '/ceph/snapshots/{}/{}/{}'.format(pool, volume, name): '', + '/ceph/snapshots/{}/{}/{}/stats'.format(pool, volume, name): '{}' + }) + + # Log it + logger.out('Created new RBD snapshot {} of volume {} on pool {}'.format(name, volume, pool), state='o') + return True + except Exception as e: + # Log it + logger.out('Failed to create new RBD snapshot {} of volume {} on pool {}: {}'.format(name, volume, pool, e), state='e') + return False + +def remove_snapshot(zk_conn, logger, pool, volume, name): + # We are ready to create a new snapshot on this node + logger.out('Removing RBD snapshot {} of volume {} on pool {}'.format(name, volume, pool), state='i') + try: + # Delete snapshot from ZK + zkhandler.deletekey(zk_conn, '/ceph/snapshots/{}/{}/{}'.format(pool, volume, name)) + + # Remove the snapshot + retcode, stdout, stderr = common.run_os_command('rbd snap rm {}/{}@{}'.format(pool, volume, name)) + if retcode: + print('rbd snap rm') + print(stdout) + print(stderr) + raise + + # Log it + logger.out('Removed RBD snapshot {} of volume {} on pool {}'.format(name, volume, pool), state='o') + return True + except Exception as e: + # Log it + logger.out('Failed to remove RBD snapshot {} of volume {} on pool {}: {}'.format(name, volume, pool, e), state='e') + return False + +# Primary command function def run_command(zk_conn, logger, this_node, data, d_osd): # Get the command and args command, args = data.split() @@ -620,3 +784,87 @@ def run_command(zk_conn, logger, this_node, data, d_osd): zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) + + # Adding a new volume + elif command == 'volume_add': + pool, name, size = args.split(',') + + if this_node.router_state == 'primary': + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: + # Add the volume + result = add_volume(zk_conn, logger, pool, name, size) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + + # Removing a volume + elif command == 'volume_remove': + pool, name = args.split(',') + + if this_node.router_state == 'primary': + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: + # Remove the volume + result = remove_volume(zk_conn, logger, pool, name) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + + # Adding a new snapshot + elif command == 'snapshot_add': + pool, volume, name = args.split(',') + + if this_node.router_state == 'primary': + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: + # Add the snapshot + result = add_snapshot(zk_conn, logger, pool, volume, name) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1) + + # Removing a snapshot + elif command == 'snapshot_remove': + pool, name, name = args.split(',') + + if this_node.router_state == 'primary': + # Lock the command queue + zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + with zk_lock: + # Remove the snapshot + result = remove_snapshot(zk_conn, logger, pool, volume, name) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + # Wait 1 seconds before we free the lock, to ensure the client hits the lock + time.sleep(1)