From bfbe9188ce830381f3f2fa1da11f1973f08eca8c Mon Sep 17 00:00:00 2001 From: Joshua Boniface Date: Mon, 29 Oct 2018 17:51:08 -0400 Subject: [PATCH] Finish setup of Ceph OSD addition and basic management --- client-cli/pvc.py | 4 +- client-common/ceph.py | 6 +- node-daemon/pvcd/CephInstance.py | 194 +++++++++++++++++++++++++++++++ node-daemon/pvcd/Daemon.py | 183 ++++++++++++++++++++++++----- node-daemon/pvcd/NodeInstance.py | 16 --- node-daemon/pvcd/common.py | 11 +- 6 files changed, 363 insertions(+), 51 deletions(-) create mode 100644 node-daemon/pvcd/CephInstance.py diff --git a/client-cli/pvc.py b/client-cli/pvc.py index eab0ee40..b83ad10c 100755 --- a/client-cli/pvc.py +++ b/client-cli/pvc.py @@ -964,7 +964,7 @@ def ceph_osd_add(node, device): @click.argument( 'osdid' ) -def ceph_osd_remove(node, device): +def ceph_osd_remove(osdid): """ Remove a Ceph OSD with ID OSDID from the cluster. """ @@ -1113,7 +1113,7 @@ net_acl.add_command(net_acl_remove) net_acl.add_command(net_acl_list) ceph_osd.add_command(ceph_osd_add) -#ceph_osd.add_command(ceph_osd_remove) +ceph_osd.add_command(ceph_osd_remove) #ceph_osd.add_command(ceph_osd_in) #ceph_osd.add_command(ceph_osd_out) #ceph_osd.add_command(ceph_osd_set) diff --git a/client-common/ceph.py b/client-common/ceph.py index 93c589e6..a74ec79a 100644 --- a/client-common/ceph.py +++ b/client-common/ceph.py @@ -59,13 +59,13 @@ def add_osd(zk_conn, node, device): return False, 'ERROR: No node named "{}" is present in the cluster.'.format(node) # Tell the cluster to create a new OSD for the host - new_osd_string = 'new {},{}'.format(node, device) - zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': new_osd_string}) + add_osd_string = 'add {},{}'.format(node, device) + zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': add_osd_string}) click.echo('Created new OSD with block device {} on node {}.'.format(device, node)) return True, '' def remove_osd(zk_conn, osd_id): remove_osd_string = 'remove {}'.format(osd_id) zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': remove_osd_string}) - click.echo('Remove OSD {} from the cluster.'.format(osd_id)) + click.echo('Removed OSD with ID {} from the cluster.'.format(osd_id)) return True, '' diff --git a/node-daemon/pvcd/CephInstance.py b/node-daemon/pvcd/CephInstance.py new file mode 100644 index 00000000..41620a22 --- /dev/null +++ b/node-daemon/pvcd/CephInstance.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python3 + +# CehpInstance.py - Class implementing a PVC node Ceph instance +# Part of the Parallel Virtual Cluster (PVC) system +# +# Copyright (C) 2018 Joshua M. Boniface +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +############################################################################### + +import time +import ast + +import pvcd.log as log +import pvcd.zkhandler as zkhandler +import pvcd.fencing as fencing +import pvcd.common as common + +class CephInstance(object): + def __init__(self): + pass + +class CephOSDInstance(object): + def __init__(self, zk_conn, this_node, osd_id): + self.zk_conn = zk_conn + self.this_node = this_node + self.osd_id = osd_id + self.node = None + self.size = None + self.stats = dict() + + @self.zk_conn.DataWatch('/ceph/osds/{}/node'.format(self.osd_id)) + def watch_osd_host(data, stat, event=''): + if event and event.type == 'DELETED': + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode('ascii') + except AttributeError: + data = '' + + if data != self.node: + self.node = data + + @self.zk_conn.DataWatch('/ceph/osds/{}/size'.format(self.osd_id)) + def watch_osd_host(data, stat, event=''): + if event and event.type == 'DELETED': + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode('ascii') + except AttributeError: + data = '' + + if data != self.size: + self.size = data + + @self.zk_conn.DataWatch('/ceph/osds/{}/stats'.format(self.osd_id)) + def watch_osd_host(data, stat, event=''): + if event and event.type == 'DELETED': + # The key has been deleted after existing before; terminate this watcher + # because this class instance is about to be reaped in Daemon.py + return False + + try: + data = data.decode('ascii') + except AttributeError: + data = '' + + if data != self.stats: + self.stats.update(ast.literal_eval(data)) + +def add_osd(zk_conn, logger, node, device): + # We are ready to create a new OSD on this host + logger.out('Creating new OSD disk', state='i') + try: + # 1. Create an OSD; we do this so we know what ID will be gen'd + retcode, stdout, stderr = common.run_os_command('ceph osd create') + if retcode != 0: + print(stdout) + print(stderr) + raise + osd_id = stdout.rstrip() + + # 2. Remove that newly-created OSD + retcode, stdout, stderr = common.run_os_command('ceph osd rm {}'.format(osd_id)) + if retcode != 0: + print(stdout) + print(stderr) + raise + + # 3. Create the OSD for real + retcode, stdout, stderr = common.run_os_command( + 'ceph-volume lvm prepare --bluestore --data {device}'.format( + osdid=osd_id, + device=device + ) + ) + if retcode != 0: + print(stdout) + print(stderr) + raise + + # 4. Activate the OSD + retcode, stdout, stderr = common.run_os_command( + 'ceph-volume lvm activate --bluestore {osdid}'.format( + osdid=osd_id + ) + ) + if retcode != 0: + print(stdout) + print(stderr) + raise + + # 5. Add it to the crush map + retcode, stdout, stderr = common.run_os_command( + 'ceph osd crush add osd.{osdid} 1.0 root=default host={node}'.format( + osdid=osd_id, + node=node + ) + ) + if retcode != 0: + print(stdout) + print(stderr) + raise + time.sleep(0.5) + + # 6. Verify it started + retcode, stdout, stderr = common.run_os_command( + 'systemctl status ceph-osd@{osdid}'.format( + osdid=osd_id + ) + ) + if retcode != 0: + print(stdout) + print(stderr) + raise + + # 7. Add the new OSD to the list + zkhandler.writedata(zk_conn, { + '/ceph/osds/{}'.format(osd_id): '', + '/ceph/osds/{}/node'.format(osd_id): node, + '/ceph/osds/{}/size'.format(osd_id): '', + '/ceph/osds/{}/stats'.format(osd_id): '{}' + }) + + # Log it + logger.out('Created new OSD disk with ID {}'.format(osd_id), state='o') + except Exception as e: + # Log it + logger.out('Failed to create new OSD disk: {}'.format(e), state='e') + +def remove_osd(zk_conn, logger, osd_id, osd_obj): + logger.out('Removing OSD disk {}'.format(osd_id), state='i') + try: + # 1. Verify the OSD is present + retcode, stdout, stderr = common.run_os_command('ceph osd ls') + osd_list = stdout.split('\n') + if not osd_id in osd_list: + logger.out('Could not find OSD {} in the cluster'.format(osd_id), state='e') + return True + + # 1. Set the OSD out so it will flush + retcode, stdout, stderr = common.run_os_command('ceph osd out {}'.format(osd_id)) + if retcode != 0: + print(stdout) + print(stderr) + + # 2. Wait for the OSD to flush + while True: + retcode, stdout, stderr = common.run_os_command('ceph health') + health_string = stdout + except: + pass + +class CephPool(object): + def __init__(self): + pass diff --git a/node-daemon/pvcd/Daemon.py b/node-daemon/pvcd/Daemon.py index e00501b9..8aa0c603 100644 --- a/node-daemon/pvcd/Daemon.py +++ b/node-daemon/pvcd/Daemon.py @@ -37,6 +37,7 @@ import time import re import configparser import threading +import json import apscheduler.schedulers.background import pvcd.log as log @@ -48,6 +49,7 @@ import pvcd.DomainInstance as DomainInstance import pvcd.NodeInstance as NodeInstance import pvcd.VXNetworkInstance as VXNetworkInstance import pvcd.DNSAggregatorInstance as DNSAggregatorInstance +import pvcd.CephInstance as CephInstance ############################################################################### # PVCD - node daemon startup program @@ -522,9 +524,11 @@ logger.out('Setting up objects', state='i') d_node = dict() d_network = dict() d_domain = dict() +d_osd = dict() node_list = [] network_list = [] domain_list = [] +osd_list = [] # Create an instance of the DNS Aggregator if we're a coordinator if config['daemon_mode'] == 'coordinator': @@ -561,7 +565,7 @@ this_node = d_node[myhostname] # Primary node @zk_conn.DataWatch('/primary_node') -def update_primart(new_primary, stat, event=''): +def update_primary(new_primary, stat, event=''): try: new_primary = new_primary.decode('ascii') except AttributeError: @@ -633,7 +637,7 @@ def update_domains(new_domain_list): # Add any missing domains to the list for domain in new_domain_list: if not domain in domain_list: - d_domain[domain] = DomainInstance.DomainInstance(domain, zk_conn, config, logger, this_node); + d_domain[domain] = DomainInstance.DomainInstance(domain, zk_conn, config, logger, this_node) # Remove any deleted domains from the list for domain in domain_list: @@ -649,6 +653,60 @@ def update_domains(new_domain_list): for node in d_node: d_node[node].update_domain_list(d_domain) +# Ceph OSD provisioning key +@zk_conn.DataWatch('/ceph/osd_cmd') +def osd_cmd(data, stat, event=''): + if data: + data = data.decode('ascii') + else: + data = '' + + if data: + # Get the command and args + command, args = data.split() + + # Adding a new OSD + if command == 'add': + node, device = args.split(',') + if node == this_node.name: + # Clean up the command queue + zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': ''}) + # Add the OSD + CephInstance.add_osd(zk_conn, logger, node, device) + # Removing an OSD + elif command == 'remove': + osd_id = args + + # Verify osd_id is in the list + if not d_osd[osd_id]: + return True + + if d_osd[osd_id].node == this_node.name: + # Clean up the command queue + zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': ''}) + # Remove the OSD + CephInstance.remove_osd(zk_conn, logger, osd_id, d_osd[osd_id]) + +# OSD objects +@zk_conn.ChildrenWatch('/ceph/osds') +def update_osds(new_osd_list): + global osd_list, d_osd + + # Add any missing OSDs to the list + for osd in new_osd_list: + if not osd in osd_list: + d_osd[osd] = CephInstance.CephOSDInstance(zk_conn, this_node, osd) + + # Remove any deleted OSDs from the list + for osd in osd_list: + if not osd in new_osd_list: + # Delete the object + del(d_osd[osd]) + + # Update and print new list + osd_list = new_osd_list + logger.out('{}OSD list:{} {}'.format(logger.fmt_blue, logger.fmt_end, ' '.join(osd_list)), state='i') + ############################################################################### # PHASE 9 - Run the daemon ############################################################################### @@ -668,6 +726,93 @@ def update_zookeeper(): if zkhandler.readdata(zk_conn, '/primary_node') != this_node.name: zkhandler.writedata(zk_conn, {'/primary_node': this_node.name}) + # Get Ceph cluster health (for local printing) + retcode, stdout, stderr = common.run_os_command('ceph health') + ceph_health = stdout.rstrip() + if 'HEALTH_OK' in ceph_health: + ceph_health_colour = logger.fmt_green + elif 'HEALTH_WARN' in ceph_health: + ceph_health_colour = logger.fmt_yellow + else: + ceph_health_colour = logger.fmt_red + + # Set ceph health information in zookeeper (primary only) + if this_node.router_state == 'primary': + # Get status info + retcode, stdout, stderr = common.run_os_command('ceph status') + ceph_status = stdout + try: + zkhandler.writedata(zk_conn, { + '/ceph': str(ceph_status) + }) + except: + logger.out('Failed to set Ceph status data', state='e') + return + + # Get data from Ceph OSDs + # Parse the dump data + osd_dump = dict() + retcode, stdout, stderr = common.run_os_command('ceph osd dump --format json') + osd_dump_raw = json.loads(stdout)['osds'] + for osd in osd_dump_raw: + osd_dump.update({ + str(osd['osd']): { + 'uuid': osd['uuid'], + 'up': osd['up'], + 'in': osd['in'], + 'weight': osd['weight'], + 'primary_affinity': osd['primary_affinity'] + } + }) + # Parse the status data + osd_status = dict() + retcode, stdout, stderr = common.run_os_command('ceph osd status') + for line in stderr.split('\n'): + # Strip off colour + line = re.sub(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))', '', line) + # Split it for parsing + line = line.split() + if len(line) > 1 and line[1].isdigit(): + # This is an OSD line so parse it + osd_id = line[1] + host = line[3].split('.')[0] + used = line[5] + avail = line[7] + wr_ops = line[9] + wr_data = line[11] + rd_ops = line[13] + rd_data = line[15] + state = line[17] + osd_status.update({ +# osd_stats.update({ + str(osd_id): { + 'host': host, + 'used': used, + 'avail': avail, + 'wr_ops': wr_ops, + 'wr_data': wr_data, + 'rd_ops': rd_ops, + 'rd_data': rd_data, + 'state': state + } + }) + # Merge them together into a single meaningful dict + osd_stats = dict() + for osd in osd_list: + this_dump = osd_dump[osd] + this_dump.update(osd_status[osd]) + osd_stats[osd] = this_dump + + # Trigger updates for each OSD on this node + osds_this_host = 0 + for osd in osd_list: + if d_osd[osd].node == myhostname: + zkhandler.writedata(zk_conn, { + '/ceph/osds/{}/stats'.format(osd): str(osd_stats[osd]) + }) + osds_this_host += 1 + + # Toggle state management of dead VMs to restart them memalloc = 0 vcpualloc = 0 @@ -699,29 +844,6 @@ def update_zookeeper(): if domain_uuid not in this_node.domain_list: this_node.domain_list.append(domain_uuid) - # Set ceph health information in zookeeper (primary only) - if this_node.router_state == 'primary': - # Get status info - retcode, stdout, stderr = common.run_os_command('ceph status') - ceph_status = stdout - try: - zkhandler.writedata(zk_conn, { - '/ceph': str(ceph_status) - }) - except: - logger.out('Failed to set Ceph status data', state='e') - return - - # Get cluster health - retcode, stdout, stderr = common.run_os_command('ceph health') - ceph_health = stdout.rstrip() - if ceph_health == 'HEALTH_OK': - ceph_health_colour = logger.fmt_green - elif ceph_health == 'HEALTH_WARN': - ceph_health_colour = logger.fmt_yellow - else: - ceph_health_colour = logger.fmt_red - # Set our information in zookeeper #this_node.name = lv_conn.getHostname() this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024) @@ -738,7 +860,6 @@ def update_zookeeper(): '/nodes/{}/memalloc'.format(this_node.name): str(this_node.memalloc), '/nodes/{}/vcpualloc'.format(this_node.name): str(this_node.vcpualloc), '/nodes/{}/cpuload'.format(this_node.name): str(this_node.cpuload), - '/nodes/{}/networkscount'.format(this_node.name): str(this_node.networks_count), '/nodes/{}/domainscount'.format(this_node.name): str(this_node.domains_count), '/nodes/{}/runningdomains'.format(this_node.name): ' '.join(this_node.domain_list), '/nodes/{}/keepalive'.format(this_node.name): str(keepalive_time) @@ -797,15 +918,19 @@ def update_zookeeper(): usedmem=this_node.memused, load=this_node.cpuload, allocmem=this_node.memalloc, - netcount=this_node.networks_count + netcount=len(network_list) ), ) logger.out( - '{bold}Ceph health status:{nofmt} {health_colour}{health}{nofmt}'.format( + '{bold}Ceph cluster status:{nofmt} {health_colour}{health}{nofmt} ' + '{bold}Total OSDs:{nofmt} {total_osds} ' + '{bold}Host OSDs:{nofmt} {host_osds}'.format( bold=logger.fmt_bold, health_colour=ceph_health_colour, nofmt=logger.fmt_end, - health=ceph_health + health=ceph_health, + total_osds=len(osd_list), + host_osds=osds_this_host ), ) diff --git a/node-daemon/pvcd/NodeInstance.py b/node-daemon/pvcd/NodeInstance.py index 33e3b430..e306e620 100644 --- a/node-daemon/pvcd/NodeInstance.py +++ b/node-daemon/pvcd/NodeInstance.py @@ -62,7 +62,6 @@ class NodeInstance(object): self.network_list = [] self.domain_list = [] # Node resources - self.networks_count = 0 self.domains_count = 0 self.memused = 0 self.memfree = 0 @@ -212,21 +211,6 @@ class NodeInstance(object): if data != self.domain_list: self.domain_list = data - @self.zk_conn.DataWatch('/nodes/{}/networkscount'.format(self.name)) - def watch_node_networkscount(data, stat, event=''): - if event and event.type == 'DELETED': - # The key has been deleted after existing before; terminate this watcher - # because this class instance is about to be reaped in Daemon.py - return False - - try: - data = data.decode('ascii') - except AttributeError: - data = 0 - - if data != self.networks_count: - self.networks_count = data - @self.zk_conn.DataWatch('/nodes/{}/domainscount'.format(self.name)) def watch_node_domainscount(data, stat, event=''): if event and event.type == 'DELETED': diff --git a/node-daemon/pvcd/common.py b/node-daemon/pvcd/common.py index 5f7a5185..b4921325 100644 --- a/node-daemon/pvcd/common.py +++ b/node-daemon/pvcd/common.py @@ -80,7 +80,16 @@ def run_os_command(command_string, background=False, environment=None): stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) - return command_output.returncode, command_output.stdout.decode('ascii'), command_output.stderr.decode('ascii') + retcode = command_output.returncode + try: + stdout = command_output.stdout.decode('ascii') + except: + stdout = '' + try: + stderr = command_output.stderr.decode('ascii') + except: + stderr = '' + return retcode, stdout, stderr # Reload the firewall rules of the system def reload_firewall_rules(logger, rules_file):