From 9b65d3271af47fc9d4daba9e2bb65f448af353eb Mon Sep 17 00:00:00 2001 From: "Joshua M. Boniface" Date: Sat, 6 Jun 2020 22:29:32 -0400 Subject: [PATCH] Improve handling of Ceph status gathering Use the Rados library instead of random OS commands, which massively improves the performance of these tasks. Closes #97 --- node-daemon/pvcnoded/Daemon.py | 101 +++++++++++++++++++++------------ 1 file changed, 65 insertions(+), 36 deletions(-) diff --git a/node-daemon/pvcnoded/Daemon.py b/node-daemon/pvcnoded/Daemon.py index 60eae130..6c1e304e 100644 --- a/node-daemon/pvcnoded/Daemon.py +++ b/node-daemon/pvcnoded/Daemon.py @@ -263,8 +263,8 @@ def readConfig(pvcnoded_config_file, myhostname): if config['enable_storage']: try: config_storage = { - 'ceph_config_file': o_config['pvc']['cluster']['storage']['ceph_config_file'], - 'ceph_admin_keyring': o_config['pvc']['cluster']['storage']['ceph_admin_keyring'] + 'ceph_config_file': o_config['pvc']['system']['configuration']['storage']['ceph_config_file'], + 'ceph_admin_keyring': o_config['pvc']['system']['configuration']['storage']['ceph_admin_keyring'] } except Exception as e: print('ERROR: Failed to load configuration: {}'.format(e)) @@ -1019,37 +1019,48 @@ if enable_storage: # Ceph stats update function def collect_ceph_stats(queue): - # Get Ceph cluster health (for local printing) - if debug: - print("Get Ceph cluster health (for local printing)") - retcode, stdout, stderr = common.run_os_command('ceph health', timeout=1) - ceph_health = stdout.rstrip() - if 'HEALTH_OK' in ceph_health: + # Connect to the Ceph cluster + try: + ceph_conn = Rados(conffile=config['ceph_config_file'], conf=dict(keyring=config['ceph_admin_keyring'])) + ceph_conn.connect() + except Exception as e: + logger.out('Failed to open connection to Ceph cluster: {}'.format(e), state='e') + return + + # Get Ceph cluster health for local status output + command = { "prefix": "health", "format": "json" } + try: + health_status = json.loads(ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1]) + ceph_health = health_status['status'] + except Exception as e: + logger.out('Failed to obtain Ceph health data: {}'.format(e), state='e') + return + + if ceph_health == 'HEALTH_OK': ceph_health_colour = fmt_green - elif 'HEALTH_WARN' in ceph_health: + elif ceph_health == 'HEALTH_WARN': ceph_health_colour = fmt_yellow else: ceph_health_colour = fmt_red - # Set ceph health information in zookeeper (primary only) + # Primary-only functions if this_node.router_state == 'primary': if debug: print("Set ceph health information in zookeeper (primary only)") - # Get status info - retcode, stdout, stderr = common.run_os_command('ceph status', timeout=1) - ceph_status = stdout + + command = { "prefix": "status", "format": "pretty" } + ceph_status = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1] try: zkhandler.writedata(zk_conn, { '/ceph': str(ceph_status) }) - except: - logger.out('Failed to set Ceph status data', state='e') + except Exception as e: + logger.out('Failed to set Ceph status data: {}'.format(e), state='e') return - # Set ceph rados df information in zookeeper (primary only) - if this_node.router_state == 'primary': if debug: print("Set ceph rados df information in zookeeper (primary only)") + # Get rados df info retcode, stdout, stderr = common.run_os_command('rados df', timeout=1) rados_df = stdout @@ -1057,28 +1068,26 @@ def collect_ceph_stats(queue): zkhandler.writedata(zk_conn, { '/ceph/radosdf': str(rados_df) }) - except: - logger.out('Failed to set Rados space data', state='e') + except Exception as e: + logger.out('Failed to set Rados space data: {}'.format(e), state='e') return - # Set pool information in zookeeper (primary only) - if this_node.router_state == 'primary': if debug: print("Set pool information in zookeeper (primary only)") # Get pool info - retcode, stdout, stderr = common.run_os_command('ceph df --format json', timeout=1) + command = { "prefix": "df", "format": "json" } try: - ceph_pool_df_raw = json.loads(stdout)['pools'] - except json.decoder.JSONDecodeError: - logger.out('Failed to obtain Pool data (ceph df)', state='w') + ceph_pool_df_raw = json.loads(ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1])['pools'] + except Exception as e: + logger.out('Failed to obtain Pool data (ceph df): {}'.format(e), state='w') ceph_pool_df_raw = [] retcode, stdout, stderr = common.run_os_command('rados df --format json', timeout=1) try: rados_pool_df_raw = json.loads(stdout)['pools'] - except json.decoder.JSONDecodeError: - logger.out('Failed to obtain Pool data (rados df)', state='w') + except Exception as e: + logger.out('Failed to obtain Pool data (rados df): {}'.format(e), state='w') rados_pool_df_raw = [] pool_count = len(ceph_pool_df_raw) @@ -1134,13 +1143,15 @@ def collect_ceph_stats(queue): # Get data from Ceph OSDs if debug: print("Get data from Ceph OSDs") + # Parse the dump data osd_dump = dict() - retcode, stdout, stderr = common.run_os_command('ceph osd dump --format json', timeout=1) + + command = { "prefix": "osd dump", "format": "json" } try: - osd_dump_raw = json.loads(stdout)['osds'] - except json.decoder.JSONDecodeError: - logger.out('Failed to obtain OSD data', state='w') + osd_dump_raw = json.loads(ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1])['osds'] + except Exception as e: + logger.out('Failed to obtain OSD data: {}'.format(e), state='w') osd_dump_raw = [] if debug: @@ -1158,12 +1169,14 @@ def collect_ceph_stats(queue): # Parse the df data if debug: print("Parse the OSD df data") + osd_df = dict() - retcode, stdout, stderr = common.run_os_command('ceph osd df --format json', timeout=1) + + command = { "prefix": "osd df", "format": "json" } try: - osd_df_raw = json.loads(stdout)['nodes'] + osd_df_raw = json.loads(ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1])['nodes'] except Exception as e: - logger.out('Failed to parse OSD list: {}'.format(e), state='w') + logger.out('Failed to obtain OSD data: {}'.format(e), state='w') osd_df_raw = [] if debug: @@ -1179,14 +1192,24 @@ def collect_ceph_stats(queue): 'reweight': osd['reweight'], } }) + # Parse the status data if debug: print("Parse the OSD status data") + osd_status = dict() - retcode, stdout, stderr = common.run_os_command('ceph osd status', timeout=1) + + command = { "prefix": "osd status", "format": "pretty" } + try: + osd_status_raw = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii') + except Exception as e: + logger.out('Failed to obtain OSD status data: {}'.format(e), state='w') + osd_status_raw = [] + if debug: print("Loop through OSD status data") - for line in stdout.split('\n'): + + for line in osd_status_raw.split('\n'): # Strip off colour line = re.sub(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))', '', line) # Split it for parsing @@ -1214,10 +1237,13 @@ def collect_ceph_stats(queue): 'state': state } }) + # Merge them together into a single meaningful dict if debug: print("Merge OSD data together") + osd_stats = dict() + for osd in osd_list: try: this_dump = osd_dump[osd] @@ -1231,6 +1257,7 @@ def collect_ceph_stats(queue): # Trigger updates for each OSD on this node if debug: print("Trigger updates for each OSD on this node") + for osd in osd_list: if d_osd[osd].node == myhostname: try: @@ -1243,6 +1270,8 @@ def collect_ceph_stats(queue): logger.out('Failed to upload OSD stats from dictionary: {}'.format(e), state='w') osds_this_node += 1 + ceph_conn.shutdown() + queue.put(ceph_health_colour) queue.put(ceph_health) queue.put(osds_this_node)