Move Ceph command pipe to new location

Matching the new /cmd/domain pipe, move Ceph pipe to /cmd/ceph.
This commit is contained in:
Joshua Boniface 2019-08-07 14:47:27 -04:00
parent ea2426fa73
commit 2880a761c0
3 changed files with 120 additions and 120 deletions

View File

@ -212,14 +212,14 @@ def add_osd(zk_conn, node, device, weight):
# Tell the cluster to create a new OSD for the host # Tell the cluster to create a new OSD for the host
add_osd_string = 'osd_add {},{},{}'.format(node, device, weight) add_osd_string = 'osd_add {},{},{}'.format(node, device, weight)
zkhandler.writedata(zk_conn, {'/ceph/cmd': add_osd_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': add_osd_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-osd_add': if result == 'success-osd_add':
message = 'Created new OSD with block device "{}" on node "{}".'.format(device, node) message = 'Created new OSD with block device "{}" on node "{}".'.format(device, node)
success = True success = True
@ -231,10 +231,10 @@ def add_osd(zk_conn, node, device, weight):
success = False success = False
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(5) time.sleep(5)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
@ -244,14 +244,14 @@ def remove_osd(zk_conn, osd_id):
# Tell the cluster to remove an OSD # Tell the cluster to remove an OSD
remove_osd_string = 'osd_remove {}'.format(osd_id) remove_osd_string = 'osd_remove {}'.format(osd_id)
zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_osd_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': remove_osd_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-osd_remove': if result == 'success-osd_remove':
message = 'Removed OSD "{}" from the cluster.'.format(osd_id) message = 'Removed OSD "{}" from the cluster.'.format(osd_id)
success = True success = True
@ -263,10 +263,10 @@ def remove_osd(zk_conn, osd_id):
message = 'ERROR Command ignored by node.' message = 'ERROR Command ignored by node.'
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(5) time.sleep(5)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
@ -276,14 +276,14 @@ def in_osd(zk_conn, osd_id):
# Tell the cluster to online an OSD # Tell the cluster to online an OSD
in_osd_string = 'osd_in {}'.format(osd_id) in_osd_string = 'osd_in {}'.format(osd_id)
zkhandler.writedata(zk_conn, {'/ceph/cmd': in_osd_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': in_osd_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-osd_in': if result == 'success-osd_in':
message = 'Set OSD {} online in the cluster.'.format(osd_id) message = 'Set OSD {} online in the cluster.'.format(osd_id)
success = True success = True
@ -295,10 +295,10 @@ def in_osd(zk_conn, osd_id):
message = 'ERROR Command ignored by node.' message = 'ERROR Command ignored by node.'
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(1) time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
@ -308,14 +308,14 @@ def out_osd(zk_conn, osd_id):
# Tell the cluster to offline an OSD # Tell the cluster to offline an OSD
out_osd_string = 'osd_out {}'.format(osd_id) out_osd_string = 'osd_out {}'.format(osd_id)
zkhandler.writedata(zk_conn, {'/ceph/cmd': out_osd_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': out_osd_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-osd_out': if result == 'success-osd_out':
message = 'Set OSD {} offline in the cluster.'.format(osd_id) message = 'Set OSD {} offline in the cluster.'.format(osd_id)
success = True success = True
@ -327,24 +327,24 @@ def out_osd(zk_conn, osd_id):
message = 'ERROR Command ignored by node.' message = 'ERROR Command ignored by node.'
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(1) time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
def set_osd(zk_conn, option): def set_osd(zk_conn, option):
# Tell the cluster to set an OSD property # Tell the cluster to set an OSD property
set_osd_string = 'osd_set {}'.format(option) set_osd_string = 'osd_set {}'.format(option)
zkhandler.writedata(zk_conn, {'/ceph/cmd': set_osd_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': set_osd_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-osd_set': if result == 'success-osd_set':
message = 'Set OSD property {} on the cluster.'.format(option) message = 'Set OSD property {} on the cluster.'.format(option)
success = True success = True
@ -355,20 +355,20 @@ def set_osd(zk_conn, option):
success = False success = False
message = 'ERROR Command ignored by node.' message = 'ERROR Command ignored by node.'
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
def unset_osd(zk_conn, option): def unset_osd(zk_conn, option):
# Tell the cluster to unset an OSD property # Tell the cluster to unset an OSD property
unset_osd_string = 'osd_unset {}'.format(option) unset_osd_string = 'osd_unset {}'.format(option)
zkhandler.writedata(zk_conn, {'/ceph/cmd': unset_osd_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': unset_osd_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-osd_unset': if result == 'success-osd_unset':
message = 'Unset OSD property {} on the cluster.'.format(option) message = 'Unset OSD property {} on the cluster.'.format(option)
success = True success = True
@ -380,10 +380,10 @@ def unset_osd(zk_conn, option):
message = 'ERROR Command ignored by node.' message = 'ERROR Command ignored by node.'
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(1) time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
@ -661,14 +661,14 @@ def getPoolInformation(zk_conn, pool):
def add_pool(zk_conn, name, pgs): def add_pool(zk_conn, name, pgs):
# Tell the cluster to create a new pool # Tell the cluster to create a new pool
add_pool_string = 'pool_add {},{}'.format(name, pgs) add_pool_string = 'pool_add {},{}'.format(name, pgs)
zkhandler.writedata(zk_conn, {'/ceph/cmd': add_pool_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': add_pool_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-pool_add': if result == 'success-pool_add':
message = 'Created new RBD pool "{}" with "{}" PGs.'.format(name, pgs) message = 'Created new RBD pool "{}" with "{}" PGs.'.format(name, pgs)
success = True success = True
@ -680,10 +680,10 @@ def add_pool(zk_conn, name, pgs):
success = False success = False
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(3) time.sleep(3)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
@ -693,14 +693,14 @@ def remove_pool(zk_conn, name):
# Tell the cluster to create a new pool # Tell the cluster to create a new pool
remove_pool_string = 'pool_remove {}'.format(name) remove_pool_string = 'pool_remove {}'.format(name)
zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_pool_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': remove_pool_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-pool_remove': if result == 'success-pool_remove':
message = 'Removed RBD pool "{}" and all volumes.'.format(name) message = 'Removed RBD pool "{}" and all volumes.'.format(name)
success = True success = True
@ -712,10 +712,10 @@ def remove_pool(zk_conn, name):
success = False success = False
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(3) time.sleep(3)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
@ -940,14 +940,14 @@ def add_volume(zk_conn, pool, name, size):
# Tell the cluster to create a new volume # Tell the cluster to create a new volume
databytes = format_bytes_fromhuman(size) databytes = format_bytes_fromhuman(size)
add_volume_string = 'volume_add {},{},{}'.format(pool, name, databytes) add_volume_string = 'volume_add {},{},{}'.format(pool, name, databytes)
zkhandler.writedata(zk_conn, {'/ceph/cmd': add_volume_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': add_volume_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-volume_add': if result == 'success-volume_add':
message = 'Created new RBD volume "{}" of size "{}" on pool "{}".'.format(name, size, pool) message = 'Created new RBD volume "{}" of size "{}" on pool "{}".'.format(name, size, pool)
success = True success = True
@ -959,10 +959,10 @@ def add_volume(zk_conn, pool, name, size):
success = False success = False
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(1) time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
@ -970,14 +970,14 @@ def resize_volume(zk_conn, pool, name, size):
# Tell the cluster to resize the volume # Tell the cluster to resize the volume
databytes = format_bytes_fromhuman(size) databytes = format_bytes_fromhuman(size)
resize_volume_string = 'volume_resize {},{},{}'.format(pool, name, databytes) resize_volume_string = 'volume_resize {},{},{}'.format(pool, name, databytes)
zkhandler.writedata(zk_conn, {'/ceph/cmd': resize_volume_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': resize_volume_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-volume_resize': if result == 'success-volume_resize':
message = 'Resized RBD volume "{}" to size "{}" on pool "{}".'.format(name, size, pool) message = 'Resized RBD volume "{}" to size "{}" on pool "{}".'.format(name, size, pool)
success = True success = True
@ -989,24 +989,24 @@ def resize_volume(zk_conn, pool, name, size):
success = False success = False
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(1) time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
def rename_volume(zk_conn, pool, name, new_name): def rename_volume(zk_conn, pool, name, new_name):
# Tell the cluster to rename # Tell the cluster to rename
rename_volume_string = 'volume_rename {},{},{}'.format(pool, name, new_name) rename_volume_string = 'volume_rename {},{},{}'.format(pool, name, new_name)
zkhandler.writedata(zk_conn, {'/ceph/cmd': rename_volume_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': rename_volume_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-volume_rename': if result == 'success-volume_rename':
message = 'Renamed RBD volume "{}" to "{}" on pool "{}".'.format(name, new_name, pool) message = 'Renamed RBD volume "{}" to "{}" on pool "{}".'.format(name, new_name, pool)
success = True success = True
@ -1018,10 +1018,10 @@ def rename_volume(zk_conn, pool, name, new_name):
success = False success = False
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(1) time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
@ -1031,14 +1031,14 @@ def remove_volume(zk_conn, pool, name):
# Tell the cluster to create a new volume # Tell the cluster to create a new volume
remove_volume_string = 'volume_remove {},{}'.format(pool, name) remove_volume_string = 'volume_remove {},{}'.format(pool, name)
zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_volume_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': remove_volume_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-volume_remove': if result == 'success-volume_remove':
message = 'Removed RBD volume "{}" in pool "{}".'.format(name, pool) message = 'Removed RBD volume "{}" in pool "{}".'.format(name, pool)
success = True success = True
@ -1050,10 +1050,10 @@ def remove_volume(zk_conn, pool, name):
success = False success = False
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(1) time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
@ -1216,14 +1216,14 @@ def getCephSnapshots(zk_conn, pool, volume):
def add_snapshot(zk_conn, pool, volume, name): def add_snapshot(zk_conn, pool, volume, name):
# Tell the cluster to create a new snapshot # Tell the cluster to create a new snapshot
add_snapshot_string = 'snapshot_add {},{},{}'.format(pool, volume, name) add_snapshot_string = 'snapshot_add {},{},{}'.format(pool, volume, name)
zkhandler.writedata(zk_conn, {'/ceph/cmd': add_snapshot_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': add_snapshot_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-snapshot_add': if result == 'success-snapshot_add':
message = 'Created new RBD snapshot "{}" of volume "{}" on pool "{}".'.format(name, volume, pool) message = 'Created new RBD snapshot "{}" of volume "{}" on pool "{}".'.format(name, volume, pool)
success = True success = True
@ -1235,24 +1235,24 @@ def add_snapshot(zk_conn, pool, volume, name):
success = False success = False
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(1) time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
def rename_snapshot(zk_conn, pool, volume, name, new_name): def rename_snapshot(zk_conn, pool, volume, name, new_name):
# Tell the cluster to rename # Tell the cluster to rename
rename_snapshot_string = 'snapshot_rename {},{},{}'.format(pool, name, new_name) rename_snapshot_string = 'snapshot_rename {},{},{}'.format(pool, name, new_name)
zkhandler.writedata(zk_conn, {'/ceph/cmd': rename_snapshot_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': rename_snapshot_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-snapshot_rename': if result == 'success-snapshot_rename':
message = 'Renamed RBD volume snapshot "{}" to "{}" for volume {} on pool "{}".'.format(name, new_name, volume, pool) message = 'Renamed RBD volume snapshot "{}" to "{}" for volume {} on pool "{}".'.format(name, new_name, volume, pool)
success = True success = True
@ -1264,10 +1264,10 @@ def rename_snapshot(zk_conn, pool, volume, name, new_name):
success = False success = False
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(1) time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message
@ -1277,14 +1277,14 @@ def remove_snapshot(zk_conn, pool, volume, name):
# Tell the cluster to create a new snapshot # Tell the cluster to create a new snapshot
remove_snapshot_string = 'snapshot_remove {},{},{}'.format(pool, volume, name) remove_snapshot_string = 'snapshot_remove {},{},{}'.format(pool, volume, name)
zkhandler.writedata(zk_conn, {'/ceph/cmd': remove_snapshot_string}) zkhandler.writedata(zk_conn, {'/cmd/ceph': remove_snapshot_string})
# Wait 1/2 second for the cluster to get the message and start working # Wait 1/2 second for the cluster to get the message and start working
time.sleep(0.5) time.sleep(0.5)
# Acquire a read lock, so we get the return exclusively # Acquire a read lock, so we get the return exclusively
lock = zkhandler.readlock(zk_conn, '/ceph/cmd') lock = zkhandler.readlock(zk_conn, '/cmd/ceph')
with lock: with lock:
try: try:
result = zkhandler.readdata(zk_conn, '/ceph/cmd').split()[0] result = zkhandler.readdata(zk_conn, '/cmd/ceph').split()[0]
if result == 'success-snapshot_remove': if result == 'success-snapshot_remove':
message = 'Removed RBD snapshot "{}" of volume "{}" in pool "{}".'.format(name, volume, pool) message = 'Removed RBD snapshot "{}" of volume "{}" in pool "{}".'.format(name, volume, pool)
success = True success = True
@ -1296,10 +1296,10 @@ def remove_snapshot(zk_conn, pool, volume, name):
success = False success = False
# Acquire a write lock to ensure things go smoothly # Acquire a write lock to ensure things go smoothly
lock = zkhandler.writelock(zk_conn, '/ceph/cmd') lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with lock: with lock:
time.sleep(1) time.sleep(1)
zkhandler.writedata(zk_conn, {'/ceph/cmd': ''}) zkhandler.writedata(zk_conn, {'/cmd/ceph': ''})
return success, message return success, message

View File

@ -718,18 +718,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
node, device, weight = args.split(',') node, device, weight = args.split(',')
if node == this_node.name: if node == this_node.name:
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Add the OSD # Add the OSD
result = add_osd(zk_conn, logger, node, device, weight) result = add_osd(zk_conn, logger, node, device, weight)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -740,18 +740,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
# Verify osd_id is in the list # Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Remove the OSD # Remove the OSD
result = remove_osd(zk_conn, logger, osd_id, d_osd[osd_id]) result = remove_osd(zk_conn, logger, osd_id, d_osd[osd_id])
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -762,18 +762,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
# Verify osd_id is in the list # Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Online the OSD # Online the OSD
result = in_osd(zk_conn, logger, osd_id) result = in_osd(zk_conn, logger, osd_id)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -784,18 +784,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
# Verify osd_id is in the list # Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Offline the OSD # Offline the OSD
result = out_osd(zk_conn, logger, osd_id) result = out_osd(zk_conn, logger, osd_id)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -805,18 +805,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Set the property # Set the property
result = set_property(zk_conn, logger, option) result = set_property(zk_conn, logger, option)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -826,18 +826,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Unset the property # Unset the property
result = unset_property(zk_conn, logger, option) result = unset_property(zk_conn, logger, option)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -847,18 +847,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Add the pool # Add the pool
result = add_pool(zk_conn, logger, name, pgs) result = add_pool(zk_conn, logger, name, pgs)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -868,18 +868,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Remove the pool # Remove the pool
result = remove_pool(zk_conn, logger, name) result = remove_pool(zk_conn, logger, name)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -889,18 +889,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Add the volume # Add the volume
result = add_volume(zk_conn, logger, pool, name, size) result = add_volume(zk_conn, logger, pool, name, size)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -910,18 +910,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Add the volume # Add the volume
result = resize_volume(zk_conn, logger, pool, name, size) result = resize_volume(zk_conn, logger, pool, name, size)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -931,18 +931,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Add the volume # Add the volume
result = rename_volume(zk_conn, logger, pool, name, new_name) result = rename_volume(zk_conn, logger, pool, name, new_name)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -952,18 +952,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Remove the volume # Remove the volume
result = remove_volume(zk_conn, logger, pool, name) result = remove_volume(zk_conn, logger, pool, name)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -973,18 +973,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Add the snapshot # Add the snapshot
result = add_snapshot(zk_conn, logger, pool, volume, name) result = add_snapshot(zk_conn, logger, pool, volume, name)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -994,18 +994,18 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Add the snapshot # Add the snapshot
result = rename_snapshot(zk_conn, logger, pool, volume, name, new_name) result = rename_snapshot(zk_conn, logger, pool, volume, name, new_name)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)
@ -1015,17 +1015,17 @@ def run_command(zk_conn, logger, this_node, data, d_osd):
if this_node.router_state == 'primary': if this_node.router_state == 'primary':
# Lock the command queue # Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd') zk_lock = zkhandler.writelock(zk_conn, '/cmd/ceph')
with zk_lock: with zk_lock:
# Remove the snapshot # Remove the snapshot
result = remove_snapshot(zk_conn, logger, pool, volume, name) result = remove_snapshot(zk_conn, logger, pool, volume, name)
# Command succeeded # Command succeeded
if result: if result:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'success-{}'.format(data)})
# Command failed # Command failed
else: else:
# Update the command queue # Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)}) zkhandler.writedata(zk_conn, {'/cmd/ceph': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock # Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1) time.sleep(1)

View File

@ -862,7 +862,7 @@ if enable_hypervisor:
if enable_storage: if enable_storage:
# Ceph command pipeline key # Ceph command pipeline key
@zk_conn.DataWatch('/ceph/cmd') @zk_conn.DataWatch('/cmd/ceph')
def cmd(data, stat, event=''): def cmd(data, stat, event=''):
if data: if data:
CephInstance.run_command(zk_conn, logger, this_node, data.decode('ascii'), d_osd) CephInstance.run_command(zk_conn, logger, this_node, data.decode('ascii'), d_osd)