diff --git a/node-daemon/pvcd/Daemon.py b/node-daemon/pvcd/Daemon.py index fc96ea4f..924ba0c0 100644 --- a/node-daemon/pvcd/Daemon.py +++ b/node-daemon/pvcd/Daemon.py @@ -58,7 +58,7 @@ import pvcd.CephInstance as CephInstance ############################################################################### # PVCD - node daemon startup program ############################################################################### -# +# # The PVC daemon starts a node and configures all the required components for # the node to run. It determines which of the 3 daemon modes it should be in # during initial setup based on hostname and the config file, and then starts @@ -201,14 +201,14 @@ def readConfig(pvcd_config_file, myhostname): address_key = '{}_dev_ip'.format(net) floating_key = '{}_floating_ip'.format(net) network_key = '{}_network'.format(net) - + # Verify the network provided is valid try: network = ipaddress.ip_network(config[network_key]) except Exception as e: print('ERROR: Network address {} for {} is not valid!'.format(config[network_key], network_key)) exit(1) - + # If we should be autoselected if config[address_key] == 'by-id': # Construct an IP from the relevant network @@ -216,9 +216,9 @@ def readConfig(pvcd_config_file, myhostname): address_id = int(mynodeid) - 1 # Grab the nth address from the network config[address_key] = '{}/{}'.format(list(network.hosts())[address_id], network.prefixlen) - + # Verify that the floating IP is valid - + try: # Set the ipaddr floating_addr = ipaddress.ip_address(config[floating_key].split('/')[0]) @@ -228,7 +228,7 @@ def readConfig(pvcd_config_file, myhostname): except Exception as e: print('ERROR: Floating address {} for {} is not valid!'.format(config[floating_key], floating_key)) exit(1) - + # Handle the storage config if config['enable_storage']: try: @@ -246,7 +246,7 @@ def readConfig(pvcd_config_file, myhostname): # Get the config object from readConfig() config = readConfig(pvcd_config_file, myhostname) - + # Handle the enable values enable_hypervisor = config['enable_hypervisor'] enable_networking = config['enable_networking'] @@ -357,19 +357,19 @@ if enable_networking: # Enable routing functions common.run_os_command('sysctl net.ipv4.ip_forward=1') common.run_os_command('sysctl net.ipv6.ip_forward=1') - + # Send redirects common.run_os_command('sysctl net.ipv4.conf.all.send_redirects=1') common.run_os_command('sysctl net.ipv4.conf.default.send_redirects=1') common.run_os_command('sysctl net.ipv6.conf.all.send_redirects=1') common.run_os_command('sysctl net.ipv6.conf.default.send_redirects=1') - + # Accept source routes common.run_os_command('sysctl net.ipv4.conf.all.accept_source_route=1') common.run_os_command('sysctl net.ipv4.conf.default.accept_source_route=1') common.run_os_command('sysctl net.ipv6.conf.all.accept_source_route=1') common.run_os_command('sysctl net.ipv6.conf.default.accept_source_route=1') - + # Disable RP filtering on the VNI dev and bridge interfaces (to allow traffic pivoting) common.run_os_command('sysctl net.ipv4.conf.{}.rp_filter=0'.format(config['vni_dev'])) common.run_os_command('sysctl net.ipv4.conf.{}.rp_filter=0'.format(config['upstream_dev'])) @@ -912,104 +912,111 @@ def update_zookeeper(): } }) - # Trigger updates for each OSD on this node + # Trigger updates for each pool on this node for pool in pool_list: zkhandler.writedata(zk_conn, { '/ceph/pools/{}/stats'.format(pool): str(json.dumps(pool_df[pool])) }) - # Get data from Ceph OSDs - if debug: - print("Get data from Ceph OSDs") - # Parse the dump data - osd_dump = dict() - retcode, stdout, stderr = common.run_os_command('ceph osd dump --format json') - osd_dump_raw = json.loads(stdout)['osds'] - if debug: - print("Loop through OSD dump") - for osd in osd_dump_raw: - osd_dump.update({ - str(osd['osd']): { - 'uuid': osd['uuid'], - 'up': osd['up'], - 'in': osd['in'], - 'primary_affinity': osd['primary_affinity'] - } - }) - # Parse the df data - if debug: - print("Parse the OSD df data") - osd_df = dict() - retcode, stdout, stderr = common.run_os_command('ceph osd df --format json') - osd_df_raw = json.loads(stdout)['nodes'] - if debug: - print("Loop through OSD df") - for osd in osd_df_raw: - osd_df.update({ - str(osd['id']): { - 'utilization': osd['utilization'], - 'var': osd['var'], - 'pgs': osd['pgs'], - 'kb': osd['kb'], - 'weight': osd['crush_weight'], - 'reweight': osd['reweight'], - } - }) - # Parse the status data - if debug: - print("Parse the OSD status data") - osd_status = dict() - retcode, stdout, stderr = common.run_os_command('ceph osd status') - if debug: - print("Loop through OSD status data") - for line in stderr.split('\n'): - # Strip off colour - line = re.sub(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))', '', line) - # Split it for parsing - line = line.split() - if len(line) > 1 and line[1].isdigit(): - # This is an OSD line so parse it - osd_id = line[1] - node = line[3].split('.')[0] - used = line[5] - avail = line[7] - wr_ops = line[9] - wr_data = line[11] - rd_ops = line[13] - rd_data = line[15] - state = line[17] - osd_status.update({ - str(osd_id): { - 'node': node, - 'used': used, - 'avail': avail, - 'wr_ops': wr_ops, - 'wr_data': wr_data, - 'rd_ops': rd_ops, - 'rd_data': rd_data, - 'state': state + # Only grab OSD stats if there are OSDs to grab (otherwise `ceph osd df` hangs) + osds_this_node = 0 + if len(osd_list) > 0: + # Get data from Ceph OSDs + if debug: + print("Get data from Ceph OSDs") + # Parse the dump data + osd_dump = dict() + retcode, stdout, stderr = common.run_os_command('ceph osd dump --format json') + osd_dump_raw = json.loads(stdout)['osds'] + if debug: + print("Loop through OSD dump") + for osd in osd_dump_raw: + osd_dump.update({ + str(osd['osd']): { + 'uuid': osd['uuid'], + 'up': osd['up'], + 'in': osd['in'], + 'primary_affinity': osd['primary_affinity'] } }) - # Merge them together into a single meaningful dict - if debug: - print("Merge OSD data together") - osd_stats = dict() - for osd in osd_list: - this_dump = osd_dump[osd] - this_dump.update(osd_df[osd]) - this_dump.update(osd_status[osd]) - osd_stats[osd] = this_dump - # Trigger updates for each OSD on this node - if debug: - print("Trigger updates for each OSD on this node") - osds_this_node = 0 - for osd in osd_list: - if d_osd[osd].node == myhostname: - zkhandler.writedata(zk_conn, { - '/ceph/osds/{}/stats'.format(osd): str(json.dumps(osd_stats[osd])) + # Parse the df data + if debug: + print("Parse the OSD df data") + osd_df = dict() + retcode, stdout, stderr = common.run_os_command('ceph osd df --format json') + try: + osd_df_raw = json.loads(stdout)['nodes'] + except: + logger.out('Failed to parse OSD list', state='w') + + if debug: + print("Loop through OSD df") + for osd in osd_df_raw: + osd_df.update({ + str(osd['id']): { + 'utilization': osd['utilization'], + 'var': osd['var'], + 'pgs': osd['pgs'], + 'kb': osd['kb'], + 'weight': osd['crush_weight'], + 'reweight': osd['reweight'], + } }) - osds_this_node += 1 + # Parse the status data + if debug: + print("Parse the OSD status data") + osd_status = dict() + retcode, stdout, stderr = common.run_os_command('ceph osd status') + if debug: + print("Loop through OSD status data") + for line in stderr.split('\n'): + # Strip off colour + line = re.sub(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))', '', line) + # Split it for parsing + line = line.split() + if len(line) > 1 and line[1].isdigit(): + # This is an OSD line so parse it + osd_id = line[1] + node = line[3].split('.')[0] + used = line[5] + avail = line[7] + wr_ops = line[9] + wr_data = line[11] + rd_ops = line[13] + rd_data = line[15] + state = line[17] + osd_status.update({ + str(osd_id): { + 'node': node, + 'used': used, + 'avail': avail, + 'wr_ops': wr_ops, + 'wr_data': wr_data, + 'rd_ops': rd_ops, + 'rd_data': rd_data, + 'state': state + } + }) + # Merge them together into a single meaningful dict + if debug: + print("Merge OSD data together") + osd_stats = dict() + for osd in osd_list: + this_dump = osd_dump[osd] + this_dump.update(osd_df[osd]) + this_dump.update(osd_status[osd]) + osd_stats[osd] = this_dump + + # Trigger updates for each OSD on this node + if debug: + print("Trigger updates for each OSD on this node") + for osd in osd_list: + if d_osd[osd].node == myhostname: + zkhandler.writedata(zk_conn, { + '/ceph/osds/{}/stats'.format(osd): str(json.dumps(osd_stats[osd])) + }) + osds_this_node += 1 memalloc = 0 vcpualloc = 0 @@ -1030,7 +1037,7 @@ def update_zookeeper(): except Exception as e: # Toggle a state "change" zkhandler.writedata(zk_conn, { '/domains/{}/state'.format(domain): instance.getstate() }) - + # Connect to libvirt if debug: print("Connect to libvirt") @@ -1039,7 +1046,7 @@ def update_zookeeper(): if lv_conn == None: logger.out('Failed to open connection to "{}"'.format(libvirt_name), state='e') return - + # Ensure that any running VMs are readded to the domain_list if debug: print("Ensure that any running VMs are readded to the domain_list")