Move Ceph statistics gathering into thread

This commit is contained in:
Joshua Boniface 2020-06-06 13:23:24 -04:00
parent cebb4bbc1a
commit 2ad6860dfe
1 changed files with 245 additions and 224 deletions

View File

@ -44,6 +44,8 @@ import apscheduler.schedulers.background
from distutils.util import strtobool from distutils.util import strtobool
from queue import Queue
import pvcnoded.log as log import pvcnoded.log as log
import pvcnoded.zkhandler as zkhandler import pvcnoded.zkhandler as zkhandler
import pvcnoded.fencing as fencing import pvcnoded.fencing as fencing
@ -1011,6 +1013,236 @@ if enable_storage:
# PHASE 9 - Run the daemon # PHASE 9 - Run the daemon
############################################################################### ###############################################################################
# Ceph stats update function
def collect_ceph_stats(queue):
# Get Ceph cluster health (for local printing)
if debug:
print("Get Ceph cluster health (for local printing)")
retcode, stdout, stderr = common.run_os_command('ceph health', timeout=1)
ceph_health = stdout.rstrip()
if 'HEALTH_OK' in ceph_health:
ceph_health_colour = fmt_green
elif 'HEALTH_WARN' in ceph_health:
ceph_health_colour = fmt_yellow
else:
ceph_health_colour = fmt_red
# Set ceph health information in zookeeper (primary only)
if this_node.router_state == 'primary':
if debug:
print("Set ceph health information in zookeeper (primary only)")
# Get status info
retcode, stdout, stderr = common.run_os_command('ceph status', timeout=1)
ceph_status = stdout
try:
zkhandler.writedata(zk_conn, {
'/ceph': str(ceph_status)
})
except:
logger.out('Failed to set Ceph status data', state='e')
return
# Set ceph rados df information in zookeeper (primary only)
if this_node.router_state == 'primary':
if debug:
print("Set ceph rados df information in zookeeper (primary only)")
# Get rados df info
retcode, stdout, stderr = common.run_os_command('rados df', timeout=1)
rados_df = stdout
try:
zkhandler.writedata(zk_conn, {
'/ceph/radosdf': str(rados_df)
})
except:
logger.out('Failed to set Rados space data', state='e')
return
# Set pool information in zookeeper (primary only)
if this_node.router_state == 'primary':
if debug:
print("Set pool information in zookeeper (primary only)")
# Get pool info
retcode, stdout, stderr = common.run_os_command('ceph df --format json', timeout=1)
try:
ceph_pool_df_raw = json.loads(stdout)['pools']
except json.decoder.JSONDecodeError:
logger.out('Failed to obtain Pool data (ceph df)', state='w')
ceph_pool_df_raw = []
retcode, stdout, stderr = common.run_os_command('rados df --format json', timeout=1)
try:
rados_pool_df_raw = json.loads(stdout)['pools']
except json.decoder.JSONDecodeError:
logger.out('Failed to obtain Pool data (rados df)', state='w')
rados_pool_df_raw = []
pool_count = len(ceph_pool_df_raw)
if debug:
print("Getting info for {} pools".format(pool_count))
for pool_idx in range(0, pool_count):
try:
# Combine all the data for this pool
ceph_pool_df = ceph_pool_df_raw[pool_idx]
rados_pool_df = rados_pool_df_raw[pool_idx]
pool = ceph_pool_df
pool.update(rados_pool_df)
# Ignore any pools that aren't in our pool list
if pool['name'] not in pool_list:
if debug:
print("Pool {} not in pool list {}".format(pool['name'], pool_list))
continue
else:
if debug:
print("Parsing data for pool {}".format(pool['name']))
# Assemble a useful data structure
pool_df = {
'id': pool['id'],
'free_bytes': pool['stats']['max_avail'],
'used_bytes': pool['stats']['bytes_used'],
'used_percent': pool['stats']['percent_used'],
'num_objects': pool['stats']['objects'],
'num_object_clones': pool['num_object_clones'],
'num_object_copies': pool['num_object_copies'],
'num_objects_missing_on_primary': pool['num_objects_missing_on_primary'],
'num_objects_unfound': pool['num_objects_unfound'],
'num_objects_degraded': pool['num_objects_degraded'],
'read_ops': pool['read_ops'],
'read_bytes': pool['read_bytes'],
'write_ops': pool['write_ops'],
'write_bytes': pool['write_bytes']
}
# Write the pool data to Zookeeper
zkhandler.writedata(zk_conn, {
'/ceph/pools/{}/stats'.format(pool['name']): str(json.dumps(pool_df))
})
except Exception as e:
# One or more of the status commands timed out, just continue
logger.out('Failed to format and send pool data', state='w')
pass
# Only grab OSD stats if there are OSDs to grab (otherwise `ceph osd df` hangs)
osds_this_node = 0
if len(osd_list) > 0:
# Get data from Ceph OSDs
if debug:
print("Get data from Ceph OSDs")
# Parse the dump data
osd_dump = dict()
retcode, stdout, stderr = common.run_os_command('ceph osd dump --format json', timeout=1)
try:
osd_dump_raw = json.loads(stdout)['osds']
except json.decoder.JSONDecodeError:
logger.out('Failed to obtain OSD data', state='w')
osd_dump_raw = []
if debug:
print("Loop through OSD dump")
for osd in osd_dump_raw:
osd_dump.update({
str(osd['osd']): {
'uuid': osd['uuid'],
'up': osd['up'],
'in': osd['in'],
'primary_affinity': osd['primary_affinity']
}
})
# Parse the df data
if debug:
print("Parse the OSD df data")
osd_df = dict()
retcode, stdout, stderr = common.run_os_command('ceph osd df --format json', timeout=1)
try:
osd_df_raw = json.loads(stdout)['nodes']
except Exception as e:
logger.out('Failed to parse OSD list: {}'.format(e), state='w')
osd_df_raw = []
if debug:
print("Loop through OSD df")
for osd in osd_df_raw:
osd_df.update({
str(osd['id']): {
'utilization': osd['utilization'],
'var': osd['var'],
'pgs': osd['pgs'],
'kb': osd['kb'],
'weight': osd['crush_weight'],
'reweight': osd['reweight'],
}
})
# Parse the status data
if debug:
print("Parse the OSD status data")
osd_status = dict()
retcode, stdout, stderr = common.run_os_command('ceph osd status', timeout=1)
if debug:
print("Loop through OSD status data")
for line in stdout.split('\n'):
# Strip off colour
line = re.sub(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))', '', line)
# Split it for parsing
line = line.split()
if len(line) > 1 and line[1].isdigit():
# This is an OSD line so parse it
osd_id = line[1]
node = line[3].split('.')[0]
used = line[5]
avail = line[7]
wr_ops = line[9]
wr_data = line[11]
rd_ops = line[13]
rd_data = line[15]
state = line[17]
osd_status.update({
str(osd_id): {
'node': node,
'used': used,
'avail': avail,
'wr_ops': wr_ops,
'wr_data': wr_data,
'rd_ops': rd_ops,
'rd_data': rd_data,
'state': state
}
})
# Merge them together into a single meaningful dict
if debug:
print("Merge OSD data together")
osd_stats = dict()
for osd in osd_list:
try:
this_dump = osd_dump[osd]
this_dump.update(osd_df[osd])
this_dump.update(osd_status[osd])
osd_stats[osd] = this_dump
except KeyError as e:
# One or more of the status commands timed out, just continue
logger.out('Failed to parse OSD stats into dictionary: {}'.format(e), state='w')
# Trigger updates for each OSD on this node
if debug:
print("Trigger updates for each OSD on this node")
for osd in osd_list:
if d_osd[osd].node == myhostname:
try:
stats = json.dumps(osd_stats[osd])
zkhandler.writedata(zk_conn, {
'/ceph/osds/{}/stats'.format(osd): str(stats)
})
except KeyError as e:
# One or more of the status commands timed out, just continue
logger.out('Failed to upload OSD stats from dictionary: {}'.format(e), state='w')
osds_this_node += 1
queue.put(ceph_health_colour)
queue.put(ceph_health)
queue.put(osds_this_node)
# Keepalive update function # Keepalive update function
def node_keepalive(): def node_keepalive():
# Set the upstream IP in Zookeeper for clients to read # Set the upstream IP in Zookeeper for clients to read
@ -1039,230 +1271,11 @@ def node_keepalive():
if zkhandler.readdata(zk_conn, '/primary_node') != this_node.name: if zkhandler.readdata(zk_conn, '/primary_node') != this_node.name:
zkhandler.writedata(zk_conn, {'/primary_node': this_node.name}) zkhandler.writedata(zk_conn, {'/primary_node': this_node.name})
# Run Ceph status collection in separate thread for parallelization
if enable_storage: if enable_storage:
# Get Ceph cluster health (for local printing) ceph_thread_queue = Queue()
if debug: ceph_stats_thread = threading.Thread(target=collect_ceph_stats, args=(ceph_thread_queue,), kwargs={})
print("Get Ceph cluster health (for local printing)") ceph_stats_thread.start()
retcode, stdout, stderr = common.run_os_command('ceph health', timeout=1)
ceph_health = stdout.rstrip()
if 'HEALTH_OK' in ceph_health:
ceph_health_colour = fmt_green
elif 'HEALTH_WARN' in ceph_health:
ceph_health_colour = fmt_yellow
else:
ceph_health_colour = fmt_red
# Set ceph health information in zookeeper (primary only)
if this_node.router_state == 'primary':
if debug:
print("Set ceph health information in zookeeper (primary only)")
# Get status info
retcode, stdout, stderr = common.run_os_command('ceph status', timeout=1)
ceph_status = stdout
try:
zkhandler.writedata(zk_conn, {
'/ceph': str(ceph_status)
})
except:
logger.out('Failed to set Ceph status data', state='e')
return
# Set ceph rados df information in zookeeper (primary only)
if this_node.router_state == 'primary':
if debug:
print("Set ceph rados df information in zookeeper (primary only)")
# Get rados df info
retcode, stdout, stderr = common.run_os_command('rados df', timeout=1)
rados_df = stdout
try:
zkhandler.writedata(zk_conn, {
'/ceph/radosdf': str(rados_df)
})
except:
logger.out('Failed to set Rados space data', state='e')
return
# Set pool information in zookeeper (primary only)
if this_node.router_state == 'primary':
if debug:
print("Set pool information in zookeeper (primary only)")
# Get pool info
retcode, stdout, stderr = common.run_os_command('ceph df --format json', timeout=1)
try:
ceph_pool_df_raw = json.loads(stdout)['pools']
except json.decoder.JSONDecodeError:
logger.out('Failed to obtain Pool data (ceph df)', state='w')
ceph_pool_df_raw = []
retcode, stdout, stderr = common.run_os_command('rados df --format json', timeout=1)
try:
rados_pool_df_raw = json.loads(stdout)['pools']
except json.decoder.JSONDecodeError:
logger.out('Failed to obtain Pool data (rados df)', state='w')
rados_pool_df_raw = []
pool_count = len(ceph_pool_df_raw)
if debug:
print("Getting info for {} pools".format(pool_count))
for pool_idx in range(0, pool_count):
try:
# Combine all the data for this pool
ceph_pool_df = ceph_pool_df_raw[pool_idx]
rados_pool_df = rados_pool_df_raw[pool_idx]
pool = ceph_pool_df
pool.update(rados_pool_df)
# Ignore any pools that aren't in our pool list
if pool['name'] not in pool_list:
if debug:
print("Pool {} not in pool list {}".format(pool['name'], pool_list))
continue
else:
if debug:
print("Parsing data for pool {}".format(pool['name']))
# Assemble a useful data structure
pool_df = {
'id': pool['id'],
'free_bytes': pool['stats']['max_avail'],
'used_bytes': pool['stats']['bytes_used'],
'used_percent': pool['stats']['percent_used'],
'num_objects': pool['stats']['objects'],
'num_object_clones': pool['num_object_clones'],
'num_object_copies': pool['num_object_copies'],
'num_objects_missing_on_primary': pool['num_objects_missing_on_primary'],
'num_objects_unfound': pool['num_objects_unfound'],
'num_objects_degraded': pool['num_objects_degraded'],
'read_ops': pool['read_ops'],
'read_bytes': pool['read_bytes'],
'write_ops': pool['write_ops'],
'write_bytes': pool['write_bytes']
}
# Write the pool data to Zookeeper
zkhandler.writedata(zk_conn, {
'/ceph/pools/{}/stats'.format(pool['name']): str(json.dumps(pool_df))
})
except Exception as e:
# One or more of the status commands timed out, just continue
logger.out('Failed to format and send pool data', state='w')
pass
# Only grab OSD stats if there are OSDs to grab (otherwise `ceph osd df` hangs)
osds_this_node = 0
if len(osd_list) > 0:
# Get data from Ceph OSDs
if debug:
print("Get data from Ceph OSDs")
# Parse the dump data
osd_dump = dict()
retcode, stdout, stderr = common.run_os_command('ceph osd dump --format json', timeout=1)
try:
osd_dump_raw = json.loads(stdout)['osds']
except json.decoder.JSONDecodeError:
logger.out('Failed to obtain OSD data', state='w')
osd_dump_raw = []
if debug:
print("Loop through OSD dump")
for osd in osd_dump_raw:
osd_dump.update({
str(osd['osd']): {
'uuid': osd['uuid'],
'up': osd['up'],
'in': osd['in'],
'primary_affinity': osd['primary_affinity']
}
})
# Parse the df data
if debug:
print("Parse the OSD df data")
osd_df = dict()
retcode, stdout, stderr = common.run_os_command('ceph osd df --format json', timeout=1)
try:
osd_df_raw = json.loads(stdout)['nodes']
except Exception as e:
logger.out('Failed to parse OSD list: {}'.format(e), state='w')
osd_df_raw = []
if debug:
print("Loop through OSD df")
for osd in osd_df_raw:
osd_df.update({
str(osd['id']): {
'utilization': osd['utilization'],
'var': osd['var'],
'pgs': osd['pgs'],
'kb': osd['kb'],
'weight': osd['crush_weight'],
'reweight': osd['reweight'],
}
})
# Parse the status data
if debug:
print("Parse the OSD status data")
osd_status = dict()
retcode, stdout, stderr = common.run_os_command('ceph osd status', timeout=1)
if debug:
print("Loop through OSD status data")
for line in stdout.split('\n'):
# Strip off colour
line = re.sub(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))', '', line)
# Split it for parsing
line = line.split()
if len(line) > 1 and line[1].isdigit():
# This is an OSD line so parse it
osd_id = line[1]
node = line[3].split('.')[0]
used = line[5]
avail = line[7]
wr_ops = line[9]
wr_data = line[11]
rd_ops = line[13]
rd_data = line[15]
state = line[17]
osd_status.update({
str(osd_id): {
'node': node,
'used': used,
'avail': avail,
'wr_ops': wr_ops,
'wr_data': wr_data,
'rd_ops': rd_ops,
'rd_data': rd_data,
'state': state
}
})
# Merge them together into a single meaningful dict
if debug:
print("Merge OSD data together")
osd_stats = dict()
for osd in osd_list:
try:
this_dump = osd_dump[osd]
this_dump.update(osd_df[osd])
this_dump.update(osd_status[osd])
osd_stats[osd] = this_dump
except KeyError as e:
# One or more of the status commands timed out, just continue
logger.out('Failed to parse OSD stats into dictionary: {}'.format(e), state='w')
# Trigger updates for each OSD on this node
if debug:
print("Trigger updates for each OSD on this node")
for osd in osd_list:
if d_osd[osd].node == myhostname:
try:
stats = json.dumps(osd_stats[osd])
zkhandler.writedata(zk_conn, {
'/ceph/osds/{}/stats'.format(osd): str(stats)
})
except KeyError as e:
# One or more of the status commands timed out, just continue
logger.out('Failed to upload OSD stats from dictionary: {}'.format(e), state='w')
osds_this_node += 1
# Normalize running VM status # Normalize running VM status
memalloc = 0 memalloc = 0
@ -1318,6 +1331,14 @@ def node_keepalive():
else: else:
this_node.domains_count = 0 this_node.domains_count = 0
# Wait for Ceph thread completion
if enable_storage:
ceph_stats_thread.join()
ceph_health_colour = ceph_thread_queue.get()
ceph_health = ceph_thread_queue.get()
osds_this_node = ceph_thread_queue.get()
# Set our information in zookeeper # Set our information in zookeeper
keepalive_time = int(time.time()) keepalive_time = int(time.time())
if debug: if debug: