Move OSD handling to CephInstance file

This commit is contained in:
Joshua Boniface 2018-11-23 20:02:50 -05:00
parent 790ed16a42
commit b8a5073a35
2 changed files with 174 additions and 165 deletions

View File

@ -417,3 +417,176 @@ def remove_pool(zk_conn, logger, name):
logger.out('Failed to remove RBD pool {}: {}'.format(name, e), state='e')
return False
def run_command(zk_conn, command):
# Get the command and args
command, args = data.split()
# Adding a new OSD
if command == 'osd_add':
node, device, weight = args.split(',')
if node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Add the OSD
result = add_osd(zk_conn, logger, node, device, weight)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Removing an OSD
elif command == 'osd_remove':
osd_id = args
# Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Remove the OSD
result = remove_osd(zk_conn, logger, osd_id, d_osd[osd_id])
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Online an OSD
elif command == 'osd_in':
osd_id = args
# Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Online the OSD
result = in_osd(zk_conn, logger, osd_id)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Offline an OSD
elif command == 'osd_out':
osd_id = args
# Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Offline the OSD
result = out_osd(zk_conn, logger, osd_id)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Set a property
elif command == 'osd_set':
option = args
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Set the property
result = set_property(zk_conn, logger, option)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Unset a property
elif command == 'osd_unset':
option = args
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Unset the property
result = unset_property(zk_conn, logger, option)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Adding a new pool
if command == 'pool_add':
name, pgs = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Add the pool
result = add_pool(zk_conn, logger, name, pgs)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)
# Removing a pool
elif command == 'pool_remove':
name = args
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Remove the pool
result = remove_pool(zk_conn, logger, name)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 1 seconds before we free the lock, to ensure the client hits the lock
time.sleep(1)

View File

@ -674,171 +674,7 @@ def cmd(data, stat, event=''):
data = ''
if data:
# Get the command and args
command, args = data.split()
# Adding a new OSD
if command == 'osd_add':
node, device, weight = args.split(',')
if node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Add the OSD
result = CephInstance.add_osd(zk_conn, logger, node, device, weight)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 0.5 seconds before we free the lock, to ensure the client hits the lock
time.sleep(0.5)
# Removing an OSD
elif command == 'osd_remove':
osd_id = args
# Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Remove the OSD
result = CephInstance.remove_osd(zk_conn, logger, osd_id, d_osd[osd_id])
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 0.5 seconds before we free the lock, to ensure the client hits the lock
time.sleep(0.5)
# Online an OSD
elif command == 'osd_in':
osd_id = args
# Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Online the OSD
result = CephInstance.in_osd(zk_conn, logger, osd_id)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 0.5 seconds before we free the lock, to ensure the client hits the lock
time.sleep(0.5)
# Offline an OSD
elif command == 'osd_out':
osd_id = args
# Verify osd_id is in the list
if d_osd[osd_id] and d_osd[osd_id].node == this_node.name:
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Offline the OSD
result = CephInstance.out_osd(zk_conn, logger, osd_id)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 0.5 seconds before we free the lock, to ensure the client hits the lock
time.sleep(0.5)
# Set a property
elif command == 'osd_set':
option = args
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Set the property
result = CephInstance.set_property(zk_conn, logger, option)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 0.5 seconds before we free the lock, to ensure the client hits the lock
time.sleep(0.5)
# Unset a property
elif command == 'osd_unset':
option = args
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Unset the property
result = CephInstance.unset_property(zk_conn, logger, option)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 0.5 seconds before we free the lock, to ensure the client hits the lock
time.sleep(0.5)
# Adding a new pool
if command == 'pool_add':
name, pgs = args.split(',')
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Add the pool
result = CephInstance.add_pool(zk_conn, logger, name, pgs)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 0.5 seconds before we free the lock, to ensure the client hits the lock
time.sleep(0.5)
# Removing a pool
elif command == 'pool_remove':
name = args
if this_node.router_state == 'primary':
# Lock the command queue
zk_lock = zkhandler.writelock(zk_conn, '/ceph/cmd')
with zk_lock:
# Remove the pool
result = CephInstance.remove_pool(zk_conn, logger, name)
# Command succeeded
if result:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'success-{}'.format(data)})
# Command failed
else:
# Update the command queue
zkhandler.writedata(zk_conn, {'/ceph/cmd': 'failure-{}'.format(data)})
# Wait 0.5 seconds before we free the lock, to ensure the client hits the lock
time.sleep(0.5)
CephInstance.run_command(data)
# OSD objects
@zk_conn.ChildrenWatch('/ceph/osds')