Finish setup of Ceph OSD addition and basic management

This commit is contained in:
Joshua Boniface 2018-10-29 17:51:08 -04:00
parent 59472ae374
commit bfbe9188ce
6 changed files with 363 additions and 51 deletions

View File

@ -964,7 +964,7 @@ def ceph_osd_add(node, device):
@click.argument( @click.argument(
'osdid' 'osdid'
) )
def ceph_osd_remove(node, device): def ceph_osd_remove(osdid):
""" """
Remove a Ceph OSD with ID OSDID from the cluster. Remove a Ceph OSD with ID OSDID from the cluster.
""" """
@ -1113,7 +1113,7 @@ net_acl.add_command(net_acl_remove)
net_acl.add_command(net_acl_list) net_acl.add_command(net_acl_list)
ceph_osd.add_command(ceph_osd_add) ceph_osd.add_command(ceph_osd_add)
#ceph_osd.add_command(ceph_osd_remove) ceph_osd.add_command(ceph_osd_remove)
#ceph_osd.add_command(ceph_osd_in) #ceph_osd.add_command(ceph_osd_in)
#ceph_osd.add_command(ceph_osd_out) #ceph_osd.add_command(ceph_osd_out)
#ceph_osd.add_command(ceph_osd_set) #ceph_osd.add_command(ceph_osd_set)

View File

@ -59,13 +59,13 @@ def add_osd(zk_conn, node, device):
return False, 'ERROR: No node named "{}" is present in the cluster.'.format(node) return False, 'ERROR: No node named "{}" is present in the cluster.'.format(node)
# Tell the cluster to create a new OSD for the host # Tell the cluster to create a new OSD for the host
new_osd_string = 'new {},{}'.format(node, device) add_osd_string = 'add {},{}'.format(node, device)
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': new_osd_string}) zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': add_osd_string})
click.echo('Created new OSD with block device {} on node {}.'.format(device, node)) click.echo('Created new OSD with block device {} on node {}.'.format(device, node))
return True, '' return True, ''
def remove_osd(zk_conn, osd_id): def remove_osd(zk_conn, osd_id):
remove_osd_string = 'remove {}'.format(osd_id) remove_osd_string = 'remove {}'.format(osd_id)
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': remove_osd_string}) zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': remove_osd_string})
click.echo('Remove OSD {} from the cluster.'.format(osd_id)) click.echo('Removed OSD with ID {} from the cluster.'.format(osd_id))
return True, '' return True, ''

View File

@ -0,0 +1,194 @@
#!/usr/bin/env python3
# CehpInstance.py - Class implementing a PVC node Ceph instance
# Part of the Parallel Virtual Cluster (PVC) system
#
# Copyright (C) 2018 Joshua M. Boniface <joshua@boniface.me>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import time
import ast
import pvcd.log as log
import pvcd.zkhandler as zkhandler
import pvcd.fencing as fencing
import pvcd.common as common
class CephInstance(object):
def __init__(self):
pass
class CephOSDInstance(object):
def __init__(self, zk_conn, this_node, osd_id):
self.zk_conn = zk_conn
self.this_node = this_node
self.osd_id = osd_id
self.node = None
self.size = None
self.stats = dict()
@self.zk_conn.DataWatch('/ceph/osds/{}/node'.format(self.osd_id))
def watch_osd_host(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data != self.node:
self.node = data
@self.zk_conn.DataWatch('/ceph/osds/{}/size'.format(self.osd_id))
def watch_osd_host(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data != self.size:
self.size = data
@self.zk_conn.DataWatch('/ceph/osds/{}/stats'.format(self.osd_id))
def watch_osd_host(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = ''
if data != self.stats:
self.stats.update(ast.literal_eval(data))
def add_osd(zk_conn, logger, node, device):
# We are ready to create a new OSD on this host
logger.out('Creating new OSD disk', state='i')
try:
# 1. Create an OSD; we do this so we know what ID will be gen'd
retcode, stdout, stderr = common.run_os_command('ceph osd create')
if retcode != 0:
print(stdout)
print(stderr)
raise
osd_id = stdout.rstrip()
# 2. Remove that newly-created OSD
retcode, stdout, stderr = common.run_os_command('ceph osd rm {}'.format(osd_id))
if retcode != 0:
print(stdout)
print(stderr)
raise
# 3. Create the OSD for real
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm prepare --bluestore --data {device}'.format(
osdid=osd_id,
device=device
)
)
if retcode != 0:
print(stdout)
print(stderr)
raise
# 4. Activate the OSD
retcode, stdout, stderr = common.run_os_command(
'ceph-volume lvm activate --bluestore {osdid}'.format(
osdid=osd_id
)
)
if retcode != 0:
print(stdout)
print(stderr)
raise
# 5. Add it to the crush map
retcode, stdout, stderr = common.run_os_command(
'ceph osd crush add osd.{osdid} 1.0 root=default host={node}'.format(
osdid=osd_id,
node=node
)
)
if retcode != 0:
print(stdout)
print(stderr)
raise
time.sleep(0.5)
# 6. Verify it started
retcode, stdout, stderr = common.run_os_command(
'systemctl status ceph-osd@{osdid}'.format(
osdid=osd_id
)
)
if retcode != 0:
print(stdout)
print(stderr)
raise
# 7. Add the new OSD to the list
zkhandler.writedata(zk_conn, {
'/ceph/osds/{}'.format(osd_id): '',
'/ceph/osds/{}/node'.format(osd_id): node,
'/ceph/osds/{}/size'.format(osd_id): '',
'/ceph/osds/{}/stats'.format(osd_id): '{}'
})
# Log it
logger.out('Created new OSD disk with ID {}'.format(osd_id), state='o')
except Exception as e:
# Log it
logger.out('Failed to create new OSD disk: {}'.format(e), state='e')
def remove_osd(zk_conn, logger, osd_id, osd_obj):
logger.out('Removing OSD disk {}'.format(osd_id), state='i')
try:
# 1. Verify the OSD is present
retcode, stdout, stderr = common.run_os_command('ceph osd ls')
osd_list = stdout.split('\n')
if not osd_id in osd_list:
logger.out('Could not find OSD {} in the cluster'.format(osd_id), state='e')
return True
# 1. Set the OSD out so it will flush
retcode, stdout, stderr = common.run_os_command('ceph osd out {}'.format(osd_id))
if retcode != 0:
print(stdout)
print(stderr)
# 2. Wait for the OSD to flush
while True:
retcode, stdout, stderr = common.run_os_command('ceph health')
health_string = stdout
except:
pass
class CephPool(object):
def __init__(self):
pass

View File

@ -37,6 +37,7 @@ import time
import re import re
import configparser import configparser
import threading import threading
import json
import apscheduler.schedulers.background import apscheduler.schedulers.background
import pvcd.log as log import pvcd.log as log
@ -48,6 +49,7 @@ import pvcd.DomainInstance as DomainInstance
import pvcd.NodeInstance as NodeInstance import pvcd.NodeInstance as NodeInstance
import pvcd.VXNetworkInstance as VXNetworkInstance import pvcd.VXNetworkInstance as VXNetworkInstance
import pvcd.DNSAggregatorInstance as DNSAggregatorInstance import pvcd.DNSAggregatorInstance as DNSAggregatorInstance
import pvcd.CephInstance as CephInstance
############################################################################### ###############################################################################
# PVCD - node daemon startup program # PVCD - node daemon startup program
@ -522,9 +524,11 @@ logger.out('Setting up objects', state='i')
d_node = dict() d_node = dict()
d_network = dict() d_network = dict()
d_domain = dict() d_domain = dict()
d_osd = dict()
node_list = [] node_list = []
network_list = [] network_list = []
domain_list = [] domain_list = []
osd_list = []
# Create an instance of the DNS Aggregator if we're a coordinator # Create an instance of the DNS Aggregator if we're a coordinator
if config['daemon_mode'] == 'coordinator': if config['daemon_mode'] == 'coordinator':
@ -561,7 +565,7 @@ this_node = d_node[myhostname]
# Primary node # Primary node
@zk_conn.DataWatch('/primary_node') @zk_conn.DataWatch('/primary_node')
def update_primart(new_primary, stat, event=''): def update_primary(new_primary, stat, event=''):
try: try:
new_primary = new_primary.decode('ascii') new_primary = new_primary.decode('ascii')
except AttributeError: except AttributeError:
@ -633,7 +637,7 @@ def update_domains(new_domain_list):
# Add any missing domains to the list # Add any missing domains to the list
for domain in new_domain_list: for domain in new_domain_list:
if not domain in domain_list: if not domain in domain_list:
d_domain[domain] = DomainInstance.DomainInstance(domain, zk_conn, config, logger, this_node); d_domain[domain] = DomainInstance.DomainInstance(domain, zk_conn, config, logger, this_node)
# Remove any deleted domains from the list # Remove any deleted domains from the list
for domain in domain_list: for domain in domain_list:
@ -649,6 +653,60 @@ def update_domains(new_domain_list):
for node in d_node: for node in d_node:
d_node[node].update_domain_list(d_domain) d_node[node].update_domain_list(d_domain)
# Ceph OSD provisioning key
@zk_conn.DataWatch('/ceph/osd_cmd')
def osd_cmd(data, stat, event=''):
if data:
data = data.decode('ascii')
else:
data = ''
if data:
# Get the command and args
command, args = data.split()
# Adding a new OSD
if command == 'add':
node, device = args.split(',')
if node == this_node.name:
# Clean up the command queue
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': ''})
# Add the OSD
CephInstance.add_osd(zk_conn, logger, node, device)
# Removing an OSD
elif command == 'remove':
osd_id = args
# Verify osd_id is in the list
if not d_osd[osd_id]:
return True
if d_osd[osd_id].node == this_node.name:
# Clean up the command queue
zkhandler.writedata(zk_conn, {'/ceph/osd_cmd': ''})
# Remove the OSD
CephInstance.remove_osd(zk_conn, logger, osd_id, d_osd[osd_id])
# OSD objects
@zk_conn.ChildrenWatch('/ceph/osds')
def update_osds(new_osd_list):
global osd_list, d_osd
# Add any missing OSDs to the list
for osd in new_osd_list:
if not osd in osd_list:
d_osd[osd] = CephInstance.CephOSDInstance(zk_conn, this_node, osd)
# Remove any deleted OSDs from the list
for osd in osd_list:
if not osd in new_osd_list:
# Delete the object
del(d_osd[osd])
# Update and print new list
osd_list = new_osd_list
logger.out('{}OSD list:{} {}'.format(logger.fmt_blue, logger.fmt_end, ' '.join(osd_list)), state='i')
############################################################################### ###############################################################################
# PHASE 9 - Run the daemon # PHASE 9 - Run the daemon
############################################################################### ###############################################################################
@ -668,6 +726,93 @@ def update_zookeeper():
if zkhandler.readdata(zk_conn, '/primary_node') != this_node.name: if zkhandler.readdata(zk_conn, '/primary_node') != this_node.name:
zkhandler.writedata(zk_conn, {'/primary_node': this_node.name}) zkhandler.writedata(zk_conn, {'/primary_node': this_node.name})
# Get Ceph cluster health (for local printing)
retcode, stdout, stderr = common.run_os_command('ceph health')
ceph_health = stdout.rstrip()
if 'HEALTH_OK' in ceph_health:
ceph_health_colour = logger.fmt_green
elif 'HEALTH_WARN' in ceph_health:
ceph_health_colour = logger.fmt_yellow
else:
ceph_health_colour = logger.fmt_red
# Set ceph health information in zookeeper (primary only)
if this_node.router_state == 'primary':
# Get status info
retcode, stdout, stderr = common.run_os_command('ceph status')
ceph_status = stdout
try:
zkhandler.writedata(zk_conn, {
'/ceph': str(ceph_status)
})
except:
logger.out('Failed to set Ceph status data', state='e')
return
# Get data from Ceph OSDs
# Parse the dump data
osd_dump = dict()
retcode, stdout, stderr = common.run_os_command('ceph osd dump --format json')
osd_dump_raw = json.loads(stdout)['osds']
for osd in osd_dump_raw:
osd_dump.update({
str(osd['osd']): {
'uuid': osd['uuid'],
'up': osd['up'],
'in': osd['in'],
'weight': osd['weight'],
'primary_affinity': osd['primary_affinity']
}
})
# Parse the status data
osd_status = dict()
retcode, stdout, stderr = common.run_os_command('ceph osd status')
for line in stderr.split('\n'):
# Strip off colour
line = re.sub(r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))', '', line)
# Split it for parsing
line = line.split()
if len(line) > 1 and line[1].isdigit():
# This is an OSD line so parse it
osd_id = line[1]
host = line[3].split('.')[0]
used = line[5]
avail = line[7]
wr_ops = line[9]
wr_data = line[11]
rd_ops = line[13]
rd_data = line[15]
state = line[17]
osd_status.update({
# osd_stats.update({
str(osd_id): {
'host': host,
'used': used,
'avail': avail,
'wr_ops': wr_ops,
'wr_data': wr_data,
'rd_ops': rd_ops,
'rd_data': rd_data,
'state': state
}
})
# Merge them together into a single meaningful dict
osd_stats = dict()
for osd in osd_list:
this_dump = osd_dump[osd]
this_dump.update(osd_status[osd])
osd_stats[osd] = this_dump
# Trigger updates for each OSD on this node
osds_this_host = 0
for osd in osd_list:
if d_osd[osd].node == myhostname:
zkhandler.writedata(zk_conn, {
'/ceph/osds/{}/stats'.format(osd): str(osd_stats[osd])
})
osds_this_host += 1
# Toggle state management of dead VMs to restart them # Toggle state management of dead VMs to restart them
memalloc = 0 memalloc = 0
vcpualloc = 0 vcpualloc = 0
@ -699,29 +844,6 @@ def update_zookeeper():
if domain_uuid not in this_node.domain_list: if domain_uuid not in this_node.domain_list:
this_node.domain_list.append(domain_uuid) this_node.domain_list.append(domain_uuid)
# Set ceph health information in zookeeper (primary only)
if this_node.router_state == 'primary':
# Get status info
retcode, stdout, stderr = common.run_os_command('ceph status')
ceph_status = stdout
try:
zkhandler.writedata(zk_conn, {
'/ceph': str(ceph_status)
})
except:
logger.out('Failed to set Ceph status data', state='e')
return
# Get cluster health
retcode, stdout, stderr = common.run_os_command('ceph health')
ceph_health = stdout.rstrip()
if ceph_health == 'HEALTH_OK':
ceph_health_colour = logger.fmt_green
elif ceph_health == 'HEALTH_WARN':
ceph_health_colour = logger.fmt_yellow
else:
ceph_health_colour = logger.fmt_red
# Set our information in zookeeper # Set our information in zookeeper
#this_node.name = lv_conn.getHostname() #this_node.name = lv_conn.getHostname()
this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024) this_node.memused = int(psutil.virtual_memory().used / 1024 / 1024)
@ -738,7 +860,6 @@ def update_zookeeper():
'/nodes/{}/memalloc'.format(this_node.name): str(this_node.memalloc), '/nodes/{}/memalloc'.format(this_node.name): str(this_node.memalloc),
'/nodes/{}/vcpualloc'.format(this_node.name): str(this_node.vcpualloc), '/nodes/{}/vcpualloc'.format(this_node.name): str(this_node.vcpualloc),
'/nodes/{}/cpuload'.format(this_node.name): str(this_node.cpuload), '/nodes/{}/cpuload'.format(this_node.name): str(this_node.cpuload),
'/nodes/{}/networkscount'.format(this_node.name): str(this_node.networks_count),
'/nodes/{}/domainscount'.format(this_node.name): str(this_node.domains_count), '/nodes/{}/domainscount'.format(this_node.name): str(this_node.domains_count),
'/nodes/{}/runningdomains'.format(this_node.name): ' '.join(this_node.domain_list), '/nodes/{}/runningdomains'.format(this_node.name): ' '.join(this_node.domain_list),
'/nodes/{}/keepalive'.format(this_node.name): str(keepalive_time) '/nodes/{}/keepalive'.format(this_node.name): str(keepalive_time)
@ -797,15 +918,19 @@ def update_zookeeper():
usedmem=this_node.memused, usedmem=this_node.memused,
load=this_node.cpuload, load=this_node.cpuload,
allocmem=this_node.memalloc, allocmem=this_node.memalloc,
netcount=this_node.networks_count netcount=len(network_list)
), ),
) )
logger.out( logger.out(
'{bold}Ceph health status:{nofmt} {health_colour}{health}{nofmt}'.format( '{bold}Ceph cluster status:{nofmt} {health_colour}{health}{nofmt} '
'{bold}Total OSDs:{nofmt} {total_osds} '
'{bold}Host OSDs:{nofmt} {host_osds}'.format(
bold=logger.fmt_bold, bold=logger.fmt_bold,
health_colour=ceph_health_colour, health_colour=ceph_health_colour,
nofmt=logger.fmt_end, nofmt=logger.fmt_end,
health=ceph_health health=ceph_health,
total_osds=len(osd_list),
host_osds=osds_this_host
), ),
) )

View File

@ -62,7 +62,6 @@ class NodeInstance(object):
self.network_list = [] self.network_list = []
self.domain_list = [] self.domain_list = []
# Node resources # Node resources
self.networks_count = 0
self.domains_count = 0 self.domains_count = 0
self.memused = 0 self.memused = 0
self.memfree = 0 self.memfree = 0
@ -212,21 +211,6 @@ class NodeInstance(object):
if data != self.domain_list: if data != self.domain_list:
self.domain_list = data self.domain_list = data
@self.zk_conn.DataWatch('/nodes/{}/networkscount'.format(self.name))
def watch_node_networkscount(data, stat, event=''):
if event and event.type == 'DELETED':
# The key has been deleted after existing before; terminate this watcher
# because this class instance is about to be reaped in Daemon.py
return False
try:
data = data.decode('ascii')
except AttributeError:
data = 0
if data != self.networks_count:
self.networks_count = data
@self.zk_conn.DataWatch('/nodes/{}/domainscount'.format(self.name)) @self.zk_conn.DataWatch('/nodes/{}/domainscount'.format(self.name))
def watch_node_domainscount(data, stat, event=''): def watch_node_domainscount(data, stat, event=''):
if event and event.type == 'DELETED': if event and event.type == 'DELETED':

View File

@ -80,7 +80,16 @@ def run_os_command(command_string, background=False, environment=None):
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
) )
return command_output.returncode, command_output.stdout.decode('ascii'), command_output.stderr.decode('ascii') retcode = command_output.returncode
try:
stdout = command_output.stdout.decode('ascii')
except:
stdout = ''
try:
stderr = command_output.stderr.decode('ascii')
except:
stderr = ''
return retcode, stdout, stderr
# Reload the firewall rules of the system # Reload the firewall rules of the system
def reload_firewall_rules(logger, rules_file): def reload_firewall_rules(logger, rules_file):