Implement Ceph volume resize and rename
Includes a simple implementation of a zookeeper "rename" facility, allowing a key and all data to be replaced by a new key with a different name but containing all the same child elements and data. [2/2] Implements #44
This commit is contained in:
parent
d5f263bdd6
commit
35363671a0
|
@ -1506,7 +1506,7 @@ def ceph_volume_resize(pool, name, size):
|
||||||
###############################################################################
|
###############################################################################
|
||||||
# pvc storage ceph volume rename
|
# pvc storage ceph volume rename
|
||||||
###############################################################################
|
###############################################################################
|
||||||
@click.command(name='rename', short_help'Rename RBD volume.')
|
@click.command(name='rename', short_help='Rename RBD volume.')
|
||||||
@click.argument(
|
@click.argument(
|
||||||
'pool'
|
'pool'
|
||||||
)
|
)
|
||||||
|
|
|
@ -489,7 +489,7 @@ class CephVolumeInstance(object):
|
||||||
|
|
||||||
def add_volume(zk_conn, logger, pool, name, size):
|
def add_volume(zk_conn, logger, pool, name, size):
|
||||||
# We are ready to create a new volume on this node
|
# We are ready to create a new volume on this node
|
||||||
logger.out('Creating new RBD volume {} on pool {}'.format(name, pool), state='i')
|
logger.out('Creating new RBD volume {} on pool {} of size {}'.format(name, pool, size), state='i')
|
||||||
try:
|
try:
|
||||||
# Create the volume
|
# Create the volume
|
||||||
retcode, stdout, stderr = common.run_os_command('rbd create --size {} --image-feature layering,exclusive-lock {}/{}'.format(size, pool, name))
|
retcode, stdout, stderr = common.run_os_command('rbd create --size {} --image-feature layering,exclusive-lock {}/{}'.format(size, pool, name))
|
||||||
|
@ -518,6 +518,70 @@ def add_volume(zk_conn, logger, pool, name, size):
|
||||||
logger.out('Failed to create new RBD volume {} on pool {}: {}'.format(name, pool, e), state='e')
|
logger.out('Failed to create new RBD volume {} on pool {}: {}'.format(name, pool, e), state='e')
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def resize_volume(zk_conn, logger, pool, name, size):
|
||||||
|
logger.out('Resizing RBD volume {} on pool {} to size {}'.format(name, pool, size), state='i')
|
||||||
|
try:
|
||||||
|
# Resize the volume
|
||||||
|
retcode, stdout, stderr = common.run_os_command('rbd resize --size {} {}/{}'.format(size, pool, name))
|
||||||
|
if retcode:
|
||||||
|
print('rbd resize')
|
||||||
|
print(stdout)
|
||||||
|
print(stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Get volume stats
|
||||||
|
retcode, stdout, stderr = common.run_os_command('rbd info --format json {}/{}'.format(pool, name))
|
||||||
|
volstats = stdout
|
||||||
|
|
||||||
|
# Update the volume to ZK
|
||||||
|
zkhandler.writedata(zk_conn, {
|
||||||
|
'/ceph/volumes/{}/{}'.format(pool, name): '',
|
||||||
|
'/ceph/volumes/{}/{}/stats'.format(pool, name): volstats,
|
||||||
|
'/ceph/snapshots/{}/{}'.format(pool, name): '',
|
||||||
|
})
|
||||||
|
|
||||||
|
# Log it
|
||||||
|
logger.out('Created new RBD volume {} on pool {}'.format(name, pool), state='o')
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
# Log it
|
||||||
|
logger.out('Failed to resize RBD volume {} on pool {}: {}'.format(name, pool, e), state='e')
|
||||||
|
return False
|
||||||
|
|
||||||
|
def rename_volume(zk_conn, logger, pool, name, new_name):
|
||||||
|
logger.out('Renaming RBD volume {} to {} on pool {}'.format(name, new_name, pool))
|
||||||
|
try:
|
||||||
|
# Rename the volume
|
||||||
|
retcode, stdout, stderr = common.run_os_command('rbd rename {}/{} {}'.format(pool, name, new_name))
|
||||||
|
if retcode:
|
||||||
|
print('rbd rename')
|
||||||
|
print(stdout)
|
||||||
|
print(stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Rename the volume in ZK
|
||||||
|
zkhandler.renamekey(zk_conn, {
|
||||||
|
'/ceph/volumes/{}/{}'.format(pool, name): '/ceph/volumes/{}/{}'.format(pool, new_name),
|
||||||
|
'/ceph/snapshots/{}/{}'.format(pool, name): '/ceph/snapshots/{}/{}'.format(pool, new_name),
|
||||||
|
})
|
||||||
|
|
||||||
|
# Get volume stats
|
||||||
|
retcode, stdout, stderr = common.run_os_command('rbd info --format json {}/{}'.format(pool, new_name))
|
||||||
|
volstats = stdout
|
||||||
|
|
||||||
|
# Update the volume stats in ZK
|
||||||
|
zkhandler.writedata(zk_conn, {
|
||||||
|
'/ceph/volumes/{}/{}/stats'.format(pool, new_name): volstats,
|
||||||
|
})
|
||||||
|
|
||||||
|
# Log it
|
||||||
|
logger.out('Renamed RBD volume {} to {} on pool {}'.format(name, new_name, pool), state='o')
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
# Log it
|
||||||
|
logger.out('Failed to rename RBD volume {} on pool {}: {}'.format(name, pool, e), state='e')
|
||||||
|
return False
|
||||||
|
|
||||||
def remove_volume(zk_conn, logger, pool, name):
|
def remove_volume(zk_conn, logger, pool, name):
|
||||||
# We are ready to create a new volume on this node
|
# We are ready to create a new volume on this node
|
||||||
logger.out('Removing RBD volume {} from pool {}'.format(name, pool), state='i')
|
logger.out('Removing RBD volume {} from pool {}'.format(name, pool), state='i')
|
||||||
|
@ -811,6 +875,48 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
|
||||||
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Resizing a volume
|
||||||
|
elif command == 'volume_resize':
|
||||||
|
pool, name, size = args.split(',')
|
||||||
|
|
||||||
|
if this_node.router_state == 'primary':
|
||||||
|
# Lock the command queue
|
||||||
|
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
|
||||||
|
with zk_lock:
|
||||||
|
# Add the volume
|
||||||
|
result = resize_volume(zk_conn, logger, pool, name, size)
|
||||||
|
# Command succeeded
|
||||||
|
if result:
|
||||||
|
# Update the command queue
|
||||||
|
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
|
||||||
|
# Command failed
|
||||||
|
else:
|
||||||
|
# Update the command queue
|
||||||
|
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
|
||||||
|
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Renaming a new volume
|
||||||
|
elif command == 'volume_rename':
|
||||||
|
pool, name, new_name = args.split(',')
|
||||||
|
|
||||||
|
if this_node.router_state == 'primary':
|
||||||
|
# Lock the command queue
|
||||||
|
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
|
||||||
|
with zk_lock:
|
||||||
|
# Add the volume
|
||||||
|
result = rename_volume(zk_conn, logger, pool, name, new_name)
|
||||||
|
# Command succeeded
|
||||||
|
if result:
|
||||||
|
# Update the command queue
|
||||||
|
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
|
||||||
|
# Command failed
|
||||||
|
else:
|
||||||
|
# Update the command queue
|
||||||
|
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
|
||||||
|
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
# Removing a volume
|
# Removing a volume
|
||||||
elif command == 'volume_remove':
|
elif command == 'volume_remove':
|
||||||
pool, name = args.split(',')
|
pool, name = args.split(',')
|
||||||
|
|
|
@ -81,6 +81,53 @@ def writedata(zk_conn, kv):
|
||||||
except Exception:
|
except Exception:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# Key rename function
|
||||||
|
def renamekey(zk_conn, kv):
|
||||||
|
# Start up a transaction
|
||||||
|
zk_transaction = zk_conn.transaction()
|
||||||
|
|
||||||
|
# Proceed one KV pair at a time
|
||||||
|
for key in sorted(kv):
|
||||||
|
old_name = key
|
||||||
|
new_name = kv[key]
|
||||||
|
|
||||||
|
old_data = zk_conn.get(old_name)
|
||||||
|
|
||||||
|
# Find the children of old_name recursively
|
||||||
|
child_keys = list()
|
||||||
|
def get_children(key):
|
||||||
|
children = zk_conn.get_children(key)
|
||||||
|
if not children:
|
||||||
|
child_keys.append(key)
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
for ckey in children:
|
||||||
|
get_children(key)
|
||||||
|
get_children(old_name)
|
||||||
|
|
||||||
|
# Get the data out of each of the child keys
|
||||||
|
child_data = dict()
|
||||||
|
for ckey in child_keys:
|
||||||
|
child_data[ckey] = zk_conn.get(ckey)
|
||||||
|
|
||||||
|
# Create the new parent key
|
||||||
|
zk_transaction.create(new_name, old_data)
|
||||||
|
|
||||||
|
# For each child key, create the key and add the data
|
||||||
|
for ckey in child_keys:
|
||||||
|
new_ckey_name = ckey.replace(old_name, new_name)
|
||||||
|
zk_transaction.create(new_ckey_name, child_data[ckey])
|
||||||
|
|
||||||
|
# Remove recursively the old key
|
||||||
|
zk_transaction.delete(old_name, recursive=True)
|
||||||
|
|
||||||
|
# Commit the transaction
|
||||||
|
try:
|
||||||
|
zk_transaction.commit()
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
# Write lock function
|
# Write lock function
|
||||||
def writelock(zk_conn, key):
|
def writelock(zk_conn, key):
|
||||||
lock_id = str(uuid.uuid1())
|
lock_id = str(uuid.uuid1())
|
||||||
|
|
Loading…
Reference in New Issue