From 2880a761c0f010b57e0afebcc069bbcdbee2a919 Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Wed, 7 Aug 2019 14:47:27 -0400 Subject: [PATCH] Move Ceph command pipe to new location Matching the new /cmd/domain pipe, move Ceph pipe to /cmd/ceph. --- client-common/ceph.py | 148 +++++++++++++++---------------- node-daemon/pvcd/CephInstance.py | 90 +++++++++---------- node-daemon/pvcd/Daemon.py | 2 +- 3 files changed, 120 insertions(+), 120 deletions(-) diff --git a/client-common/ceph.py b/client-common/ceph.py index 10aabb2f..463127c1 100644 --- a/client-common/ceph.py +++ b/client-common/ceph.py @@ -212,14 +212,14 @@ def add_osd(zk_conn, node, device, weight): # Tell the cluster to create a new OSD for the host add_osd_string = 'osd_add {},{},{}'.format(node, device, weight) - zkhandler.writedata(zk_conn, {'/ceph/cmd': add_osd_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': add_osd_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-osd_add': message = 'Created new OSD with block device "{}" on node "{}".'.format(device, node) success = True @@ -231,10 +231,10 @@ def add_osd(zk_conn, node, device, weight): success = False # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(5) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message @@ -244,14 +244,14 @@ def remove_osd(zk_conn, osd_id): # Tell the cluster to remove an OSD remove_osd_string = 'osd_remove {}'.format(osd_id) - zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_osd_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': remove_osd_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-osd_remove': message = 'Removed OSD "{}" from the cluster.'.format(osd_id) success = True @@ -263,10 +263,10 @@ def remove_osd(zk_conn, osd_id): message = 'ERROR Command ignored by node.' # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(5) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message @@ -276,14 +276,14 @@ def in_osd(zk_conn, osd_id): # Tell the cluster to online an OSD in_osd_string = 'osd_in {}'.format(osd_id) - zkhandler.writedata(zk_conn, {'/ceph/cmd': in_osd_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': in_osd_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-osd_in': message = 'Set OSD {} online in the cluster.'.format(osd_id) success = True @@ -295,10 +295,10 @@ def in_osd(zk_conn, osd_id): message = 'ERROR Command ignored by node.' # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(1) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message @@ -308,14 +308,14 @@ def out_osd(zk_conn, osd_id): # Tell the cluster to offline an OSD out_osd_string = 'osd_out {}'.format(osd_id) - zkhandler.writedata(zk_conn, {'/ceph/cmd': out_osd_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': out_osd_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-osd_out': message = 'Set OSD {} offline in the cluster.'.format(osd_id) success = True @@ -327,24 +327,24 @@ def out_osd(zk_conn, osd_id): message = 'ERROR Command ignored by node.' # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(1) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message def set_osd(zk_conn, option): # Tell the cluster to set an OSD property set_osd_string = 'osd_set {}'.format(option) - zkhandler.writedata(zk_conn, {'/ceph/cmd': set_osd_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': set_osd_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-osd_set': message = 'Set OSD property {} on the cluster.'.format(option) success = True @@ -355,20 +355,20 @@ def set_osd(zk_conn, option): success = False message = 'ERROR Command ignored by node.' - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message def unset_osd(zk_conn, option): # Tell the cluster to unset an OSD property unset_osd_string = 'osd_unset {}'.format(option) - zkhandler.writedata(zk_conn, {'/ceph/cmd': unset_osd_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': unset_osd_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-osd_unset': message = 'Unset OSD property {} on the cluster.'.format(option) success = True @@ -380,10 +380,10 @@ def unset_osd(zk_conn, option): message = 'ERROR Command ignored by node.' # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(1) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message @@ -661,14 +661,14 @@ def getPoolInformation(zk_conn, pool): def add_pool(zk_conn, name, pgs): # Tell the cluster to create a new pool add_pool_string = 'pool_add {},{}'.format(name, pgs) - zkhandler.writedata(zk_conn, {'/ceph/cmd': add_pool_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': add_pool_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-pool_add': message = 'Created new RBD pool "{}" with "{}" PGs.'.format(name, pgs) success = True @@ -680,10 +680,10 @@ def add_pool(zk_conn, name, pgs): success = False # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(3) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message @@ -693,14 +693,14 @@ def remove_pool(zk_conn, name): # Tell the cluster to create a new pool remove_pool_string = 'pool_remove {}'.format(name) - zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_pool_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': remove_pool_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-pool_remove': message = 'Removed RBD pool "{}" and all volumes.'.format(name) success = True @@ -712,10 +712,10 @@ def remove_pool(zk_conn, name): success = False # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(3) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message @@ -940,14 +940,14 @@ def add_volume(zk_conn, pool, name, size): # Tell the cluster to create a new volume databytes = format_bytes_fromhuman(size) add_volume_string = 'volume_add {},{},{}'.format(pool, name, databytes) - zkhandler.writedata(zk_conn, {'/ceph/cmd': add_volume_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': add_volume_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-volume_add': message = 'Created new RBD volume "{}" of size "{}" on pool "{}".'.format(name, size, pool) success = True @@ -959,10 +959,10 @@ def add_volume(zk_conn, pool, name, size): success = False # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(1) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message @@ -970,14 +970,14 @@ def resize_volume(zk_conn, pool, name, size): # Tell the cluster to resize the volume databytes = format_bytes_fromhuman(size) resize_volume_string = 'volume_resize {},{},{}'.format(pool, name, databytes) - zkhandler.writedata(zk_conn, {'/ceph/cmd': resize_volume_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': resize_volume_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-volume_resize': message = 'Resized RBD volume "{}" to size "{}" on pool "{}".'.format(name, size, pool) success = True @@ -989,24 +989,24 @@ def resize_volume(zk_conn, pool, name, size): success = False # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(1) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message def rename_volume(zk_conn, pool, name, new_name): # Tell the cluster to rename rename_volume_string = 'volume_rename {},{},{}'.format(pool, name, new_name) - zkhandler.writedata(zk_conn, {'/ceph/cmd': rename_volume_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': rename_volume_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-volume_rename': message = 'Renamed RBD volume "{}" to "{}" on pool "{}".'.format(name, new_name, pool) success = True @@ -1018,10 +1018,10 @@ def rename_volume(zk_conn, pool, name, new_name): success = False # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(1) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message @@ -1031,14 +1031,14 @@ def remove_volume(zk_conn, pool, name): # Tell the cluster to create a new volume remove_volume_string = 'volume_remove {},{}'.format(pool, name) - zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_volume_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': remove_volume_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-volume_remove': message = 'Removed RBD volume "{}" in pool "{}".'.format(name, pool) success = True @@ -1050,10 +1050,10 @@ def remove_volume(zk_conn, pool, name): success = False # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(1) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message @@ -1216,14 +1216,14 @@ def getCephSnapshots(zk_conn, pool, volume): def add_snapshot(zk_conn, pool, volume, name): # Tell the cluster to create a new snapshot add_snapshot_string = 'snapshot_add {},{},{}'.format(pool, volume, name) - zkhandler.writedata(zk_conn, {'/ceph/cmd': add_snapshot_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': add_snapshot_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-snapshot_add': message = 'Created new RBD snapshot "{}" of volume "{}" on pool "{}".'.format(name, volume, pool) success = True @@ -1235,24 +1235,24 @@ def add_snapshot(zk_conn, pool, volume, name): success = False # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(1) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message def rename_snapshot(zk_conn, pool, volume, name, new_name): # Tell the cluster to rename rename_snapshot_string = 'snapshot_rename {},{},{}'.format(pool, name, new_name) - zkhandler.writedata(zk_conn, {'/ceph/cmd': rename_snapshot_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': rename_snapshot_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-snapshot_rename': message = 'Renamed RBD volume snapshot "{}" to "{}" for volume {} on pool "{}".'.format(name, new_name, volume, pool) success = True @@ -1264,10 +1264,10 @@ def rename_snapshot(zk_conn, pool, volume, name, new_name): success = False # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(1) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message @@ -1277,14 +1277,14 @@ def remove_snapshot(zk_conn, pool, volume, name): # Tell the cluster to create a new snapshot remove_snapshot_string = 'snapshot_remove {},{},{}'.format(pool, volume, name) - zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_snapshot_string}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': remove_snapshot_string}) # Wait 1/2 second for the cluster to get the message and start working time.sleep(0.5) # Acquire a read lock, so we get the return exclusively - lock = zkhandler.readlock(zk_conn, '/ceph/cmd') + lock = zkhandler.readlock(zk_conn, '/cmd/ceph') with lock: try: - result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] + result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0] if result == 'success-snapshot_remove': message = 'Removed RBD snapshot "{}" of volume "{}" in pool "{}".'.format(name, volume, pool) success = True @@ -1296,10 +1296,10 @@ def remove_snapshot(zk_conn, pool, volume, name): success = False # Acquire a write lock to ensure things go smoothly - lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with lock: time.sleep(1) - zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': ''}) return success, message diff --git a/node-daemon/pvcd/CephInstance.py b/node-daemon/pvcd/CephInstance.py index 3f45a3c2..5deb4b48 100644 --- a/node-daemon/pvcd/CephInstance.py +++ b/node-daemon/pvcd/CephInstance.py @@ -718,18 +718,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): node, device, weight = args.split(',') if node == this_node.name: # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Add the OSD result = add_osd(zk_conn, logger, node, device, weight) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -740,18 +740,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): # Verify osd_id is in the list if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Remove the OSD result = remove_osd(zk_conn, logger, osd_id, d_osd[osd_id]) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -762,18 +762,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): # Verify osd_id is in the list if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Online the OSD result = in_osd(zk_conn, logger, osd_id) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -784,18 +784,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): # Verify osd_id is in the list if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Offline the OSD result = out_osd(zk_conn, logger, osd_id) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -805,18 +805,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): if this_node.router_state == 'primary': # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Set the property result = set_property(zk_conn, logger, option) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -826,18 +826,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): if this_node.router_state == 'primary': # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Unset the property result = unset_property(zk_conn, logger, option) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -847,18 +847,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): if this_node.router_state == 'primary': # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Add the pool result = add_pool(zk_conn, logger, name, pgs) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -868,18 +868,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): if this_node.router_state == 'primary': # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Remove the pool result = remove_pool(zk_conn, logger, name) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -889,18 +889,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): if this_node.router_state == 'primary': # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Add the volume result = add_volume(zk_conn, logger, pool, name, size) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -910,18 +910,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): if this_node.router_state == 'primary': # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Add the volume result = resize_volume(zk_conn, logger, pool, name, size) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -931,18 +931,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): if this_node.router_state == 'primary': # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Add the volume result = rename_volume(zk_conn, logger, pool, name, new_name) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -952,18 +952,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): if this_node.router_state == 'primary': # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Remove the volume result = remove_volume(zk_conn, logger, pool, name) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -973,18 +973,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): if this_node.router_state == 'primary': # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Add the snapshot result = add_snapshot(zk_conn, logger, pool, volume, name) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -994,18 +994,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd): if this_node.router_state == 'primary': # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Add the snapshot result = rename_snapshot(zk_conn, logger, pool, volume, name, new_name) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) @@ -1015,17 +1015,17 @@ def run_command(zk_conn, logger, this_node, data, d_osd): if this_node.router_state == 'primary': # Lock the command queue - zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') + zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph') with zk_lock: # Remove the snapshot result = remove_snapshot(zk_conn, logger, pool, volume, name) # Command succeeded if result: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)}) # Command failed else: # Update the command queue - zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) + zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)}) # Wait 1 seconds before we free the lock, to ensure the client hits the lock time.sleep(1) diff --git a/node-daemon/pvcd/Daemon.py b/node-daemon/pvcd/Daemon.py index 1ccdf0e6..97a48047 100644 --- a/node-daemon/pvcd/Daemon.py +++ b/node-daemon/pvcd/Daemon.py @@ -862,7 +862,7 @@ if enable_hypervisor: if enable_storage: # Ceph command pipeline key - @zk_conn.DataWatch('/ceph/cmd') + @zk_conn.DataWatch('/cmd/ceph') def cmd(data, stat, event=''): if data: CephInstance.run_command(zk_conn, logger, this_node, data.decode('ascii'), d_osd)