Implementation of RBD volumes and snapshots

Adds the ability to manage RBD volumes (add/remove) and RBD
snapshots (add/remove). (Working) list functions to come.
This commit is contained in:
Joshua Boniface 2019-06-19 00:12:44 -04:00
parent b50b2a827b
commit 01959cb9e3
3 changed files with 991 additions and 2 deletions

View File

@ -1239,7 +1239,7 @@ def ceph_osd_unset(osd_property):
)
def ceph_osd_list(limit):
"""
List all Ceph OSDs in the cluster; optinally only match elements matching ID regex LIMIT.
List all Ceph OSDs in the cluster; optionally only match elements matching ID regex LIMIT.
"""
zk_conn = pvc_common.startZKConnection(zk_host)
@ -1309,13 +1309,164 @@ def ceph_pool_remove(name):
)
def ceph_pool_list(limit):
"""
List all Ceph RBD pools in the cluster; optinally only match elements matching name regex LIMIT.
List all Ceph RBD pools in the cluster; optionally only match elements matching name regex LIMIT.
"""
zk_conn = pvc_common.startZKConnection(zk_host)
retcode, retmsg = pvc_ceph.get_list_pool(zk_conn, limit)
cleanup(retcode, retmsg, zk_conn)
###############################################################################
# pvc ceph volume add
###############################################################################
@click.command(name='add', short_help='Add new RBD volume.')
@click.argument(
'pool'
)
@click.argument(
'name'
)
@click.argument(
'size'
)
def ceph_volume_add(pool, name, size):
"""
Add a new Ceph RBD volume with name NAME and size SIZE [GiB] to pool POOL.
"""
zk_conn = pvc_common.startZKConnection(zk_host)
retcode, retmsg = pvc_ceph.add_volume(zk_conn, pool, name, size)
cleanup(retcode, retmsg, zk_conn)
###############################################################################
# pvc ceph volume remove
###############################################################################
@click.command(name='remove', short_help='Remove RBD volume.')
@click.argument(
'pool'
)
@click.argument(
'name'
)
def ceph_volume_remove(pool, name):
"""
Remove a Ceph RBD volume with name NAME from pool POOL.
"""
click.echo('DANGER: This will completely remove volume {} from pool {} and all data contained in it.'.format(name, pool))
choice = input('Are you sure you want to do this? (y/N) ')
if choice == 'y' or choice == 'Y':
zk_conn = pvc_common.startZKConnection(zk_host)
retcode, retmsg = pvc_ceph.remove_volume(zk_conn, pool, name)
cleanup(retcode, retmsg, zk_conn)
else:
click.echo('Aborting.')
###############################################################################
# pvc ceph volume list
###############################################################################
@click.command(name='list', short_help='List cluster RBD volumes.')
@click.argument(
'pool', default='all', required=False
)
@click.argument(
'limit', default=None, required=False
)
def ceph_volume_list(pool, limit):
"""
List all Ceph RBD volumes in the cluster or in pool POOL; optionally only match elements matching name regex LIMIT.
"""
zk_conn = pvc_common.startZKConnection(zk_host)
retcode, retmsg = pvc_ceph.get_list_volume(zk_conn, pool, limit)
cleanup(retcode, retmsg, zk_conn)
###############################################################################
# pvc ceph volume snapshot add
###############################################################################
@click.command(name='add', short_help='Add new RBD volume snapshot.')
@click.argument(
'pool'
)
@click.argument(
'volume'
)
@click.argument(
'name'
)
def ceph_snapshot_add(pool, volume, name):
"""
Add a snapshot of Ceph RBD volume VOLUME with name NAME.
"""
zk_conn = pvc_common.startZKConnection(zk_host)
retcode, retmsg = pvc_ceph.add_snapshot(zk_conn, pool, volume, name)
cleanup(retcode, retmsg, zk_conn)
###############################################################################
# pvc ceph volume snapshot remove
###############################################################################
@click.command(name='remove', short_help='Remove RBD volume snapshot.')
@click.argument(
'pool'
)
@click.argument(
'volume'
)
@click.argument(
'name'
)
def ceph_volume_remove(pool, volume, name):
"""
Remove a Ceph RBD volume with name NAME from pool POOL.
"""
click.echo('DANGER: This will completely remove volume {} from pool {} and all data contained in it.'.format(name, pool))
choice = input('Are you sure you want to do this? (y/N) ')
if choice == 'y' or choice == 'Y':
zk_conn = pvc_common.startZKConnection(zk_host)
retcode, retmsg = pvc_ceph.remove_snapshot(zk_conn, pool, name)
cleanup(retcode, retmsg, zk_conn)
else:
click.echo('Aborting.')
###############################################################################
# pvc ceph volume snapshot list
###############################################################################
@click.command(name='list', short_help='List cluster RBD volume shapshots.')
@click.argument(
'pool', default='all', required=False
)
@click.argument(
'volume', default='all', required=False
)
@click.argument(
'limit', default=None, required=False
)
def ceph_volume_list(pool, volume, limit):
"""
List all Ceph RBD volume snapshots, in the cluster or in pool POOL, for all volumes or volume VOLUME; optionally only match elements matching name regex LIMIT.
"""
zk_conn = pvc_common.startZKConnection(zk_host)
retcode, retmsg = pvc_ceph.get_list_snapshot(zk_conn, pool, snapshot, limit)
cleanup(retcode, retmsg, zk_conn)
###############################################################################
# pvc init
###############################################################################
@click.command(name='init', short_help='Initialize a new cluster.')
def init_cluster():
"""
Perform initialization of a new PVC cluster.
"""
import pvc_init
pvc_init.init_zookeeper(zk_host)
###############################################################################
# pvc init
@ -1426,9 +1577,19 @@ ceph_pool.add_command(ceph_pool_add)
ceph_pool.add_command(ceph_pool_remove)
ceph_pool.add_command(ceph_pool_list)
ceph_volume.add_command(ceph_volume_add)
ceph_volume.add_command(ceph_volume_remove)
ceph_volume.add_command(ceph_volume_list)
ceph_volume.add_command(ceph_volume_snapshot)
ceph_volume_snapshot.add_command(ceph_volume_snapshot_add)
ceph_volume_snapshot.add_command(ceph_volume_snapshot_remove)
ceph_volume_snapshot.add_command(ceph_volume_snapshot_list)
cli_ceph.add_command(ceph_status)
cli_ceph.add_command(ceph_osd)
cli_ceph.add_command(ceph_pool)
cli_ceph.add_command(ceph_volume)
cli.add_command(cli_node)
cli.add_command(cli_vm)

View File

@ -48,6 +48,20 @@ def verifyPool(zk_conn, name):
else:
return False
# Verify Volume is valid in cluster
def verifyVolume(zk_conn, pool, name):
if zkhandler.exists(zk_conn, '/ceph/volumes/{}/{}'.format(pool, name)):
return True
else:
return False
# Verify Snapshot is valid in cluster
def verifySnapshot(zk_conn, pool, volume, name):
if zkhandler.exists(zk_conn, '/ceph/snapshots/{}/{}/{}'.format(pool, volume, name)):
return True
else:
return False
# Verify OSD path is valid in cluster
def verifyOSDBlock(zk_conn, node, device):
for osd in zkhandler.listchildren(zk_conn, '/ceph/osds'):
@ -567,6 +581,390 @@ Wr: {pool_write_ops: <{pool_write_ops_length}} \
output_string = pool_list_output_header + '\n' + '\n'.join(sorted(pool_list_output))
return output_string
def getCephVolumes(zk_conn, pool):
pool_list = list()
if pool == 'all':
for pool in zkhandler.listchildren(zk_conn, '/ceph/pools'):
pool_list += zkhandler.listchildren(zk_conn, '/ceph/volumes/{}'.format(pool))
else:
pool_list += zkhandler.listchildren(zk_conn, '/ceph/volumes/{}'.format(pool))
return pool_list
def formatVolumeList(zk_conn, volume_list):
volume_list_output = []
volume_id = dict()
volume_size = dict()
volume_num_objects = dict()
volume_num_clones = dict()
volume_num_copies = dict()
volume_num_degraded = dict()
volume_read_ops = dict()
volume_read_data = dict()
volume_write_ops = dict()
volume_write_data = dict()
volume_name_length = 5
volume_id_length = 3
volume_size_length = 5
volume_num_objects_length = 6
volume_num_clones_length = 7
volume_num_copies_length = 7
volume_num_degraded_length = 9
volume_read_ops_length = 4
volume_read_data_length = 5
volume_write_ops_length = 4
volume_write_data_length = 5
for volume in volume_list:
# Set the Volume name length
_volume_name_length = len(volume) + 1
if _volume_name_length > volume_name_length:
volume_name_length = _volume_name_length
# Get stats
volume_stats = getVolumeInformation(zk_conn, volume)
# Set the parent node and length
try:
volume_id[volume] = volume_stats['id']
# If this happens, the node hasn't checked in fully yet, so just ignore it
if not volume_id[volume]:
continue
except KeyError:
continue
# Set the id and length
volume_id[volume] = volume_stats['id']
_volume_id_length = len(str(volume_id[volume])) + 1
if _volume_id_length > volume_id_length:
volume_id_length = _volume_id_length
# Set the size and length
volume_size[volume] = volume_stats['size_formatted']
_volume_size_length = len(str(volume_size[volume])) + 1
if _volume_size_length > volume_size_length:
volume_size_length = _volume_size_length
# Set the num_objects and length
volume_num_objects[volume] = volume_stats['num_objects']
_volume_num_objects_length = len(str(volume_num_objects[volume])) + 1
if _volume_num_objects_length > volume_num_objects_length:
volume_num_objects_length = _volume_num_objects_length
# Set the num_clones and length
volume_num_clones[volume] = volume_stats['num_object_clones']
_volume_num_clones_length = len(str(volume_num_clones[volume])) + 1
if _volume_num_clones_length > volume_num_clones_length:
volume_num_clones_length = _volume_num_clones_length
# Set the num_copies and length
volume_num_copies[volume] = volume_stats['num_object_copies']
_volume_num_copies_length = len(str(volume_num_copies[volume])) + 1
if _volume_num_copies_length > volume_num_copies_length:
volume_num_copies_length = _volume_num_copies_length
# Set the num_degraded and length
volume_num_degraded[volume] = volume_stats['num_objects_degraded']
_volume_num_degraded_length = len(str(volume_num_degraded[volume])) + 1
if _volume_num_degraded_length > volume_num_degraded_length:
volume_num_degraded_length = _volume_num_degraded_length
# Set the write IOPS/data and length
volume_write_ops[volume] = volume_stats['write_ops']
_volume_write_ops_length = len(str(volume_write_ops[volume])) + 1
if _volume_write_ops_length > volume_write_ops_length:
volume_write_ops_length = _volume_write_ops_length
volume_write_data[volume] = volume_stats['write_formatted']
_volume_write_data_length = len(volume_write_data[volume]) + 1
if _volume_write_data_length > volume_write_data_length:
volume_write_data_length = _volume_write_data_length
# Set the read IOPS/data and length
volume_read_ops[volume] = volume_stats['read_ops']
_volume_read_ops_length = len(str(volume_read_ops[volume])) + 1
if _volume_read_ops_length > volume_read_ops_length:
volume_read_ops_length = _volume_read_ops_length
volume_read_data[volume] = volume_stats['read_formatted']
_volume_read_data_length = len(volume_read_data[volume]) + 1
if _volume_read_data_length > volume_read_data_length:
volume_read_data_length = _volume_read_data_length
# Format the output header
volume_list_output_header = '{bold}\
{volume_id: <{volume_id_length}} \
{volume_name: <{volume_name_length}} \
{volume_size: <{volume_size_length}} \
Obj: {volume_objects: <{volume_objects_length}} \
{volume_clones: <{volume_clones_length}} \
{volume_copies: <{volume_copies_length}} \
{volume_degraded: <{volume_degraded_length}} \
Rd: {volume_read_ops: <{volume_read_ops_length}} \
{volume_read_data: <{volume_read_data_length}} \
Wr: {volume_write_ops: <{volume_write_ops_length}} \
{volume_write_data: <{volume_write_data_length}} \
{end_bold}'.format(
bold=ansiprint.bold(),
end_bold=ansiprint.end(),
volume_id_length=volume_id_length,
volume_name_length=volume_name_length,
volume_size_length=volume_size_length,
volume_objects_length=volume_num_objects_length,
volume_clones_length=volume_num_clones_length,
volume_copies_length=volume_num_copies_length,
volume_degraded_length=volume_num_degraded_length,
volume_write_ops_length=volume_write_ops_length,
volume_write_data_length=volume_write_data_length,
volume_read_ops_length=volume_read_ops_length,
volume_read_data_length=volume_read_data_length,
volume_id='ID',
volume_name='Name',
volume_size='Used',
volume_objects='Count',
volume_clones='Clones',
volume_copies='Copies',
volume_degraded='Degraded',
volume_write_ops='OPS',
volume_write_data='Data',
volume_read_ops='OPS',
volume_read_data='Data'
)
for volume in volume_list:
# Format the output header
volume_list_output.append('{bold}\
{volume_id: <{volume_id_length}} \
{volume_name: <{volume_name_length}} \
{volume_size: <{volume_size_length}} \
{volume_objects: <{volume_objects_length}} \
{volume_clones: <{volume_clones_length}} \
{volume_copies: <{volume_copies_length}} \
{volume_degraded: <{volume_degraded_length}} \
{volume_read_ops: <{volume_read_ops_length}} \
{volume_read_data: <{volume_read_data_length}} \
{volume_write_ops: <{volume_write_ops_length}} \
{volume_write_data: <{volume_write_data_length}} \
{end_bold}'.format(
bold=ansiprint.bold(),
end_bold=ansiprint.end(),
volume_id_length=volume_id_length,
volume_name_length=volume_name_length,
volume_size_length=volume_size_length,
volume_objects_length=volume_num_objects_length,
volume_clones_length=volume_num_clones_length,
volume_copies_length=volume_num_copies_length,
volume_degraded_length=volume_num_degraded_length,
volume_write_ops_length=volume_write_ops_length,
volume_write_data_length=volume_write_data_length,
volume_read_ops_length=volume_read_ops_length,
volume_read_data_length=volume_read_data_length,
volume_id=volume_id[volume],
volume_name=volume,
volume_size=volume_size[volume],
volume_objects=volume_num_objects[volume],
volume_clones=volume_num_clones[volume],
volume_copies=volume_num_copies[volume],
volume_degraded=volume_num_degraded[volume],
volume_write_ops=volume_write_ops[volume],
volume_write_data=volume_write_data[volume],
volume_read_ops=volume_read_ops[volume],
volume_read_data=volume_read_data[volume]
)
)
output_string = volume_list_output_header + '\n' + '\n'.join(sorted(volume_list_output))
return output_string
def getCephSnapshots(zk_conn):
snapshot_list = zkhandler.listchildren(zk_conn, '/ceph/snapshots')
return snapshot_list
def formatSnapshotList(zk_conn, snapshot_list):
snapshot_list_output = []
snapshot_id = dict()
snapshot_size = dict()
snapshot_num_objects = dict()
snapshot_num_clones = dict()
snapshot_num_copies = dict()
snapshot_num_degraded = dict()
snapshot_read_ops = dict()
snapshot_read_data = dict()
snapshot_write_ops = dict()
snapshot_write_data = dict()
snapshot_name_length = 5
snapshot_id_length = 3
snapshot_size_length = 5
snapshot_num_objects_length = 6
snapshot_num_clones_length = 7
snapshot_num_copies_length = 7
snapshot_num_degraded_length = 9
snapshot_read_ops_length = 4
snapshot_read_data_length = 5
snapshot_write_ops_length = 4
snapshot_write_data_length = 5
for snapshot in snapshot_list:
# Set the Snapshot name length
_snapshot_name_length = len(snapshot) + 1
if _snapshot_name_length > snapshot_name_length:
snapshot_name_length = _snapshot_name_length
# Get stats
snapshot_stats = getSnapshotInformation(zk_conn, snapshot)
# Set the parent node and length
try:
snapshot_id[snapshot] = snapshot_stats['id']
# If this happens, the node hasn't checked in fully yet, so just ignore it
if not snapshot_id[snapshot]:
continue
except KeyError:
continue
# Set the id and length
snapshot_id[snapshot] = snapshot_stats['id']
_snapshot_id_length = len(str(snapshot_id[snapshot])) + 1
if _snapshot_id_length > snapshot_id_length:
snapshot_id_length = _snapshot_id_length
# Set the size and length
snapshot_size[snapshot] = snapshot_stats['size_formatted']
_snapshot_size_length = len(str(snapshot_size[snapshot])) + 1
if _snapshot_size_length > snapshot_size_length:
snapshot_size_length = _snapshot_size_length
# Set the num_objects and length
snapshot_num_objects[snapshot] = snapshot_stats['num_objects']
_snapshot_num_objects_length = len(str(snapshot_num_objects[snapshot])) + 1
if _snapshot_num_objects_length > snapshot_num_objects_length:
snapshot_num_objects_length = _snapshot_num_objects_length
# Set the num_clones and length
snapshot_num_clones[snapshot] = snapshot_stats['num_object_clones']
_snapshot_num_clones_length = len(str(snapshot_num_clones[snapshot])) + 1
if _snapshot_num_clones_length > snapshot_num_clones_length:
snapshot_num_clones_length = _snapshot_num_clones_length
# Set the num_copies and length
snapshot_num_copies[snapshot] = snapshot_stats['num_object_copies']
_snapshot_num_copies_length = len(str(snapshot_num_copies[snapshot])) + 1
if _snapshot_num_copies_length > snapshot_num_copies_length:
snapshot_num_copies_length = _snapshot_num_copies_length
# Set the num_degraded and length
snapshot_num_degraded[snapshot] = snapshot_stats['num_objects_degraded']
_snapshot_num_degraded_length = len(str(snapshot_num_degraded[snapshot])) + 1
if _snapshot_num_degraded_length > snapshot_num_degraded_length:
snapshot_num_degraded_length = _snapshot_num_degraded_length
# Set the write IOPS/data and length
snapshot_write_ops[snapshot] = snapshot_stats['write_ops']
_snapshot_write_ops_length = len(str(snapshot_write_ops[snapshot])) + 1
if _snapshot_write_ops_length > snapshot_write_ops_length:
snapshot_write_ops_length = _snapshot_write_ops_length
snapshot_write_data[snapshot] = snapshot_stats['write_formatted']
_snapshot_write_data_length = len(snapshot_write_data[snapshot]) + 1
if _snapshot_write_data_length > snapshot_write_data_length:
snapshot_write_data_length = _snapshot_write_data_length
# Set the read IOPS/data and length
snapshot_read_ops[snapshot] = snapshot_stats['read_ops']
_snapshot_read_ops_length = len(str(snapshot_read_ops[snapshot])) + 1
if _snapshot_read_ops_length > snapshot_read_ops_length:
snapshot_read_ops_length = _snapshot_read_ops_length
snapshot_read_data[snapshot] = snapshot_stats['read_formatted']
_snapshot_read_data_length = len(snapshot_read_data[snapshot]) + 1
if _snapshot_read_data_length > snapshot_read_data_length:
snapshot_read_data_length = _snapshot_read_data_length
# Format the output header
snapshot_list_output_header = '{bold}\
{snapshot_id: <{snapshot_id_length}} \
{snapshot_name: <{snapshot_name_length}} \
{snapshot_size: <{snapshot_size_length}} \
Obj: {snapshot_objects: <{snapshot_objects_length}} \
{snapshot_clones: <{snapshot_clones_length}} \
{snapshot_copies: <{snapshot_copies_length}} \
{snapshot_degraded: <{snapshot_degraded_length}} \
Rd: {snapshot_read_ops: <{snapshot_read_ops_length}} \
{snapshot_read_data: <{snapshot_read_data_length}} \
Wr: {snapshot_write_ops: <{snapshot_write_ops_length}} \
{snapshot_write_data: <{snapshot_write_data_length}} \
{end_bold}'.format(
bold=ansiprint.bold(),
end_bold=ansiprint.end(),
snapshot_id_length=snapshot_id_length,
snapshot_name_length=snapshot_name_length,
snapshot_size_length=snapshot_size_length,
snapshot_objects_length=snapshot_num_objects_length,
snapshot_clones_length=snapshot_num_clones_length,
snapshot_copies_length=snapshot_num_copies_length,
snapshot_degraded_length=snapshot_num_degraded_length,
snapshot_write_ops_length=snapshot_write_ops_length,
snapshot_write_data_length=snapshot_write_data_length,
snapshot_read_ops_length=snapshot_read_ops_length,
snapshot_read_data_length=snapshot_read_data_length,
snapshot_id='ID',
snapshot_name='Name',
snapshot_size='Used',
snapshot_objects='Count',
snapshot_clones='Clones',
snapshot_copies='Copies',
snapshot_degraded='Degraded',
snapshot_write_ops='OPS',
snapshot_write_data='Data',
snapshot_read_ops='OPS',
snapshot_read_data='Data'
)
for snapshot in snapshot_list:
# Format the output header
snapshot_list_output.append('{bold}\
{snapshot_id: <{snapshot_id_length}} \
{snapshot_name: <{snapshot_name_length}} \
{snapshot_size: <{snapshot_size_length}} \
{snapshot_objects: <{snapshot_objects_length}} \
{snapshot_clones: <{snapshot_clones_length}} \
{snapshot_copies: <{snapshot_copies_length}} \
{snapshot_degraded: <{snapshot_degraded_length}} \
{snapshot_read_ops: <{snapshot_read_ops_length}} \
{snapshot_read_data: <{snapshot_read_data_length}} \
{snapshot_write_ops: <{snapshot_write_ops_length}} \
{snapshot_write_data: <{snapshot_write_data_length}} \
{end_bold}'.format(
bold=ansiprint.bold(),
end_bold=ansiprint.end(),
snapshot_id_length=snapshot_id_length,
snapshot_name_length=snapshot_name_length,
snapshot_size_length=snapshot_size_length,
snapshot_objects_length=snapshot_num_objects_length,
snapshot_clones_length=snapshot_num_clones_length,
snapshot_copies_length=snapshot_num_copies_length,
snapshot_degraded_length=snapshot_num_degraded_length,
snapshot_write_ops_length=snapshot_write_ops_length,
snapshot_write_data_length=snapshot_write_data_length,
snapshot_read_ops_length=snapshot_read_ops_length,
snapshot_read_data_length=snapshot_read_data_length,
snapshot_id=snapshot_id[snapshot],
snapshot_name=snapshot,
snapshot_size=snapshot_size[snapshot],
snapshot_objects=snapshot_num_objects[snapshot],
snapshot_clones=snapshot_num_clones[snapshot],
snapshot_copies=snapshot_num_copies[snapshot],
snapshot_degraded=snapshot_num_degraded[snapshot],
snapshot_write_ops=snapshot_write_ops[snapshot],
snapshot_write_data=snapshot_write_data[snapshot],
snapshot_read_ops=snapshot_read_ops[snapshot],
snapshot_read_data=snapshot_read_data[snapshot]
)
)
output_string = snapshot_list_output_header + '\n' + '\n'.join(sorted(snapshot_list_output))
return output_string
#
# Direct functions
#
@ -861,3 +1259,185 @@ def get_list_pool(zk_conn, limit):
return True, ''
def add_volume(zk_conn, pool, name, size):
# Tell the cluster to create a new volume
add_volume_string = 'volume_add {},{},{}'.format(pool, name, size)
zkhandler.writedata(zk_conn, {'/ceph/cmd': add_volume_string})
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd')
with lock:
try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0]
if result == 'success-volume_add':
message = 'Created new RBD volume {} of size {} GiB on pool {}.'.format(name, size, pool)
success = True
else:
message = 'ERROR: Failed to create new volume; check node logs for details.'
success = False
except:
message = 'ERROR: Command ignored by node.'
success = False
# Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with lock:
time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''})
return success, message
def remove_volume(zk_conn, pool, name):
if not verifyVolume(zk_conn, pool, name):
return False, 'ERROR: No volume with name "{}" is present in pool {}.'.format(name, pool)
# Tell the cluster to create a new volume
remove_volume_string = 'volume_remove {},{}'.format(pool, name)
zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_volume_string})
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd')
with lock:
try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0]
if result == 'success-volume_remove':
message = 'Removed RBD volume {} in pool {}.'.format(name, pool)
success = True
else:
message = 'ERROR: Failed to remove volume; check node logs for details.'
success = False
except Exception as e:
message = 'ERROR: Command ignored by node: {}'.format(e)
success = False
# Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with lock:
time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''})
return success, message
def get_list_volume(zk_conn, pool, limit):
volume_list = []
full_volume_list = getCephVolumes(zk_conn, pool)
if limit:
try:
# Implicitly assume fuzzy limits
if re.match('\^.*', limit) == None:
limit = '.*' + limit
if re.match('.*\$', limit) == None:
limit = limit + '.*'
except Exception as e:
return False, 'Regex Error: {}'.format(e)
for volume in full_volume_list:
valid_volume = False
if limit:
if re.match(limit, volume['volume_id']) != None:
valid_volume = True
else:
valid_volume = True
if valid_volume:
volume_list.append(volume)
output_string = formatVolumeList(zk_conn, volume_list)
click.echo(output_string)
return True, ''
def add_snapshot(zk_conn, pool, volume, name):
# Tell the cluster to create a new snapshot
add_snapshot_string = 'snapshot_add {},{}'.format(pool. volume, name)
zkhandler.writedata(zk_conn, {'/ceph/cmd': add_snapshot_string})
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd')
with lock:
try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0]
if result == 'success-snapshot_add':
message = 'Created new RBD snapshot {} of volume {} on pool {}.'.format(name, volume, pool)
success = True
else:
message = 'ERROR: Failed to create new snapshot; check node logs for details.'
success = False
except:
message = 'ERROR: Command ignored by node.'
success = False
# Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with lock:
time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''})
return success, message
def remove_snapshot(zk_conn, pool, volume, name):
if not verifySnapshot(zk_conn, pool, volume, name):
return False, 'ERROR: No snapshot with name "{}" is present of volume {} on pool {}.'.format(name, volume, pool)
# Tell the cluster to create a new snapshot
remove_snapshot_string = 'snapshot_remove {},{},{}'.format(pool, volume name)
zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_snapshot_string})
# Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd')
with lock:
try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0]
if result == 'success-snapshot_remove':
message = 'Removed RBD snapshot {} and all volumes.'.format(name)
success = True
else:
message = 'ERROR: Failed to remove snapshot; check node logs for details.'
success = False
except Exception as e:
message = 'ERROR: Command ignored by node: {}'.format(e)
success = False
# Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with lock:
time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''})
return success, message
def get_list_snapshot(zk_conn, pool, volume, limit):
snapshot_list = []
full_snapshot_list = getCephSnapshots(zk_conn, pool, volume)
if limit:
try:
# Implicitly assume fuzzy limits
if re.match('\^.*', limit) == None:
limit = '.*' + limit
if re.match('.*\$', limit) == None:
limit = limit + '.*'
except Exception as e:
return False, 'Regex Error: {}'.format(e)
for snapshot in full_snapshot_list:
valid_snapshot = False
if limit:
if re.match(limit, snapshot['snapshot_id']) != None:
valid_snapshot = True
else:
valid_snapshot = True
if valid_snapshot:
snapshot_list.append(snapshot)
output_string = formatSnapshotList(zk_conn, snapshot_list)
click.echo(output_string)
return True, ''

View File

@ -447,6 +447,170 @@ def remove_pool(zk_conn, logger, name):
logger.out('Failed to remove RBD pool {}: {}'.format(name, e), state='e')
return False
class CephVolumeInstance(object):
def __init__(self, zk_conn, this_node, pool, name):
self.zk_conn = zk_conn
self.this_node = this_node
self.pool = pool
self.name = name
self.size = ''
self.stats = dict()
@self.zk_conn.DataWatch('/ceph/volumes/{}/{}/size'.format(self.pool, self.name))
def watch_volume_node(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.size:
self.size = data
@self.zk_conn.DataWatch('/ceph/volumes/{}/{}/stats'.format(self.pool, self.name))
def watch_volume_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
def add_volume(zk_conn, logger, pool, name, size):
# We are ready to create a new volume on this node
logger.out('Creating new RBD volume {} on pool {}'.format(name, pool), state='i')
try:
# 1. Create the volume
sizeMiB = size * 1024
retcode, stdout, stderr = common.run_os_command('rbd create --size {} {}/{}'.format(sizeMiB, pool, name))
if retcode:
print('rbd create')
print(stdout)
print(stderr)
raise
# 2. Add the new volume to ZK
zkhandler.writedata(zk_conn, {
'/ceph/volumes/{}/{}'.format(pool, name): '',
'/ceph/volumes/{}/{}/size'.format(pool, name): size,
'/ceph/volumes/{}/{}/stats'.format(pool, name): '{}'
})
# Log it
logger.out('Created new RBD volume {} on pool {}'.format(name, pool), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to create new RBD volume {} on pool {}: {}'.format(name, pool, e), state='e')
return False
def remove_volume(zk_conn, logger, pool, name):
# We are ready to create a new volume on this node
logger.out('Removing RBD volume {} from pool {}'.format(name, pool), state='i')
try:
# Delete volume from ZK
zkhandler.deletekey(zk_conn, '/ceph/volumes/{}/{}'.format(pool, name))
# Remove the volume
retcode, stdout, stderr = common.run_os_command('rbd rm {}/{}'.format(pool, name))
if retcode:
print('ceph osd volume rm')
print(stdout)
print(stderr)
raise
# Log it
logger.out('Removed RBD volume {} from pool {}'.format(name, pool), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to remove RBD volume {} from pool {}: {}'.format(name, pool, e), state='e')
return False
class CephSnapshotInstance(object):
def __init__(self, zk_conn, this_node, name):
self.zk_conn = zk_conn
self.this_node = this_node
self.pool = pool
self.volume = volume
self.name = name
self.stats = dict()
@self.zk_conn.DataWatch('/ceph/snapshots/{}/{}/{}/stats'.format(self.pool, self.volume, self.name))
def watch_snapshot_stats(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data and data != self.stats:
self.stats = json.loads(data)
def add_snapshot(zk_conn, logger, pool, volume, name):
# We are ready to create a new snapshot on this node
logger.out('Creating new RBD snapshot {} of volume {} on pool {}'.format(name, volume, pool), state='i')
try:
# 1. Create the snapshot
retcode, stdout, stderr = common.run_os_command('rbd snap create {}/{}@{}'.format(pool, volume, name))
if retcode:
print('rbd snap create')
print(stdout)
print(stderr)
raise
# 2. Add the new snapshot to ZK
zkhandler.writedata(zk_conn, {
'/ceph/snapshots/{}/{}/{}'.format(pool, volume, name): '',
'/ceph/snapshots/{}/{}/{}/stats'.format(pool, volume, name): '{}'
})
# Log it
logger.out('Created new RBD snapshot {} of volume {} on pool {}'.format(name, volume, pool), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to create new RBD snapshot {} of volume {} on pool {}: {}'.format(name, volume, pool, e), state='e')
return False
def remove_snapshot(zk_conn, logger, pool, volume, name):
# We are ready to create a new snapshot on this node
logger.out('Removing RBD snapshot {} of volume {} on pool {}'.format(name, volume, pool), state='i')
try:
# Delete snapshot from ZK
zkhandler.deletekey(zk_conn, '/ceph/snapshots/{}/{}/{}'.format(pool, volume, name))
# Remove the snapshot
retcode, stdout, stderr = common.run_os_command('rbd snap rm {}/{}@{}'.format(pool, volume, name))
if retcode:
print('rbd snap rm')
print(stdout)
print(stderr)
raise
# Log it
logger.out('Removed RBD snapshot {} of volume {} on pool {}'.format(name, volume, pool), state='o')
return True
except Exception as e:
# Log it
logger.out('Failed to remove RBD snapshot {} of volume {} on pool {}: {}'.format(name, volume, pool, e), state='e')
return False
# Primary command function
def run_command(zk_conn, logger, this_node, data, d_osd):
# Get the command and args
command, args = data.split()
@ -620,3 +784,87 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Adding a new volume
elif command == 'volume_add':
pool, name, size = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Add the volume
result = add_volume(zk_conn, logger, pool, name, size)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Removing a volume
elif command == 'volume_remove':
pool, name = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Remove the volume
result = remove_volume(zk_conn, logger, pool, name)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Adding a new snapshot
elif command == 'snapshot_add':
pool, volume, name = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Add the snapshot
result = add_snapshot(zk_conn, logger, pool, volume, name)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Removing a snapshot
elif command == 'snapshot_remove':
pool, name, name = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Remove the snapshot
result = remove_snapshot(zk_conn, logger, pool, volume, name)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)