Improve handling of Ceph status gathering
Use the Rados library instead of random OS commands, which massively improves the performance of these tasks. Closes #97
This commit is contained in:
		| @@ -263,8 +263,8 @@ def readConfig(pvcnoded_config_file, myhostname): | |||||||
|     if config['enable_storage']: |     if config['enable_storage']: | ||||||
|         try: |         try: | ||||||
|             config_storage = { |             config_storage = { | ||||||
|                 'ceph_config_file': o_config['pvc']['cluster']['storage']['ceph_config_file'], |                 'ceph_config_file': o_config['pvc']['system']['configuration']['storage']['ceph_config_file'], | ||||||
|                 'ceph_admin_keyring': o_config['pvc']['cluster']['storage']['ceph_admin_keyring'] |                 'ceph_admin_keyring': o_config['pvc']['system']['configuration']['storage']['ceph_admin_keyring'] | ||||||
|             } |             } | ||||||
|         except Exception as e: |         except Exception as e: | ||||||
|             print('ERROR: Failed to load configuration: {}'.format(e)) |             print('ERROR: Failed to load configuration: {}'.format(e)) | ||||||
| @@ -1019,37 +1019,48 @@ if enable_storage: | |||||||
|  |  | ||||||
| # Ceph stats update function | # Ceph stats update function | ||||||
| def collect_ceph_stats(queue): | def collect_ceph_stats(queue): | ||||||
|     # Get Ceph cluster health (for local printing) |     # Connect to the Ceph cluster | ||||||
|     if debug: |     try: | ||||||
|         print("Get Ceph cluster health (for local printing)") |         ceph_conn = Rados(conffile=config['ceph_config_file'], conf=dict(keyring=config['ceph_admin_keyring'])) | ||||||
|     retcode, stdout, stderr = common.run_os_command('ceph health', timeout=1) |         ceph_conn.connect() | ||||||
|     ceph_health = stdout.rstrip() |     except Exception as e: | ||||||
|     if 'HEALTH_OK' in ceph_health: |         logger.out('Failed to open connection to Ceph cluster: {}'.format(e), state='e') | ||||||
|  |         return | ||||||
|  |  | ||||||
|  |     # Get Ceph cluster health for local status output | ||||||
|  |     command = { "prefix": "health", "format": "json" } | ||||||
|  |     try: | ||||||
|  |         health_status = json.loads(ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1]) | ||||||
|  |         ceph_health = health_status['status'] | ||||||
|  |     except Exception as e: | ||||||
|  |         logger.out('Failed to obtain Ceph health data: {}'.format(e), state='e') | ||||||
|  |         return | ||||||
|  |  | ||||||
|  |     if ceph_health == 'HEALTH_OK': | ||||||
|         ceph_health_colour = fmt_green |         ceph_health_colour = fmt_green | ||||||
|     elif 'HEALTH_WARN' in ceph_health: |     elif ceph_health == 'HEALTH_WARN': | ||||||
|         ceph_health_colour = fmt_yellow |         ceph_health_colour = fmt_yellow | ||||||
|     else: |     else: | ||||||
|         ceph_health_colour = fmt_red |         ceph_health_colour = fmt_red | ||||||
|  |  | ||||||
|     # Set ceph health information in zookeeper (primary only) |     # Primary-only functions | ||||||
|     if this_node.router_state == 'primary': |     if this_node.router_state == 'primary': | ||||||
|         if debug: |         if debug: | ||||||
|             print("Set ceph health information in zookeeper (primary only)") |             print("Set ceph health information in zookeeper (primary only)") | ||||||
|         # Get status info |  | ||||||
|         retcode, stdout, stderr = common.run_os_command('ceph status', timeout=1) |         command = { "prefix": "status", "format": "pretty" } | ||||||
|         ceph_status = stdout |         ceph_status = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1] | ||||||
|         try: |         try: | ||||||
|             zkhandler.writedata(zk_conn, { |             zkhandler.writedata(zk_conn, { | ||||||
|                 '/ceph': str(ceph_status) |                 '/ceph': str(ceph_status) | ||||||
|             }) |             }) | ||||||
|         except: |         except Exception as e: | ||||||
|             logger.out('Failed to set Ceph status data', state='e') |             logger.out('Failed to set Ceph status data: {}'.format(e), state='e') | ||||||
|             return |             return | ||||||
|  |  | ||||||
|     # Set ceph rados df information in zookeeper (primary only) |  | ||||||
|     if this_node.router_state == 'primary': |  | ||||||
|         if debug: |         if debug: | ||||||
|             print("Set ceph rados df information in zookeeper (primary only)") |             print("Set ceph rados df information in zookeeper (primary only)") | ||||||
|  |  | ||||||
|         # Get rados df info |         # Get rados df info | ||||||
|         retcode, stdout, stderr = common.run_os_command('rados df', timeout=1) |         retcode, stdout, stderr = common.run_os_command('rados df', timeout=1) | ||||||
|         rados_df = stdout |         rados_df = stdout | ||||||
| @@ -1057,28 +1068,26 @@ def collect_ceph_stats(queue): | |||||||
|             zkhandler.writedata(zk_conn, { |             zkhandler.writedata(zk_conn, { | ||||||
|                 '/ceph/radosdf': str(rados_df) |                 '/ceph/radosdf': str(rados_df) | ||||||
|             }) |             }) | ||||||
|         except: |         except Exception as e: | ||||||
|             logger.out('Failed to set Rados space data', state='e') |             logger.out('Failed to set Rados space data: {}'.format(e), state='e') | ||||||
|             return |             return | ||||||
|  |  | ||||||
|     # Set pool information in zookeeper (primary only) |  | ||||||
|     if this_node.router_state == 'primary': |  | ||||||
|         if debug: |         if debug: | ||||||
|             print("Set pool information in zookeeper (primary only)") |             print("Set pool information in zookeeper (primary only)") | ||||||
|  |  | ||||||
|         # Get pool info |         # Get pool info | ||||||
|         retcode, stdout, stderr = common.run_os_command('ceph df --format json', timeout=1) |         command = { "prefix": "df", "format": "json" } | ||||||
|         try: |         try: | ||||||
|             ceph_pool_df_raw = json.loads(stdout)['pools'] |             ceph_pool_df_raw = json.loads(ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1])['pools'] | ||||||
|         except json.decoder.JSONDecodeError: |         except Exception as e: | ||||||
|             logger.out('Failed to obtain Pool data (ceph df)', state='w') |             logger.out('Failed to obtain Pool data (ceph df): {}'.format(e), state='w') | ||||||
|             ceph_pool_df_raw = [] |             ceph_pool_df_raw = [] | ||||||
|  |  | ||||||
|         retcode, stdout, stderr = common.run_os_command('rados df --format json', timeout=1) |         retcode, stdout, stderr = common.run_os_command('rados df --format json', timeout=1) | ||||||
|         try: |         try: | ||||||
|             rados_pool_df_raw = json.loads(stdout)['pools'] |             rados_pool_df_raw = json.loads(stdout)['pools'] | ||||||
|         except json.decoder.JSONDecodeError: |         except Exception as e: | ||||||
|             logger.out('Failed to obtain Pool data (rados df)', state='w') |             logger.out('Failed to obtain Pool data (rados df): {}'.format(e), state='w') | ||||||
|             rados_pool_df_raw = [] |             rados_pool_df_raw = [] | ||||||
|  |  | ||||||
|         pool_count = len(ceph_pool_df_raw) |         pool_count = len(ceph_pool_df_raw) | ||||||
| @@ -1134,13 +1143,15 @@ def collect_ceph_stats(queue): | |||||||
|         # Get data from Ceph OSDs |         # Get data from Ceph OSDs | ||||||
|         if debug: |         if debug: | ||||||
|             print("Get data from Ceph OSDs") |             print("Get data from Ceph OSDs") | ||||||
|  |  | ||||||
|         # Parse the dump data |         # Parse the dump data | ||||||
|         osd_dump = dict() |         osd_dump = dict() | ||||||
|         retcode, stdout, stderr = common.run_os_command('ceph osd dump --format json', timeout=1) |  | ||||||
|  |         command = { "prefix": "osd dump", "format": "json" } | ||||||
|         try: |         try: | ||||||
|             osd_dump_raw = json.loads(stdout)['osds'] |             osd_dump_raw = json.loads(ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1])['osds'] | ||||||
|         except json.decoder.JSONDecodeError: |         except Exception as e: | ||||||
|             logger.out('Failed to obtain OSD data', state='w') |             logger.out('Failed to obtain OSD data: {}'.format(e), state='w') | ||||||
|             osd_dump_raw = [] |             osd_dump_raw = [] | ||||||
|  |  | ||||||
|         if debug: |         if debug: | ||||||
| @@ -1158,12 +1169,14 @@ def collect_ceph_stats(queue): | |||||||
|         # Parse the df data |         # Parse the df data | ||||||
|         if debug: |         if debug: | ||||||
|             print("Parse the OSD df data") |             print("Parse the OSD df data") | ||||||
|  |  | ||||||
|         osd_df = dict() |         osd_df = dict() | ||||||
|         retcode, stdout, stderr = common.run_os_command('ceph osd df --format json', timeout=1) |  | ||||||
|  |         command = { "prefix": "osd df", "format": "json" } | ||||||
|         try: |         try: | ||||||
|             osd_df_raw = json.loads(stdout)['nodes'] |             osd_df_raw = json.loads(ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1])['nodes'] | ||||||
|         except Exception as e: |         except Exception as e: | ||||||
|             logger.out('Failed to parse OSD list: {}'.format(e), state='w') |             logger.out('Failed to obtain OSD data: {}'.format(e), state='w') | ||||||
|             osd_df_raw = [] |             osd_df_raw = [] | ||||||
|  |  | ||||||
|         if debug: |         if debug: | ||||||
| @@ -1179,14 +1192,24 @@ def collect_ceph_stats(queue): | |||||||
|                     'reweight': osd['reweight'], |                     'reweight': osd['reweight'], | ||||||
|                 } |                 } | ||||||
|             }) |             }) | ||||||
|  |  | ||||||
|         # Parse the status data |         # Parse the status data | ||||||
|         if debug: |         if debug: | ||||||
|             print("Parse the OSD status data") |             print("Parse the OSD status data") | ||||||
|  |  | ||||||
|         osd_status = dict() |         osd_status = dict() | ||||||
|         retcode, stdout, stderr = common.run_os_command('ceph osd status', timeout=1) |  | ||||||
|  |         command = { "prefix": "osd status", "format": "pretty" } | ||||||
|  |         try: | ||||||
|  |             osd_status_raw = ceph_conn.mon_command(json.dumps(command), b'', timeout=1)[1].decode('ascii') | ||||||
|  |         except Exception as e: | ||||||
|  |             logger.out('Failed to obtain OSD status data: {}'.format(e), state='w') | ||||||
|  |             osd_status_raw = [] | ||||||
|  |  | ||||||
|         if debug: |         if debug: | ||||||
|             print("Loop through OSD status data") |             print("Loop through OSD status data") | ||||||
|         for line in stdout.split('\n'): |  | ||||||
|  |         for line in osd_status_raw.split('\n'): | ||||||
|             # Strip off colour |             # Strip off colour | ||||||
|             line = re.sub(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))', '', line) |             line = re.sub(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))', '', line) | ||||||
|             # Split it for parsing |             # Split it for parsing | ||||||
| @@ -1214,10 +1237,13 @@ def collect_ceph_stats(queue): | |||||||
|                         'state': state |                         'state': state | ||||||
|                     } |                     } | ||||||
|                 }) |                 }) | ||||||
|  |  | ||||||
|         # Merge them together into a single meaningful dict |         # Merge them together into a single meaningful dict | ||||||
|         if debug: |         if debug: | ||||||
|             print("Merge OSD data together") |             print("Merge OSD data together") | ||||||
|  |  | ||||||
|         osd_stats = dict() |         osd_stats = dict() | ||||||
|  |  | ||||||
|         for osd in osd_list: |         for osd in osd_list: | ||||||
|             try: |             try: | ||||||
|                 this_dump = osd_dump[osd] |                 this_dump = osd_dump[osd] | ||||||
| @@ -1231,6 +1257,7 @@ def collect_ceph_stats(queue): | |||||||
|         # Trigger updates for each OSD on this node |         # Trigger updates for each OSD on this node | ||||||
|         if debug: |         if debug: | ||||||
|             print("Trigger updates for each OSD on this node") |             print("Trigger updates for each OSD on this node") | ||||||
|  |  | ||||||
|         for osd in osd_list: |         for osd in osd_list: | ||||||
|             if d_osd[osd].node == myhostname: |             if d_osd[osd].node == myhostname: | ||||||
|                 try: |                 try: | ||||||
| @@ -1243,6 +1270,8 @@ def collect_ceph_stats(queue): | |||||||
|                     logger.out('Failed to upload OSD stats from dictionary: {}'.format(e), state='w') |                     logger.out('Failed to upload OSD stats from dictionary: {}'.format(e), state='w') | ||||||
|                 osds_this_node += 1 |                 osds_this_node += 1 | ||||||
|  |  | ||||||
|  |     ceph_conn.shutdown() | ||||||
|  |  | ||||||
|     queue.put(ceph_health_colour) |     queue.put(ceph_health_colour) | ||||||
|     queue.put(ceph_health) |     queue.put(ceph_health) | ||||||
|     queue.put(osds_this_node) |     queue.put(osds_this_node) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user