diff --git a/client-common/ceph.py b/client-common/ceph.py index 91bedc85..51de8cc2 100644 --- a/client-common/ceph.py +++ b/client-common/ceph.py @@ -23,6 +23,7 @@ import re import click import ast +import time import client_lib.ansiprint as ansiprint import client_lib.zkhandler as zkhandler @@ -90,7 +91,14 @@ def formatOSDList(zk_conn, osd_list): osd_stats = getOSDInformation(zk_conn, osd) # Set the parent node and length - osd_node[osd] = osd_stats['node'] + try: + osd_node[osd] = osd_stats['node'] + # If this happens, the node hasn't checked in fully yet, so just ignore it + if osd_node[osd] == '|': + continue + except KeyError: + continue + _osd_node_length = len(osd_node[osd]) + 1 if _osd_node_length > osd_node_length: osd_node_length = _osd_node_length @@ -247,20 +255,51 @@ def get_status(zk_conn): return True, '' def add_osd(zk_conn, node, device): + # Verify the target node exists if not common.verifyNode(zk_conn, node): return False, 'ERROR: No node named "{}" is present in the cluster.'.format(node) # Tell the cluster to create a new OSD for the host add_osd_string = 'add {},{}'.format(node, device) zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': add_osd_string}) - click.echo('Created new OSD with block device {} on node {}.'.format(device, node)) - return True, '' + # Wait 1/2 second for the cluster to get the message and start working + time.sleep(0.5) + # Acquire a read lock, so we get the return exclusively + lock = zkhandler.readlock(zk_conn, '/ceph/osd_cmd') + with lock: + result = zkhandler.readdata(zk_conn, '/ceph/osd_cmd').split()[0] + if result == 'success-add': + success = True + else: + success = False + + if success: + return True, 'Created new OSD with block device {} on node {}.'.format(device, node) + else: + return False, 'Failed to create new OSD; check node logs for details.' def remove_osd(zk_conn, osd_id): + if not common.verifyOSD(zk_conn, osd_id): + return False, 'ERROR: No OSD with ID "{}" is present in the cluster.'.format(osd_id) + + # Tell the cluster to remove an OSD remove_osd_string = 'remove {}'.format(osd_id) zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': remove_osd_string}) - click.echo('Removed OSD with ID {} from the cluster.'.format(osd_id)) - return True, '' + # Wait 1/2 second for the cluster to get the message and start working + time.sleep(0.5) + # Acquire a read lock, so we get the return exclusively + lock = zkhandler.readlock(zk_conn, '/ceph/osd_cmd') + with lock: + result = zkhandler.readdata(zk_conn, '/ceph/osd_cmd').split()[0] + if result == 'success-remove': + success = True + else: + success = False + + if success: + return True, 'Removed OSD {} from the cluster.'.format(osd_id) + else: + return False, 'Failed to remove OSD; check node logs for details.' def get_list_osd(zk_conn, limit): osd_list = [] diff --git a/client-common/common.py b/client-common/common.py index 5e1e4f63..e66133b3 100644 --- a/client-common/common.py +++ b/client-common/common.py @@ -175,6 +175,15 @@ def verifyNode(zk_conn, node): else: return False +# +# Verify OSD is valid in cluster +# +def verifyOSD(zk_conn, osd_id): + if zkhandler.exists(zk_conn, '/ceph/osds/{}'.format(osd_id)): + return True + else: + return False + # # Get the primary coordinator node # diff --git a/client-common/zkhandler.py b/client-common/zkhandler.py index 69099f2f..6e7d0b8c 100644 --- a/client-common/zkhandler.py +++ b/client-common/zkhandler.py @@ -21,6 +21,8 @@ ############################################################################### import kazoo.client +import uuid + import client_lib.ansiprint as ansiprint # Exists function @@ -38,10 +40,7 @@ def listchildren(zk_conn, key): # Delete key function def deletekey(zk_conn, key, recursive=True): - try: - zk_conn.delete(key, recursive=recursive) - except: - pass + zk_conn.delete(key, recursive=recursive) # Data read function def readdata(zk_conn, key): @@ -88,3 +87,14 @@ def writedata(zk_conn, kv): except Exception: return False +# Write lock function +def writelock(zk_conn, key): + lock_id = str(uuid.uuid1()) + lock = zk_conn.WriteLock('{}'.format(key), lock_id) + return lock + +# Read lock function +def readlock(zk_conn, key): + lock_id = str(uuid.uuid1()) + lock = zk_conn.ReadLock('{}'.format(key), lock_id) + return lock diff --git a/node-daemon/pvcd/CephInstance.py b/node-daemon/pvcd/CephInstance.py index 7f005d4a..e84e6906 100644 --- a/node-daemon/pvcd/CephInstance.py +++ b/node-daemon/pvcd/CephInstance.py @@ -22,6 +22,8 @@ import time import ast +import json +import psutil import pvcd.log as log import pvcd.zkhandler as zkhandler @@ -53,7 +55,7 @@ class CephOSDInstance(object): except AttributeError: data = '' - if data != self.node: + if data and data != self.node: self.node = data @self.zk_conn.DataWatch('/ceph/osds/{}/stats'.format(self.osd_id)) @@ -68,8 +70,8 @@ class CephOSDInstance(object): except AttributeError: data = '' - if data != self.stats: - self.stats = dict(ast.literal_eval(data)) + if data and data != self.stats: + self.stats = json.loads(data) def add_osd(zk_conn, logger, node, device): # We are ready to create a new OSD on this node @@ -77,7 +79,8 @@ def add_osd(zk_conn, logger, node, device): try: # 1. Create an OSD; we do this so we know what ID will be gen'd retcode, stdout, stderr = common.run_os_command('ceph osd create') - if retcode != 0: + if retcode: + print('ceph osd create') print(stdout) print(stderr) raise @@ -85,7 +88,8 @@ def add_osd(zk_conn, logger, node, device): # 2. Remove that newly-created OSD retcode, stdout, stderr = common.run_os_command('ceph osd rm {}'.format(osd_id)) - if retcode != 0: + if retcode: + print('ceph osd rm') print(stdout) print(stderr) raise @@ -97,7 +101,8 @@ def add_osd(zk_conn, logger, node, device): device=device ) ) - if retcode != 0: + if retcode: + print('ceph-volume lvm prepare') print(stdout) print(stderr) raise @@ -108,7 +113,8 @@ def add_osd(zk_conn, logger, node, device): osdid=osd_id ) ) - if retcode != 0: + if retcode: + print('ceph-volume lvm activate') print(stdout) print(stderr) raise @@ -120,7 +126,8 @@ def add_osd(zk_conn, logger, node, device): node=node ) ) - if retcode != 0: + if retcode: + print('ceph osd crush add') print(stdout) print(stderr) raise @@ -132,7 +139,8 @@ def add_osd(zk_conn, logger, node, device): osdid=osd_id ) ) - if retcode != 0: + if retcode: + print('systemctl status') print(stdout) print(stderr) raise @@ -141,15 +149,16 @@ def add_osd(zk_conn, logger, node, device): zkhandler.writedata(zk_conn, { '/ceph/osds/{}'.format(osd_id): '', '/ceph/osds/{}/node'.format(osd_id): node, - '/ceph/osds/{}/size'.format(osd_id): '', '/ceph/osds/{}/stats'.format(osd_id): '{}' }) # Log it logger.out('Created new OSD disk with ID {}'.format(osd_id), state='o') + return True except Exception as e: # Log it logger.out('Failed to create new OSD disk: {}'.format(e), state='e') + return False def remove_osd(zk_conn, logger, osd_id, osd_obj): logger.out('Removing OSD disk {}'.format(osd_id), state='i') @@ -163,16 +172,78 @@ def remove_osd(zk_conn, logger, osd_id, osd_obj): # 1. Set the OSD out so it will flush retcode, stdout, stderr = common.run_os_command('ceph osd out {}'.format(osd_id)) - if retcode != 0: + if retcode: + print('ceph osd out') print(stdout) print(stderr) + raise # 2. Wait for the OSD to flush + osd_string = str() while True: - retcode, stdout, stderr = common.run_os_command('ceph health') - health_string = stdout - except: - pass + retcode, stdout, stderr = common.run_os_command('ceph pg dump osds --format json') + dump_string = json.loads(stdout) + for osd in dump_string: + if str(osd['osd']) == osd_id: + osd_string = osd + print(osd_string) + num_pgs = osd_string['num_pgs'] + if num_pgs > 0: + time.sleep(5) + else: + break + + # 3. Stop the OSD process and wait for it to be terminated + retcode, stdout, stderr = common.run_os_command('systemctl stop ceph-osd@{}'.format(osd_id)) + if retcode: + print('systemctl stop') + print(stdout) + print(stderr) + raise + + # FIXME: There has to be a better way to do this /shrug + while True: + is_osd_up = False + # Find if there is a process named ceph-osd with arg '--id {id}' + for p in psutil.process_iter(attrs=['name', 'cmdline']): + if 'ceph-osd' == p.info['name'] and '--id {}'.format(osd_id) in ' '.join(p.info['cmdline']): + is_osd_up = True + # If there isn't, continue + if not is_osd_up: + break + + # 4. Delete OSD from ZK + zkhandler.deletekey(zk_conn, '/ceph/osds/{}'.format(osd_id)) + + # 5. Determine the block devices + retcode, stdout, stderr = common.run_os_command('readlink /var/lib/ceph/osd/ceph-{}/block'.format(osd_id)) + vg_name = stdout.split('/')[-2] # e.g. /dev/ceph-/osd-block- + retcode, stdout, stderr = common.run_os_command('vgs --separator , --noheadings -o pv_name {}'.format(vg_name)) + pv_block = stdout + + # 6. Zap the volumes + retcode, stdout, stderr = common.run_os_command('ceph-volume lvm zap --destroy {}'.format(pv_block)) + if retcode: + print('ceph-volume lvm zap') + print(stdout) + print(stderr) + raise + + # 7. Purge the OSD from Ceph + retcode, stdout, stderr = common.run_os_command('ceph osd purge {} --yes-i-really-mean-it'.format(osd_id)) + if retcode: + print('ceph osd purge') + print(stdout) + print(stderr) + raise + + # Log it + logger.out('Purged OSD disk with ID {}'.format(osd_id), state='o') + return True + except Exception as e: + # Log it + logger.out('Failed to purge OSD disk with ID {}: {}'.format(osd_id, e), state='e') + return False class CephPool(object): def __init__(self): diff --git a/node-daemon/pvcd/Daemon.py b/node-daemon/pvcd/Daemon.py index c6520a1d..fa9f5f27 100644 --- a/node-daemon/pvcd/Daemon.py +++ b/node-daemon/pvcd/Daemon.py @@ -308,7 +308,7 @@ def zk_listener(state): # Start keepalive thread if update_timer: - update_timer = createKeepaliveTimer() + update_timer = startKeepaliveTimer() else: pass zk_conn.add_listener(zk_listener) @@ -669,23 +669,38 @@ def osd_cmd(data, stat, event=''): if command == 'add': node, device = args.split(',') if node == this_node.name: - # Clean up the command queue - zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': ''}) - # Add the OSD - CephInstance.add_osd(zk_conn, logger, node, device) + # Lock the command queue + lock = zkhandler.writelock(zk_conn, '/ceph/osd_cmd') + with lock: + # Add the OSD + result = CephInstance.add_osd(zk_conn, logger, node, device) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'failure-{}'.format(data)}) # Removing an OSD elif command == 'remove': osd_id = args # Verify osd_id is in the list - if not d_osd[osd_id]: - return True - - if d_osd[osd_id].node == this_node.name: - # Clean up the command queue - zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': ''}) - # Remove the OSD - CephInstance.remove_osd(zk_conn, logger, osd_id, d_osd[osd_id]) + if d_osd[osd_id] and d_osd[osd_id].node == this_node.name: + # Lock the command queue + lock = zkhandler.writelock(zk_conn, '/ceph/osd_cmd') + with lock: + # Remove the OSD + result = CephInstance.remove_osd(zk_conn, logger, osd_id, d_osd[osd_id]) + # Command succeeded + if result: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'success-{}'.format(data)}) + # Command failed + else: + # Update the command queue + zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': 'failure-{}'.format(data)}) # OSD objects @zk_conn.ChildrenWatch('/ceph/osds') @@ -784,7 +799,6 @@ def update_zookeeper(): rd_data = line[15] state = line[17] osd_status.update({ -# osd_stats.update({ str(osd_id): { 'node': node, 'used': used, @@ -808,7 +822,7 @@ def update_zookeeper(): for osd in osd_list: if d_osd[osd].node == myhostname: zkhandler.writedata(zk_conn, { - '/ceph/osds/{}/stats'.format(osd): str(osd_stats[osd]) + '/ceph/osds/{}/stats'.format(osd): str(json.dumps(osd_stats[osd])) }) osds_this_node += 1 diff --git a/node-daemon/pvcd/zkhandler.py b/node-daemon/pvcd/zkhandler.py index 91db9b76..279495cb 100644 --- a/node-daemon/pvcd/zkhandler.py +++ b/node-daemon/pvcd/zkhandler.py @@ -21,7 +21,9 @@ ############################################################################### import kazoo.client -import pvcd.log as log +import uuid + +#import pvcd.log as log # Child list function def listchildren(zk_conn, key): @@ -29,8 +31,8 @@ def listchildren(zk_conn, key): return children # Key deletion function -def delete(zk_conn, key): - zk_conn.delete(key, recursive=True) +def deletekey(zk_conn, key, recursive=True): + zk_conn.delete(key, recursive=recursive) # Data read function def readdata(zk_conn, key): @@ -53,7 +55,7 @@ def writedata(zk_conn, kv): # Check if this key already exists or not if not zk_conn.exists(key): # We're creating a new key - zk_transaction.create(key, data.encode('ascii')) + zk_transaction.create(key, str(data).encode('ascii')) else: # We're updating a key with version validation orig_data = zk_conn.get(key) @@ -63,7 +65,7 @@ def writedata(zk_conn, kv): new_version = version + 1 # Update the data - zk_transaction.set_data(key, data.encode('ascii')) + zk_transaction.set_data(key, str(data).encode('ascii')) # Set up the check try: @@ -79,3 +81,14 @@ def writedata(zk_conn, kv): except Exception: return False +# Write lock function +def writelock(zk_conn, key): + lock_id = str(uuid.uuid1()) + lock = zk_conn.WriteLock('{}'.format(key), lock_id) + return lock + +# Read lock function +def readlock(zk_conn, key): + lock_id = str(uuid.uuid1()) + lock = zk_conn.ReadLock('{}'.format(key), lock_id) + return lock